Celery와 FastAPI - 2

2024. 3. 22. 22:41·프로그래밍 언어/파이썬

이번 글의 목표

  1. FastAPI로 Celery 설정 하기
  2. Python Shell에서 Celery 작업 실행 하기
  3. Flower로 Celery 앱 모니터링 하기

Redis 설정

레디스를 OS에 직접 설치하거나 Docker를 이용하여 설치 및 실행 할 수 있습니다.

Docker

docker run -p 6379:6379 --name some-redis -d redis

Docker Hub에서 Redis docker 이미지가 다운로드 되어서 포트 6379로 백그라운드 작업으로 실행됩니다.

아래 명령어를 통해서 레디스가 실행중인지 확인 할 수 있습니다.

docker exec -it some-redis redis-cli ping # PONG

Celery 설정

1단계 : FastAPI 앱에서 task message를 message broker에 전달합니다.

2단계 : Celery Woker는 message broker로부터 전달 받은 task를 처리합니다. task가 처리된 후, task 결과를 result backend에 저장합니다. 그리고 task 상태를 업데이트 합니다.
3단계 : FastAPI 앱에서 task를 message broker에 보낸 뒤 task status와 결과를 result backend로부터 확인합니다.

FastAPI 프로젝트 생성

$ mkdir fastapi-celery-project && cd fastapi-celery-project

파이썬 가상환경을 생성하고 실행합니다.

$ python3.10 -m venv venv
$ source venv/bin/activate
(venv)$

requirements.txt파일을 생성하고 아래와 같이 작성합니다.

fastapi==0.79.0
uvicorn[standard]==0.18.2

main.py 파일을 작성해줍니다.

from fastapi import FastAPI

app = FastAPI()


@app.get("/")
async def root():
    return {"message": "Hello World"}

앱을 실행합니다.

(venv)$ uvicorn main:app --reload

INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO:     Started reloader process [2061] using WatchFiles
INFO:     Started server process [2063]
INFO:     Waiting for application startup.
INFO:     Application startup complete.

http://localhost:8000 를 브라우저 주소창에 입력하면 {"message":"Hello World"} 결과를 볼 수 있습니다.

프로젝트 구조는 아래와 같습니다.

├── main.py
└── requirements.txt

Celery 추가

Celery를 설치하고 설정해봅니다.
이를 위해 requirements.txt파일 아래 추가로 라이브러리 이름과 버전을 작성합니다.

celery==5.2.7
redis==4.3.4
(venv)$ pip install -r requirements.txt

main.py를 아래와 같이 수정합니다.

from celery import Celery
from fastapi import FastAPI

app = FastAPI()


celery = Celery(
    __name__,
    broker="redis://127.0.0.1:6379/0",
    backend="redis://127.0.0.1:6379/0"
)


@app.get("/")
async def root():
    return {"message": "Hello World"}


@celery.task
def divide(x, y):
    import time
    time.sleep(5)
    return x / y
  1. FastAPI 인스턴스 생성 이후 Celery 인스턴스를 새롭게 만들어줍니다.
  2. Celery의 broker, backend 키워드 매개변수에 redis://127.0.0.1:6379/0문자열 값을 맵핑처리합니다.
    추후 별도의 설정 파일에 작성하고 이를 환경변수로 불러 오도록 처리 할 수 있습니다.
  3. divide함수는 celery task를 이용하여 long-runnning task를 시뮬레이션 하기 위해 정의하였습니다.

Celery에 task 전달하기

설정이 완료되면 Celery에 task를 보내서 어떻게 작동하는지 확인해 보겠습니다.

터미널 창에 프로젝트 디렉토리로 이동하여 가상 환경을 활성화 한 후에 아래 명령어를 실행합니다.

(venv)$ celery -A main.celery worker --loglevel=info

아래 로그를 확인 할 수 있습니다.

[config]
.> app:         main:0x10ad0d5f8
.> transport:   redis://127.0.0.1:6379/0
.> results:     redis://127.0.0.1:6379/0
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)

[queues]
.> celery           exchange=celery(direct) key=celery


[tasks]
  . main.divide

기존 터미널 창으로 돌아와서 python shell로 접속합니다.

(venv)$ python

celery worker에 몇 가지 task를 보내봅니다.

