[Airflow] Airflow Xcom

Xcom이란?

Airflow의 태스크(Operator)들간에 데이터를 주고 받기 위한 방식

DAG내에서 한 Task의 리턴값을 다른 Task가 사용할 수 있게 전달하는 형태로 사용됩니다. 

이 값들은 Airflow 메타 데이터 DB에 저장 되기 때문에 큰 데이터를 주고 받는데는 좋지 않습니다. 

데이터가 큰 경우에는 S3등에 로드하고 그 위치를 넘기는 형태로 사용합니다. 

 

Xcom은 key-value 형태로 저장하고, Xcom_push, Xcom_pull, Xcom_pull 과 같은 기능을 제공합니다. 

Xcom_push(key="식별자", value="전달하려는데이터")
Xcom_pop(key="식별자", task_ids="태스크명")
Xcom_pull(key="식별자", task_ids="태스크명")

 

Xcom_pull과 Xcom_pop의 차이는 Xcom_pull은 값을 꺼내오는데 사용하고, Xcom_pop은 값을 가져오고 동시에 삭제합니다. 

def transform(**context):
    logging.info("Transform started")    
    text = context["task_instance"].xcom_pull(key="return_value", task_ids="extract")
    lines = text.strip().split("\n")[1:] 
    records = []
    for l in lines:
      (name, gender) = l.split(",")
      records.append([name, gender])
    logging.info("Transform ended")
    return records
extract = PythonOperator(
    task_id = 'extract',
    python_callable = extract,
    params = {
        'url':  Variable.get("csv_url")
    },
    dag = dag)

transform = PythonOperator(
    task_id = 'transform',
    python_callable = transform,
    params = { 
    },  
    dag = dag)

load = PythonOperator(
    task_id = 'load',
    python_callable = load,
    params = {
        'schema': 'myschema',
        'table': 'name_gender'
    },
    dag = dag)

extract >> transform >> load