프로그래밍 언어/파이썬

Celery와 FastAPI - 3

hyeseong-dev 2024. 3. 23. 20:38

이번에는 테스트와 프로젝트 스케일 확장을 쉽게하기 위해, factory patter으로 리팩토링 해보겠습니다.

  • 팩토리 패턴이란?
  • 디자인 패턴중의 하나인 팩토리(Factory) 패턴은 객체 생성 처리를 서브 클래스에 위임하여, 객체 생성을 위한 인터페이스를 정의하는데 목적이 있습니다. 즉, 객체를 생성하는 코드와 사용하는 코드를 분리하여, 클라이언트가 특정 클래스의 인스턴스를 직접 사용하지 않고도 필요한 객체를 얻을 수 있도록 하는 생성 패턴입니다.

목표

  1. FastAPI 앱 인스턴스 초기화를 위해 애플리케이션 팩토리 생성
  2. 애플리케이션 팩토리 패턴과 작동하도록 Celery 설정
  3. SQLAlchemy와 Alembic을 사용하여 데이터베이스 변경 사항 관리

App factory

'project'라는 새로운 디랙토리를 만들어줍니다. __init__.py파일을 만들고 아래와 같이 작성합니다.

from fastapi import FastAPI


def create_app() -> FastAPI:
    app = FastAPI()

    @app.get("/")
    async def root():
        return {"message": "Hello World"}

    return app

FastAPI app 사용을 위해 여러번 호출 가능하도록 factory function을 정의 하였습니다.

기존 main.py는 아래와 같이 수정하겠습니다. create_app 함수를 호출하여 인스턴스를 사용 할 수 있도록 만듭니다.

from project import create_app

app = create_app()

정상적으로 서버가 기동하는지 확인해봅니다.

(venv)$ uvicorn main:app --reload

INFO:     Uvicorn running on http://localhost:8000 (Press CTRL+C to quit)
INFO:     Started reloader process [96439] using watchgod
INFO:     Started server process [96482]
INFO:     Waiting for application startup.
INFO:     Application startup complete.

localhost:8000에 접속 되는지 브라우저에서 확인해봅니다.

Database Support

Dependencies

DB 쿼리와 마이그레이션을 수월하게 도와줄 SQLAlchemy, alembic을 설치해봅니다.

requirements파일 하단에 아래 내용을 추가합니다.

SQLAlchemy==1.4.40
alembic==1.8.1
$ pip install -r requirements.txt

설정

project 디렉토리에서 config.py파일을 만들어줍니다. 아래와 같이 작성합니다.

import os 
import pathlib
from functools import lru_cache

class BaseConfig:
    BASE_DIR: pathlib.Path = pathlib.Path(__file__).parent.parent

    DATABASE_URL: str = os.environ.get("DATABASE_URL", f"sqlite:///{BASE_DIR}/db.sqlite3")
    DATABASE_CONNECT_DICT: dict = {}

class ProductionConfig(BaseConfig):
    pass


class TestingConfig(BaseConfig):
    pass

@lru_cache()
def get_settings():
    config_cls_dict = {
        "development": DevelopmentConfig,
        "production": ProductionConfig,
        "testing": TestingConfig
    }

    config_name = os.environ.get("FASTAPI_CONFIG", "development")
    config_cls = config_cls_dict[config_name]
    return config_cls()

settings = get_settings()

코드를 분석하면서 각 부분이 어떻게 작동하는지 확인해 봅니다.

BaseConfig 클래스

  • BaseConfig 클래스는 모든 환경 설정 클래스의 기본이 되며, 공통 설정을 담고 있습니다.
  • BASE_DIR은 현재 파일(config.py)의 부모 디렉토리의 부모를 가리키는 pathlib.Path 객체입니다. 이는 프로젝트의 루트 디렉토리를 의미합니다.
  • DATABASE_URL은 데이터베이스 연결 URL을 설정합니다. 기본값으로 SQLite를 사용하며, BASE_DIR을 기반으로 데이터베이스 파일의 위치를 지정합니다. 환경 변수 DATABASE_URL을 통해 다른 데이터베이스(예: PostgreSQL)에 연결 정보를 설정할 수 있습니다.
  • DATABASE_CONNECT_DICT는 데이터베이스 연결에 사용될 추가 옵션을 담을 수 있는 딕셔너리입니다(비어 있음).

