Spark 데이터 시스템 아키텍처 구성
- Spark은 별도의 파일 시스템을 가지고 있지 않음 ⇨ 분산 파일 시스템 필요
- HDFS, AWS S3, Azure Blob, GCP Cloud Storage (내부 데이터)
- Resource Manager(YARN, Kubernetes)
- 그 위에 Spark이 올라감
- 이외에도 Spark SQL, Spark Streaming, Spark GraphX, Spark ML과 같은 다양한 패키지가 있음
- 큰 데이터를 ETL이나 Adhoc형태로 인터렉티브하게 쿼리를 날리기 위해서는 Hive나 Presto를 사용하면 되지만 하나의 시스템(Spark)로 다양한 기능이 제공되기 떄문에 대부분 Spark 사용
- 외부 데이터(관계형 데이터베이스, NoSql 과 같은 프로덕션 DB)의 경우 주기적인 ETL을 통해서 내부데이터(HDFS로 로딩)와 같이 사용하는데 이 경우에 스케줄링을 위해 Airflow를 사용합니다.
- 또는 필요할 때마다 Spark에서 바로 처리(로딩)할 수 있습니다
- Spark Streaming이나 배치로 읽어서 Spark SQL로 처리
- 최종적인 결과물을 외부로 다시 writing할 때 외부 데이터 혹은 내부 데이터에서 사용될 수 있음
대용량 분산처리 시스템의 병렬처리
(장점) Data partitioning은 데이터 처리에 병렬성 줌 ↔️ (단점)데이터가 균등하게 분포하지 않는 경우(주로 데이터 셔플링 후) Data Skewness
셔플링을 최소화하고 파티션 최적화를 하는 것이 중요
- 하둡 map의 경우 HDFS 데이터 블록(128MB)이라는 단위로 나누어서 사용
- hdfs-site.xml에 있는 dfs.block.size에서 조정
- Spark에서는 이를 파티션(partition)이라고 부름. 파티션의 기본 크기도 128MB
- spark.sql.files.maxPartitionBytes에서 조정 가능: HDFS등에 있는 파일을 읽어올 때만 적용
- HDFS와 단위를 맞추는게 좋음
- 데이터 유실을 막기 위해서 replication factor의 수만큼 동일한 데이터 블록이 다수의 서버에 복제
- 맵리듀스에서 N개의 데이터 블록으로 구성된 파일 처리 시 N개의 Map 태스크가 실행
- Spark에서는 파티션 단위로 메모리로 로드되어 Executor가 배정됨
- 한 Executor에서 여러 개의 파티션을 처리할 수 있음
Spark의 데이터 처리 흐름
- 처리데이터가 파티션으로 나눠지고 각 파티션이 스파크에 의해서 Executor상에서 태스크로 실행
- 파티션의 수를 Executor수 x Executor당 CPU의 수로 지정하면 병렬성을 최대화
- 파티션은 데이터가 physical하게 나누어져 있고 RDD, DataFrame/Dataset 형태
예를 들어,
- 처리하려는 데이터 파일이 4개의 데이터 블락으로 나누어져 있으면 4개의 파티셔닝이 된다
- 파티셔닝 방법은 데이터 소스에 따라 달라짐(JDBC 소스는 기본으로 하나의 파티션만 만듦)
- Spark Cluster에 2개의 Executor가 있고, 각 Executor안에 cpu코어가 1개씩 있다면, 이 Spark Cluster의 동시에 실행가능한 최고 태스크의 수?
- 2개
- 4개의 파티션이 있지만 병렬처리 할 수 있는건 2개씩
Spark shuffling 셔플링 발생 이유
이론적으로 데이터 프레임이 파티션의 집합으로 구성되어 있고 각 파티션은 하나의 executor안에 태스크가 처리하면서 병렬성 유지 속도가 매우 빠를 것 같지만
파티션 간의 데이터 이동 없이 진행되지 않기 때문에 파티션간의 이동 필요(셔플링)
피티션은 네트워크를 통해 다른 서버로 이동(group by, Join)하면서 새로운 파티션이 만들어지는데 이 경우 데이터가 일치하지 않을 수 있다(데이터 불균형, Data Skew)
셔플링 발생하는 이유?
- 개발자가 명시적으로 파티션을 새롭게 하는 경우
- 시스템에 의해 이루어지는 셔플링
- 기존 파티션으로는 데이터 작업이 안되는 경우, groupby와 같은 aggregation이나 sorting
- 셔플링이 발생할 때 네트워크를 타고 데이터가 이동
- 셔플링이 발생 했을 때 새로 생기는 데이터 프레임은 몇개의 파티션을 가질까?
- spark.sql.shuffle.partitions이 결정
- 기본값은 200이며 최대 파티션의 수
- 오퍼레이션에 따라 파티션 수가 결정
- 파티셔닝 방법 random, hashing partiton, range partition 등
- Data Skew 발생 가능 🥲
'데브코스 데이터엔지니어링' 카테고리의 다른 글
[웹크롤링] 정적 웹크롤링 BeautifulSoup (0) | 2024.12.26 |
---|---|
[웹크롤링] 정적 웹 크롤링 requests 라이브러리 (1) | 2024.12.26 |
mysql 프로덕션DB= OLTP ->redshift DW= OLAP 로 복사하기 (0) | 2024.12.26 |
[til] 숙제 apple updatesymbol_v2 incremental update 방식바꾸기 (1) | 2024.12.26 |
Schedule cron tab 표현식 airflow (0) | 2024.12.26 |