Python 백그라운드 작업

Python 백그라운드 작업

2022-10-18 last update

14 minutes reading celery flask redis python

소개



몇 달 전에 ive는 새로운 사용자가 참여할 때마다 Welcome Gif 이미지를 보내는 Discord Bot을 배포했지만 거기에 문제가 있습니다...

나중에 다른 용도로 사용할 경우를 대비하여 이미지를 서버에 저장하고 있습니다. 하지만 지금은 봇을 더 많은 서버에 추가하려면 결국 모든 파일을 직접 삭제하기 어려울 것이므로 수정하겠습니다.

조사를 하다가 Celery를 찾았습니다. Celery는 주로 백그라운드 작업을 수행하고 작업을 예약할 수 있게 해주는 훌륭한 도구입니다.
Celery는 메시지 브로커를 통해 통신하여 해당 작업을 저장(직렬화)하고 Celery(The Worker)와 언제든지 호출합니다. 이번에는 Redis를 브로커로 사용할 것입니다.

Redis는 기본적으로 브라우저의 LocalStorage와 비슷하지만 컴퓨터(서버)에 있습니다.

전제 조건


  • 이 기사에서는 Redis를 사용하지만 Docker 이미지를 사용하므로 Docker가 설치되어 있는지 확인하십시오.

  • 시작하자!



    이제 도커 설치가 완료되었으므로 다음 명령을 사용하여 Redis 이미지를 다운로드하여 컨테이너에서 실행할 수 있습니다.
    sudo docker run -p 6379:6379 --name redis-cont -d redis
    sudo docker ps -a를 사용하여 컴퓨터에서 실행 중인 모든 컨테이너를 확인할 수 있습니다. redis-cont라는 컨테이너가 있는지 확인하십시오.

    파이썬 종속성 설치



    가상 환경을 사용하여 다음 종속성을 설치할 수 있습니다.

    celery==5.2.7
    Flask==2.1.3
    flower==1.0.0
    redis==4.3.3
    


    셀러리와 플라스크



    플라스크로 백그라운드에서 함수를 호출하는 방법의 예입니다.
    백그라운드 기능에서 수행할 작업은 일부 이미지를 포함하는 "./data"디렉토리의 모든 파일을 삭제하는 것입니다.



    # main.py
    from flask import Flask, request
    from celery import Celery
    import os
    import time
    
    # Adding the required settings in order to connect our flask app with celery and redis
    app = Flask(__name__)
    app.config["CELERY_BROKER_URL"] = "redis://localhost:6379/0"
    # Using the route "/0" to send the message to redis
    app.config["result_backend"] = "redis://localhost:6379/1"
    # Using the route "/1" to save the result-object we return in the background task 
    
    
    celery = Celery(app.name, broker=app.config["CELERY_BROKER_URL"])
    celery.conf.update(
        result_backend=app.config["result_backend"]
    )
    
    @celery.task()
    def task_example():
        # Sleeping the function for a while
        time.sleep(25)
        print("--------> Deleting all files in ./data directory <---------")
        for _file in os.listdir('./data'):
            os.remove(f"./data/{_file}")
        # This object will be stored in redis as the **result_backend** we passed above
        return {
            "state": "Done",
            "developer": "Bearz",
            "status": 200
        }
    
    @app.route('/', methods = ['POST', 'GET'])
    def main():
        # The 'delay()' method will send the function to a celery worker to be executed
        task = task_example.delay()
        return "GG BRO"
    
    
    if __name__=='__main__':
        app.run(debug=True, port=5000)
    


    이제 다음을 호출하여 플라스크 애플리케이션을 시작할 수 있습니다.python main.py
    셀러리가 작업을 시작하려면 별도의 터미널에서 다음 명령을 실행해야 합니다.
    celery -A main.celery worker --loglevel=info
    그러면 작업자가 모든 백그라운드 작업을 처리하고 적절한 시간에 호출하기 시작합니다.

    main.celery의 위 명령에서 주의할 점이 있습니다. 파일 이름이 app.py인 경우 app.celery를 전달해야 합니다.

    좋습니다! 이제 2개의 터미널이 있습니다. 하나는 Flask 앱을 ​​실행하고 다른 하나는 Celery 작업자를 실행합니다. 모든 것이 제대로 작동하는지 테스트하려면 다음 URL을 방문하세요. **http://localhost:5000/**
    우리의 delay() 메서드가 트리거되고 작업 함수를 작업자에게 보냅니다.

    모니터링 작업



    좋습니다. 첫 번째 백그라운드 작업을 만들었습니다. 하지만 어떻게 모니터링합니까?

    우리는 Celery에 연결하여 모든 단일 작업을 추적하는 웹 앱인 Flower를 사용할 수 있습니다.
    pip install flower==1.0.0
    그리고 시작해
    celery -A main.celery flower --loglevel=info --port=9999
    이제 localhost:9999로 이동하면 작업 탭으로 이동하면 모든 작업과 현재 상태를 볼 수 있는 예쁜 GUI를 볼 수 있습니다.

    작업 예약



    아직 해야 할 일이 하나 있습니다. 매시간/분/초마다 실행되도록 작업을 예약하는 방법은 무엇입니까?

    Celery Beat를 사용하면 플라스크를 사용하지 않고도 작업을 생성할 수 있습니다.

    작업을 예약하려면 main.py 파일에 몇 가지를 더 추가해야 합니다.

    # main.py
    # ...
    CELERY_BEAT_SCHEDULE = {
          'do-this-every-5-minutes': {
            'task': 'main.task_example',
            'schedule': 300.0,
        },
        'do-this-every-1-minute': {
            'task': 'main.task_per_minute',
            'schedule': 60.0,
        },
    }
    
    celery = Celery(app.name, broker=app.config["CELERY_BROKER_URL"])
    celery.conf.update(
        result_backend=app.config["result_backend"],
        beat_schedule=CELERY_BEAT_SCHEDULE
    )
    
    @celery.task()
    def task_per_minute():
        print("--> This message should be printed every minute <--")
        return {
            "task_returned": True,
            "developer": "Bearz"
        }
    
    @celery.task()
    def task_example():
        time.sleep(25)
        print("--------> Deleting all files in ./data directory <---------")
        for _file in os.listdir('./data'):
            os.remove(f"./data/{_file}")
        return {
            "state": "Done",
            "owner": "Elmer&Hachy"
        }
    
    


    좋아, 우리는 beath_schedule인 celery.conf.update에 새 매개변수를 전달합니다. 여기서 우리는 예약하려는 모든 작업이 포함된 사전을 전달합니다.
    해당 사전의 Eveyr 단일 키는 새 사전에 최소한 2개의 가져오기 키가 있어야 합니다.
  • task: 실행하고자 하는 함수 참조
  • 일정: 작업을 호출해야 하는 빈도, 시간은 초 단위입니다.

  • Celery Beat는 celery의 또 다른 서비스이므로 서비스를 시작하려면 별도의 터미널을 열어야 합니다.
    celery -A main.celery beat
    그리고 주어진 시간에 Tasks가 호출됩니다.

    Celery Beat는 일정한 간격으로 작업을 시작하고 Celery Worker는 작업을 처리합니다.

    앞서 시작한 플라워 서비스를 살펴보세요.


    놀랍습니다. 이제 우리는 특정 목적을 위해 백그라운드 작업을 생성할 사람을 알고 있습니다.

    오늘은 여기까지 입니다 읽어주셔서 감사하고 다음 포스팅에서 뵙겠습니다 좋은 하루 되세요 ;)

    Git 리포지토리는 다음과 같습니다.
    https://github.com/AlonsoCrag/python-celery

    메모
    Redis Commander는 Flower의 대안으로 npm을 통해 설치할 수 있습니다.