[Spark] Spark 데이터 프레임 연산 Transformations, Actions, Job

spark.read.option("header", True).csv("test.csv").where("gender <> 'F'").select("name", "gender").groupby("gender").count().show()

 

다음 DataFrame의 조작 코드의 연산을 아래와 같이 구분해 볼 수 있습니다.

 

WHERETransformation, Narrow Dependency

SELECTTransformation, Narrow Dependency

GROUPBY  Transformation, Wide Dependency (셔플링 발생)

COUNT  Transformation, Narrow Dependency

SHOWAction

 

Jobs, Stages, Tasks

Action ▶️ Job ▶️ Stage ▶️ Transformation ▶️ Task
  • ActionJob을 하나 만들어 내고 코드가 실제로 실행됨
  • 하나의 Job은 하나 혹은 그 이상의 Stage로 구성이 되고 한 Stage는 하나 혹은 그 이상의 Transformation으로 구성
  • 한 Stage는 셔플링 없이 독립적으로 병렬적으로 가능한 연산 작업으로 shuffling이 발생하는 경우 새로 생김
  • 하나의 스테이지는 DAG의 형태로 구성된 다수의 Task 존재, 각 Task는 하나의 Narrow Dependency 작업(파티션 수만큼 병렬 실행)을 수행
  • 태스크는 가장 작은 실행 유닛으로 Executor에 의해 실행

Transformations and Actions

Transformations

  • Narrow Dependencies
    • 독립적인 Partition level 작업
    • select, filter, map 등등
  • Wide Dependencies
    • Shuffling이 필요한 작업
    • groupby, reduceby, partitionby, repartition, coalesce 등등

Actions

  • Read, Write, Show, Collect ➡️ Job을 실행시킴 (실제 코드가 실행됨)
  • Lazy Execution
    • 더 많은 오퍼레이션을 볼 수 있기에 최적화를 더 잘할 수 있음, 최적화 관점에서 SQL이 선호됨

연습문제1

spark = SparkSession.builder.master("local[3]").appName("SparkSchemaDemo").config("spark.sql.adaptive.enabled", False).config("spark.sql.shuffle.partitions", 3).getOrCreate()

df = spark.read.text("shakespeare.txt")
df_count = df.select(explode(split(df.value, " ")).alias("word")).groupBy("word").count()

df_count.show()
  • spark.sql.adaptive.enabled
  • spark.sql.shuffle.partitions

이 코드는 몇 개의 Job을 만들어 낼까?

1개의 Job. spark.read.text("shakespeare.txt")에서 header=True 옵션이 맨 처음 예시와는 달리 없기 때문에 그대로 불러오는 거라(스키마 추론 등 작업 필요없음) Action이 아니고 df_count.show()를 불렀을 때 하나의 Job이 생긴다. groupby 기준으로 2개의 stage를 만들어낸다.

show()가 없다면 이 코드는 몇 개의 Job을 만들어낼까?

0개의 Job을 생성한다. Laze Execution이 선호되는 이유.

연습문제2

spark = SparkSession.builder.master("local[3]").appName("SparkSchemaDemo").config("spark.sql.adaptive.enabled", False).config("spark.sql.shuffle.partitions", 3).getOrCreate()

df_large = spark.read.json("large_data/")
df_small = spark.read.json("small_data/")

join_expr = df_large.id == df_samll.id
join_df = df_large.join(df_small, join_expr, "inner")

join_df.show()

이 코드는 몇 개의 Job을 만들어 낼까?

3개의 Job이 만들어진다. 1) spark.read.json("large_data/"), 2) spark.read.json("small_data/") json 파일 읽어 올 때(스키마 추론 작업 필요), 3) join_df.show() 연산 실행될때 (shuffle 해싱 조인 사용)

df_small이 충분히 작다면?

두번째 프레임(df_small) 이 너무 작으면 오버헤드 발생

shuffle 해싱 조인 대신 브로드캐스팅 조인을 사용하기

 

방법1

join_df = df_large.join(broadcast(df_small), join_expr, "inner") # 명시적으로 broadcast 함수 호출

방법2

spark.sql.adaptive.autoBroadcastJoinThreshold # 옵션 세팅, spark가 자동으로 최적화