이번 글의 목표
- FastAPI로 Celery 설정 하기
- Python Shell에서 Celery 작업 실행 하기
- Flower로 Celery 앱 모니터링 하기
Redis 설정
레디스를 OS에 직접 설치하거나 Docker를 이용하여 설치 및 실행 할 수 있습니다.
Docker
docker run -p 6379:6379 --name some-redis -d redis
Docker Hub에서 Redis docker 이미지가 다운로드 되어서 포트 6379로 백그라운드 작업으로 실행됩니다.
아래 명령어를 통해서 레디스가 실행중인지 확인 할 수 있습니다.
docker exec -it some-redis redis-cli ping # PONG
Celery 설정
1단계 : FastAPI 앱에서 task message를 message broker에 전달합니다.
2단계 : Celery Woker는 message broker로부터 전달 받은 task를 처리합니다. task가 처리된 후, task 결과를 result backend
에 저장합니다. 그리고 task 상태를 업데이트 합니다.
3단계 : FastAPI 앱에서 task를 message broker에 보낸 뒤 task status와 결과를 result backend
로부터 확인합니다.
FastAPI 프로젝트 생성
$ mkdir fastapi-celery-project && cd fastapi-celery-project
파이썬 가상환경을 생성하고 실행합니다.
$ python3.10 -m venv venv
$ source venv/bin/activate
(venv)$
requirements.txt파일을 생성하고 아래와 같이 작성합니다.
fastapi==0.79.0
uvicorn[standard]==0.18.2
main.py 파일을 작성해줍니다.
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Hello World"}
앱을 실행합니다.
(venv)$ uvicorn main:app --reload
INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO: Started reloader process [2061] using WatchFiles
INFO: Started server process [2063]
INFO: Waiting for application startup.
INFO: Application startup complete.
http://localhost:8000 를 브라우저 주소창에 입력하면 {"message":"Hello World"}
결과를 볼 수 있습니다.
프로젝트 구조는 아래와 같습니다.
├── main.py
└── requirements.txt
Celery 추가
Celery를 설치하고 설정해봅니다.
이를 위해 requirements.txt파일 아래 추가로 라이브러리 이름과 버전을 작성합니다.
celery==5.2.7
redis==4.3.4
(venv)$ pip install -r requirements.txt
main.py를 아래와 같이 수정합니다.
from celery import Celery
from fastapi import FastAPI
app = FastAPI()
celery = Celery(
__name__,
broker="redis://127.0.0.1:6379/0",
backend="redis://127.0.0.1:6379/0"
)
@app.get("/")
async def root():
return {"message": "Hello World"}
@celery.task
def divide(x, y):
import time
time.sleep(5)
return x / y
- FastAPI 인스턴스 생성 이후 Celery 인스턴스를 새롭게 만들어줍니다.
- Celery의
broker
,backend
키워드 매개변수에redis://127.0.0.1:6379/0
문자열 값을 맵핑처리합니다.
추후 별도의 설정 파일에 작성하고 이를 환경변수로 불러 오도록 처리 할 수 있습니다. - divide함수는 celery task를 이용하여 long-runnning task를 시뮬레이션 하기 위해 정의하였습니다.
Celery에 task 전달하기
설정이 완료되면 Celery에 task를 보내서 어떻게 작동하는지 확인해 보겠습니다.
터미널 창에 프로젝트 디렉토리로 이동하여 가상 환경을 활성화 한 후에 아래 명령어를 실행합니다.
(venv)$ celery -A main.celery worker --loglevel=info
아래 로그를 확인 할 수 있습니다.
[config]
.> app: main:0x10ad0d5f8
.> transport: redis://127.0.0.1:6379/0
.> results: redis://127.0.0.1:6379/0
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. main.divide
기존 터미널 창으로 돌아와서 python shell로 접속합니다.
(venv)$ python
celery worker에 몇 가지 task를 보내봅니다.
>>> from main import app, divide
>>> task = divide.delay(1,2)
- message broker에 새로운 message를 전달하기 위해서 delay 메소드를 사용했습니다. 큐로부터 전달받은 워커는
task를 pickup하여서 실행하게됩니다. - 작업이 백그라운에서 실행되는 동안 실행이 완료됩니다.
Celery Worker 터미널로 이동하면 아래와 비슷한 내용이 표시됩니다.
[2024-03-22 15:14:02,089: INFO/MainProcess] Task main.divide[0dd7c3a2-27c0-4bef-bc42-31ca4f1007bd] received
[2024-03-22 15:14:07,109: INFO/ForkPoolWorker-8] Task main.divide[0dd7c3a2-27c0-4bef-bc42-31ca4f1007bd] succeeded in 5.017738905007718s: 0.5
worker process가 15:14:02 때, task를 받았습니다. 그리고 태스크를 시작하고 마치는데 5초가 소요되었습니다.
그리고 한두개 정도의 task를 더합니다. 그리고 위에서 했던 방식대로 해보고, 어떻게 flow가 흘러가는지
위의 첨부한 그림을 보고 판단해봅니다.
하나. 클라이언트(Client): 사용자의 요청을 보내는 주체입니다.
둘. FastAPI 애플리케이션: 클라이언트로부터의 요청을 받아 처리하는 엔드포인트로, 이 경우 작업을 큐에 추가하는 프로듀서(Producer)의 역할을 합니다.
셋. 메시지 브로커(Message Broker): FastAPI 애플리케이션(프로듀서)이 생성한 태스크 메시지를 받아서 저장하고, Celery 워커(컨슈머)에게 전달하는 중개자입니다.
넷. Celery 워커(Worker) 1 & 2: 메시지 브로커로부터 태스크를 받아 실제로 처리하는 실행 단위입니다. 이 워커들은 컨슈머의 역할을 하며, 배분된 태스크를 실행합니다.
다섯. 결과 백엔드(Result Backend): 태스크 처리 결과를 저장하는 저장소로, Celery 워커가 작업을 마친 후 결과 데이터를 여기에 저장합니다.
여섯. Celery Flower: Celery 애플리케이션과 메시지 브로커에서 처리되는 메시지를 모니터링하는 도구로, 시스템의 상태와 워커의 활동을 실시간으로 볼 수 있게 해줍니다.
이 시스템에서는 다음과 같은 단계로 작업이 진행됩니다:
- 1단계: FastAPI 앱이 클라이언트로부터 요청을 받고, 이를 기반으로 태스크를 생성하여 메시지 브로커에 전송합니다.
- 2단계: 메시지 브로커는 이 태스크를 Celery 워커에게 전달하고, 워커는 이를 받아 처리합니다. 태스크가 완료되면 결과를 결과 백엔드에 저장하고 태스크의 상태를 업데이트합니다.
- 3단계: 태스크가 메시지 브로커를 통해 큐에 추가된 후, FastAPI 애플리케이션은 결과 백엔드를 확인하여 태스크의 상태와 결과를 확인할 수 있습니
이러한 프로세스는 사용자 요청에 대한 즉각적인 응답을 가능하게 하면서, 시간이 오래 걸리는 작업을 비동기적으로 처리하고, 시스템의 전반적인 성능을 향상시키는 데 도움을 줍니다.
추가 task를 만들어 봅니다.
>>> task = divide.delay(1,2)
>>>type(task)
<class 'celery.result.AsyncResult'>
delay 메소드 호출 이후 AsyncResult
인스턴스를 반환 받게됩니다.
해당 인스턴스를 통해서 task의 상태와 더불어 값 또는 exception details을 확인 할 수 있습니다.
다시 새로운 task를 생성해봅니다. 그리고 print메소드로 task.state
와 task.result
를 확인 해봅니다.
>>> print(task.state, task.result)
PENDING None
>>> print(task.state, task.result)
PENDING None
>>> print(task.state, task.result)
PENDING None
>>> print(task.state, task.result)
PENDING None
>>> print(task.state, task.result)
PENDING None
>>> print(task.state, task.result)
SUCCESS 0.5
>>> print(task.state, task.result)
SUCCESS 0.5
만약 에러가 발생한다면 어떻게 될까요?
>>> task = divide.delay(1, 0)
# 몇 초가 지난뒤 state와 result를 확인해봅니다.
>>> task.state
'FAILURE'
>>> task.result
ZeroDivisionError('division by zero')
Flower를 이용한 Celery 모니터링
Flower는 Celery용 실시간 웹 애플리케이션 모니터링 및 관리 도구입니다.
새 터미널 창을 열고 프로젝트 디렉터리로 이동합니다. 가상 환경을 활성화한 후 Flower를 설치합니다
requirements.txt 파일 아래에 추가합니다.
flower==1.2.0
(venv)$ pip install -r requirements.txt
설치가 완료되면 서버를 기동합니다.
(venv)$ celery -A main.celery flower --port=5555
flower에서 제공하는 GUI dashboard를 보기 위해서 http://localhost:5555로 접속합니다.
첫 터미널로 돌아와서 4개의 task를 생성해봅니다. 그리고 하나의 task는 fail 나도록 이중 한개의 테스크의 두번째 인자값에 0을 넣어줍니다.
>>> task = divide.delay(1, 2)
>>> task = divide.delay(1, 0)
>>> task = divide.delay(1, 2)
>>> task = divide.delay(1, 3)
Flower웹화면으로 돌아가면 아래와 같이 task의 상태와 값등을 확인 할 수 있습니다.
UUID컬럼중 state가 FAILURE가 된 부분이 값을 복사합니다. 그리고 터미널 윈도우를 열어서 FastAPI shell에서 아래
코드를 실행해봅니다.
>>> from celery.result import AsyncResult
>>> task = AsyncResult('007510f0-a48f-44b3-94ca-794a4c2cf8ee') # replace with your UUID
>>>
>>> task.state
'FAILURE'
>>>
>>> task.result
ZeroDivisionError('division by zero')
특정 task의 UUID값을 통해서 상태와 result를 파악 할 수 있습니다.
'프로그래밍 언어 > 파이썬' 카테고리의 다른 글
Celery와 FastAPI - 4 (0) | 2024.03.24 |
---|---|
Celery와 FastAPI - 3 (0) | 2024.03.23 |
Celery와 FastAPI - 1 (0) | 2024.03.22 |
classmethod와 staticmethod의 차이 (0) | 2024.03.21 |
파이썬에서 접근제어 지시자 (0) | 2024.03.20 |