DevelopmentConfig, ProductionConfig, TestingConfig 클래스

  • 이 세 클래스는 각각 개발, 운영, 테스트 환경에 특화된 설정을 할 수 있게 합니다. 현재는 BaseConfig를 직접 상속만 받고 추가 설정은 없습니다. 필요에 따라 각 환경별로 다른 설정을 추가할 수 있습니다.

get_settings 함수

  • @lru_cache() 데코레이터는 이 함수의 결과를 캐싱하여, 동일한 입력값에 대해 함수가 다시 호출될 때마다 계산을 반복하지 않고 캐싱된 결과를 반환합니다. 이는 설정을 여러 번 불러오더라도 성능 저하 없이 빠르게 처리할 수 있게 해줍니다.
  • config_cls_dict 딕셔너리는 문자열 키(환경 이름)와 해당 환경에 대한 설정 클래스를 매핑합니다.
  • os.environ.get("FASTAPI_CONFIG", "development")을 통해 환경 변수 FASTAPI_CONFIG의 값을 읽어오며, 설정된 값이 없다면 기본값으로 "development"를 사용합니다. 이는 현재 어떤 환경(개발, 운영, 테스트)에서 실행되고 있는지를 결정합니다.
  • 결정된 설정 클래스를 인스턴스화하여 반환합니다. 이렇게 함으로써, 환경에 맞는 설정을 쉽게 불러와 사용할 수 있습니다.

settings 변수

  • get_settings() 함수를 호출하여 얻은 설정 객체를 settings 변수에 할당합니다. 이 변수를 통해 프로젝트의 다른 부분에서 설정에 접근할 수 있습니다.

주의 사항

  • Pydantic의 BaseSettings 클래스를 사용하지 않는 이유는 Celery와 Flower를 사용할 때 발생할 수 있는 특정 에러(KeyError('__signature__'))를 피하기 위함입니다. 이는 Pydantic과 Celery 간의 호환성 문제 때문에 발생할 수 있으며, 여기서는 이 문제를 회피하기 위해 기본 Python 클래스와 기능을 사용하여 설정을 관리합니다.

  • pydantic?

  • Pydantic은 Python에서 데이터 검증과 설정 관리를 위한 라이브러리입니다. Pydantic을 사용하면 데이터를 Python 클래스로 모델링하고, 타입 힌트를 활용하여 데이터 유효성 검증을 간단하게 할 수 있습니다. Pydantic은 FastAPI와 같은 현대적인 웹 프레임워크에서 널리 사용되며, 설정 파일 관리, API 요청 및 응답 데이터의 검증, 데이터 파싱과 같은 다양한 상황에 활용됩니다.

이 설정 파일 구조는 프로젝트의 다양한 환경에 따라 유연하게 설정을 관리할 수 있게 해줍니다. 필요에 따라 환경별로 다른 설정을 쉽게 추가하거나 수정할 수 있어, 개발 및 배포 과정에서 큰 유연성을 제공합니다.

프로젝트 구조는 아래와 같습니다.

├── main.py
├── project
│   ├── __init__.py
│   └── config.py
└── requirements.txt

SQLAlchemy 임포트

project디렉토리 아래 database.py파일을 아래와 같이 작성합니다.

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

from project.config import settings

