-
[Airflow 실습] Task 여러개 병렬처리 하기(db sqlite -> postgre로 변경하기, Gantt를 이용하여 병렬처리 확인하기)데이터 엔지니어링/AirFlow 2022. 6. 22. 15:53반응형
목표
- 병렬처리를 위한 사전작업 알아보기(postgres준비, config파일 변경)
- 의존성을 LIST형식으로 구현하는 방법 알기
- Graph를 이용하여 병렬처리 되었는지 확인하는 방법 알아보기
병렬처리를 위한 사전작업 알아보기(postgres준비, config파일 변경)
airflow에서 기본 제공하는 db는 sqlite인데 sqlite로 진행 할 시에 병렬처리가 되지 않는다. 병렬처리를 하기 위해서는 mysql이나 postgresql을 이용하여 한다. 그래서 airflow db를 변경하는 방법에 대해 알아보는 좋을 것 같다. 필자는 docker로 했지만, local에 다운받아서 진행해도 무방하다.
docker image 받기
docker pull postgres:latest
docker container 만들기
postgre image는 postgre password, timezone, 마운트 해야하는 주소와 가 container를 생성해서 진행하면 될 것 같다.
docker run -p 5432:5432\\ --name sr-postgres\\ -e POSTGRES_PASSWORD=1234\\ -e TZ=Asia/Seoul\\ -v ~/dev/pgdata:/var/lib/postgresql/data\\ postgres:latest # 포트와 이름을 정해 주고 # postgre 비밀 번호 timezone # 마운트 할 주소 {local주소}:{hostPath} # continer이름
postgres user 만들기
airflow에서는 cfg파일에 db url을 바라보고 진행 되기때문에 새로운 db를 만들어 진행해야 한다. airflow라는 db를 만들고 user를 id/pw를 airflow/airflow 로 하나 만들었다.
docker exec -it sr-postgres /bin/bash root@088262b5a378:/# psql -U postgres psql (14.4 (Debian 14.4-1.pgdg110+1)) Type "help" for help. # database 생성 postgres=# create database airflow; # user 생성 postgres=# create user airflow with password 'airflow';
airflow config파일 변경하기
먼저 db 정보를 변경하기 전에 pycopg2를 설치해야한다
pycopg2 설치하기
pip install pycopg2
db 정보 변경하기
postgre를 받았으니 db기본 정보를 변경해야 한다. cfg파일안에 database탭에 보면 sql_alchemy_conn이 있는데 이부분을 변경 하면 된다.
기본 db접속 정보
sql_alchemy_conn = sqlite:////Users/planit/airflow/airflow.db
변경 후 db 접속 정보
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost/airflow
executor변경하기
executor를 변경해야하는 이유는 동작 원리에 executor를 이용하여 병렬처리하는 부분이 있기 때문이다. 이 부분은 동작원리에 대해 공부하고 오자, celery와 kubernetes executor가 있으니 따로 공부하면 좋을 것 같다.
기본 executor
executor = SequentialExecutor
변경 executor
executor = LocalExecutor
example DAG 안보이게 하기
번외로 example은 안보이는게 좋으니 작업할 때 좋아서 안보이게 설정 변경
기본 load
load_examples = True
변경 후 load
load_examples = False
여기 까지 완료 했으면 설정도 바꿨겠다. db를 초기화 해주고 새로운 airflow webserver용 user를 생성해야한다. 이작업은 처음 airflow 시작하는 것과 마찬가지이니깐 코드만 작성하겠다.
# db 초기화 기본 세팅 airflow db init # user 만들기 airflow users create \\ > --username admin \\ > --firstname admin \\ > --lastname admin \\ > --role Admin \\ > --password admin \\ > --email admin@airflow.com # webserver 구동 airflow webserver # scheduler 구동 airflow scheduler
여기까지 진행하면 sqlite to postgre, config 파일을 수정이 완료 된것이다. 이제 병렬 처리하는 DAG에 대해 알아보자
병렬처리하는 기본 DAG 알아보기
완성된 DAG
완성된 소스
from time import sleep from datetime import datetime, timedelta from airflow import DAG from pendulum.tz.timezone import Timezone from airflow.operators.python import PythonOperator from airflow.operators.bash import BashOperator default_args={ 'owner':'eddie', 'retries': 1, 'retry_delay': timedelta(minutes=1) } with DAG( dag_id='ex_parallel_tasks', description='병렬처리 예제 입니다.', default_args=default_args, start_date=datetime(2022,6,21, tzinfo=Timezone("Asia/Seoul")), schedule_interval="@daily", tags=['test', 'parallel tasks'] ) as dag: def dump() -> None: print('hello world') # 방법 1 start = BashOperator( task_id='start', bash_command="date" ) # task 동적 할당 tasks = [] for i in range(4): temp_task = PythonOperator(task_id=f"task_{i}", python_callable=dump) tasks.append(temp_task) end = PythonOperator( task_id="end", python_callable=dump ) # 의존성 리스트 할당 방법 1 start >> tasks >> end ''' # 방법 2 start = PythonOperator(task_id='start', python_callable=dump) task_1 = PythonOperator(task_id='task_1', python_callable=dump) task_2 = PythonOperator(task_id='task_1', python_callable=dump) task_3 = PythonOperator(task_id='task_1', python_callable=dump) task_4 = PythonOperator(task_id='task_1', python_callable=dump) task_5 = PythonOperator(task_id='task_1', python_callable=dump) end = PythonOperator(task_id="end", python_callable=dump) # 의존성 리스트 할당 방법 2 start >> [task_1, task_2, task_3, task_4, task_5] >> end '''
DAG 작성
default_args={ 'owner':'eddie', 'retries': 1, 'retry_delay': timedelta(minutes=1) } with DAG( dag_id='ex_parallel_tasks', description='병렬처리 예제 입니다.', default_args=default_args, start_date=datetime(2022,6,21, tzinfo=Timezone("Asia/Seoul")), schedule_interval="@daily", tags=['test', 'parallel tasks'] ) as dag:
Task 작성
def dump() -> None: print('hello world') # 방법 1 start = BashOperator( task_id='start', bash_command="date" ) # task 동적 할당 tasks = [] for i in range(4): temp_task = PythonOperator(task_id=f"task_{i}", python_callable=dump) tasks.append(temp_task) end = PythonOperator( task_id="end", python_callable=dump )
의존성 작성
# 의존성 리스트 할당 방법 1 start >> tasks >> end
DAG 실행 시켜보기
DAG 실행 시키면 왼쪽 처럼 한번에 실행 되는 모습을 볼 수 있다. 병렬처리가 잘 된 모습이다. 그리고 다 끝나면 end로 잘 가는 모습이다.
web에서 Gantt를 보면 task가 언제 얼마나 어떻게 실행 되었는지 볼 수 있다. Task0~3이 동시에 실행 된 것을 볼 수 있다.
소스는 github에서 확인 가능합니다.
반응형'데이터 엔지니어링 > AirFlow' 카테고리의 다른 글
[Airflow 실습] SubDAG 작성하여 DAG관리하기 (0) 2022.06.23 [Airflow 실습] Depends_on_past, wait_for_downstream을 이용하여 직전 dag의 상태 의존성 걸어 실행 계획 처리하기 (0) 2022.06.23 [Airflow 실습] 여러개의 Task 한번에 돌리기 (0) 2022.06.22 [Airflow 실습]가장 기본 DAG 만들어 실행해보기 (0) 2022.06.22 [Airflow 기본 내용] DAG 작성 심화 (0) 2022.06.22