>>> from main import app, divide
>>> task = divide.delay(1,2)
  1. message broker에 새로운 message를 전달하기 위해서 delay 메소드를 사용했습니다. 큐로부터 전달받은 워커는
    task를 pickup하여서 실행하게됩니다.
  2. 작업이 백그라운에서 실행되는 동안 실행이 완료됩니다.

Celery Worker 터미널로 이동하면 아래와 비슷한 내용이 표시됩니다.

[2024-03-22 15:14:02,089: INFO/MainProcess] Task main.divide[0dd7c3a2-27c0-4bef-bc42-31ca4f1007bd] received
[2024-03-22 15:14:07,109: INFO/ForkPoolWorker-8] Task main.divide[0dd7c3a2-27c0-4bef-bc42-31ca4f1007bd] succeeded in 5.017738905007718s: 0.5

worker process가 15:14:02 때, task를 받았습니다. 그리고 태스크를 시작하고 마치는데 5초가 소요되었습니다.


그리고 한두개 정도의 task를 더합니다. 그리고 위에서 했던 방식대로 해보고, 어떻게 flow가 흘러가는지
위의 첨부한 그림을 보고 판단해봅니다.

하나. 클라이언트(Client): 사용자의 요청을 보내는 주체입니다.

둘. FastAPI 애플리케이션: 클라이언트로부터의 요청을 받아 처리하는 엔드포인트로, 이 경우 작업을 큐에 추가하는 프로듀서(Producer)의 역할을 합니다.

셋. 메시지 브로커(Message Broker): FastAPI 애플리케이션(프로듀서)이 생성한 태스크 메시지를 받아서 저장하고, Celery 워커(컨슈머)에게 전달하는 중개자입니다.


넷. Celery 워커(Worker) 1 & 2: 메시지 브로커로부터 태스크를 받아 실제로 처리하는 실행 단위입니다. 이 워커들은 컨슈머의 역할을 하며, 배분된 태스크를 실행합니다.

다섯. 결과 백엔드(Result Backend): 태스크 처리 결과를 저장하는 저장소로, Celery 워커가 작업을 마친 후 결과 데이터를 여기에 저장합니다.

여섯. Celery Flower: Celery 애플리케이션과 메시지 브로커에서 처리되는 메시지를 모니터링하는 도구로, 시스템의 상태와 워커의 활동을 실시간으로 볼 수 있게 해줍니다.

이 시스템에서는 다음과 같은 단계로 작업이 진행됩니다:

  • 1단계: FastAPI 앱이 클라이언트로부터 요청을 받고, 이를 기반으로 태스크를 생성하여 메시지 브로커에 전송합니다.
  • 2단계: 메시지 브로커는 이 태스크를 Celery 워커에게 전달하고, 워커는 이를 받아 처리합니다. 태스크가 완료되면 결과를 결과 백엔드에 저장하고 태스크의 상태를 업데이트합니다.
  • 3단계: 태스크가 메시지 브로커를 통해 큐에 추가된 후, FastAPI 애플리케이션은 결과 백엔드를 확인하여 태스크의 상태와 결과를 확인할 수 있습니

    이러한 프로세스는 사용자 요청에 대한 즉각적인 응답을 가능하게 하면서, 시간이 오래 걸리는 작업을 비동기적으로 처리하고, 시스템의 전반적인 성능을 향상시키는 데 도움을 줍니다.

추가 task를 만들어 봅니다.

>>> task = divide.delay(1,2)

>>>type(task)
<class 'celery.result.AsyncResult'>

delay 메소드 호출 이후 AsyncResult 인스턴스를 반환 받게됩니다.
해당 인스턴스를 통해서 task의 상태와 더불어 값 또는 exception details을 확인 할 수 있습니다.

다시 새로운 task를 생성해봅니다. 그리고 print메소드로 task.state와 task.result를 확인 해봅니다.

>>> print(task.state, task.result)
PENDING None

>>> print(task.state, task.result)
PENDING None

>>> print(task.state, task.result)
PENDING None

>>> print(task.state, task.result)
PENDING None

>>> print(task.state, task.result)
PENDING None

>>> print(task.state, task.result)
SUCCESS 0.5

>>> print(task.state, task.result)
SUCCESS 0.5

만약 에러가 발생한다면 어떻게 될까요?

>>> task = divide.delay(1, 0)

# 몇 초가 지난뒤 state와 result를 확인해봅니다. 

>>> task.state
'FAILURE'