# https://fastapi.tiangolo.com/tutorial/sql-databases/#create-the-sqlalchemy-engine
engine = create_engine(
    settings.DATABASE_URL, connect_args=settings.DATABASE_CONNECT_DICT
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

코드 설명

이 database.py 파일은 SQLAlchemy를 사용하여 데이터베이스 연결을 설정하고 세션을 관리하는 기본 구조를 구성합니다. SQLAlchemy는 Python에서 가장 인기 있는 ORM(Object-Relational Mapping) 라이브러리 중 하나로, 데이터베이스와 객체 지향 프로그래밍 사이의 브릿지 역할을 합니다. 각 코드 라인의 의미를 자세히 설명하겠습니다.

임포트 구문

  • create_engine: SQLAlchemy의 엔진을 생성하는 함수입니다. 엔진은 SQLAlchemy와 데이터베이스 간의 핵심 연결을 관리합니다.
  • declarative_base: 클래스 정의를 기반으로 테이블 스키마를 생성하는 데 사용되는 함수입니다. 이를 통해 Python 클래스를 데이터베이스 테이블로 매핑할 수 있습니다.
  • sessionmaker: 데이터베이스와의 모든 대화를 관리하는 세션 객체를 생성하는 팩토리 함수입니다. 세션을 통해 객체를 데이터베이스에 추가, 수정, 삭제할 수 있습니다.

프로젝트 설정에서 데이터베이스 설정 로딩

  • from project.config import settings: 앞서 설명한 config.py 파일에서 정의한 설정 객체를 임포트합니다. 이를 통해 데이터베이스 URL과 연결 옵션을 가져올 수 있습니다.

SQLAlchemy 엔진 생성

  • engine = create_engine(...): create_engine 함수를 사용해 SQLAlchemy 엔진을 생성합니다. 이 엔진은 settings.DATABASE_URL에서 제공하는 데이터베이스 연결 정보를 사용합니다. connect_args 매개변수는 추가 연결 옵션을 제공하는 데 사용되며, 여기서는 settings.DATABASE_CONNECT_DICT를 사용합니다.

세션 설정

  • SessionLocal = sessionmaker(...): sessionmaker 함수를 사용해 세션 팩토리 SessionLocal을 생성합니다. 이 팩토리는 개별 요청마다 새로운 세션 인스턴스를 생성하는 데 사용됩니다. autocommit=False는 자동 커밋을 비활성화하여 명시적인 커밋이 필요함을 나타내고, autoflush=False는 세션에서 객체 상태 변경을 데이터베이스에 자동으로 플러시하지 않도록 설정합니다. bind=engine은 생성된 세션을 앞서 생성한 엔진에 바인딩하여, 해당 엔진을 사용하여 데이터베이스 연결을 관리하도록 설정합니다.

베이스 클래스 생성

  • Base = declarative_base(): declarative_base 함수를 호출하여 모든 ORM 모델의 기본 클래스인 Base를 생성합니다. 이 Base 클래스를 상속받아 모델을 정의하면, SQLAlchemy가 Python 클래스를 데이터베이스 테이블로 매핑하는 구조를 자동으로 생성할 수 있습니다.

이 구성을 통해 애플리케이션에서 SQLAlchemy를 사용하여 데이터베이스 작업을 수행할 준비를 마칩니다. 데이터 모델을 정의하고, 이 파일에서 생성한 SessionLocal 인스턴스를 사용하여 데이터베이스 세션을 관리할 수 있게 됩니다.

Alembic 설정

alembic 초기화

(venv)$ alembic init alembic

프로젝트에 새로운 폴더와 파일이 보일 것입니다.

├── alembic               # new
│   ├── README
│   ├── env.py
│   ├── script.py.mako
│   └── versions
├── alembic.ini           # new
├── main.py
├── project
│   ├── __init__.py
│   ├── config.py
│   └── database.py
└── requirements.txt

alembic/env.py 파일을 아래와 같이 수정합니다.

from logging.config import fileConfig

from sqlalchemy import engine_from_config
from sqlalchemy import pool

from alembic import context

from project import create_app                 # new
from project.config import settings            # new
from project.database import Base              # new

# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config

# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
    fileConfig(config.config_file_name)

# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
config.set_main_option("sqlalchemy.url", str(settings.DATABASE_URL))        # new

fastapi_app = create_app()    # new

target_metadata = Base.metadata       # new

# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.

...

Alembic은 SQLAlchemy를 사용하는 프로젝트에서 데이터베이스 마이그레이션을 관리하는 도구입니다. Alembic이 애플리케이션의 데이터베이스 설정과 모델을 인식하고, 데이터베이스 스키마 마이그레이션을 자동으로 생성하도록 설정하는 데 필요합니다.

변경된 부분 설명

추가된 임포트

  • from project import create_app: FastAPI 애플리케이션 인스턴스를 생성하는 함수를 임포트합니다. 이 함수를 호출하면 애플리케이션 초기화 중에 데이터베이스 모델 등이 로드됩니다.
  • from project.config import settings: 애플리케이션 설정을 담고 있는 settings 객체를 임포트합니다. 여기에는 데이터베이스 연결 정보(DATABASE_URL) 등이 포함됩니다.
  • from project.database import Base: SQLAlchemy의 모델 베이스를 임포트합니다. 이 베이스는 모든 데이터베이스 모델의 기본 클래스로, 모델의 메타데이터 정보를 포함합니다.

Alembic 설정 수정

  • config.set_main_option("sqlalchemy.url", str(settings.DATABASE_URL)): Alembic 설정 객체의 sqlalchemy.url 옵션을 프로젝트 설정에서 가져온 데이터베이스 URL로 설정합니다. 이는 Alembic이 데이터베이스 연결을 설정할 때 사용할 문자열입니다.
  • fastapi_app = create_app(): FastAPI 애플리케이션 인스턴스를 생성합니다. 이는 애플리케이션 구성요소가 초기화되도록 하여, Alembic이 모델 정의를 포함하여 필요한 모든 정보에 접근할 수 있게 합니다.
  • target_metadata = Base.metadata: Alembic에게 마이그레이션 생성 시 참조할 메타데이터 객체를 설정합니다. 이 메타데이터는 애플리케이션의 모든 데이터베이스 모델 정보를 포함합니다. 이를 통해 Alembic은 데이터베이스 스키마 변경 사항을 자동으로 감지하고 마이그레이션 파일을 생성할 수 있습니다.

마이그레이션 관리의 이점

이 설정을 통해 Alembic은 애플리케이션의 데이터베이스 모델 변경 사항을 자동으로 감지하고, 이러한 변경사항에 기반한 마이그레이션 스크립트를 생성할 수 있습니다. 마이그레이션 스크립트는 데이터베이스 스키마를 버전 관리할 수 있게 해주며, 개발자는 데이터베이스 스키마 변경을 코드로 관리하고, 다양한 개발 환경 간에 데이터베이스 스키마를 일관되게 유지할 수 있습니다. 이는 팀 작업 및 배포 과정에서 데이터베이스 관련 문제를 최소화하는 데 큰 도움이 됩니다.

db.sqlite3 파일, 여기서는 파일형 데이터베이스 생성을 하기 위해 아래 명령어를 수행합니다.

(venv)$ python

>>> from main import app
>>> from project.database import Base, engine
>>> Base.metadata.create_all(bind=engine)
>>> exit()

(venv)ls db.sqlite3
db.sqlite3

아직 모델을 정의하지 않았지만 데이터베이스 마이그레이션을 진행해보겠습니다.

(venv)$ alembic revision --autogenerate

INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
  Generating alembic/versions/512d31e06401_.py ...  done

(venv)$ alembic upgrade head
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade  -> 512d31e06401, empty message

Applicaiton 구조

FastAPI 프로젝트에서 "Django Apps"나 "Flask Blueprints"와 유사한 방식으로 애플리케이션의 구조를 나누고, 공통 기능을 그룹화하여 재사용 가능한 컴포넌트로 만드는 방법을 소개합니다. 이 예제에서는 "users"라는 폴더를 생성하여 사용자 관련 기능을 그룹화하는 방법을 설명하겠습니다.

project 디렉토리 내에 users 디렉토리를 만듭니다. __init__.py파일을 만들고 아래와 같이 정의합니다.

from fastapi import APIRouter

users_router = APIRouter(
    prefix="/users",
)

from . import models

코드 설명

"users" 폴더와 __init__.py 파일

  • "users" 폴더:
    FastAPI 프로젝트 내에 "project" 폴더 아래 "users"라는 새 폴더를 생성합니다. 이 폴더는 사용자 관련 모델, 스키마, 라우트 등 모든 관련 파일을 포함하게 됩니다. 이렇게 기능별로 코드를 모듈화하여 구조화하는 방식은 프로젝트의 유지 보수성을 높이고, 기능별로 명확하게 구분할 수 있게 해줍니다.
  • __init__.py 파일:
    "users" 폴더 내에 __init__.py 파일을 생성합니다. 이 파일은 해당 폴더가 Python 패키지로 인식되도록 하는 역할을 합니다. 또한, 이 파일 내에서 FastAPI의 APIRouter를 사용하여 "users" 관련 라우트를 초기화하고 구성합니다.

APIRouter를 사용한 라우팅

  • from fastapi import APIRouter:
    FastAPI의 APIRouter를 임포트합니다. APIRouter는 특정 경로(prefix)에 관련된 라우트들을 그룹화하는 기능을 제공합니다. 이는 Flask의 블루프린트나 Django의 앱 시스템과 유사한 개념입니다.
  • users_router = APIRouter(prefix="/users"):
    사용자 관련 API 엔드포인트를 "/users" 경로 아래에 그룹화하기 위해 APIRouter 인스턴스를 생성합니다. prefix="/users"는 이 라우터에 속하는 모든 라우트가 "/users" 경로로 시작하도록 설정합니다.
  • from . import models # noqa:
    "users" 폴더 내의 models.py 파일을 임포트합니다. 이는 사용자 데이터를 정의하는 데이터 모델이 포함된 파일입니다. # noqa는 일반적으로 코드 검사 도구에서 특정 경고를 무시하도록 하는 주석입니다. 여기서는 모델이 실제로 이 위치에서 사용되지 않더라도 임포트해야 하는 상황을 처리합니다. 이렇게 하면 models.py 파일 내의 모델 정의가 SQLAlchemy 등의 ORM에 의해 인식될 수 있습니다.
    이 구조를 통해 FastAPI 애플리케이션 내에서 "users" 기능 관련 코드를 모듈화하고 재사용 가능한 방식으로 관리할 수 있습니다. 이는 큰 프로젝트를 더 쉽게 관리하고 확장할 수 있게 해주며, 코드의 가독성과 유지 보수성을 향상시킵니다.

project/users/model.py 파일을 생성하고 아래와 같이 정의합니다.

from sqlalchemy import Column, Integer, String

from project.database import Base


class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, autoincrement=True)
    username = Column(String(128), unique=True, nullable=False)
    email = Column(String(128), unique=True, nullable=False)

    def __init__(self, username, email, *args, **kwargs):
        self.username = username
        self.email = email

