🔙 이전에 배운 내용 복습하기 [ETL] 도커 Airflow에서 태스크 실행 - DAG 코드 기본 구조
[ETL] 도커 Aiflow에서 태스크 실행-DAG 코드 기본 구조
📌 Airflow DAG 코드 기본 구조1. DAG 대표하는 객체를 먼저 만들기- DAG 이름, 실행주기, 실행날짜, 오너 등등2. DAG를 구성하는 태스크 만들기- 태스크 별로 적합한 오퍼레이터 선택- 태스크 ID를 부여
ourjune.tistory.com
🔼 BashOperator를 사용한 예시
Apache Airflow에서 제공하는 Operator 중에 하나인 PythonOperator를 사용하여 실습을 했습니다.
PythonOperator로 만든 DAG의 기본적인 구조
자유도가 높은 general한 Task 구현에 유리하고 Airflow decorators(@task) 지정시 좀 더 단순하게 작성 가능
from airflow operators.python import PythonOperator
load_nps = PythonOperator(
dag=dag,
task_id='task_id',
python_callable=python_func,
params={
'table':'delighted_nps',
'schema':'raw_data'
},
)
# 실행할 함수
def python_func(**cxt):
table=cxt["params"]["table"]
schema=cxt["params"]["schema"]
ex_date=cxt["execution_date"]
# 원하는 기능 자유롭게 추가 가능
- python_callable: 호출할 python 함수명
- params: python 함수 호출 시 넘겨줄 인자
- *cxt: 딕셔너리 형태로 parmas 키 밑에 operator에서 정의한 딕셔너리 저장
PythonOperator 사용 예시
2개의 Task로 구성된 데이터 파이프라인(DAG)를 작성하였습니다.
- print_hello: PythonOperator로 구성되어 있으며 먼저 실행
- print_goodbye: PythonOperator로 구성되어 있으며 두번째로 실행
- 실행 순서 print_hello >> print_goodbye
DAG 설정
dag = DAG(
dag_id = 'HelloWorld',
start_date = datetime(2022,5,5),
catchup=False,
tags=['example'],
schedule = '0 2 * * *')
default_args=default_args
)
- schedule: 하루에 한번 2시 0분에 실행
파이썬 함수 설정
print_hello() print_goodbye() 함수 선언
- print_hello: "hello!" 출력 파이썬 함수
- print_goodbye: "hello!" 출력 파이썬 함수
PythonOperator 사용
print_hello = PythonOperator(
dag = dag
task_id='print_hello',
python_callable=print_hello,
)
...
실행 순서 지정
print_hello >> print_goodbye
Aiflow Decorators
python operator에 @task을 붙여 어노테이션 해줍니다.
- python operator와 operator의 entry함수를 따로 정의해야 했는데 한번에 정의할 수 있어 보다 간단합니다.
from airflow.decorators import task
@task
def print_hello():
print("hello!")
return "hello!"
@task
def print_goodbye():
print("goodbye!")
return "goodbye!"
with DAG(
dag_id = 'HelloWorld_v2',
start_date = datetime(2022,5,5),
catchup=False,
tags=['example'],
schedule = '0 2 * * *'
) as dag:
# 함수 이름이 Task ID가 됨
print_hello() >> print_goodbye()
참고
PythonOperator — Airflow Documentation
airflow.apache.org
'데브코스 데이터엔지니어링' 카테고리의 다른 글
[Airflow] Full Refresh ETL 작성시 고려사항 (0) | 2024.12.28 |
---|---|
Python 라이브러리 psycopg2로 PostgreSQL 데이터베이스 연결 (1) | 2024.12.28 |
[Airflow] 도커 airflow 웹서버 포트 에러 (0) | 2024.12.27 |
[Aiflow] 도커 기반 Aiflow에서 태스크 실행-DAG 코드 기본 구조 (2) | 2024.12.27 |
[Airflow] Mac에서 도커로 Airflow 설치하기 (2) | 2024.12.27 |