[Aiflow] 도커 기반 Aiflow에서 태스크 실행-DAG 코드 기본 구조

📌 Airflow DAG 코드 기본 구조
1. DAG 대표하는 객체를 먼저 만들기
- DAG 이름, 실행주기, 실행날짜, 오너 등등
2. DAG를 구성하는 태스크 만들기
- 태스크 별로 적합한 오퍼레이터 선택
- 태스크 ID를 부여하고 해야할 작업의 세부사항 지정 
3. 최종적으로 태스크들간의 실행 순서를 결정

default_args 설정

모든 태스크들에 공통으로 적용되는 설정을 dictionary 형태로 만들어줍니다.

from datetime import datetime, timedelta

default_args = {
	'owner':'ourjune',
    'email':['214kcal@gmail.com'],
    'retries':1, 
    'retry_delay':timedelta(minutes=3), 
} 

# 그외 태스크 성공/실패시에 적용할 함수 지정 가능
# on_failure_callback
# on_success_callback

 

- owner: 소유자

- email: 이메일

- retries task: 실패시 재시도 횟수

- retry_delay: 재시도 간의 시간 간격

- on_failure_callback: 태스크 실패시에 호출할 함수 지정
- on_success_callback: 태스크 성공시에 호출할 함수 지정

DAG 설정

from airflow import DAG

dag = DAG(
	"dag_v1", # DAG name
    start_date=datetime(2024, 12, 25, hour=0, minute=00),
    schedule="0 * * * *", # 매시 0분 @hourly
    tags=["example"],
    catchup=False,
    # common settings
    default_args=default_args
)

 

- DAG ID:  DAG name(id)

- schedule

    - * * * * *: 분/ 시간/ 일/ 월/ 요일

    - None, @once:  실행을 주기적으로 하지 않을 경우, 필요에 따라 실행(트리거해서 사용)

    - 또는 @hourly, @daily, @weekly, @monthly, @yearly 와 같이 스케줄링 가능

- start_date (end_date) 

- tags: 기본 example

- catchup:

    - (True) 과거의 데이터 backfill

    - (false) start_date과 DAG 활성화 날짜 사이의 gap에 대해서 실행하지 않음

    - full refresh인 경우 False 기본, incremental update로 작성된 DAG 인 경우 True 사용됨

- default_args: 앞서 만들어둔 태스크에 저장될 파라미터 (Task parameters)

- max_active_runs: DAG에만 적용, 한번에 동시에 실행될 수 있는 DAG의 수(backfill 때 유리)

- max_active_tasks: 이 DAG에 속한 Task가 동시에 실행 가능한 수

    - max_active_로 지정을 한 값이 아무리 커도 airflow worker에 할당된 cpu의 합계가 실제 최대 가능한 값이 된다. 

더보기

DAG parameters vs. Task parameters 구분하기

 

 

더보기

Bash Operator를 사용한 예제

- 3개의 Task로 구성

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
	'owner':'ourjune',
    'email':['214kcal@gmail.com'],
    'start_date':datetime(2024, 12, 25, hour=0, minite=00),
    'retries':1, 
    'retry_delay':timedelta(minutes=3), 
} 

test_dag = DAG(
	"dag_v1", # DAG name
    schedule="0 9 * * *",
    tags=['test'],
    catchUp=False,
    default_args=default_args
)

# t1: 현재 시간 출력
t1 = BashOperator(
	task_id='print_date',
    bash_command='date',
    dag=test_dag)
  
# t2: 5초 대기 후 종료
t2 = BashOperator(
	task_id='sleep',
    bash_command='sleep 5',
    dag=test_dag)

# t3: 서버의 /tmp 디렉토리의 내용 출력
t3 = BashOperator(
	task_id='ls',
    bash_command='ls',
    dag=test_dag)
    
# t1이 끝나면 t2와 t3를 병렬로 실행
t1 >> [t2, t3]

도커 Aiflow에서 DAG 태스크 실행

도커 컨테이너 접속

터미널에서 docker ps 명령어로 airflow-docker-airflow-webserver-1의 container id 확인하고 

docker exec -it [컨테이너 ID] sh 로 접속

airflow dags 폴더 아래 DAG 파일 작성