project/__init__.py를 수정합니다.

from fastapi import FastAPI


def create_app() -> FastAPI:
    app = FastAPI()

    from project.users import users_router                # new
    app.include_router(users_router)                      # new

    @app.get("/")
    async def root():
        return {"message": "Hello World"}

    return app

from project.users import users_router코드가 호출되면 project/users/__init__.py파일이 실행됩니다.
그러면 models.py 파일도 import 되게 됩니다.

프로젝트 구조는 아래와 같습니다.

├── alembic
│   ├── README
│   ├── env.py
│   ├── script.py.mako
│   └── versions
│       └── 512d31e06401_.py
├── alembic.ini
├── main.py
├── project
│   ├── __init__.py
│   ├── config.py
│   ├── database.py
│   └── users
│       ├── __init__.py
│       └── models.py
└── requirements.txt
  1. main.py - 새로운 FastAPI app 인스턴스를 생성하는데 create_app을 사용합니다.
  2. project/__init__.py - Factory 함수
  3. project/config.py - FastAPI 설정
  4. project/users - 관련 users 도메인과 관련된 모델과 라우터등

Database 작업

다음으로, 새 데이터베이스 마이그레이션을 생성하고 User 모델에 대한 테이블을 생성해 보겠습니다.

(venv)$ alembic revision --autogenerate
# INFO  [alembic.autogenerate.compare] Detected added table 'users'

