ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Airflow 기본 내용] Airflow란? 구성요소, 구동원리, 개념, 용어 정리
    데이터 엔지니어링/AirFlow 2022. 6. 21. 08:22
    반응형

    Airflow란?

    • airflow는 airbnb에서 만든 workflow를 만드는 오픈소스 프로젝트이다.
    • python 코드로 워크플로우(workflow)를 작성하고, 스케쥴링, 모니터링 하는 플랫폼임.
    • 스케쥴링을 하고 모니터링을 코드로 작성하기 때문에 더 세분화 할 수 있으므로, 더 정교화된 파이프라인을 구성 할 수 있음.
    • 예약된 워크플로를 트리거 하고 실행 할 실행기에 Task을 제출하는 두가지를 모두 처리하는 스케줄러이다.

    Airflow 동작 원리

    구성요소

    • User interface / Web Server : airflow는 설치 만으로 web server를 구성 할 수 있음
    • DAG Directory: 작업스케쥴을 만들어주는 스케쥴링 단위를 작성한 DAG를 보관하는 장소 python 코드로 작성한 DAG들을 여기에 보관한다.
    • Worker : 실제 Task를 실행하는 주체. Executor 종류에 따라 동작 방식이 다양하다.
    • Metadata Database : 실행할 Task의 관한 정보를 저장해 놓는다. 순서나 작업 스케쥴링 등등 , task status(queued, scheduled, running, success, failed, etc)가 저장된다.
    • Scheduler : DAG와 작업들을 모니터링하고 실행 순서와 상태 관리한다.
    • Executor : 스케쥴러와 함께 동작하는 구성요소. status가 queued인 태스크를 확인하며 실제 어떤 리소스가 투입되어 실행 될 것인지를 결정, Local Executor, Celery Executor, Kubernetes Executor 등이 있다.

    Workflow

    • 작업 흐름이라는 뜻을 가지고 있고,
    • 가장 작은 단위는 Operator들이고 이것이 모여 Task, task들이 모여 DAG, DAG들이 모여 Workflow가 된다
    • 순서 Workflow << DAGs << Tasks(Operators) 로 커진다고 보면 된다.

    DAG(Directed Acyclic Graph)

    • 비선형 구조의 그래프라는 뜻으로 한쪽으론 방향이 흐룰 수 있다는 것을 뜻함
    • DAG는 관계 의존성을 그래프로 그려주어 사용자가 flow를 잘 파악 할 수 있도록 도와주는 역할을 함.
    • 또한 연관성을 직접 지정하여 처리가 가능함

    기본 DAG 작성법

    • DAG 선언
    • task 정의
    • >>, << 등으로 stream 정의
    from datetime import timedelta
    from airflow import DAG
    from airflow.operators.bash import BashOperator
    from airflow.utils.dates import days_ago
    
    default_args = {
    	'owner' : 'airflow',
    	'retries' : 1,
    	'retry_delay' : timedelta(minutes=5),
    }
    
    with DAG(
    	'tutorial',
    	default_args=default_args,
    	description='A simple tutorial DAG',
    	schedule_interval=timedelta(days=1),
    	start_date=days_ago(2),
    	tags=['example'],
    ) as dag:
    
        t1 = BashOperator(
            task_id='print_date',
            bash_command='date',
        )
    
        t2 = BashOperator(
            task_id='sleep',
            depends_on_past=False,
            bash_command='sleep 5',
            retries=3,
        )
    
        t1 >> t2

    Task

    하나의 작업 단위를 Task라고 하며 하나 또는 여러 개의 Task를 이용해 하나의 DAG를 생성하게 된다. Task간 >>(downStream), <<(upstream)을 이용하여 어떤 작업을 할지 정할 수 있다.

    Operator란?

    DAGs가 작동하는 동안 workflow를 어떻게 작동하는지 묘사하는 것이다. 하나의 작업(TASK)에 대해 정의 할 수 있다. 이런 TASK들이 모여서 하나의 DAG를 구성하고 DAG들이 모여서 workflow가 만들어진다. Operator는 DAG를 구성하기 윈한 가장 작은 단위로 봐도 된다.

    t2 = BashOperator(
    	task_id='sleep',
    	depends_on_past=False,
    	bash_command='sleep 5',
    	retries=3,
    )
    

    Sensor

    Sensor는 Operator와 마찬가지로 Task로 사용할 수 있는데 특정 조건이 채워지기를 기다리면 조건을 만족하는 경우 이후 Task로 넘어가게 하는 역할을 한다.

    from airflow.providers.http.sensors.http import HttpSensor
    
     # Http가 호출 될때 까지 기다리는 Sensor
    is_api_avaiable = HttpSensor(
        task_id='is_api_available',
        http_conn_id='opensea_api',
        endpoint='api/~~/~~'
    )

    Pool

    Airflow는 동시성 제어를 위해 Pool을 제공하는데 동시에 몇개의 DAG까지 진행 할 수 있는지 조작 할 수 있다.

    Pool 공식문서

    Xcom

    airflow task간 데이터 공유가 필요 할 때가 있다. Xcom은 airflow간 데이터 공유를 하기 위해 push,pull 을 하여 전달 및 가져오기가 가능하다. PythonOperator을 사용하면 return 값이 자동으로 Xcom에 push 된다.

    Xcom 공식 문서

    Variable

    Variable은 위에서 언급한 것과 같이 Airflow에서 공통적으로 사용 가능한 변수들을 모아놓는 곳으로 Web UI에서 관리가 가능하다. key-value 값으로 값을 보관할 수 있으며, password, secret, passwd, authorization 등 키워드가 포함된 key를 가지는 경우 web UI에서 정보가 자동으로 보이지 않게 된다.

    variable 공식 문서

    Connection

    Connection은 외부 시스템과 연결한느 방식에 대한 정보를 저장해 놓는 곳이다. Operator, Hook등에서 Connection의 정보를 사용한다. UI에서 설정이 가능하다.

    connection 공식 문서

    Hook

    Hook은 외부 플랫폼에 대한 인터페이스를 제공하는 것으로 Hive, S3, Mysql 등에 접근 할 수 있는 다양한 Hook을 제공 독립적으로 task가 될 수 없으며, Operator과 함께 사용된다.

    hook 공식 문서

     

     

    구동원리 

    구동 순서 

    • DAG를 작성하여 Workflow를 만든다. DAG는 Task로 구성되어 있다.
    • Task는 Operator가 인스턴스화 된 것.
    • DAG를 실행 시킬 때 Scheduler는 DagRun 오브젝트를 만든다.
    • DagRun 오브젝트는 Task Instance를 만든다.
    • Worker가 Task를 수행 후 DagRun의 상태를 “완료”로 바꿔놓는다.

    webserver 구동 -> folder DAGs 파일 받아와 보여줌 -> Scheduler에서 Folder DAGs를 보고 Metastore에 정보전달 DAGRun 생성-> DagRun에 해당하는 Task Instance 생성 및 Executor에 전달 -> Task가 여러개이면 Metastor와 Task들의 처리 상태(진행, 건너뛰기, 완료 등)을 업데이트하면서 다음 내용 이전내용을 Execute 진행 -> 완료 되면 DagRun에 완료 상태 보내고 Scheduler에 완료 상태 업데이트알려주어 끝냄

     

    airflow 구동시 webserver를 구동하고 나서 꼭 scheduler도 같이 구동하여야 Dag파일을 불러와 읽어드리는 모습을 볼 수 있다

    반응형

    댓글

Designed by Tistory.