ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Airflow 실습]가장 기본 DAG 만들어 실행해보기
    데이터 엔지니어링/AirFlow 2022. 6. 22. 15:46
    반응형

    목표

    DAG를 하나 생성하여 실행하고 hello world가 찍힌 것을 확인하기

    완성 사진

    코딩 순서

    1. timezone 설정
    2. default_args 설정
    3. DAG 설정 작성
    4. task 작성
    5. Dependencies 설정

    완성 코드

    import pendulum
    from datetime import datetime
    from airflow import DAG
    from airflow.operators.dummy_operator import DummyOperator
    from airflow.operators.python_operator import PythonOperator
    
    # timezone 한국시간으로 변경
    kst = pendulum.timezone("Asia/Seoul")
    
    # 기본 args 생성
    default_args = {
        'owner' : 'Hello World',
        'email' : ['airflow@airflow.com'],
        'email_on_failure': False,
    }
    
    # DAG 생성 
    # 2022/06/21 @once 한번만 실행하는 DAG생성
    with DAG(
        dag_id='ex_hello_world',
        default_args=default_args,
        start_date=datetime(2022, 6,21, tzinfo=kst),
        description='print hello world',
        schedule_interval='@once',
        tags=['test']
    ) as dag:
    
        # python Operator에서 사용할 함수 정의
        def print_hello():
            print('hello world')
    
        t1 = DummyOperator(
            task_id='dummy_task_id',
            retries=5,
        )
    
        t2 = PythonOperator(
            task_id='Hello_World',
            python_callable=print_hello
        )
    
        t1 >> t2
    

    timezone 설정

    airflow 특성상 timezone을 설정해야한다. 기본으로 UTC 가 설정이 되어있어 한국시간하고 다를 뿐더러 timezone을 설정해야 cron형태의 schdule_interval을 정확하게 설정 할 수 있기 때문이다. pendulum은 python에서 timezone을 쉽게 조작하게 도와주는 라이브러리이다.

    import pendulum
    
    # timezone 한국시간으로 변경
    kst = pendulum.timezone("Asia/Seoul")
    

    default_args 설정

    default_args는 기본 arguments를 설정하는 곳이다 airflow ui를 살펴보면 dag detail을 볼수 있는데 여기서 detail에 표시되는 내용이라고 보면 된다. dag graph에 task를 클릭하면 task의 전반적인 내용을 볼 수 있다. arg의 내용을 보고 싶다면 instance Details를 눌러 보면 자세한 내용이 나오니 확인해 보자. default_arg는 모든 범위에 detail에 사용할 때 사용하며, 각각의 task에서 관련 내용을 변경해서 사용도 가능하다

    DAG 설정 작성

    Dag 설정은 2.0기준으로는 decorator를 사용하여 작성하지만, 기존 with문을 사용하여 작성하겠다. 크게 상관은 없는 것 같다. 설명은 아래와 같이 기본적으로 하면 된다.

    from airflow import DAG
    
    # DAG 생성 
    # 2022/06/21 @once 한번만 실행하는 DAG생성
    with DAG(
        dag_id='ex_hello_world', # dag id는 web ui에 나오는 이름 꼭 작성해야함
        default_args=default_args, # 기본 설정
        start_date=datetime(2022, 6,21, tzinfo=kst), # 시작일시 시작일시는 과거로 작성해도된다 나중에 catchup과 backfill에 대한 내용에 적용된다.
        description='print hello world', # 설명 안적어도된다.
        schedule_interval='@once', # cron 표현식으로 scheduling
        tags=['test'] # tag는 작성해도되고 안해도되지만 ui에서 filtering이 가능하므로 작성하는 것이 좋다.
    ) as dag:
    

    task 작성

    from airflow.operators.dummy_operator import DummyOperator
    from airflow.operators.python_operator import PythonOperator
    
    # python Operator에서 사용할 함수 정의
    def print_hello():
        print('hello world')
    
    # 기본제공되는 DummyOperator
    t1 = DummyOperator(
        task_id='dummy_task_id',
        retries=5,
    )
    
    # 가장 많이 쓰는 operator python python 함수를 실행 할 수 있다.
    # bashoperator로 bash 명령어를 실행 할 수 있는데, python script 실행 명령어로도 python script를 실행 가능하다.
    t2 = PythonOperator(
        task_id='Hello_World',
        python_callable=print_hello
    )
    

    Dependencies 설정

    기본적인 의존성 설정 다운스트림으로 작성하였음

       t1 >> t2
    

    DAG 실행 해보기

    DAG를 실행 할려면 airflow scheduler를 먼저 실행해야한다. scheduler가 dag folder를 바라보고 있어 dag의 내용을 불러 오기 때문이다. 위에 설정한 dag는 우리가 작성한 것이니 꼭 scheduler를 실행하자. scheduler를 실행해보면 scheduler도 webserver로 구성 되어있는 것을 볼 수 있다.

    # airflow scheduler 실행하기 
    airflow scheduler
    

    주소로 들어가보면 서버가 올라가 있는 것을 확인 할 수 있으며, front단은 없이 back단만 구성 된 것을 볼 수 있다. 이처럼 webserver와 scheduler 그리고 executor가 각각 구성 되었다는 것을 알 수 있다.

    dag 폴더는 기본적으로 ~/airflow/dags를 바라보는데 cfg파일에 내용을 변경 하여 바라보는 폴더를 변경 해줄 수도 있다. ~/airflow 폴더에 cfg 파일을 열어보면 설정이 적용되어있는데 core에 바로 첫번째가 dags 폴더 설정이다. 이 부분을 변경하면 되는데 그냥 알아만 두자 설정 변경 ㄴㄴ 큰일 날 수도 있다.

     

    그리고 30초 정도 후에 보면 web ui에 내용이 나온것을 볼 수 있다. dag_id ex_hello_world가 보이는데 그 왼쪽에 pause/unpaused 가 있는데 우리는 schedule을 @once로 했기 때문에 실행하면 바로 이 dag가 실행 될것이다. pause를 눌러 unpause로 바꿔보자

     

     

    그리고 Dag_id를 눌러 들어가보면 DAGRuns 가 나오게 된다. 나중에 살펴 보자 그냥 실행된 내역이라고 정도만 알면 된다.

    Hello world가 찍힌 것을 볼려면 Task Instance를 확인해야한다. 확인하는 방법은 간단한데 좌측 상단에 Graph를 눌러 보면 DAG를 그래프 형식으로 볼 수 있다. 우리가 만든 Task를 볼수 있고 의존성을 작성하였기 때문에 dummy_task_id와 Hello_world가 연결 되어있는 것을 볼 수 있다.

     

     

    Hello_world task를 눌러 보면 팝업창이 뜨는데 DagRuns(실행 내용)에 포함되어 있는 task instance을 조작하여 재실행 이나 skip해서 디버깅하기 편하게 하는 팝업창이 뜬다. 여기서 log를 누르면 해당 작업이 몇시에 시작 되었는지 어떻게 출력이 되었는지 볼 수 있다.

     

     

    소스는 github에서 자세히 보실수 있습니다.

     

    GitHub - hyunseokjoo/prac_airflow

    Contribute to hyunseokjoo/prac_airflow development by creating an account on GitHub.

    github.com

     

    반응형

    댓글

Designed by Tistory.