본문 바로가기
Apache/Spark

[4장] 스파크 SQL과 데이터 프레임: 내장 데이터 소스 소개 [2]

by 잼있는잼니 2023. 8. 3.

SQL 테이블과 뷰

 

스파크는  각 테이블과 해당 데이터에 관련된 정보인 스키마, 설명, 테이블명, 데이터베이스명, 칼럼명, 파티션, 실제 데이터의 물리적 위치 등의 메타데이터를 가지고 있고, 이 모든 정보는 중앙 메타스토어에 저장된다.

 

관리형 테이블과 비관리형 테이블

스파크는 관리형과 비관리형이라는 두 가지 유형의 테이블을 만들 수 있다.

관리형 테이블의 경우 스파크는 메타데이터와 파일 저장소의 데이터를 모두 관리하며, 비관리형 테이블의 경우에는 오직 메타데이터만 관리하고 카산드라와 같은 외부 데이터 소스에서 데이터를 직접 관리한다.

 

SQL 데이터 베이스와 테이블 생성하기

테이블은 데이터베이스 안에 존재하기에 스파크에게 learn_spark_db라는 데이터베이스를 생성하고 스파크에게 해당 데이터베이스를 사용하겠다고 알려줘야한다.

# 파이썬 예제

spark.sql("CREATE DATABASE learn_spark_db")
spark.sql("USE learn_spark_db")

이제부턴 어떠한 명령어든 learn_spark_db 데이터베이스 안에서 생성되고 상주하게 된다.

 

관리형 테이블 생성하기

learn_spark_db 데이터베이스 안에 관리형 테이블을 생성하기 위한 코드는 아래와 같다.

# 파이썬 예제
spark.sql("CREATE TABLE managed_us_delay_flights_tbl (date STRING, delay INT, distance INT, origin STRING, destination STRING)")

데이터 프레임 API를 아래와 같이 사용하여 같은 명령을 수행이 가능하다.

# 파이썬 예제
# 미국 항공편 지연 CSV 파일 경로

csv_file = "파일 위치"

# 앞의 예제에서 정의된 스키마

schema = "date STRING, delay INT, distance INT, origin STRING, destination STRING"
flights_df = spark.read.csv(csv_file, schema=schema)
flights_df.write.saveAsTable("managed_us_delay_flights_tbl")

 

비관리형 테이블 생성하기

 

스파크 애플리케이션에서 접근 가능한 파일 저장소에 있는 파케이, CSV 및 JSON 파일 포맷의 데이터 소스로부터 비관리형 테이블을 생성할 수 있다.

CSV 파일과 같은 데이터 소스로부터 비관리형 테이블을 생성하기 위해서는 SQL에서 다음과 같은 명령어를 사용한다.

spark.sql("""
	CREATE TABLE us_delay_flights_tbl (
    date STRING,
    delay INT,
    distance INT,
    origin STRING,
    destination STRING
)
USING csv OPTIONS (PATH '파일 위치')
""")
# 데이터 프레임 API에서 다음과 같은 명령어 사용

(flights_df
	.write
    .option("path", "파일 위치")
    .saveAsTable("us_delay_flights_tbl"))

뷰 생성하기

 

뷰는 전역 또는 세션 범위일 수 있으며 일시적으로 스파크 애플리케이션이 종료되면 사라진다.

뷰를 생성하고 난뒤 테이블처럼 쿼리할 수 있으며, 애플리케이션이 종료되면 테이블은 유지되나 뷰는 사라진다.

 

임시 뷰 VS 전역 임시 뷰

임시 뷰와 전역 임시 뷰는 크게 차이가 없다. 임시 뷰는 스파크 애플리케이션 내의 단일 SparkSession에 연결되는 반면에 전역 임시 뷰는 스파크 애플리케이션 내의 여러 SparkSession에서 볼 수 있다.

 

메타데이터 보기

스파크는 관리형 및 비관리형 테이블에 대해서 메타스토어에 저장하기 때문에, 메타데이터를 관리할 수 있다.

스파크 SQL의 상위 추상화 모듈인 카탈로그에 저장이 된다.

 

# 파이썬 예제

spark.catalog.listDatabases()
spark.catalog.listTables()
spark.catalog.listColumns("us_delay_flights_tbl")

