[Spark] Apache Spark 파티션과 병렬처리 구조(셔플링)

Spark 데이터 시스템 아키텍처 구성

  • Spark은 별도의 파일 시스템을 가지고 있지 않음 ⇨ 분산 파일 시스템 필요
    • HDFS, AWS S3, Azure Blob, GCP Cloud Storage (내부 데이터)
  • Resource Manager(YARN, Kubernetes)
  • 그 위에 Spark이 올라감
    • 이외에도 Spark SQL, Spark Streaming, Spark GraphX, Spark ML과 같은 다양한 패키지가 있음
  • 큰 데이터를 ETL이나 Adhoc형태로 인터렉티브하게 쿼리를 날리기 위해서는 Hive나 Presto를 사용하면 되지만 하나의 시스템(Spark)로 다양한 기능이 제공되기 떄문에 대부분 Spark 사용
  • 외부 데이터(관계형 데이터베이스, NoSql 과 같은 프로덕션 DB)의 경우 주기적인 ETL을 통해서 내부데이터(HDFS로 로딩)와 같이 사용하는데 이 경우에 스케줄링을 위해 Airflow를 사용합니다. 
  • 또는 필요할 때마다 Spark에서 바로 처리(로딩)할 수 있습니다
    • Spark Streaming이나 배치로 읽어서 Spark SQL로 처리 
  • 최종적인 결과물을 외부로 다시 writing할 때 외부 데이터 혹은 내부 데이터에서 사용될 수 있음

대용량 분산처리 시스템의 병렬처리

(장점) Data partitioning은 데이터 처리에 병렬성 줌 ↔️ (단점)데이터가 균등하게 분포하지 않는 경우(주로 데이터 셔플링 후) Data Skewness

셔플링을 최소화하고 파티션 최적화를 하는 것이 중요

  • 하둡 map의 경우 HDFS 데이터 블록(128MB)이라는 단위로 나누어서 사용
    • hdfs-site.xml에 있는 dfs.block.size에서 조정
  • Spark에서는 이를 파티션(partition)이라고 부름. 파티션의 기본 크기도 128MB
    • spark.sql.files.maxPartitionBytes에서 조정 가능: HDFS등에 있는 파일을 읽어올 때만 적용
    • HDFS와 단위를 맞추는게 좋음
  • 데이터 유실을 막기 위해서 replication factor의 수만큼 동일한 데이터 블록이 다수의 서버에 복제
  • 맵리듀스에서 N개의 데이터 블록으로 구성된 파일 처리 시 N개의 Map 태스크가 실행
  • Spark에서는 파티션 단위로 메모리로 로드되어 Executor가 배정됨
    • Executor에서 여러 개의 파티션을 처리할 수 있음

Spark의 데이터 처리 흐름

  • 처리데이터가 파티션으로 나눠지고 각 파티션이 스파크에 의해서 Executor상에서 태스크로 실행
    • 파티션의 수를 Executor수 x Executor당 CPU의 수로 지정하면 병렬성을 최대화
    • 파티션은 데이터가 physical하게 나누어져 있고 RDD, DataFrame/Dataset 형태

예를 들어, 

  • 처리하려는 데이터 파일이 4개의 데이터 블락으로 나누어져 있으면 4개의 파티셔닝이 된다
    • 파티셔닝 방법은 데이터 소스에 따라 달라짐(JDBC 소스는 기본으로 하나의 파티션만 만듦)
  • Spark Cluster에 2개의 Executor가 있고, 각 Executor안에 cpu코어가 1개씩 있다면, 이 Spark Cluster의 동시에 실행가능한 최고 태스크의 수? 
    • 2개
  • 4개의 파티션이 있지만 병렬처리 할 수 있는건 2개씩

Spark shuffling 셔플링 발생 이유

이론적으로 데이터 프레임이 파티션의 집합으로 구성되어 있고 각 파티션은 하나의 executor안에 태스크가 처리하면서 병렬성 유지 속도가 매우 빠를 것 같지만

파티션 간의 데이터 이동 없이 진행되지 않기 때문에 파티션간의 이동 필요(셔플링)

피티션은 네트워크를 통해 다른 서버로 이동(group by, Join)하면서 새로운 파티션이 만들어지는데 이 경우 데이터가 일치하지 않을 수 있다(데이터 불균형, Data Skew)

 

셔플링 발생하는 이유?

  • 개발자가 명시적으로 파티션을 새롭게 하는 경우
  • 시스템에 의해 이루어지는 셔플링
    • 기존 파티션으로는 데이터 작업이 안되는 경우, groupby와 같은 aggregation이나 sorting
  • 셔플링이 발생할 때 네트워크를 타고 데이터가 이동
  • 셔플링이 발생 했을 때 새로 생기는 데이터 프레임은 몇개의 파티션을 가질까?
    • spark.sql.shuffle.partitions이 결정
    • 기본값은 200이며 최대 파티션의 수
    • 오퍼레이션에 따라 파티션 수가 결정
    • 파티셔닝 방법 random, hashing partiton, range partition 등
  • Data Skew 발생 가능 🥲