(venv)$ alembic upgrade head
# Create users table

python shell을 이용하여 DB에 접근하여 CRUD를 할 수 있습니다.

>>> from main import app
>>> from project.database import SessionLocal
>>> from project.users.models import User

>>> user = User(username='test1', email='test1@example.com')
>>> session = SessionLocal()
>>> session.add(user)
>>> session.commit()
>>>
>>> new_session = SessionLocal()
>>> new_session.query(User).first().username
'test1'

>>> exit()

Celery 추가

class BaseConfig:
    BASE_DIR: pathlib.Path = pathlib.Path(__file__).parent.parent

    DATABASE_URL: str = os.environ.get("DATABASE_URL", f"sqlite:///{BASE_DIR}/db.sqlite3")
    DATABASE_CONNECT_DICT: dict = {}

    CELERY_BROKER_URL: str = os.environ.get("CELERY_BROKER_URL", "redis://127.0.0.1:6379/0")            # NEW
    CELERY_RESULT_BACKEND: str = os.environ.get("CELERY_RESULT_BACKEND", "redis://127.0.0.1:6379/0")    # NEW

project/celery_utils.py파일을 새로 만들고 아래와 같이 정의합니다.

from celery import current_app as current_celery_app

from project.config import settings


def create_celery():
    celery_app = current_celery_app
    celery_app.config_from_object(settings, namespace="CELERY")

    return celery_app

