ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [1장] 아파치 스파크 소개
    데이터엔지니어링/Spark 2023. 6. 28. 03:46

    아파치 스파크란!!??

    데이터 센터나 클라우드에서 대규모 분산 데이터 처리를 하기 위해 설계된 통합형 엔진!

     

    스파크는 중간 연산을 위해 메모리 저장소를 지원하여 하둡 맵리듀스보다 훨씬 빠르게 동작할 수 있다.

    스파크는 머신러닝(MLlib), 대화형 질의를 위한 SQL(스파크 SQL), 실시간 데이터 처리를 위한 스트리밍 처리(스파크 스트리밍), 그래프 처리(GraphX) 등을 위해 쉽게 사용 가능한 API들로 이루어진 라이브러리를 갖고 있다.

     

    스파크의 네 가지 핵심 특성

     

    - 속도 -

     

    스파크는 질의 연산을 방향성 비순환 그래프(DAG)로 구성되며, 이 DAG의 스케줄러와 질의 최적화 모듈은 효율적인 연산 그래프를 만들어서 각각의 태스크로 분해하여 클러스터의 워커 노드 위에서 병렬 수행될 수 있도록 해 준다.

    그리고 세 번째로, 물리적 실행 엔진인 텅스텐은 전체적 코드 생성이라는 기법을 써서 실행을 위한 간결한 코드를 생성해 낸다.

     

    스파크의 모든 중간 결과는 메모리에 유지되며, 디스크 I/O를 제한적으로 사용하기 때문에 성능이 향상된다.

     

    - 사용 편리성 -

     

    데이터 프레임이나 데이터세트 같은 고수준 데이터 추상화 계층 아래에 유연한 분산 데이터세트(RDD)라 불리는 핵심적이면서도 단순한 논리 자료구조를 구축하여 단순성을 실현하였다.

    연산(Operation)의 종류로서 트랜스포메이션(Transformation)액션(Action)의 집합과 단순한 프로그래밍 모델을 제공함으로써 사용자들이 각자 편한 언어로 빅데이터 애플리케이션을 만들 수 있도록 하였다.

     

    - 모듈성 -

     

    스파크 연산은 다양한 타입의 워크로드에 적용 가능하며, 지원하는 모든 프로그래밍 언어(자바, 스칼라, 파이썬, SQL, R)로 표현할 수 있다. 스파크는 API들로 이루어진 통합 라이브러리를 제공한다. 

     

    - 확장성 -

     

    스파크는 저장보다는 빠른 병렬 연산 엔진에 초점이 맞추어져 있으며, 저장과 연산을 모두 포함하는 아파치 하둡과는 달리 스파크는 이 둘을 분리했다.

     

    스파크가 수많은 데이터 소스에서 데이터를 읽어 들여 메모리에서 처리 가능하다는 의미이다.


    아파치 컴포넌트

    스파크 SQL

     

    RDBMS 테이블이나 구조화된 데이터의 파일 포맷에서 데이터를 읽어 들일 수 있으며, 그  데이터로 스파크에서 영구적이거나 임시적인 테이블을 만들 수 있다. 스파크 정형화 API를 사용해 SQL 계통의 질의를 써서 데이터를 바로 데이터 프레임으로 읽을 수 있다.

    #스칼라 예제
    #아마존 S3 버킷에서 데이터를 읽어 들여 스파크 데이터 프레임으로 만든다
    
    spark.read.json("s3://apache_spark/data/committers.json").createOrReplaceTempView("committers")
    
    #SQL 질의 실행 후 결과를 스파크 데이터 프레임으로 받음
    val results = spark.sql("""SELECT name, org, module, release, num_commits
    	FROM committers WHERE module = 'mllib' AND num_commits > 10
        	ORDER BY num_commits DESC""")

     

    스파크 MLlib

     

    이 API는 특성들을 추출하고 변형하고 파이프라인을 구축하고 배포하는 동안 모델을 보존해 준다. 그 외의 추가적인 도구들은 일반적인 선형대수 연산을 사용하게 해 준다. MLlib은 경사 하강법 최적화를 포함한 다른 저수준 ML 기능을 포함한다.

    #파이썬 예제
    from pyspark.ml.classification import LogisticRegression
    ...
    training = spark.read.csv("s3://...")
    test = spark.read.csv("s3://...")
    
    #훈련 데이터 로드
    lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
    
    #모델 적합화(fit)
    lrModel = lr.fit(training)
    
    #예측
    lrModel.transform(test)
    ...

     

    스파크 정형화 스트리밍

     

    정형화 스트리밍 모델의 하부에는 스파크 SQL 엔진이 장애 복구와 지연 데이터의 모든 측면을 관리하면서 개발자들에게는 상대적으로 쉽게 스트리밍 애플리케이션을 작성하게 해 준다. DStream 모델을 없애고, 아파치 카프카, 키네시스, HDFS 기반 저장소나 클라우드 저장소 등으로 스트리밍 데이터 범위도 확장했다.

    # 파이썬 예제
    # 로컬 호스트에서 스트림을 읽어 들인다.
    from pyspark.sql.functions import explode, split
    lines = (spark
    	.readStream
        .format("socket")
        .option("host", "localhost")
        .option("port", 9999)
        .load())
    
    # 트랜스포메이션 수행
    # 라인별로 읽어 단어별로 나눈다.
    words = lines.select(explode(split(lines.value, " ")).alias("word"))
    
    # 단어 세기를 수행한다.
    word_counts = words.groupBy("word").count()
    
    # 결과 스트림을 카프카에 쓴다.
    query = (word_counts
    	.writeStream
        .format("kafka")
        .option("topic", "output"))​

     

    GraphX

     

    GraphX는 그래프를 조작하고 그래프 병렬 연산을 수행하기 위한 라이브러리다. 분석, 연결 탐색 등의 표준적인 그래프 알고리즘을 제공하며 커뮤니티의 사용자들이 기여한 페이지랭크, 연결 컴포넌트, 삼각 집게 등의 알고리즘도 포함하고 있다.

    # 스칼라 예제
    val graph = Graph(vertices, edges)
    messages = spark.textfile("hdfs://...")
    val graph2 =graph.joinVertices(messages) {(id, vertex, msg) => ...}

    아파치 스파크의 분산 실행

    스파크 드라이버

     

    SparkSession 객체를 초기화하는 책임을 가진 스파크 애플리케이션의 일부로서, 스파크 드라이버는 여러 가지 역할을 한다. 클러스터 매니저와 통신하며 스파크 이그제큐터들을 위해 필요한 자원을 요청하고(CPU, 메모리 등) 모든 스파크 작업을 DAG 연산 형태로 변환하고 스케줄링하며 각 실행 단위를 태스크로 나누어 스파크 이그제큐터들에게 분배해준다. 자원이 일단 할당된다면 그다음부터 드라이버는 이그제큐터와 직접 통신한다.

    SparkSession

     

    SparkSession은 모든 스파크 연산과 데이터에 대한 통합 연결 채널이 되었다. SparkContext, SQLContext, HiveContext, SparkConf, StreamingContext 등이 통합되어있고, 스파크 작업을 훨씬 간단하고 쉽게 만들어준다.

     

    # 스칼라 예제
    import org.apache.spark.sql.SparkSession
    
    # SparkSession 생성
    val spark = SparkSession
    	.builder
        .appName("LearnSpark")
        .config("spark.sql.shuffle.partitions", 6)
        .getOrCreate()
    ...
    
    # JSON 읽기에 스파크 세션 사용
    val people = spark.read.json("...")
    ...
    # SQL 실행에 스파크 세션 사용
    val resultsDF = spark.sql("SELECT city, pop, state, zip FROM table_name")

     

    클러스터 매니저

     

    클러스터 매니저는 스파크 애플리케이션이 실행되는 클러스터에서 자원을 관리 및 할당하는 책임을 지닌다. 현재 스파크는 네 종류의 클러스터 매니저를 지원하는데 이는 내장 단독 클러스터 매니저, 아파치 하둡 얀, 아파치 메소스, 쿠버네티스다.

     

    스파크 이그제큐터

     

    스파크 이그제큐터는 클러스터의 각 워커 노드에서 동작한다. 이그제큐터는 드라이버 프로그램과 통신하며 워커에서 태스크를 실행하는 역할을 하며, 대부분의 배포 모드에서 노드당 하나의 이그제큐터만 실행된다.

     

    배포 모드

     

    모드 스파크 드라이버 스파크 이그제큐터 클러스터 매니저
    로컬 랩톱이나 단일 서버 같은 머신에서 단일 JVM 위에서 실행 드라이버와 동일한 JVM 위에서 동작 동일한 호스트에서 실행
    단독 클러스터의 아무 노드에서나 실행 가능 클러스터의 각 노드가 자체적인 이그제큐터 JVM을 실행 클러스터의 아무 호스트에나 할당 가능
    얀(클라이언트) 클러스터 외부의 클라이언트에서 동작 얀의 노드매니저의 컨테이너 얀의 리소스 매니저가 얀의 애플리케이션 마스터와 연계하여 노드 매니저에 이그제큐터를 위한 컨테이너들을 할당
    얀(클러스터) 얀 애플리케이션 마스터에서 동작 얀(클러스터) 모드와 동일 얀(클러스터) 모드와 동일
    쿠버네티스 쿠버네티스 파드에서 동작 각 워커가 자신의 파드 내에서 실행 쿠버네티스 마스터

     

    분산 데이터와 파티션

     

    실제 물리적인 데이터는 HDFS나 클라우드 저장소에 존재하는 파티션이 되어 저장소 전체에 분산된다. 데이터가 파티션으로 되어 물리적으로 분산되면서, 스파크는 각 파티션을 고수준에서 논리적인 데이터 추상화, 즉 메모리의 데이터 프레임 객체로 바라본다. 스파크 이그제큐터는 가급적이면 데이터 지역성을 고려해 네트워크에서 가장 가까운 파티션을 읽어 들이도록 태스크를 할당한다.

     

    파티셔닝은 효과적인 병렬 처리를 가능하게 해 준다. 데이터를 조각내 청크나 파티션으로 분산해 저장하는 방식은 스파크 이그제큐터가 네트워크 사용을 최소화하며 가까이 있는 데이터만 처리할 수 있도록 한다.

     

    # 파이썬 예제
    # 클러스터에 나뉘어서 저장된 물리적 데이터들을 8개의 파티션으로 나누고, 
    # 각 이그제큐터는 하나 이상의 파티션을 메모리로 읽어 들이게 되는 코드
    log_df = spark.read.text("path_to_large_text_file").repartition(8)
    print(log_df.rdd.getNumPartitions())
    
    # 파이썬 예제
    # 1만 개의 정수로 구성된 데이터 프레임을 만들어서 8개의 파티션으로 메모리에 분산
    df = spark.range(0, 10000, 1, 8)
    print(df.rdd.getNumPartitions())
    
    # 둘다 8을 출력한다

     

Designed by Tistory.