【Airflow 입문】Airflow의 Job을 다른 강한 노드로 처리시키고 싶다

【Airflow 입문】Airflow의 Job을 다른 강한 노드로 처리시키고 싶다

2022-10-06 last update

5 minutes reading Redis Celery Airflow

소개



Airflow는 워크플로우 엔진이라고 하며 DAG라는 유향 그래프입니다.
Job 일정을 관리합니다.
아래에 기본 구성을 올립니다.

기본 구성





주제



특정 Job을 강한 GPU를 쌓은 workerNode에 일하고 싶습니다.



그렇게 생각하는 것이 있죠. 특히 기계 학습 Job을 돌릴 때 등이 필요합니다.
어떻게 실현하는지 Celery Executor라는 것을 사용합니다.

Celery



Celery는 원래 Job을 분산 처리하기위한 미들웨어입니다.
이것을 Airflow는 좋은 느낌으로 사용해줍니다. 그 좋은 느낌에 사용하는 것을 응용해, 특정 노드에 일을 시킵니다.

Celery Executor 개념편



Airflow에는 Celery를 잘 사용하기 위한 Celery Executor라는 것이 있습니다.
기본 개념은 아래에 설명되어 있습니다.



기본적으로 WorkerNode는 여러 개가 있다고 가정합니다.
이번에는 GPU 인스턴스 1개만 연결하는 것과 MasterNode 내에도 Worker가 있는 설정으로 이야기합니다.

Scheduler와 CeleryExecutor는 다음과 같이 움직입니다.
  • Scheduler는 Job 사령서를 Queue에 발행합니다
  • Celery는 Queue에 연결된 Worker에 무차별적으로 Job을 던집니다.
  • Worker는 Dequeue하고 손에 있는 DagFile을 바탕으로 Job을 실행하는 것과 동시에, 결과를 PostgreSQL에 기재해 갑니다.

  • 이 때, 특별히 설정을 하지 않은 경우, WorkerNode의 Worker에 Job이 던질 수 있는지,
    MasterNode의 Worker에 던질 수 있는지 결정할 수 없습니다.

    그러나 특정 Queue가 Worker를 연결하도록 허용하면,
    그 특정 큐에 흐르는 Job은 연결된 Worker에만 가기 때문에,
    특정 노드에 특정 작업을 수행할 수 있습니다.

    Celery Executor 설정편



    첫째, MasterNode와 별도의 Node에서 Job을 이동하려면 CeleryExecutor 기능이 필수입니다.
    이를 위해 다음 패키지를 설치하십시오.

    celery 설치
    pip install apache-airflow[celery]==1.10.7
    

    version은 좋아하게 변경해도 괜찮습니다. (글쓰기 현재는 1.10.7이 최신 안정판)

    Celery Executor는 airflow.cfg 파일에서 설정해야 합니다.

    airflow.cfg
    # The executor class that airflow should use. Choices include
    # SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor
    #executor = SequentialExecutor
    executor = CeleryExecutor
    

    또한 마찬가지로 다음 항목도 편집합니다.

    airflow.cfg
    # The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
    # a sqlalchemy database. Refer to the Celery documentation for more
    # information.
    # http://docs.celeryproject.org/en/latest/userguide/configuration.html#broker-settings
    #broker_url = sqla+mysql://airflow:[email protected]:3306/airflow
    broker_url = redis://user:password@hostname:port/0 #queueを複数指定しているように見えますが、brokerは一つで大丈夫です。
    
    # The Celery result_backend. When a job finishes, it needs to update the
    # metadata of the job. Therefore it will post a message on a message bus,
    # or insert it into a database (depending of the backend)
    # This status is used by the scheduler to update the state of the task
    # The use of a database is highly recommended
    # http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-result-backend-settings
    #result_backend = db+mysql://airflow:[email protected]:3306/airflow
    result_backend = db+postgresql://airflow:password@hostname:port/airflow #ここにはSchedulerやWebServerが見てるDBを参照させる
    
    

    이것을 WorkerNode와 MasterNode 모두에 설치합니다.
    github 등에서 공유하는 것이 좋습니다.

    Celery Executor 실행편



    드디어 실행해 봅시다.
    MasterNode 측에서 airflow webserverairflow scheduler를 Tmux 나름을 사용하여 기동시킵니다.
    그런 다음 WorkerNode에서 airflow worker -q ML로 QueueName을 ML로 지정합니다.

    현재 그림





    현재의 상황으로서는 이러한 그림이 됩니다.
    여기서, DAG측에서 Queue에 ML를 지정한 DAG를 흘려 주면, 무사히 움직입니다.
    당연히, Queue에 default 를 했을 경우, MasterNode로 Job을 실행해 줍니다.

    결론



    GCP 관련에 대해서 이번은 써보고 싶네요, 우선 공부한 것을 그대로 쓰고 있는 느낌이므로,
    체계적으로 되어 있지 않을지도 모릅니다만, 거기엔은 이번 개인 블로그로 정리할까라고 생각하므로 잘 부탁드립니다!