Xcom이란?
Airflow의 태스크(Operator)들간에 데이터를 주고 받기 위한 방식
DAG내에서 한 Task의 리턴값을 다른 Task가 사용할 수 있게 전달하는 형태로 사용됩니다.
이 값들은 Airflow 메타 데이터 DB에 저장 되기 때문에 큰 데이터를 주고 받는데는 좋지 않습니다.
데이터가 큰 경우에는 S3등에 로드하고 그 위치를 넘기는 형태로 사용합니다.
Xcom은 key-value 형태로 저장하고, Xcom_push, Xcom_pull, Xcom_pull 과 같은 기능을 제공합니다.
Xcom_push(key="식별자", value="전달하려는데이터")
Xcom_pop(key="식별자", task_ids="태스크명")
Xcom_pull(key="식별자", task_ids="태스크명")
Xcom_pull과 Xcom_pop의 차이는 Xcom_pull은 값을 꺼내오는데 사용하고, Xcom_pop은 값을 가져오고 동시에 삭제합니다.
def transform(**context):
logging.info("Transform started")
text = context["task_instance"].xcom_pull(key="return_value", task_ids="extract")
lines = text.strip().split("\n")[1:]
records = []
for l in lines:
(name, gender) = l.split(",")
records.append([name, gender])
logging.info("Transform ended")
return records
extract = PythonOperator(
task_id = 'extract',
python_callable = extract,
params = {
'url': Variable.get("csv_url")
},
dag = dag)
transform = PythonOperator(
task_id = 'transform',
python_callable = transform,
params = {
},
dag = dag)
load = PythonOperator(
task_id = 'load',
python_callable = load,
params = {
'schema': 'myschema',
'table': 'name_gender'
},
dag = dag)
extract >> transform >> load
'데브코스 데이터엔지니어링' 카테고리의 다른 글
[Hadoop] 맵리듀스 프로그래밍 (0) | 2024.12.31 |
---|---|
[Airflow] PostgreSQL 데이터를 Redshift로 적재하는 Airflow DAG 코드 (2) | 2024.12.28 |
[Airflow] Airflow Connections와 Variables (1) | 2024.12.28 |
[SQL] 테이블 레코드 삭제 delete from vs. truncate 차이 (0) | 2024.12.28 |
[Airflow] Full Refresh ETL 작성시 고려사항 (0) | 2024.12.28 |