>>> task.result
ZeroDivisionError('division by zero')

Flower를 이용한 Celery 모니터링

Flower는 Celery용 실시간 웹 애플리케이션 모니터링 및 관리 도구입니다.
새 터미널 창을 열고 프로젝트 디렉터리로 이동합니다. 가상 환경을 활성화한 후 Flower를 설치합니다
requirements.txt 파일 아래에 추가합니다.

flower==1.2.0
(venv)$ pip install -r requirements.txt

설치가 완료되면 서버를 기동합니다.

(venv)$ celery -A main.celery flower --port=5555

flower에서 제공하는 GUI dashboard를 보기 위해서 http://localhost:5555로 접속합니다.

첫 터미널로 돌아와서 4개의 task를 생성해봅니다. 그리고 하나의 task는 fail 나도록 이중 한개의 테스크의 두번째 인자값에 0을 넣어줍니다.

>>> task = divide.delay(1, 2)
>>> task = divide.delay(1, 0)
>>> task = divide.delay(1, 2)
>>> task = divide.delay(1, 3)

Flower웹화면으로 돌아가면 아래와 같이 task의 상태와 값등을 확인 할 수 있습니다.

 

UUID컬럼중 state가 FAILURE가 된 부분이 값을 복사합니다. 그리고 터미널 윈도우를 열어서 FastAPI shell에서 아래 

코드를 실행해봅니다. 

>>> from celery.result import AsyncResult
>>> task = AsyncResult('007510f0-a48f-44b3-94ca-794a4c2cf8ee')  # replace with your UUID
>>>
>>> task.state
'FAILURE'
>>>
>>> task.result
ZeroDivisionError('division by zero')

 

특정 task의 UUID값을 통해서 상태와 result를 파악 할 수 있습니다. 

 

저작자표시 (새창열림)

'프로그래밍 언어 > 파이썬' 카테고리의 다른 글

Celery와 FastAPI - 4  (0) 2024.03.24
Celery와 FastAPI - 3  (1) 2024.03.23
Celery와 FastAPI - 1  (0) 2024.03.22
classmethod와 staticmethod의 차이  (0) 2024.03.21
파이썬에서 접근제어 지시자  (0) 2024.03.20
'프로그래밍 언어/파이썬' 카테고리의 다른 글
  • Celery와 FastAPI - 4
  • Celery와 FastAPI - 3
  • Celery와 FastAPI - 1
  • classmethod와 staticmethod의 차이
hyeseong-dev
hyeseong-dev
안녕하세요. 백엔드 개발자 이혜성입니다.
  • hyeseong-dev
    어제 오늘 그리고 내일
    hyeseong-dev
  • 전체
    오늘
    어제
    • 분류 전체보기 (284)
      • 여러가지 (108)
        • 알고리즘 & 자료구조 (72)
        • 오류 (4)
        • 이것저것 (29)
        • 일기 (2)
      • 프레임워크 (39)
        • 자바 스프링 (39)
        • React Native (0)
      • 프로그래밍 언어 (38)
        • 파이썬 (30)
        • 자바 (3)
        • 스프링부트 (5)
      • 운영체제 (0)
      • DB (17)
        • SQL (0)
        • Redis (17)
      • 클라우드 컴퓨팅 (2)
        • 도커 (2)
        • AWS (0)
      • 스케쥴 (65)
        • 세미나 (0)
        • 수료 (0)
        • 스터디 (24)
        • 시험 (41)
      • 트러블슈팅 (1)
      • 자격증 (0)
        • 정보처리기사 (0)
      • 재태크 (5)
        • 암호화폐 (5)
        • 기타 (0)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

  • 공지사항

  • 인기 글

  • 태그

    그리디
    프로그래머스
    시험
    Spring Boot
    Python
    DP
    완전탐색
    java
    AWS
    항해99
    WebFlux
    Redis
    EC2
    FastAPI
    백준
    OOP
    RDS
    celery
    #개발자포트폴리오 #개발자이력서 #개발자취업 #개발자취준 #코딩테스트 #항해99 #취리코 #취업리부트코스
    Docker-compose
    파이썬
    자바
    spring
    취업리부트
    mybatis
    ecs
    Spring WebFlux
    docker
    reactor
    SAA
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.0
hyeseong-dev
Celery와 FastAPI - 2
상단으로

티스토리툴바