[Airflow] PostgreSQL 데이터를 Redshift로 적재하는 Airflow DAG 코드

이번에는 PostgreSQL 데이터를 Redshift로 적재하는 ETL 코드를 Airflow DAG로 작성해보겠습니다. 

 

지난 수업에서 

PostgreSQL 데이터베이스에 접속하여(psycopg2.connect 사용) csv 파일을 AWS Redshift에 로드하는 extract(), transform(), load()의 3개의 함수를 작성하였습니다.

 

🔽 이전 게시물 확인하기

 

Python 라이브러리 psycopg2로 PostgreSQL 데이터베이스 연결

psycopg2란?PostgreSQL 전용 Python과 연결을 지원하는 라이브러리다중 스레드와 대량 데이터 처리 지원 psycopg2 주요 기능트랜잭션 관리 commit(), rollback()SQL 실행 execute(), executemany()를 사용해 쿼리 실행

ourjune.tistory.com

 

 

Airflow DAG로 작성하기 위해 PythonOperator를 사용할 예정이고,

3개의 함수를 하나의 TASK로 처리할 지, 따로 처리할지 고민해봐야 하는데 버전1에서는 하나의 TASK로 처리해주었습니다. 

 

💡태스크를 분리하는 이유는 모듈화되어서 3번 task 에서 오류가 났을때, 1,2번 TASK는 성공했다면 3번 태스크만 다시 실행하면 되기 때문에 재실행을 쉽게 하기 위해서 사용합니다. 

단, 태스크를 많이 만들면 전체 DAG 실행이 오래 걸리고 스케줄러에 부하가 발생할 수 있습니다. 

장단점을 고려해서 태스크를 분리하는 것이 중요!

 

💡 하나의 PythonOperator는 하나의 TASK를 처리합니다. 

💡 DAG 선언은 스케줄러 개념이고 Operator로 TASK 선언하는 부분이 실제 DAG시 동작할 명령을 나타냅니다.

 

더보기
더보기

버전 1

def etl():
    link = "https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv"
    data = extract(link)
    lines = transform(data)
    load(lines)


dag_second_assignment = DAG(
	dag_id = 'name_gender',
	catchup = False,
	start_date = datetime(2023,4,6),
	schedule = '0 2 * * *') 

task = PythonOperator(
	task_id = 'perform_etl',
	python_callable = etl,
	dag = dag_second_assignment)

 

Airflow DAG 코드 개선하기

기본 코드 (버전1) 에서 Airflow DAG 코드 개선이 필요한 부분은 아래와 같습니다. 

✅ 하드코딩 되어있는 접속 정보를 configration 형태로 코드 밖으로 꺼내기 ⇨ connectionsvariables 사용

✅ csv 파일 경로를 환경변수로 빼내기 ⇨ python_callableparams 사용 or connections variables 사용

하나의 TASK로 작성한 함수를 개별의 TASK로 만들기  Xcom을 사용 or @task decorator 사용

TASK decorator로 코드 간단하게 변경하기   @task 사용

 

csv_url에 대해서 PythonOperator의 함수의 인자로 params를 지정해서 etl 함수에 직접적으로 사용하지 않는 방법이 있습니다.

python_Callable 과 params

def etl(**context):
    link = context["params"]["url"]
	task_instance = context['task_instance']
    execution_date = context['execution_date']

    logging.info(execution_date)

    data = extract(link)
    lines = transform(data)
    load(lines)
    
dag = DAG(...)
    
task = PythonOperator(
    task_id = 'perform_etl',
    python_callable = etl,
    params = {
        'url': "https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv"
    },
    dag = dag)

이 경우에도 params로 지정해주는 부분에서 직접적으로 명시가 되기 때문에 코드 밖으로 꺼내서 처리하기 위해서 

Airflow에서 제공하는 connections와 variables를 이용하겠습니다. 

Connections Variables 

Variable을 이용해 CSV 파라미터를 넘기고 Connections에 Redshift 연결정보를 넘겨줬습니다.

 