BaseConfig 클래스 업데이트

project/config.py 파일의 BaseConfig 클래스에 CELERY_BROKER_URLCELERY_RESULT_BACKEND 설정을 추가함으로써, Celery의 기본 설정을 정의합니다.

  • CELERY_BROKER_URL: Celery가 사용할 메시지 브로커의 URL을 설정합니다. 이 예제에서는 기본값으로 Redis를 사용하며, redis://127.0.0.1:6379/0 주소를 가리킵니다. 메시지 브로커는 Celery 작업자(worker)와 통신하기 위해 사용되는 중간 서비스입니다.
  • CELERY_RESULT_BACKEND: Celery 작업의 결과를 저장할 백엔드의 URL을 설정합니다. 작업의 상태와 결과를 조회할 수 있게 해줍니다. 이 예제 역시 Redis를 결과 백엔드로 사용합니다.

환경 변수(os.environ.get)를 사용하여 이러한 설정값을 동적으로 로드할 수 있게 합니다. 이는 개발, 테스트, 운영 환경에서 다른 설정을 쉽게 적용할 수 있도록 해줍니다.

project/celery_utils.py 파일

이 파일은 Celery 애플리케이션 인스턴스를 생성하고 설정하는 팩토리 함수 create_celery를 정의합니다.

  • from celery import current_app as current_celery_app: 현재 활성화된 Celery 애플리케이션 인스턴스를 가져옵니다. 이를 통해 새 인스턴스를 생성하는 대신, 이미 존재하는 인스턴스를 사용하여 여러 곳에서 Celery 작업을 공유할 수 있습니다.
  • celery_app.config_from_object(settings, namespace="CELERY"): settings 객체에서 Celery 설정을 로드합니다. namespace="CELERY"는 모든 Celery 관련 설정 키가 CELERY_로 시작해야 함을 의미합니다. 예를 들어, 브로커 URL을 설정하기 위해 CELERY_BROKER_URL 키를 사용합니다.

create_celery 함수는 Celery 애플리케이션 인스턴스를 반환합니다. 이 인스턴스는 프로젝트의 다른 부분에서 백그라운드 작업을 정의하고 실행하는 데 사용될 수 있습니다.

이렇게 Celery를 설정하고 초기화하는 과정은 FastAPI 애플리케이션에 비동기 작업 처리 기능을 통합하는 데 필요한 기본 단계입니다. Celery를 사용하면 웹 요청 처리와 동시에 이메일 발송, 데이터 처리 등의 백그라운드 작업을 효율적으로 관리할 수 있습니다.

project/__init__.py를 수정합니다.

Update project/__init__.py:

from fastapi import FastAPI

from project.celery_utils import create_celery   # new


def create_app() -> FastAPI:
    app = FastAPI()

    # do this before loading routes              # new
    app.celery_app = create_celery()             # new

    from project.users import users_router
    app.include_router(users_router)

    @app.get("/")
    async def root():
        return {"message": "Hello World"}

    return app

project/users/tasks.py 파일을 생성하고 정의합니다.

from celery import shared_task


@shared_task
def divide(x, y):
    import time
    time.sleep(5)
    return x / y

project/users/__init__.py파일을 수정합니다. 위의 tasks.py 모듈을 임포트 하기 위해서입니다.

from fastapi import APIRouter

users_router = APIRouter(
    prefix="/users",
)

from . import models, tasks # noqa

main.py 모듈을 수정합니다.

from project import create_app

app = create_app()
celery = app.celery_app