(airflow)pwd
/opt/airflow
(airflow)ls -tl
total 120
drwxr-xr-x 4 default root    128 Dec 27 06:15 dags
drwxr-xr-x 4 default root    128 Dec 27 05:20 logs
-rw-r--r-- 1 default root      3 Dec 27 05:20 airflow-webserver.pid
-rw-rw-r-- 1 default root   4762 Dec 27 05:20 webserver_config.py
-rw------- 1 default root 110448 Dec 27 05:20 airflow.cfg
drwxr-xr-x 2 root    root     64 Dec 27 04:52 config
drwxr-xr-x 2 default root     64 Dec 27 04:34 plugins
(airflow)cd dags
(airflow)ls -tl
total 4
drwxr-xr-x 3 default root  96 Dec 27 06:15 __pycache__
-rw-r--r-- 1 default root 716 Dec 27 06:11 TestDAG.py
(airflow)

 

 

웹 UI에서 확인

 

저장한 DAG ID가 확인되고 토글을 활성화하면 파란색으로 변경되고 오른쪽에 trigger start를 누르면

Task가 실행되는 것을 모니터링 할 수 있습니다. 

실행 전
실행 후

dag_v1의 세부정보를 확인합니다. 

graph, code 등 확인이 가능하고 Task 별로 확인도 가능합니다.

상세 페이지

터미널에서 실행

airflow dags list # DAG 목록 확인

airflow tasks list DAG이름 # Task 목록 확인

(airflow)airflow tasks list dag_v1
ls
print_date
sleep

 

airflow tasks test DAG이름 Task이름 날짜 # DAG 실행

- test vs. run

    - 실행 결과가 메타데이터 DB에 기록이 되는지 유무에 따라 test(저장x) vs.run(저장o)

- 날짜는  YYYY-MM-DD 형식

 

output이 출력된 것 확인

(airflow)airflow tasks test dag_v1 ls 2024-12-26
[2024-12-27T06:46:04.636+0000] {dagbag.py:588} INFO - Filling up the DagBag from /opt/***/dags
[2024-12-27T06:46:05.942+0000] {taskinstance.py:2613} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: dag_v1.ls __***_temporary_run_2024-12-27T06:46:05.918734+00:00__ [None]>
[2024-12-27T06:46:05.946+0000] {taskinstance.py:2613} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: dag_v1.ls __***_temporary_run_2024-12-27T06:46:05.918734+00:00__ [None]>
[2024-12-27T06:46:05.946+0000] {taskinstance.py:2866} INFO - Starting attempt 0 of 2
[2024-12-27T06:46:05.947+0000] {taskinstance.py:2947} WARNING - cannot record queued_duration for task ls because previous state change time has not been saved
[2024-12-27T06:46:05.948+0000] {taskinstance.py:2889} INFO - Executing <Task(BashOperator): ls> on 2024-12-26 00:00:00+00:00
[2024-12-27T06:46:06.088+0000] {taskinstance.py:3132} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='keeyonghan@hotmail.com' AIRFLOW_CTX_DAG_OWNER='keeyong' AIRFLOW_CTX_DAG_ID='dag_v1' AIRFLOW_CTX_TASK_ID='ls' AIRFLOW_CTX_EXECUTION_DATE='2024-12-26T00:00:00+00:00' AIRFLOW_CTX_DAG_RUN_ID='__***_temporary_run_2024-12-27T06:46:05.918734+00:00__'
Task instance is in running state
 Previous state of the Task instance: queued
Current task name:ls state:None start_date:None
Dag name:dag_v1 and current dag run status:queued
[2024-12-27T06:46:06.094+0000] {taskinstance.py:731} INFO - ::endgroup::
[2024-12-27T06:46:06.095+0000] {subprocess.py:78} INFO - Tmp dir root location: /tmp
[2024-12-27T06:46:06.096+0000] {subprocess.py:88} INFO - Running command: ['/usr/bin/bash', '-c', 'ls /tmp']
[2024-12-27T06:46:06.108+0000] {subprocess.py:99} INFO - Output:
[2024-12-27T06:46:06.116+0000] {subprocess.py:106} INFO - 2029240f6d1128be89ddc32729463129
[2024-12-27T06:46:06.116+0000] {subprocess.py:106} INFO - ***tmpwhlq7wb0
[2024-12-27T06:46:06.117+0000] {subprocess.py:110} INFO - Command exited with return code 0
[2024-12-27T06:46:06.124+0000] {taskinstance.py:340} INFO - ::group::Post task execution logs
[2024-12-27T06:46:06.124+0000] {taskinstance.py:352} INFO - Marking task as SUCCESS. dag_id=dag_v1, task_id=ls, run_id=__***_temporary_run_2024-12-27T06:46:05.918734+00:00__, execution_date=20241226T000000, start_date=, end_date=20241227T064606
(airflow)

 

 

 

참고

https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html

 

DAGs — Airflow Documentation

 

airflow.apache.org