[ETL] Airflow Connections와 Variables

ConnectionsRedshift와 연결을 해주는 객체 connection 생성시 접속 정보 등이 코드 상에 노출되는 이슈가 있는데 이를 해결하기 위해서 airflow connections를 사용할 수 있습니다. 환경설정 형태로 코드 밖으

ourjune.tistory.com

 

Variable 모듈은 get, set 메소드 사용할 수 있습니다.

get은 키를 주고 키에 해당하는 value 값 가져오기, set은 키와 value를 지정해 주는 역할을 합니다. 

from airflow.models import Variable
extract = PythonOperator(
    task_id = 'extract',
    python_callable = extract,
    params = {
        'url':  Variable.get("csv_url")
    },
    dag = dag)

 

Redshift 접속 정보를 airflow connections에 저장해둔 것을 PostgresHook으로 지정해줍니다. 

from airflow.providers.postgres.hooks.postgres import PostgresHook
def get_Redshift_connection(autocommit=True):
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()

PostgresHook의 autocommit 디폴트 값은 False

Xcom 사용해서 태스크 나누기

Xcom_pull()을 이용해서 extract, transform의 리턴값을 transform과 load로 가져와줍니다. 

def transform(**context):
    logging.info("Transform started")    
    text = context["task_instance"].xcom_pull(key="return_value", task_ids="extract")
    lines = text.strip().split("\n")[1:] 
    records = []
    for l in lines:
      (name, gender) = l.split(",")
      records.append([name, gender])
    logging.info("Transform ended")
    return records

하나의 Task를 3개의 Task를 나눴기 때문에 PythonOperator도 3개를 호출합니다. 

extract = PythonOperator(
    task_id = 'extract',
    python_callable = extract,
    params = {
        'url':  Variable.get("csv_url")
    },
    dag = dag)

transform = PythonOperator(
    task_id = 'transform',
    python_callable = transform,
    params = { 
    },  
    dag = dag)

load = PythonOperator(
    task_id = 'load',
    python_callable = load,
    params = {
        'schema': 'myschema',
        'table': 'name_gender'
    },
    dag = dag)

extract >> transform >> load

 

 

[ETL] Airflow Xcom

Xcom이란?Airflow의 태스크(Operator)들간에 데이터를 주고 받기 위한 방식DAG내에서 한 Task의 리턴값을 다른 Task가 사용할 수 있게 전달하는 형태로 사용됩니다. 이 값들은 Airflow 메타 데이터 DB에 저

ourjune.tistory.com

 

하지만 코드를 보다 단순화하기 위해 task decorator를 사용하면 Xcom을 사용하지 않아도 엔트리 함수를 사용해서

지정이 가능합니다. 

@task Decorator로 코드 간단하게 만들기

from airflow.decorators import task

 

PythonOperator를 사용하지 않고 decorators를 사용합니다. 

extract, transform, load 함수에 @task로 어노테이션 해주면 task ID가 함수 이름으로 지정됩니다.

함수 각각이 별개의 태스크가 됩니다.

함수의 인자 또한 PythonOperator에서 사용하는 **context 가 아닌 파이썬 함수 인자로 사용할 수 있습니다. 

직관적인 코딩이 가능합니다. 

@task
def extract(url):
    logging.info(datetime.utcnow())
    f = requests.get(url)
    return f.text


@task
def transform(text):
    lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
    records = []
    for l in lines:
      (name, gender) = l.split(",") # l = "Keeyong,M" -> [ 'keeyong', 'M' ]
      records.append([name, gender])
    logging.info("Transform ended")
    return records
with DAG(
    dag_id='namegender_v5',
    start_date=datetime(2022, 10, 6),
    schedule='0 2 * * *',
    max_active_runs=1,
    catchup=False,
    default_args={
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
    }
) as dag:

    url = Variable.get("csv_url")
    schema = "myschema"
    table = 'name_gender'

    lines = transform(extract(url))
    load(schema, table, lines)

 

이로서 개선이 필요한 부분을 모두 수정해보았습니다.