ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Airflow]Pool 을 이용하여 메모리 사용량 제한하기
    데이터 엔지니어링/AirFlow 2022. 6. 26. 20:11
    반응형

    Airflow 배치 스케쥴을 만들 때 또 좋은 점이 있는데 바로 pool을 이용하여 메모리를 제한 할 수 있다. DAG를 구성할 때 무거운 작업은 메모리를 많이 할당하여 빨리 처리하거나 아니면 너무 무거워 뻑나는 경우를 대비하여 슬롯을 조금 할당 할 수도 있다.

    Pool설정 방법

    task를 설정할 때 pool이라는 속성을 정의 하면 pool을 지정해 줄수 있다. pool_slot을 이용하여 슬롯 숫자도 정희 줄 수 있다.

    def dump(interval):
        sleep(interval)
    
    ask_heavy = PythonOperator(
        task_id='task_heavy',
        python_callable=dump,
        op_args=[500],
        pool='single_pool',
    		pool_slot=2,
    )
    

    Pool 설정 예제

    from platform import python_branch
    from time import sleep
    from pendulum.tz.timezone import Timezone
    from datetime import datetime, timedelta, tzinfo
    
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    
    kst = Timezone('Asia/Seoul')
    
    default_args={
        'owner':'eddie',
        'retries':1,
        'retry_delay':timedelta(minutes=1),
    }
    
    with DAG(
        dag_id='ex_pools',
        default_args=default_args,
        start_date=datetime(2022,6,22, tzinfo=kst),
        schedule_interval="@once",
        tags=['test', 'pool sample']
    ) as dag:
        def dump(interval):
            sleep(interval)
    
        start = PythonOperator(
            task_id='start',
            python_callable=dump,
            op_args=[3],
        )
    
        task_heavy = PythonOperator(
            task_id='task_heavy',
            python_callable=dump,
            op_args=[500],
            pool='single_pool',
        )
    
        task_medium = PythonOperator(
            task_id='task_medium',
            python_callable=dump,
            op_args=[300],
            pool='single_pool'
        )
    
        task_light = PythonOperator(
            task_id='task_light',
            python_callable=dump,
            op_args=[100],
            pool='single_pool',
        )
    
        end = PythonOperator(
            task_id='end',
            python_callable=dump,
            op_args=[3],
        )
    
        start >> [task_heavy, task_medium, task_light] >> end
    

    먼저 무거운 작업, 중간 작업 , 가벼운 작업이 있다고 가정해 보자 그리고 아래와 같이 의존성을 설정해주면 분명 병렬처리가 되어야한다. 하지만 pool을 실험하기 위해 하나의 슬롯만 가지는 pool을 만들어 실행 시켜 보자 admin에 pools를 선택

    그럼 default_pooldl 128슬롯으로 정의 된것을 볼 수 있고, +버튼을 누르면 새로 추가도 가능하다.

    1나의 슬롯만 할당하여 pool을 하나 만들었다.

    그리고 위에 소스 처럼 할당해주면 task 3개를 병렬 처리해도 슬롯이 1개이기때문에 순차 처리가 될것이다. DAG 를 실행 시켜보면 1개의 task만 실행되고 나머지는 schedule된 것을 볼 수 있다. 그리고 500초를 지나고 나면

    8분 20초가 지나고 나서 medium이 실행 된것을 볼 수 있다.

    이렇게 해서 pool을 설정하고 실행시켜보고 슬롯을 지정하는 법을 알아봤다. 너무 무거워서 뻑나거나 pool을 조절해서 메모리 할당해야할때 유용한 기능인 것같다.

     

    모든 소스는 github에 있습니다.

     

    GitHub - hyunseokjoo/prac_airflow

    Contribute to hyunseokjoo/prac_airflow development by creating an account on GitHub.

    github.com

     

    반응형

    댓글

Designed by Tistory.