ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Airflow 실습] Task 여러개 병렬처리 하기(db sqlite -> postgre로 변경하기, Gantt를 이용하여 병렬처리 확인하기)
    데이터 엔지니어링/AirFlow 2022. 6. 22. 15:53
    반응형

    목표

    • 병렬처리를 위한 사전작업 알아보기(postgres준비, config파일 변경)
    • 의존성을 LIST형식으로 구현하는 방법 알기
    • Graph를 이용하여 병렬처리 되었는지 확인하는 방법 알아보기

    병렬처리를 위한 사전작업 알아보기(postgres준비, config파일 변경)

    airflow에서 기본 제공하는 db는 sqlite인데 sqlite로 진행 할 시에 병렬처리가 되지 않는다. 병렬처리를 하기 위해서는 mysql이나 postgresql을 이용하여 한다. 그래서 airflow db를 변경하는 방법에 대해 알아보는 좋을 것 같다. 필자는 docker로 했지만, local에 다운받아서 진행해도 무방하다.

    docker image 받기

    docker pull postgres:latest
    

    docker container 만들기

    postgre image는 postgre password, timezone, 마운트 해야하는 주소와 가 container를 생성해서 진행하면 될 것 같다.

    docker run -p 5432:5432\\
     --name sr-postgres\\
     -e POSTGRES_PASSWORD=1234\\
     -e TZ=Asia/Seoul\\
     -v ~/dev/pgdata:/var/lib/postgresql/data\\
     postgres:latest
    
    # 포트와 이름을 정해 주고 
    # postgre 비밀 번호 timezone
    # 마운트 할 주소 {local주소}:{hostPath}
    # continer이름
    

    postgres user 만들기

    airflow에서는 cfg파일에 db url을 바라보고 진행 되기때문에 새로운 db를 만들어 진행해야 한다. airflow라는 db를 만들고 user를 id/pw를 airflow/airflow 로 하나 만들었다.

    docker exec -it sr-postgres /bin/bash
    root@088262b5a378:/# psql -U postgres
    psql (14.4 (Debian 14.4-1.pgdg110+1))
    Type "help" for help.
    
    # database 생성
    postgres=# create database airflow;
    
    # user 생성
    postgres=# create user airflow with password 'airflow';
    

    airflow config파일 변경하기

    먼저 db 정보를 변경하기 전에 pycopg2를 설치해야한다

    pycopg2 설치하기

    pip install pycopg2
    

    db 정보 변경하기

    postgre를 받았으니 db기본 정보를 변경해야 한다. cfg파일안에 database탭에 보면 sql_alchemy_conn이 있는데 이부분을 변경 하면 된다.

    기본 db접속 정보

    sql_alchemy_conn = sqlite:////Users/planit/airflow/airflow.db
    

    변경 후 db 접속 정보

    sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost/airflow
    

    executor변경하기

    executor를 변경해야하는 이유는 동작 원리에 executor를 이용하여 병렬처리하는 부분이 있기 때문이다. 이 부분은 동작원리에 대해 공부하고 오자, celery와 kubernetes executor가 있으니 따로 공부하면 좋을 것 같다.

    기본 executor

    executor = SequentialExecutor
    

    변경 executor

    executor = LocalExecutor
    

    example DAG 안보이게 하기

    번외로 example은 안보이는게 좋으니 작업할 때 좋아서 안보이게 설정 변경

    기본 load

    load_examples = True
    

    변경 후 load

    load_examples = False
    

    여기 까지 완료 했으면 설정도 바꿨겠다. db를 초기화 해주고 새로운 airflow webserver용 user를 생성해야한다. 이작업은 처음 airflow 시작하는 것과 마찬가지이니깐 코드만 작성하겠다.

    # db 초기화 기본 세팅
    airflow db init
    # user 만들기
    airflow users create \\ 
    > --username admin \\
    > --firstname admin \\ 
    > --lastname admin \\
    > --role Admin \\
    > --password admin \\
    > --email admin@airflow.com
    # webserver 구동
    airflow webserver
    # scheduler 구동
    airflow scheduler 
    

    여기까지 진행하면 sqlite to postgre, config 파일을 수정이 완료 된것이다. 이제 병렬 처리하는 DAG에 대해 알아보자

    병렬처리하는 기본 DAG 알아보기

    완성된 DAG

    완성된 소스

    from time import sleep
    from datetime import datetime, timedelta
    from airflow import DAG
    from pendulum.tz.timezone import Timezone
    from airflow.operators.python import PythonOperator
    from airflow.operators.bash import BashOperator
    
    default_args={
        'owner':'eddie',
        'retries': 1,
        'retry_delay': timedelta(minutes=1)
    }
    
    with DAG(
        dag_id='ex_parallel_tasks',
        description='병렬처리 예제 입니다.',
        default_args=default_args,
        start_date=datetime(2022,6,21, tzinfo=Timezone("Asia/Seoul")),
        schedule_interval="@daily",
        tags=['test', 'parallel tasks']
    ) as dag:
    
        def dump() -> None:
            print('hello world')
    
        # 방법 1
        start = BashOperator(
            task_id='start', 
            bash_command="date"
        )
    
        # task 동적 할당
        tasks = []
        for i in range(4):
            temp_task = PythonOperator(task_id=f"task_{i}", python_callable=dump)
            tasks.append(temp_task)
    
        end = PythonOperator(
            task_id="end", 
            python_callable=dump
        )
    
        # 의존성 리스트 할당 방법 1
        start >> tasks >> end
    
        '''
        # 방법 2
        start = PythonOperator(task_id='start', python_callable=dump)
    
        task_1 = PythonOperator(task_id='task_1', python_callable=dump)
        task_2 = PythonOperator(task_id='task_1', python_callable=dump)
        task_3 = PythonOperator(task_id='task_1', python_callable=dump)
        task_4 = PythonOperator(task_id='task_1', python_callable=dump)
        task_5 = PythonOperator(task_id='task_1', python_callable=dump)
    
        end = PythonOperator(task_id="end", python_callable=dump)
    
        # 의존성 리스트 할당 방법 2
        start >> [task_1, task_2, task_3, task_4, task_5] >> end
        '''
    

    DAG 작성

    default_args={
        'owner':'eddie',
        'retries': 1,
        'retry_delay': timedelta(minutes=1)
    }
    
    with DAG(
        dag_id='ex_parallel_tasks',
        description='병렬처리 예제 입니다.',
        default_args=default_args,
        start_date=datetime(2022,6,21, tzinfo=Timezone("Asia/Seoul")),
        schedule_interval="@daily",
        tags=['test', 'parallel tasks']
    ) as dag:
    

    Task 작성

    def dump() -> None:
        print('hello world')
    
    # 방법 1
    start = BashOperator(
        task_id='start', 
        bash_command="date"
    )
    
    # task 동적 할당
    tasks = []
    for i in range(4):
        temp_task = PythonOperator(task_id=f"task_{i}", python_callable=dump)
        tasks.append(temp_task)
    
    end = PythonOperator(
        task_id="end", 
        python_callable=dump
    )
    

    의존성 작성

    # 의존성 리스트 할당 방법 1
    start >> tasks >> end
    

    DAG 실행 시켜보기

    DAG 실행 시키면 왼쪽 처럼 한번에 실행 되는 모습을 볼 수 있다. 병렬처리가 잘 된 모습이다. 그리고 다 끝나면 end로 잘 가는 모습이다.

    web에서 Gantt를 보면 task가 언제 얼마나 어떻게 실행 되었는지 볼 수 있다. Task0~3이 동시에 실행 된 것을 볼 수 있다.

     

    소스는 github에서 확인 가능합니다.

     

    GitHub - hyunseokjoo/prac_airflow

    Contribute to hyunseokjoo/prac_airflow development by creating an account on GitHub.

    github.com

     

    반응형

    댓글

Designed by Tistory.