-
[Airflow 기본 내용] Operator, task란?데이터 엔지니어링/AirFlow 2022. 6. 22. 08:09반응형
Operator의 특성
- single task를 정의 해야한다.
- 멱등성을 유지해야한다. 다른 작업간에 겹침 현상을 제거하여 오류제거를 위함이다.
- 자동 재시도를 작성하여 자동으로 재시도 해줄수 있다.
- 하나의 task는 하나의 Operator 클래스로 구성되어 생성이 되어진다.
Operator 종류
Operator는 어떤 작업을 할지 정의 해주는 것이기 때문에 많은 종류의 Operator들이 있다. BashOperators, pythonOperator, EmailOperator, MySqlOperator,SqliteOperator, PostgreOperator 등등
아래 링크에서 확인 가능하다 이것을 보고 DAG를 작성하면 된다.
airflow.operators - Airflow Documentation
Operator 타입
모든 Operator는 BaseOperator를 상속받아 구성된다. 그리고 밑에 3가지 경우로 상속받아 재구성되어 사용되어진다.
- Action operators - 실제 연산을 수행
- Transfer operators - 데이터를 옮김
- sensor operators - 태스크를 언제 실행시킬 트리거를 기다림
Task란?
Task는 airflow의 기본 실행 단위. 작업은 DAG로 정렬된 다음 실행해야 하는 순서를 표현하기 위해 작업간 스트림 및 다운스트림 종속성을 설정
Task 타입 세가지
- Operators : DAG의 대부분 구축되어있는 모듈로써 작업 템플릿 거의 이것으로 다 가능
- Sensors : 전적으로 외부 이벤트가 발생하기를 기다리는 연산자의 특수 하위 클래스
- TaskFlow : 장식 @task 된, 태스크로 패키지된 사용자 정의 python함수
기본적으로 BaseOperator를 상속받아 진행 하기 때문에 상요 교환 및 운용이 가능
관계(Relationship)
# 비트 시프트로 표시 first_task >> second_task >> [third_task, fourth_task] # 함수로 표시 first_task.set_downstream(second_task) third_task.set_upstream(second_task)
Task instance
DAG실행 될때 마다 Task Instance를 생성하여 Executor로 넘긴다음 해당 작업을 실행한다. 그리고 그 Task instance를 다시 Metadata로 보내서 상태를 업데이트 하며, Task Instance가 작업이 아직 남아 있으면 사디 Executor로 보내진다. 작업이 완료가 되면 Scheduler에게 보내지는데 가운데의 상태를 잘 알아야 scheduler의 다음 동작을 잘 알수 있다.
상태
- none : 태스크가 아직 실행을 위한 큐가 없는 상태
- scheduled : 스케쥴러가 작업의 종속성이 충족되고 실행 되어야 한다고 결정
- queued : 작업이 Executor에 할당되었으며 작업자를 기다리고 있음
- running : 작업이 작업자에서 실행 중
- success : 작업이 오류 없이 성공
- shutdown : 태스트가 실행 중일 때 종료 하도록 외부적으로 요청이 됨
- restarting : 작업이 실행 중 일 때 다시 시작하도록 외부에서 요청한 작업
- failed : 작업을 실행하는 동안 오류가 발생하여 실행하지 못했음
- skipped : 분기, LatestOnly 등으로 인해 작업을 건너뛰었음
- upstream_failed : 업스트림 작업이 실패했고 트리거 규칙이 필요하다고 말함
- up_for_retry : 작업이 실패했지만 재시도 횟수가 남았고 일정이 다시 잡힘
- up_fro_reschedule : reschdule안에 있는 sensor 역할을 함
- sensing : 과제에 Smart Sensor
- deferred : 작업이 트리거로 인해 연기 되었음
- removed : 실행이 시작된 후 작업이 DAG에서 사라졌음
Task- workflow
시간 초과(Time Out)
Task가 최대 런타임을 가지는 속성을 정희하는 것이다. execution_timeout 속성을 datetime.timedelta 최대 허용 런타임 값으로 설정하면 된다.
SFTPSensor를 예로 들면
- 센서가 SFTP서버에 요청 할 때마다 최대 60초가 걸릴수 있음.
- 센서가 SFTP서버에 요청이 60초 이상 걸리면 AirflowTaskTimeout이 올라감. 그리고 센서는 retries를 하고 최대 재시도 2번까지 함
- 첫 번째 실행 시작부터 결국 성공할 때까지 센서는 에 정의된 대로 최대 3600초 동안 허용됨. 즉, 3600초 이내에 파일이 SFTP 서버에 나타나지 않으면 센서가 AirflowSensorTimeout. 이 오류가 발생하면 다시 시도하지 않음
sensor = SFTPSensor( task_id="sensor", path="/root/test", execution_timeout=timedelta(seconds=60), timeout=3600, retries=2, mode="reschedule", )
참고문헌
반응형'데이터 엔지니어링 > AirFlow' 카테고리의 다른 글
[Airflow 실습]가장 기본 DAG 만들어 실행해보기 (0) 2022.06.22 [Airflow 기본 내용] DAG 작성 심화 (0) 2022.06.22 [Airflow 기본 내용] 기본적인 DAG 작성법 (0) 2022.06.22 [Airflow 기본 내용] Airflow란? 구성요소, 구동원리, 개념, 용어 정리 (0) 2022.06.21 [Airflow 설치방법] Airflow Conda 환경에서의 설치 방법 (0) 2022.06.21