📌 Airflow DAG 코드 기본 구조
1. DAG 대표하는 객체를 먼저 만들기
- DAG 이름, 실행주기, 실행날짜, 오너 등등
2. DAG를 구성하는 태스크 만들기
- 태스크 별로 적합한 오퍼레이터 선택
- 태스크 ID를 부여하고 해야할 작업의 세부사항 지정
3. 최종적으로 태스크들간의 실행 순서를 결정
default_args 설정
모든 태스크들에 공통으로 적용되는 설정을 dictionary 형태로 만들어줍니다.
from datetime import datetime, timedelta
default_args = {
'owner':'ourjune',
'email':['214kcal@gmail.com'],
'retries':1,
'retry_delay':timedelta(minutes=3),
}
# 그외 태스크 성공/실패시에 적용할 함수 지정 가능
# on_failure_callback
# on_success_callback
- owner: 소유자
- email: 이메일
- retries task: 실패시 재시도 횟수
- retry_delay: 재시도 간의 시간 간격
- on_failure_callback: 태스크 실패시에 호출할 함수 지정
- on_success_callback: 태스크 성공시에 호출할 함수 지정
DAG 설정
from airflow import DAG
dag = DAG(
"dag_v1", # DAG name
start_date=datetime(2024, 12, 25, hour=0, minute=00),
schedule="0 * * * *", # 매시 0분 @hourly
tags=["example"],
catchup=False,
# common settings
default_args=default_args
)
- DAG ID: DAG name(id)
- schedule
- * * * * *: 분/ 시간/ 일/ 월/ 요일
- None, @once: 실행을 주기적으로 하지 않을 경우, 필요에 따라 실행(트리거해서 사용)
- 또는 @hourly, @daily, @weekly, @monthly, @yearly 와 같이 스케줄링 가능
- start_date (end_date)
- tags: 기본 example
- catchup:
- (True) 과거의 데이터 backfill
- (false) start_date과 DAG 활성화 날짜 사이의 gap에 대해서 실행하지 않음
- full refresh인 경우 False 기본, incremental update로 작성된 DAG 인 경우 True 사용됨
- default_args: 앞서 만들어둔 태스크에 저장될 파라미터 (Task parameters)
- max_active_runs: DAG에만 적용, 한번에 동시에 실행될 수 있는 DAG의 수(backfill 때 유리)
- max_active_tasks: 이 DAG에 속한 Task가 동시에 실행 가능한 수
- max_active_로 지정을 한 값이 아무리 커도 airflow worker에 할당된 cpu의 합계가 실제 최대 가능한 값이 된다.
DAG parameters vs. Task parameters 구분하기
Bash Operator를 사용한 예제
- 3개의 Task로 구성
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner':'ourjune',
'email':['214kcal@gmail.com'],
'start_date':datetime(2024, 12, 25, hour=0, minite=00),
'retries':1,
'retry_delay':timedelta(minutes=3),
}
test_dag = DAG(
"dag_v1", # DAG name
schedule="0 9 * * *",
tags=['test'],
catchUp=False,
default_args=default_args
)
# t1: 현재 시간 출력
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=test_dag)
# t2: 5초 대기 후 종료
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
dag=test_dag)
# t3: 서버의 /tmp 디렉토리의 내용 출력
t3 = BashOperator(
task_id='ls',
bash_command='ls',
dag=test_dag)
# t1이 끝나면 t2와 t3를 병렬로 실행
t1 >> [t2, t3]
도커 Aiflow에서 DAG 태스크 실행
도커 컨테이너 접속
터미널에서 docker ps 명령어로 airflow-docker-airflow-webserver-1의 container id 확인하고
docker exec -it [컨테이너 ID] sh 로 접속
airflow dags 폴더 아래 DAG 파일 작성
(airflow)pwd
/opt/airflow
(airflow)ls -tl
total 120
drwxr-xr-x 4 default root 128 Dec 27 06:15 dags
drwxr-xr-x 4 default root 128 Dec 27 05:20 logs
-rw-r--r-- 1 default root 3 Dec 27 05:20 airflow-webserver.pid
-rw-rw-r-- 1 default root 4762 Dec 27 05:20 webserver_config.py
-rw------- 1 default root 110448 Dec 27 05:20 airflow.cfg
drwxr-xr-x 2 root root 64 Dec 27 04:52 config
drwxr-xr-x 2 default root 64 Dec 27 04:34 plugins
(airflow)cd dags
(airflow)ls -tl
total 4
drwxr-xr-x 3 default root 96 Dec 27 06:15 __pycache__
-rw-r--r-- 1 default root 716 Dec 27 06:11 TestDAG.py
(airflow)
웹 UI에서 확인
저장한 DAG ID가 확인되고 토글을 활성화하면 파란색으로 변경되고 오른쪽에 trigger start를 누르면
Task가 실행되는 것을 모니터링 할 수 있습니다.
dag_v1의 세부정보를 확인합니다.
graph, code 등 확인이 가능하고 Task 별로 확인도 가능합니다.
터미널에서 실행
airflow dags list # DAG 목록 확인
airflow tasks list DAG이름 # Task 목록 확인
(airflow)airflow tasks list dag_v1
ls
print_date
sleep
airflow tasks test DAG이름 Task이름 날짜 # DAG 실행
- test vs. run
- 실행 결과가 메타데이터 DB에 기록이 되는지 유무에 따라 test(저장x) vs.run(저장o)
- 날짜는 YYYY-MM-DD 형식
output이 출력된 것 확인
(airflow)airflow tasks test dag_v1 ls 2024-12-26
[2024-12-27T06:46:04.636+0000] {dagbag.py:588} INFO - Filling up the DagBag from /opt/***/dags
[2024-12-27T06:46:05.942+0000] {taskinstance.py:2613} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: dag_v1.ls __***_temporary_run_2024-12-27T06:46:05.918734+00:00__ [None]>
[2024-12-27T06:46:05.946+0000] {taskinstance.py:2613} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: dag_v1.ls __***_temporary_run_2024-12-27T06:46:05.918734+00:00__ [None]>
[2024-12-27T06:46:05.946+0000] {taskinstance.py:2866} INFO - Starting attempt 0 of 2
[2024-12-27T06:46:05.947+0000] {taskinstance.py:2947} WARNING - cannot record queued_duration for task ls because previous state change time has not been saved
[2024-12-27T06:46:05.948+0000] {taskinstance.py:2889} INFO - Executing <Task(BashOperator): ls> on 2024-12-26 00:00:00+00:00
[2024-12-27T06:46:06.088+0000] {taskinstance.py:3132} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='keeyonghan@hotmail.com' AIRFLOW_CTX_DAG_OWNER='keeyong' AIRFLOW_CTX_DAG_ID='dag_v1' AIRFLOW_CTX_TASK_ID='ls' AIRFLOW_CTX_EXECUTION_DATE='2024-12-26T00:00:00+00:00' AIRFLOW_CTX_DAG_RUN_ID='__***_temporary_run_2024-12-27T06:46:05.918734+00:00__'
Task instance is in running state
Previous state of the Task instance: queued
Current task name:ls state:None start_date:None
Dag name:dag_v1 and current dag run status:queued
[2024-12-27T06:46:06.094+0000] {taskinstance.py:731} INFO - ::endgroup::
[2024-12-27T06:46:06.095+0000] {subprocess.py:78} INFO - Tmp dir root location: /tmp
[2024-12-27T06:46:06.096+0000] {subprocess.py:88} INFO - Running command: ['/usr/bin/bash', '-c', 'ls /tmp']
[2024-12-27T06:46:06.108+0000] {subprocess.py:99} INFO - Output:
[2024-12-27T06:46:06.116+0000] {subprocess.py:106} INFO - 2029240f6d1128be89ddc32729463129
[2024-12-27T06:46:06.116+0000] {subprocess.py:106} INFO - ***tmpwhlq7wb0
[2024-12-27T06:46:06.117+0000] {subprocess.py:110} INFO - Command exited with return code 0
[2024-12-27T06:46:06.124+0000] {taskinstance.py:340} INFO - ::group::Post task execution logs
[2024-12-27T06:46:06.124+0000] {taskinstance.py:352} INFO - Marking task as SUCCESS. dag_id=dag_v1, task_id=ls, run_id=__***_temporary_run_2024-12-27T06:46:05.918734+00:00__, execution_date=20241226T000000, start_date=, end_date=20241227T064606
(airflow)
참고
https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html
DAGs — Airflow Documentation
airflow.apache.org
'데브코스 데이터엔지니어링' 카테고리의 다른 글
[Airflow] Apache Airflow PythonOperator 사용하기 (1) | 2024.12.27 |
---|---|
[Airflow] 도커 airflow 웹서버 포트 에러 (0) | 2024.12.27 |
[Airflow] Mac에서 도커로 Airflow 설치하기 (2) | 2024.12.27 |
[Airflow] The "AIRFLOW_UID" variable is not set. Defaulting to a blank string. 에러 해결 (0) | 2024.12.27 |
Superset (1) | 2024.12.27 |