[2장] 아파치 스파크 시작
아파치 스파크 다운로드
pip isntall pyspark
를 통해서 pyspark를 설치해준 상태에서 로컬로 실시했다.
파이스파크 셸 사용
pyspark, spark-shell, spark-sql, sparkR의 네 가지 인터프리터들이 포함되어 있어 일회성 데이터 분석이 가능하며, 파이썬, 스칼라, R, SQL이나 bash 같은 유닉스 셸을 써봤으면 대화형 셸들이 익숙할 것이다.
이 셸들은 클러스터에 연결하고 분산 데이터를 스파크 워커 노드의 메모리에 로드할 수 있도록 확장 되어왔다.
로컬 머신에서 사용하기
ㅇ스파크 연산들은 작업으로 표현되는데, 작업들은 태스크라는 저수준 RDD 바이트 코드로 변환되며 실행을 위해 스파크의 이그제큐터들에 분산된다.
이후 빠져나가기 위해서 Ctrl-D를 누르면 된다. 스파크 셸을 쓰면 빠른 대화형 작업으로 빠르게 배우는 것뿐 아니라 빠르게 프로토타이핑 하기에도 아주 좋다.
위의 예제는 RDD를 쓰지 않고 상위 수준 정형화 API를 써서 텍스트 파일을 스파크 데이터 프레임에 읽어 들였다는 점에 주목하기.
스파크 애플리케이션 개념의 이해
애플리케이션
- API를 써서 스파크 위에서 돌아가는 사용자 프로그램, 드라이버 프로그램과 클러스터의 실행기로 이루어진다.
SparkSession
- 스파크 코어 기능들과 상호 작용할 수 있는 진입점을 제공하며 그 API로 프로그래밍을 할 수 있게 해주는 객체이다. 스파크 셸에서 스파크 드라이버는 기본적으로 SparkSession을 제공하지만 스파크 애플리케이션에서 사용자가 SparkSession 객체를 생성해서 써야한다.
잡(job)
스파크 액션(ex: save(), collect())에 대한 응답으로 생성되는 여러 태스크로 이루어진 병렬 연산
스테이지(stage)
각 잡은 스테이지라 불리는 서로 의존성을 가지는 다수의 태스크 모음으로 나뉨
태스크(task)
스파크 이그제큐터로 보내지는 작업 실행의 가장 기본적인 단위
스파크 애플리케이션과 SparkSession
모든 스파크 애플리케이션의 핵심에는 스파크 드라이버 프로그램이 있고, 드라이버는 SparkSession 객체를 만든다. 스파크 셸을 써서 작업시 드라이버는 셸에 포함되어 있는 형태이고 SparkSession 객체가 미리 만들어진다.
스파크 셸을 로컬 실행했기에 모든 연산 또한 단일 JVM에서 로컬 실행된다. 하지만 마찬가지로 쉽게 분석을 위해서 클러스터에서도 스파크 셸을 실행할 수 있다.
spark-shell --help나 pyspark --help 명령 타이핑시 스파크 클러스터 매니저에 어떻게 연결해야 할지 보여준다.
SparkSession 객체를 만들었으면 그를 통해 스파크 연산을 수행하는 API를 써서 프로그래밍이 가능하다.
스파크 잡(job)
스파크 셸로 상호 작용하는 작업 동안, 드라이버는 스파크 애플리케이션을 하나 이상의 스파크 잡으로 변환하며, 이 잡들은 DAG로 변환한다. 본질적으로 이게 스파크의 실행 계획이 되고, DAG 그래프에서 각각의 노드는 하나 이상의 스파크 스테이지에 해당한다.
스파크 스테이지(Stage)
어떤 작업이 연속적으로 또는 병렬적으로 수행되는지에 맞춰 스테이지에 해당하는 DAG 노드가 생성된다. 모든 스파크 연산이 하나의 스테이지 안에서 실행될 수는 없으므로 여러 스테이지로 나뉘어야 한다. 종종 스파크 이그제큐터끼리의 데이터 전송이 이루어지는 연산 범위 경계 위에서 스테이지가 결정되기도 한다.
스파크 태스크
각 스테이지는 최소 실행 단위이며 스파크 이그제큐터들 위에서 연합 실행되는 스파크 태스크들로 이루어진다. 각 태스크는 개별 CPU 코어에 할당되고 데이터의 개별 파티션을 갖고 작업한다.
16코어 이그제큐터라면 16개 이상의 파티션을 갖는 16개 이상의 태스크를 할당받아 작업하게 되며 병렬 처리가 이뤄진다.
트랜스포메이션, 액션, 지연 평가
분산 데이터의 스파크 연산은 트랜스포메이션과 액션으로 구분되며, 트랜스포메이션은 이미 불변성의 특징을 가진 원본 데이터를 수정하지 않고 하나의 스파크 데이터 프레임을 새로운 데이터 프레임으로 변형한다.
즉, select()나 filter() 같은 연산은 원본 데이터 프레임을 수정하지 않고, 새로운 데이터 프레임으로 연산 결과를 만들어 낸다.
모든 트랜스포메이션은 뒤늦게 평가되는데 결과가 즉시 계산되는 게 아니라 계보(lineage)라 불리는 형태로 기록되기 때문이다. 기록된 계보는 실행 계획에서 후반쯤에 스파크가 확실한 트랜스포메이션들끼리 재배열하거나 합치거나 해서 더 효율적으로 실행할 수 있도록 최적화하도록 한다. 지연 평가는 액션이 실행되는 시점이나 데이터에 실제 접근하는 시점(디스크에서 읽거나 쓰는 시점)까지 실제 실행을 미루는 스파크의 전략이다.
하나의 액션은 모든 기록된 트랜스포메이션의 지연 연산을 발동시킨다. 지연 평가는 스파크가 사용자의 연계된 트랜스포메이션들을 살펴봄으로써 쿼리 최적화를 가능하게 하는 반면, 리니지와 데이터 불변성은 장애에 대한 데이터 내구성을 제공한다. 스파크는 계보에 각 트랜스포메이션을 기록해 놓고 데이터 프레임들은 트랜스포메이션을 거치는 동안 변하지 않기 때문에 단순히 기록된 계보를 재실행하는 것만으로도 원래 상태를 다시 만들어 낼 수 있으며 이 덕분에 장애 상황에도 유연성을 확보할 수 있다.
트랜스포메이션 | 액션 |
orderBy() | show() |
groupBy() | take() |
filter() | count() |
select() | collect() |
join() | save() |
좁은/넓은 트랜스포메이션
지연 연산 개념의 큰 이득은 스파크가 연산 쿼리를 분석하고 어디를 최적화할지 알 수 있다는 점이며, 최적화는 조인이나 파이프라이닝이 될 수도 있고 연산들을 한 스테이지로 합치거나 반대로 어떤 연산이 셔플이나 클러스터 데이터 교환이 필요한지 파악해 나누거나 하는 식으로 이루어질 수 있다.
트랜스포메이션은 좁은 의존성과 넓은 의존성으로 분류가 된다. 하나의 입력 파티션을 연산하여 하나의 결과 파티션을 내놓는 트랜스 포메이션은 좁은 트랜스포메이션이고, filter()와 contains()는 좁은 트랜스포메이션이고, groupBy()나 orderBy()는 넓은 트랜스포메이션이다.
첫 번째 단독 애플리케이션
쿠키 몬스터를 위한 M&M 세기
from __future__ import print_function
import sys
from pyspark.sql import SparkSession
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: mnmcount <file>", file=sys.stderr)
sys.exit(-1)
# SparkSession API를 써서 SparkSession 객체를 만든다.
# 객체가 존재하지 않으면 생성한다
# JVM마다 SparkSession은 하나만 존재할 수 있다.
spark = (SparkSession
.builder
.appName("PythonMnMCount")
.getOrCreate())
# M&M 데이터가 들어있는 파일 이름을 가져온다
mnm_file = sys.argv[1]
# 스키마랑 header를 지정해주고
# 파일을 csv 포맷으로 읽고 데이터 프레임으로 저장한다.
mnm_df = (spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load(mnm_file))
mnm_df.show(n=5, truncate=False)
# 각 주별, 색별로 그룹화하고, 그룹화 된 주별, 색별 집계를 한다.
# orderBy로 역순으로 본다
count_mnm_df = (mnm_df.select("State", "Color", "Count")
.groupBy("State", "Color")
.sum("Count")
.orderBy("sum(Count)", ascending=False))
# 모든 주와 색별 결과를 보여준다.
count_mnm_df.show(n=60, truncate=False)
print("Total Rows = %d" % (count_mnm_df.count()))
# 필터에 의한 캘리포니아 집계 결과 찾기
ca_count_mnm_df = (mnm_df.select("*")
.where(mnm_df.State == 'CA')
.groupBy("State", "Color")
.sum("Count")
.orderBy("sum(Count)", ascending=False))
# 캘리포니아를 기준으로 집계한 결과를 보여준다.
ca_count_mnm_df.show(n=10, truncate=False)
# spark 세션을 멈춘다
spark.stop()
여기서 INFO 메시지가 많이 출력되는데 없애고 싶으면, log4j.properties를 카피해두고, log4j.rootCategory=WARN을 지정해주면 된다고 한다.