spark.read.option("header", True).csv("test.csv").where("gender <> 'F'").select("name", "gender").groupby("gender").count().show()
다음 DataFrame의 조작 코드의 연산을 아래와 같이 구분해 볼 수 있습니다.
WHERE ← Transformation, Narrow Dependency
SELECT ← Transformation, Narrow Dependency
GROUPBY ← Transformation, Wide Dependency (셔플링 발생)
COUNT ← Transformation, Narrow Dependency
SHOW ← Action
Jobs, Stages, Tasks
Action ▶️ Job ▶️ Stage ▶️ Transformation ▶️ Task
- Action은 Job을 하나 만들어 내고 코드가 실제로 실행됨
- 하나의 Job은 하나 혹은 그 이상의 Stage로 구성이 되고 한 Stage는 하나 혹은 그 이상의 Transformation으로 구성
- 한 Stage는 셔플링 없이 독립적으로 병렬적으로 가능한 연산 작업으로 shuffling이 발생하는 경우 새로 생김
- 하나의 스테이지는 DAG의 형태로 구성된 다수의 Task 존재, 각 Task는 하나의 Narrow Dependency 작업(파티션 수만큼 병렬 실행)을 수행
- 태스크는 가장 작은 실행 유닛으로 Executor에 의해 실행
Transformations and Actions
Transformations
- Narrow Dependencies
- 독립적인 Partition level 작업
- select, filter, map 등등
- Wide Dependencies
- Shuffling이 필요한 작업
- groupby, reduceby, partitionby, repartition, coalesce 등등
Actions
- Read, Write, Show, Collect ➡️ Job을 실행시킴 (실제 코드가 실행됨)
- Lazy Execution
- 더 많은 오퍼레이션을 볼 수 있기에 최적화를 더 잘할 수 있음, 최적화 관점에서 SQL이 선호됨
연습문제1
spark = SparkSession.builder.master("local[3]").appName("SparkSchemaDemo").config("spark.sql.adaptive.enabled", False).config("spark.sql.shuffle.partitions", 3).getOrCreate()
df = spark.read.text("shakespeare.txt")
df_count = df.select(explode(split(df.value, " ")).alias("word")).groupBy("word").count()
df_count.show()
- spark.sql.adaptive.enabled
- spark.sql.shuffle.partitions
이 코드는 몇 개의 Job을 만들어 낼까?
1개의 Job. spark.read.text("shakespeare.txt")에서 header=True 옵션이 맨 처음 예시와는 달리 없기 때문에 그대로 불러오는 거라(스키마 추론 등 작업 필요없음) Action이 아니고 df_count.show()를 불렀을 때 하나의 Job이 생긴다. groupby 기준으로 2개의 stage를 만들어낸다.
show()가 없다면 이 코드는 몇 개의 Job을 만들어낼까?
0개의 Job을 생성한다. Laze Execution이 선호되는 이유.
연습문제2
spark = SparkSession.builder.master("local[3]").appName("SparkSchemaDemo").config("spark.sql.adaptive.enabled", False).config("spark.sql.shuffle.partitions", 3).getOrCreate()
df_large = spark.read.json("large_data/")
df_small = spark.read.json("small_data/")
join_expr = df_large.id == df_samll.id
join_df = df_large.join(df_small, join_expr, "inner")
join_df.show()
이 코드는 몇 개의 Job을 만들어 낼까?
3개의 Job이 만들어진다. 1) spark.read.json("large_data/"), 2) spark.read.json("small_data/") json 파일 읽어 올 때(스키마 추론 작업 필요), 3) join_df.show() 연산 실행될때 (shuffle 해싱 조인 사용)
df_small이 충분히 작다면?
두번째 프레임(df_small) 이 너무 작으면 오버헤드 발생
shuffle 해싱 조인 대신 브로드캐스팅 조인을 사용하기
방법1
join_df = df_large.join(broadcast(df_small), join_expr, "inner") # 명시적으로 broadcast 함수 호출
방법2
spark.sql.adaptive.autoBroadcastJoinThreshold # 옵션 세팅, spark가 자동으로 최적화
'데브코스 데이터엔지니어링' 카테고리의 다른 글
| [Hadoop] 하둡(Hapoop) 이란? (0) | 2025.01.02 |
|---|---|
| [Spark] AWS EMR 클라우드 기반 Spark 클러스터 실행 (0) | 2025.01.01 |
| [Spark] PySpark에서 Spark 세션 생성 (1) | 2024.12.31 |
| [Spark] YARN 기반 Spark Cluster 프로그램의 구조 (1) | 2024.12.31 |
| [Spark] 아파치 스파크 3.0이란? (0) | 2024.12.31 |