저장된 모든 메타데이터를 위의 함수를 통해서 액세스할 수 있다.


SQL 테이블 캐싱

 

데이터 프레임처럼 SQL 테이블과 뷰, 캐시 및 언캐싱을 할 수 있다. Spark 3.0 에서는 테이블을 LAZY로 지정 가능하며, 이는 테이블을 바로 캐싱하지 않고 처음 사용되는 시점에서 캐싱함을 의미한다.

 

# SQL 예제
CACHE [LAZY] TABLE <table-name>
UNCACHE TABLE <table-name>

 

만약, 사용 준비가 된 데이터베이스 learn_spark_db 와 테이블 us_delay_flights_tbl이 있다 가정하자.

SQL을 이용하여 테이블을 쿼리하고 반환된 결과로 데이터 프레임에 저장할 수 있다.

 

# 파이썬 예제

us_flights_df = spark.sql("SELECT * FROM us_delay_flights_tbl")
us_flights_df2 = spark.table("us_delay_flights_tbl")

 

기존 SQL 테이블에서 읽은 가공된 데이터 프레임을 갖게 된다.


데이터 프레임 및 SQL 테이블을 위한 데이터 소스

 

상위 수준 데이터 소스 API인 DataFrameReader 와 DataFrameWriter에 대해서 살펴보자.

 

DataFrameReader

정의된 형식과 권장되는 사용 패턴이 존재하며, 스파크에서 일반적으로 사용되고 가독성이 높다.

또한, SparkSession 인스턴스를 통해서만 액세스가 가능하다.

 

DataFrameReader.format(args).option("key","value").schema(args).load()

# 인스턴스 핸들을 위해서

SparkSession.read
# or
SparkSession.readStream
# 두가지중 한가지를 이용하여 핸들을 얻을 수 있다.
함수 인수 설명
format("ar") ar에 parquet, csv, txt, json, jdbc, orc, avro 등 기본값은 parquet 혹은 spark.sql.source.default에 지정된 항목
option() ("mode", {PERMISSIVE, FAILFAST, DROPMALFORMED}) ("inferSchema",{true, false}) ("path", "path_file_data_source") 기본 모드는 PERMISSIVE이다. "inferSchema" 및 "mode" 옵션은 JSON 및 CSV 파일 형식에만 적용된다.
schema() DDL 문자열 또는 StructType, 예: 'A INT, B STRING' 또는 StructType(...) JSON 또는 CSV 형식의 경우 option() 함수에서 스키마를 유추하도록 지정할 수 있다.
load() "/path/to/data/source" 데이터 소스 경로, option("path", "---")에 지정된 경우 비우기 가능

 

DataFrameWriter

지정된 내장 데이터 소스에 데이터를 저장하거나 쓰는 작업을 수행하며, DataFrameReader와 달리 SparkSession이 아닌 저장하려는 데이터 프레임에서 인스턴스에 액세스가 가능하다.

 

DataFrameWriter.format(args)
	.option(args)
    .bucketBy(args)
    .partitionBy(args)
    .save(path)

DataFrameWriter.format(args).option(args).sortBy(args).saveAsTable(table)

# 인스턴스 핸들을 가져오려면

DataFrame.write
# 혹은
DataFrame.writeStream
함수 인수 설명
format("arg") arg에 parquet, csv, txt, json, jdbc, orc, avro 등 기본값은 parquet 혹은 spark.sql.source.default에 지정된 항목
option() ("mode", {append, overwrite, ignore, error or errorifexists}) ("mode", {SaveMode.Overwrite, SaveMode.Append, SaveMode.Ignore,SaveMode.ErrorIfExists})
("path", "path_to_write_to")
기본 모드 옵션은 error 또는 errorifexists와 SaveMode이며, ErrorifExists는 데이터가 이미 있는 경우 런타임에서 예외 발생시킨다.
bucketBy() (numBuckets, col, col..., coln) 버킷 개수 및 버킷 기준 칼럼 이름이며, 하이브의 버킷팅 체계를 사용한다.
save() "/path/to/data/source" 데이터 소스 경로, option("path", "---")에 지정된 경우 비우기 가능
saveAsTable() "table_name" 저장할 테이블

 

댓글