ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Airflow] jinja_template을 활용한 날짜 동적 변수 활용 하는 법(동적 datetime, ds변수 UTC안되는 것 해결법)
    데이터 엔지니어링/AirFlow 2022. 9. 18. 18:15
    반응형

    안녕하세요 오늘은 회사에서 airflow를 사용하고 있는데 쿼리를 execution_date별로 날짜를 동적으로 설정 해야하는 상황이 발생하여 동적 date변수를 조작해 보는 airflow 코드를 만들어 볼 예정입니다. datetime 동적 변수 만드는데 삽질을 좀 많이해서 삽질 과정도 좀 같이 넣어 볼게요. 먼저, 동적 변수를 만드는 첫 번째 과정은 Jinja template을 이용하는 것입니다.

    Jinja template에 있는 동적 변수 이용하기

    먼저 airflow는 python 코드로 되어있기 때문에 jinja template을 내장하고 있습니다. airflow에서는 Jinja_template에 미리 정의 되어있는 변수를 제공합니다. 홈페이지에 가보시면 자세히 나와있습니다.

    사용법은 아래 코드를 보시면 이해를 하실겁니다.

    import pendulum
    from datetime import datetime, timedelta
    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    
    kst = pendulum.timezone("Asia/Seoul")
    
    default_args = {
        'owner' : 'hsjoo',
        'email' : ['airflow@airflow.com'],
        'email_on_failure': False,
    }
    
    with DAG(
        dag_id='smaple_jinja_template_datetime',
        default_args=default_args,
        start_date=datetime(2022, 9, 14, 5, tzinfo=kst),
        schedule_interval='0 * * * *',
        tags=['dev'],
        catchup=True,
    ) as dag:
    
        template = '''
        echo {{ data_interval_start }}
        echo {{ data_interval_end }}
        echo {{ ds }}
        echo {{ ds_nodash }}
        echo {{ ts }}
        echo {{ ts_nodash_with_kz }}
        echo {{ macros }}
        '''
    
        t1 = BashOperator(
            task_id="exec_bash",
            bash_command=template,
        )
    
        t1
    

    이것을 실행 시켜 로그를 확인하면 아래와 같이 날짜 형태의 값을 볼 수 있습니다.

    [2022-09-18, 17:43:37 KST] {subprocess.py:85} INFO - Output:
    [2022-09-18, 17:43:37 KST] {subprocess.py:92} INFO - 2022-09-13T20:00:00+00:00
    [2022-09-18, 17:43:37 KST] {subprocess.py:92} INFO - 2022-09-13T20:00:00+00:00
    [2022-09-18, 17:43:37 KST] {subprocess.py:92} INFO - 2022-09-13
    [2022-09-18, 17:43:37 KST] {subprocess.py:92} INFO - 20220913
    [2022-09-18, 17:43:37 KST] {subprocess.py:92} INFO - 2022-09-13T20:00:00+00:00
    [2022-09-18, 17:43:37 KST] {subprocess.py:92} INFO - 20220913T200000+0000
    

     

     

     

    여기서 문제점

    10시에 실행 된것을 한번 확인해 보겠습니다. 로그를 확인해 보면 아래와 같이 나온것을 알 수 있습니다. 바로 UTC 시간으로 설정이 되어있는 것입니다. UTC는 시간이 한국시간보다 9시간 작기 때문에 1시로 나오게 되었습니다. 그래서 이것을 바로 사용하면 안된다고 판단이 되더군요.

    [2022-09-18, 17:54:00 KST] {subprocess.py:92} INFO - 2022-09-14T01:00:00+00:00
    [2022-09-18, 17:54:00 KST] {subprocess.py:92} INFO - 2022-09-14T02:00:00+00:00
    [2022-09-18, 17:54:00 KST] {subprocess.py:92} INFO - 2022-09-14
    [2022-09-18, 17:54:00 KST] {subprocess.py:92} INFO - 20220914
    [2022-09-18, 17:54:00 KST] {subprocess.py:92} INFO - 2022-09-14T01:00:00+00:00
    [2022-09-18, 17:54:00 KST] {subprocess.py:92} INFO - 20220914T010000+0000
    

     

    먼저 config 파일 설정을 변경해 보았습니다.

    defult_ui_timezone이 있습니다. 이것을 Asia/Seoul로 변경해 보았습니다. 그리고 UI이기 때문에 화면에서만 바뀌는 것을 파악 되어 아래 내용도 변경 해 주었습니다.

    이정도면 되야 하는데 이게 왠걸 jinja_template은 변경되지 않았습니다. 그래서 그냥 코드로 정리해서 변경해야 겠다 생각이 들더 군요. 결론적으로 수기로 변경하여 xcom으로 전달하는 방법에 이르게 되었습니다.

     

     

    Xcom으로 동적 변수 보내기

    import pendulum
    from datetime import datetime, timedelta
    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from airflow.operators.python import PythonOperator
    
    # timezone 한국시간으로 변경
    kst = pendulum.timezone("Asia/Seoul")
    
    def parse_datetime(utc_date, **context):
        tmp_dt = datetime.strptime(utc_date.replace("T"," ")[:19], '%Y-%m-%d %H:%M:%S')
        kst_dt = tmp_dt + timedelta(hours=9)
    
        print(f"tmp_dt : {tmp_dt}, kst_dt : {kst_dt}")
    
        y = kst_dt.strftime("%Y")
        m = kst_dt.strftime("%m")
        d = kst_dt.strftime("%d")
        h = kst_dt.strftime("%H")
        mi = kst_dt.strftime("%M")
        s = kst_dt.strftime("%S")
    
        print(f"date = {y},{m},{d},{h},{mi},{s}")
    
        ti = context['task_instance']
        ti.xcom_push(key='y', value=y)
        ti.xcom_push(key='m', value=m)
        ti.xcom_push(key='d', value=d)
        ti.xcom_push(key='h', value=h)
        ti.xcom_push(key='mi', value=mi)
        ti.xcom_push(key='s', value=s)
    
    # 기본 args 생성
    default_args = {
        'owner' : 'hsjoo',
        'email' : ['airflow@airflow.com'],
        'email_on_failure': False,
    }
    
    with DAG(
        dag_id='sample_date_variables',
        default_args=default_args,
        start_date=datetime(2022, 9, 14, 5, tzinfo=kst),
        schedule_interval='@once',
        tags=['dev']
    ) as dag:
    
        t0 = PythonOperator(
            task_id='generate_date_variables',
            python_callable=parse_datetime,
            op_args=["{{ts}}"]
        )
    
        template='''
        echo {{ task_instance.xcom_pull(key='y') }}
        echo {{ task_instance.xcom_pull(key='m') }}
        echo {{ task_instance.xcom_pull(key='d') }}
        echo {{ task_instance.xcom_pull(key='h') }}
        '''
    
        t1 = BashOperator(
            task_id="exec_bash",
            bash_command=template,
        )
    
        t0 >> t1
    

    위에 코드를 실행 시켜 보겠습니다. 시간은 9월14일 5시

    시간이 정상적으로 나온것을 확인이 가능합니다. 그럼 bash에서 나온값을 확인해봐야겠죠?

    [2022-09-18, 18:08:54 KST] {logging_mixin.py:115} INFO - tmp_dt : 2022-09-13 20:00:00, kst_dt : 2022-09-14 05:00:00
    [2022-09-18, 18:08:54 KST] {logging_mixin.py:115} INFO - date = 2022,09,14,05,00,00
    

    bash operator에서 불러서 사용한 xcom변수도 잘 나왔네요 이렇게 날짜를 동적변수에 담아 사용하는 법을 알 수 있었습니다. 후 내 하루…………

    [2022-09-18, 18:08:55 KST] {subprocess.py:74} INFO - Running command: ['/usr/bin/bash', '-c', '\\n    echo 2022\\n    echo 09\\n    echo 14\\n    echo 05\\n    ']
    [2022-09-18, 18:08:55 KST] {subprocess.py:85} INFO - Output:
    [2022-09-18, 18:08:55 KST] {subprocess.py:92} INFO - 2022
    [2022-09-18, 18:08:55 KST] {subprocess.py:92} INFO - 09
    [2022-09-18, 18:08:55 KST] {subprocess.py:92} INFO - 14
    [2022-09-18, 18:08:55 KST] {subprocess.py:92} INFO - 05
    [2022-09-18, 18:08:55 KST] {subprocess.py:96} INFO - Command exited with return code 0
    

     

     

     

    모든 airflow sample 코드는 제 깃헙에 있습니다.

     

    GitHub - hyunseokjoo/airflow_sample_code

    Contribute to hyunseokjoo/airflow_sample_code development by creating an account on GitHub.

    github.com

     

    반응형

    댓글

Designed by Tistory.