본문 바로가기
Apache/Spark

[4장] 스파크 SQL과 데이터 프레임: 내장 데이터 소스 소개 [1]

by 잼있는잼니 2023. 8. 1.

스파크 SQL의 특징

 

  • 3장에서 살펴본 상위 수준의 정형화 API가 엔진으로 제공된다.
  • 다양한 정형 데이터를 읽거나 쓸 수 있다.(JSON, 하이브 테이블, Parquet, Avro, ORC, CSV)
  • 태블로, 파워BI, 탈렌드와 같은 외부 비즈니스 인텔리전스의 데이터 소스나 MySQL 및 PostgreSQL과 같은 RDBMS의 데이터를 JDBC/ODBC 커넥터를 사용하여 쿼리할 수 있다.
  • 스파크 애플리케이션에서 데이터베이스 안에 테이블 또는 뷰로 저장되어 있는 정형 데이터와 소통할 수 있도록 프로그래밍 인터페이스를 제공한다.
  • SQL 쿼리를 정형 데이터에 대해 실행할 수 있는 대화형 셸을 제공한다.
  • ANSI SQL:2003 호환 명령 및 HiveQL을 지원한다.

 

스파크 애플리케이션에서 스파크 SQL 사용하기

SQL 쿼리를 실행하기 위해 spark라고 선언된 SparkSession 인스턴스에서 spark.sql("SELECT * FROM myTableName")과 같은 sql() 함수를 사용한다.

기본 쿼리 예제

# 파이썬 예제

from pyspark.sql import SparkSession

# SparkSession 생성

spark = (SparkSession
    .builder
    .appName("SparkSQLExampleApp")
    .getOrCreate())
    
# 데이터세트 경로
csv_file = "데이터세트 경로"

# 읽고 임시뷰를 생성
# 스키마 추론

df = (spark.read.format("csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .load(csv_file))

df.createOrReplaceTempView("us_delay_flights_tbl")

위 코드를 작성후 쿼리문을 적용하면 다음과 같이 나온다.

아래는 1000마일 이상인 모든 항공편이다.

spark.sql("""
SELECT distance, origin, destination 
FROM us_delay_flights_tbl 
WHERE distance > 1000 
ORDER BY distance DESC
""").show(10)

아래는 샌프란시스코(SFO)와 시카고(ORD)간 2시간 이상 지연이 있던 모든 항공편이다.

spark.sql("""
SELECT date, delay, origin, destination 
FROM us_delay_flights_tbl 
WHERE delay > 120 AND ORIGIN = 'SFO' AND DESTINATION = 'ORD' 
ORDER by delay DESC
""").show(10)

 

아래는 출발지와 목적지에 관계없이 모든 미국 항공편에 매우 긴 지연(>6시간), 긴 지연(2~6시간) 등의 지연에 대한 표시를 레이블로 지정하기 위해 CASE 절을 사용하였다.

spark.sql("""SELECT delay, origin, destination,
              CASE
                  WHEN delay > 360 THEN 'Very Long Delays'
                  WHEN delay > 120 AND delay < 360 THEN  'Long Delays '
                  WHEN delay > 60 AND delay < 120 THEN  'Short Delays'
                  WHEN delay > 0 and delay < 60  THEN   'Tolerable Delays'
                  WHEN delay = 0 THEN 'No Delays'
                  ELSE 'No Delays'
               END AS Flight_Delays
               FROM us_delay_flights_tbl
               ORDER BY origin, delay DESC""").show(10)

 

spark.sql 인터페이스를 이용하면 일반적인 데이터 분석 작업을 수행할 수 있다.


데이터 프레임 API를 사용하여 쿼리를 작성한 코드이다.

스파크 SQL 인터페이스를 사용해 데이터를 쿼리하는 것은 관계형 데이터베이스 테이블에 일반 SQL 쿼리를 작성하는 것과 유사하며, 데이터 프레임 API를 사용한 것과 결과는 유사하다.

df.select("distance", "origin", "destination")
  .where("distance > 1000")
  .orderBy("distance", ascending=False).show(10)


 

댓글