[Airflow] Apache Airflow PythonOperator 사용하기

🔙 이전에 배운 내용 복습하기 [ETL] 도커 Airflow에서 태스크 실행 - DAG 코드 기본 구조

 

[ETL] 도커 Aiflow에서 태스크 실행-DAG 코드 기본 구조

📌 Airflow DAG 코드 기본 구조1. DAG 대표하는 객체를 먼저 만들기- DAG 이름, 실행주기, 실행날짜, 오너 등등2. DAG를 구성하는 태스크 만들기- 태스크 별로 적합한 오퍼레이터 선택- 태스크 ID를 부여

ourjune.tistory.com

🔼 BashOperator를 사용한 예시


Apache Airflow에서 제공하는 Operator 중에 하나인 PythonOperator를 사용하여 실습을 했습니다. 

PythonOperator로 만든 DAG의 기본적인 구조

자유도가 높은 general한 Task 구현에 유리하고 Airflow decorators(@task) 지정시 좀 더 단순하게 작성 가능

from airflow operators.python import PythonOperator

load_nps = PythonOperator(
	dag=dag,
    task_id='task_id',
    python_callable=python_func,
    params={
    'table':'delighted_nps',
    'schema':'raw_data'
    },
)

# 실행할 함수
def python_func(**cxt):
	table=cxt["params"]["table"]
    schema=cxt["params"]["schema"]
    
    ex_date=cxt["execution_date"]
    
# 원하는 기능 자유롭게 추가 가능

- python_callable: 호출할 python 함수명

- params: python 함수 호출 시 넘겨줄 인자

- *cxt: 딕셔너리 형태로 parmas 키 밑에 operator에서 정의한 딕셔너리 저장

PythonOperator 사용 예시

더보기

2개의 Task로 구성된 데이터 파이프라인(DAG)를 작성하였습니다.

- print_hello: PythonOperator로 구성되어 있으며 먼저 실행

- print_goodbye: PythonOperator로 구성되어 있으며 두번째로 실행

- 실행 순서 print_hello >> print_goodbye

 

DAG 설정

dag = DAG(
    dag_id = 'HelloWorld',
    start_date = datetime(2022,5,5),
    catchup=False,
    tags=['example'],
    schedule = '0 2 * * *')
    default_args=default_args
 )

- schedule: 하루에 한번 2시 0분에 실행

 

파이썬 함수 설정

print_hello() print_goodbye() 함수 선언

- print_hello: "hello!" 출력 파이썬 함수

- print_goodbye: "hello!" 출력 파이썬 함수

PythonOperator 사용

print_hello = PythonOperator(
dag = dag
    task_id='print_hello',
    python_callable=print_hello,
)

...

실행 순서 지정

print_hello >> print_goodbye

Aiflow Decorators

python operator에 @task을 붙여 어노테이션 해줍니다. 

- python operator와 operator의 entry함수를 따로 정의해야 했는데 한번에 정의할 수 있어 보다 간단합니다.

from airflow.decorators import task

@task
def print_hello():
    print("hello!")
    return "hello!"

@task
def print_goodbye():
    print("goodbye!")
    return "goodbye!"

with DAG(
    dag_id = 'HelloWorld_v2',
    start_date = datetime(2022,5,5),
    catchup=False,
    tags=['example'],
    schedule = '0 2 * * *'
) as dag:

    # 함수 이름이 Task ID가 됨
    print_hello() >> print_goodbye()

 

참고

 

PythonOperator — Airflow Documentation

 

airflow.apache.org