-
[Airflow]Pool 을 이용하여 메모리 사용량 제한하기데이터 엔지니어링/AirFlow 2022. 6. 26. 20:11반응형
Airflow 배치 스케쥴을 만들 때 또 좋은 점이 있는데 바로 pool을 이용하여 메모리를 제한 할 수 있다. DAG를 구성할 때 무거운 작업은 메모리를 많이 할당하여 빨리 처리하거나 아니면 너무 무거워 뻑나는 경우를 대비하여 슬롯을 조금 할당 할 수도 있다.
Pool설정 방법
task를 설정할 때 pool이라는 속성을 정의 하면 pool을 지정해 줄수 있다. pool_slot을 이용하여 슬롯 숫자도 정희 줄 수 있다.
def dump(interval): sleep(interval) ask_heavy = PythonOperator( task_id='task_heavy', python_callable=dump, op_args=[500], pool='single_pool', pool_slot=2, )
Pool 설정 예제
from platform import python_branch from time import sleep from pendulum.tz.timezone import Timezone from datetime import datetime, timedelta, tzinfo from airflow import DAG from airflow.operators.python import PythonOperator kst = Timezone('Asia/Seoul') default_args={ 'owner':'eddie', 'retries':1, 'retry_delay':timedelta(minutes=1), } with DAG( dag_id='ex_pools', default_args=default_args, start_date=datetime(2022,6,22, tzinfo=kst), schedule_interval="@once", tags=['test', 'pool sample'] ) as dag: def dump(interval): sleep(interval) start = PythonOperator( task_id='start', python_callable=dump, op_args=[3], ) task_heavy = PythonOperator( task_id='task_heavy', python_callable=dump, op_args=[500], pool='single_pool', ) task_medium = PythonOperator( task_id='task_medium', python_callable=dump, op_args=[300], pool='single_pool' ) task_light = PythonOperator( task_id='task_light', python_callable=dump, op_args=[100], pool='single_pool', ) end = PythonOperator( task_id='end', python_callable=dump, op_args=[3], ) start >> [task_heavy, task_medium, task_light] >> end
먼저 무거운 작업, 중간 작업 , 가벼운 작업이 있다고 가정해 보자 그리고 아래와 같이 의존성을 설정해주면 분명 병렬처리가 되어야한다. 하지만 pool을 실험하기 위해 하나의 슬롯만 가지는 pool을 만들어 실행 시켜 보자 admin에 pools를 선택
그럼 default_pooldl 128슬롯으로 정의 된것을 볼 수 있고, +버튼을 누르면 새로 추가도 가능하다.
1나의 슬롯만 할당하여 pool을 하나 만들었다.
그리고 위에 소스 처럼 할당해주면 task 3개를 병렬 처리해도 슬롯이 1개이기때문에 순차 처리가 될것이다. DAG 를 실행 시켜보면 1개의 task만 실행되고 나머지는 schedule된 것을 볼 수 있다. 그리고 500초를 지나고 나면
8분 20초가 지나고 나서 medium이 실행 된것을 볼 수 있다.
이렇게 해서 pool을 설정하고 실행시켜보고 슬롯을 지정하는 법을 알아봤다. 너무 무거워서 뻑나거나 pool을 조절해서 메모리 할당해야할때 유용한 기능인 것같다.
모든 소스는 github에 있습니다.
반응형'데이터 엔지니어링 > AirFlow' 카테고리의 다른 글
[Airflow]variables를 이용한 전역변수 설정 (0) 2022.06.27 [Airflow] Connection을 이용하여 외부 서비스와 연동하기 (0) 2022.06.26 [Airflow]Xcom을 이용한 task 간 데이터 주고 받기 (0) 2022.06.26 [Airflow 실습] Trigger를 이용하여 다른 DAG를 실행하기 (0) 2022.06.24 [Airflow 실습] Task group으로 ui 관리하기 (0) 2022.06.23