
【Airflow 입문】Airflow의 Job을 다른 강한 노드로 처리시키고 싶다
소개
Airflow는 워크플로우 엔진이라고 하며 DAG라는 유향 그래프입니다.
Job 일정을 관리합니다.
아래에 기본 구성을 올립니다.
기본 구성

주제
특정 Job을 강한 GPU를 쌓은 workerNode에 일하고 싶습니다.
그렇게 생각하는 것이 있죠. 특히 기계 학습 Job을 돌릴 때 등이 필요합니다.
어떻게 실현하는지 Celery Executor라는 것을 사용합니다.
Celery
Celery는 원래 Job을 분산 처리하기위한 미들웨어입니다.
이것을 Airflow는 좋은 느낌으로 사용해줍니다. 그 좋은 느낌에 사용하는 것을 응용해, 특정 노드에 일을 시킵니다.
Celery Executor 개념편
Airflow에는 Celery를 잘 사용하기 위한 Celery Executor라는 것이 있습니다.
기본 개념은 아래에 설명되어 있습니다.

기본적으로 WorkerNode는 여러 개가 있다고 가정합니다.
이번에는 GPU 인스턴스 1개만 연결하는 것과 MasterNode 내에도 Worker가 있는 설정으로 이야기합니다.
Scheduler와 CeleryExecutor는 다음과 같이 움직입니다.
특정 Job을 강한 GPU를 쌓은 workerNode에 일하고 싶습니다.
그렇게 생각하는 것이 있죠. 특히 기계 학습 Job을 돌릴 때 등이 필요합니다.
어떻게 실현하는지 Celery Executor라는 것을 사용합니다.
Celery
Celery는 원래 Job을 분산 처리하기위한 미들웨어입니다.
이것을 Airflow는 좋은 느낌으로 사용해줍니다. 그 좋은 느낌에 사용하는 것을 응용해, 특정 노드에 일을 시킵니다.
Celery Executor 개념편
Airflow에는 Celery를 잘 사용하기 위한 Celery Executor라는 것이 있습니다.
기본 개념은 아래에 설명되어 있습니다.

기본적으로 WorkerNode는 여러 개가 있다고 가정합니다.
이번에는 GPU 인스턴스 1개만 연결하는 것과 MasterNode 내에도 Worker가 있는 설정으로 이야기합니다.
Scheduler와 CeleryExecutor는 다음과 같이 움직입니다.
이 때, 특별히 설정을 하지 않은 경우, WorkerNode의 Worker에 Job이 던질 수 있는지,
MasterNode의 Worker에 던질 수 있는지 결정할 수 없습니다.
그러나 특정 Queue가 Worker를 연결하도록 허용하면,
그 특정 큐에 흐르는 Job은 연결된 Worker에만 가기 때문에,
특정 노드에 특정 작업을 수행할 수 있습니다.
Celery Executor 설정편
첫째, MasterNode와 별도의 Node에서 Job을 이동하려면 CeleryExecutor 기능이 필수입니다.
이를 위해 다음 패키지를 설치하십시오.
celery 설치
pip install apache-airflow[celery]==1.10.7
version은 좋아하게 변경해도 괜찮습니다. (글쓰기 현재는 1.10.7이 최신 안정판)
Celery Executor는
airflow.cfg
파일에서 설정해야 합니다.airflow.cfg
# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor
#executor = SequentialExecutor
executor = CeleryExecutor
또한 마찬가지로 다음 항목도 편집합니다.
airflow.cfg
# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
# a sqlalchemy database. Refer to the Celery documentation for more
# information.
# http://docs.celeryproject.org/en/latest/userguide/configuration.html#broker-settings
#broker_url = sqla+mysql://airflow:[email protected]:3306/airflow
broker_url = redis://user:password@hostname:port/0 #queueを複数指定しているように見えますが、brokerは一つで大丈夫です。
# The Celery result_backend. When a job finishes, it needs to update the
# metadata of the job. Therefore it will post a message on a message bus,
# or insert it into a database (depending of the backend)
# This status is used by the scheduler to update the state of the task
# The use of a database is highly recommended
# http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-result-backend-settings
#result_backend = db+mysql://airflow:[email protected]:3306/airflow
result_backend = db+postgresql://airflow:password@hostname:port/airflow #ここにはSchedulerやWebServerが見てるDBを参照させる
이것을 WorkerNode와 MasterNode 모두에 설치합니다.
github 등에서 공유하는 것이 좋습니다.
Celery Executor 실행편
드디어 실행해 봅시다.
MasterNode 측에서
airflow webserver
와 airflow scheduler
를 Tmux 나름을 사용하여 기동시킵니다.그런 다음 WorkerNode에서
airflow worker -q ML
로 QueueName을 ML
로 지정합니다.현재 그림

현재의 상황으로서는 이러한 그림이 됩니다.
여기서, DAG측에서 Queue에
ML
를 지정한 DAG를 흘려 주면, 무사히 움직입니다.당연히, Queue에
default
를 했을 경우, MasterNode로 Job을 실행해 줍니다.