-
[3장] 아파치 스파크의 정형화 API [2]데이터엔지니어링/Spark 2023. 7. 25. 02:24
자주 쓰이는 데이터 프레임 작업들
데이터 프레임에서 일반적인 데이터 작업을 수행하려면 구조화된 데이터를 갖고 있는 데이터 소스에서 데이터 프레임으로 로드를 해야하는데, 이를 위해 스파크는 DataFrameReader라는 이름의 인터페이스를 제공한다. 이 인터페이스는 JSON, CSV, Parquet, 텍스트, 에이브로, ORC 같은 다양한 포맷의 데이터 소스를 읽는것을 지원한다.
특정 포맷의 데이터소스에 데이터 프레임의 데이터를 써서 내보내기 위해서는 DataFrameWriter를 쓴다.
# 파이썬에서 스키마를 정의한다. from pyspark.sql.types import * # 프로그래밍적인 방법으로 스키마를 정의한다. fire_schema = StructType([StructField('CallNumber', IntegerType(), True), StructField('UnitID', StringType(), True), StructField('IncidentNumber', IntegerType(), True), StructField('CallType', StringType(), True), StructField('CallDate', StringType(), True), StructField('WatchDate', StringType(), True), StructField('CallFinalDisposition', StringType(), True), StructField('AvailableDtTm', StringType(), True), StructField('Address', StringType(), True), StructField('City', StringType(), True), StructField('Zipcode', IntegerType(), True), StructField('Battalion', StringType(), True), StructField('StationArea', StringType(), True), StructField('Box', StringType(), True), StructField('originalPriority', StringType(), True), StructField('Priority', StringType(), True), StructField('FinalPriority', IntegerType(), True), StructField('ASLUnit', BooleanType(), True), StructField('CallTypeGroup', StringType(), True), StructField('NumAlarms', IntegerType(), True), StructField('UnitType', StringType(), True), StructField('UnitSequenceInCallDispatch', IntegerType(), True), StructField('FirePreventionDistrict', StringType(), True), StructField('SupervisorDistrict', StringType(), True), StructField('Neighborhood', StringType(), True), StructField('Location', StringType(), True), StructField('RowID', StringType(), True), StructField('Delay', FloatType(), True)]) # DataFrameReader 인터페이스로 CSV 파일 읽기 sf_fire_file = "파일 위치" fire_df = spark.read.csv(sf_fire_file, header=True, schema=fire_schema)
spark.read.csv() 함수는 CSV 파일을 읽어서 row 객체와 스키마에 맞는 타입의 이름이 있는 칼럼들로 이루어진 데이터 프레임을 되돌려 준다.
데이터 프레임을 Parquet 파일이나 SQL 테이블로 저장하기
일반적으로 많이 하는 작업이다.
# 파이썬에서 파케이로 저장 parquet_path = "위치위치" fire_df.write.format("parquet").save(parquetPath) # 혹은 하이브 메타스토어에 메타데이터로 등록되는 테이블로 지정 가능 # 파이썬 예제 parquet_table = new_parquet #테이블 이름 fire_df.write.format("parquet").saveAsTable(parquet_table)
트랜스포메이션과 액션
프로젝션과 필터
프로젝션은 관계형 DB 식으로 말하면 필터를 이용해 특정 관계 상태와 매치되는 행들만 되돌려 주는 방법이다. 스파크에서 프로젝션은 select() 메서드로 수행하는 반면, 필터는 filter()나 where() 메서드로 표현된다.
# 파이썬 예제 few_fire_df = (fire_df.select("IncidentNumber", "AvailableDtTm", "CallType") .where(col("CallType" != "Medical Incident")) few_fire_df.show(5, truncate=False) # 파이썬 예제, countDistinct()를 써서 신고 타입의 개수를 되돌려 준다. from pyspark.sql.functions import * (fire_df .select("CallType") .where(col("CallType").isNotNull()) .agg(countDistinct("CallType").alias("DistinctCallTypes")) .show()) # 파이썬 예제, 모든 행에서 null이 아닌 개별 CallType을 추출한다. (fire_df .select("CallType") .where(col("CallType").isNotNull()) .distinct() .show(10, False))
칼럼의 이름 변경 및 추가 삭제
StructField를 써서 스키마 내에서 원하는 칼럼 이름들을 지정하면 결과 데이터 프레임에서 원하는 대로 칼럼 이름이 출력되며, withColumnRenamed() 함수를 써서 이름을 변경이 가능하다.
# 파이썬 예제 new_fire_df = fire_df.withColumnRenamed("Delay", "ResponseDelayedinMins") (new_fire_df.select("ResponseDelayedinMins") .where(col("ResopnseDelayedinMins") > 5) .show(5, False)) # 파이썬 예제 fire_ts_df = (new_fire_df .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")) .drop("CallDate") .withColumn("OnWatchDate", to_timestamp(col("watchDate"), "MM/dd/yyyy")) .drop("WatchDate") .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a")) .drop("AvailableDtTm")) # 변환된 칼럼들을 가져온다. (fire_ts_df.select("IncidentDate", "OnWatchDate", "AvailableDtTS") .show(5, False))
spark.sql.functions에서 dayofmonth(), dayofyear(), dayofweek() 같은 함수들을 써서 질의할 수 있다.
# 파이썬 예제 (fire_ts_df.select(year('IncidentDate') .distinct() .orderBy(year('IncidentDate')) .show())
집계연산
groupBy(), orderBy(), count()와 같이 데이터 프레임에서 쓰는 일부 트랜스포메이션과 액션은 칼럼 이름으로 집계해서 각각 개수를 세어주는 기능 제공한다.
# 파이썬 예제 (fire_ts_df.select("CallType") .where(col("CallType").isNotNull()) .groupBy("CallType") .count() .orderBy("count", ascending=False) .show(n=10, truncate=False))
그 외 일반적인 데이터 프레임 연산들
데이터 프레임 API는 min(), max(), sum(), avg() 뿐만 아니라, stat(), describe(), correlation(), covariance(), sampleBy(), approxQuantile(), frequentItems() 등이 있다.
자주 쓰이는 데이터 프레임 작업들
데이터세트는 정적 타입 API와 동적 타입 API의 두 특성을 모두 가진다.
개념적으론 스칼라의 데이터 프레임은 공용 객체의 모음인 Dataset[Row]의 다른 이름이라고 생각할 수 있으며, Row는 서로 다른 타입의 값을 저장할 수 있는 포괄적 JVM 객체라고 보면 된다.
반면 데이터세트는 스칼라에서 엄격하게 타입이 정해진 JVM 객체의 집합이며, 이 객체는 자바에서는 클래스라고 볼 수 있다.
언어 정적/동적 타입 기본 추상화 객체 타입 여부 스칼라 Dataset[T]와 DataFrame(Dataset[Row]의 앨리어싱) 양쪽 모두 가능 자바 Dataset[T] 정적 타입 파이썬 DataFrame 포괄적 Row 객체를 사용한 동적 타입 R DataFrame 포괄적 Row 객체를 사용한 동적 타입 Row는 스파크의 포괄적 객체 타입이며, 인덱스를 사용해 접근할 수 있으며 다양한 타입의 값들을 담을 수 있다.
# 파이썬 예제 from pyspark.sql import Row row = Row(350, True, "Learning Spark 2E", None)
Row 객체에 공개되어 있는 게터(getter)류 함수들에 인덱스를 사용해 개별 필드에 접근할 수 있다.
정적 객체들은 JVM에서 실제 자바 클래스나 스칼라 클래스가 된다. 그래서 데이터 세트의 각 아이템들은 곧바로 하나의 JVM 객체가 되어 쓸 수 있다.
데이터 프레임 vs 데이터세트
어떤걸 쓸지 아래의 예시를 보면된다.
- 스파크에게 어떻게 하는지가 아니라 무엇을 해야 하는지 말하고 싶으면 데이터 프레임이나 데이터 세트를 사용한다.
- 풍부한 표현과 높은 수준의 추상화 및 DSL 연산을 원한다면 데이터 프레임이나 데이터세트를 사용한다.
- 파이썬 사용자는 데이터 프레임을 쓰되, 제어권을 더 갖고 싶다면 RDD로 바꿔 사용한다.
RDD를 사용할 때
RDD 사용 고려할 때
- RDD를 사용하도록 작성된 서드파티 패키지를 사용한다.
- 데이터 프레임과 데이터세트에서 얻을 수 있는 코드 최적화, 효과적인 공간 사용, 퍼포먼스의 이득을 포기할 수 있다.
- 스파크가 어떻게 질의를 수행할지 정확하게 지정해 주고 싶다.
스파크 SQL과 하부의 엔진
SQL 같은 질의를 수행하게 해 주는 것 외에도 스파크 SQL 엔진은 다음과 같은 일을 한다.
- 스파크 컴포넌트들을 통합하고 데이터 프레임/데이터 세트가 자바, 스칼라, 파이썬 R등으로 정형화 데이터 관련 작업을 단순화할 수 있도록 추상화를 해 준다.
- 아파치 하이브 메타스토어와 테이블에 접근한다
- 정형화된 파일 포맷등에서 스키마와 정형화 데이터를 읽고 쓰며 데이터를 임시 테이블로 변환한다.
- 빠른 데이터 탐색을 할 수 있도록 대화형 스파크 SQL 셸을 제공한다.
- 표준 데이터베이스 JDBC/ODBC 커넥터를 통해 외부의 도구들과 연결할 수 있는 중간 역할을 한다.
- 최종 실행을 위해 최적화된 질의 계획과 JVM을 위한 최적화된 코드를 생성한다.
카탈리스트 옵티마이저
스파크 연산의 4단계 분석, 논리적 최적화, 물리 계획 수립, 코드 생성
'데이터엔지니어링 > Spark' 카테고리의 다른 글
[4장] 스파크 SQL과 데이터 프레임: 내장 데이터 소스 소개 [2] (0) 2023.08.03 [4장] 스파크 SQL과 데이터 프레임: 내장 데이터 소스 소개 [1] (0) 2023.08.01 [3장] 아파치 스파크의 정형화 API [1] (0) 2023.07.20 [2장] 아파치 스파크 시작 (0) 2023.07.07 [1장] 아파치 스파크 소개 (0) 2023.06.28