project/__init__.py의 변경 사항

  • 이 파일에서 create_celery 함수를 임포트하여 FastAPI 애플리케이션 인스턴스에 Celery 애플리케이션 인스턴스를 속성으로 추가합니다. 이렇게 함으로써, FastAPI 애플리케이션에서 Celery 작업을 손쉽게 정의하고 실행할 수 있게 됩니다.
  • create_app 함수 내에서, FastAPI 라우터를 초기화하기 전에 app.celery_app = create_celery()를 호출하여 Celery 애플리케이션을 생성하고 FastAPI 애플리케이션 인스턴스에 할당합니다. 이는 애플리케이션이 실행되면서 Celery 작업을 사용할 수 있게 합니다.

project/users/tasks.py 파일 생성

  • Celery의 @shared_task 데코레이터를 사용하여 비동기 작업을 정의합니다. 여기서는 예제로 divide라는 간단한 작업을 정의했으며, 이 작업은 두 숫자를 나누는 함수입니다. @shared_task는 Celery 인스턴스에 직접 의존하지 않으므로, 애플리케이션 내 어디서든 재사용할 수 있는 작업을 만듭니다.
  • shared_task를 사용하는 주된 이유는 순환 임포트 문제를 방지하고 코드의 재사용성을 높이기 위함입니다.

project/users/__init__.py 업데이트

  • 이 파일에서 tasks 모듈을 임포트하여 Celery 작업이 프로젝트에서 사용될 수 있도록 합니다. 이로써 tasks.py에 정의된 비동기 작업들이 애플리케이션 시작 시 로드되고, Celery 워커가 이를 인식할 수 있게 됩니다.

main.py 파일 업데이트

  • FastAPI 애플리케이션을 생성하고, 이를 통해 Celery 애플리케이션 인스턴스에 접근합니다. 이 파일은 FastAPI 애플리케이션의 진입점 역할을 하며, Celery 워커를 시작할 때 사용할 수 있는 Celery 인스턴스를 제공합니다.

프로젝트 구조

위의 변경 사항을 적용한 후 프로젝트의 구조는 다음과 같습니다:

├── alembic
│   ├── README
│   ├── env.py
│   ├── script.py.mako
│   └── versions
├── alembic.ini
├── db.sqlite3
├── main.py
├── project
│   ├── __init__.py
│   ├── celery_utils.py
│   ├── config.py
│   ├── database.py
│   └── users
│       ├── __init__.py
│       ├── models.py
│       └── tasks.py
└── requirements.txt

이 구조는 FastAPI와 Celery를 통합하여 비동기 백그라운드 작업을 관리할 수 있는 효과적인 방법을 제시합니다. Celery를 사용함으로써 애플리케이션의 성능을 향상시키고, 사용자 요청 처리와 동시에 시간이 많이 소요되는 작업을 백그라운드에서 실행할 수 있게 됩니다.

Manual Test

새로운 터미널에서 워커를 기동해봅니다.

(venv)$ celery -A main.celery worker --loglevel=info

[config]
.> app:         default:0x10f681940 (.default.Loader)
.> transport:   redis://127.0.0.1:6379/0
.> results:     redis://127.0.0.1:6379/0
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)

[queues]
.> celery           exchange=celery(direct) key=celery


[tasks]
  . project.users.tasks.divide

[task] section에서 project.users.tasks.divide 부분이 확인됩낟.
Celery worker가 task를 성공적으로 발견하였습니다.

새로운 터미널을 하나 추가로 열고 이번에는 파이썬 쉘로 접속합니다.
task를 생성하여 Celery worker에 보냅니다.

>>> from main import app
>>> from project.users.tasks import divide
>>> task = divide.delay(1, 2)

첫 번째 터미널에서 로그들이 아래와 같이 볼수 있습니다.

[2024-03-20 16:21:09,668: INFO/MainProcess] Task project.users.tasks.divide[f2875744-69b6-4c97-9e0b-17095c1a14ea] received
[2024-03-20 16:21:14,683: INFO/ForkPoolWorker-8] Task project.users.tasks.divide[f2875744-69b6-4c97-9e0b-17095c1a14ea] succeeded in 5.013083600002574s: 0.5