이번에는 테스트와 프로젝트 스케일 확장을 쉽게하기 위해, factory patter으로 리팩토링 해보겠습니다.
- 팩토리 패턴이란?
- 디자인 패턴중의 하나인 팩토리(Factory) 패턴은 객체 생성 처리를 서브 클래스에 위임하여, 객체 생성을 위한 인터페이스를 정의하는데 목적이 있습니다. 즉, 객체를 생성하는 코드와 사용하는 코드를 분리하여, 클라이언트가 특정 클래스의 인스턴스를 직접 사용하지 않고도 필요한 객체를 얻을 수 있도록 하는 생성 패턴입니다.
목표
- FastAPI 앱 인스턴스 초기화를 위해 애플리케이션 팩토리 생성
- 애플리케이션 팩토리 패턴과 작동하도록 Celery 설정
- SQLAlchemy와 Alembic을 사용하여 데이터베이스 변경 사항 관리
App factory
'project'라는 새로운 디랙토리를 만들어줍니다. __init__.py
파일을 만들고 아래와 같이 작성합니다.
from fastapi import FastAPI
def create_app() -> FastAPI:
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Hello World"}
return app
FastAPI app 사용을 위해 여러번 호출 가능하도록 factory function을 정의 하였습니다.
기존 main.py는 아래와 같이 수정하겠습니다. create_app 함수를 호출하여 인스턴스를 사용 할 수 있도록 만듭니다.
from project import create_app
app = create_app()
정상적으로 서버가 기동하는지 확인해봅니다.
(venv)$ uvicorn main:app --reload
INFO: Uvicorn running on http://localhost:8000 (Press CTRL+C to quit)
INFO: Started reloader process [96439] using watchgod
INFO: Started server process [96482]
INFO: Waiting for application startup.
INFO: Application startup complete.
localhost:8000에 접속 되는지 브라우저에서 확인해봅니다.
Database Support
Dependencies
DB 쿼리와 마이그레이션을 수월하게 도와줄 SQLAlchemy, alembic을 설치해봅니다.
requirements파일 하단에 아래 내용을 추가합니다.
SQLAlchemy==1.4.40
alembic==1.8.1
$ pip install -r requirements.txt
설정
project 디렉토리에서 config.py파일을 만들어줍니다. 아래와 같이 작성합니다.
import os
import pathlib
from functools import lru_cache
class BaseConfig:
BASE_DIR: pathlib.Path = pathlib.Path(__file__).parent.parent
DATABASE_URL: str = os.environ.get("DATABASE_URL", f"sqlite:///{BASE_DIR}/db.sqlite3")
DATABASE_CONNECT_DICT: dict = {}
class ProductionConfig(BaseConfig):
pass
class TestingConfig(BaseConfig):
pass
@lru_cache()
def get_settings():
config_cls_dict = {
"development": DevelopmentConfig,
"production": ProductionConfig,
"testing": TestingConfig
}
config_name = os.environ.get("FASTAPI_CONFIG", "development")
config_cls = config_cls_dict[config_name]
return config_cls()
settings = get_settings()
코드를 분석하면서 각 부분이 어떻게 작동하는지 확인해 봅니다.
BaseConfig
클래스
BaseConfig
클래스는 모든 환경 설정 클래스의 기본이 되며, 공통 설정을 담고 있습니다.BASE_DIR
은 현재 파일(config.py
)의 부모 디렉토리의 부모를 가리키는pathlib.Path
객체입니다. 이는 프로젝트의 루트 디렉토리를 의미합니다.DATABASE_URL
은 데이터베이스 연결 URL을 설정합니다. 기본값으로 SQLite를 사용하며,BASE_DIR
을 기반으로 데이터베이스 파일의 위치를 지정합니다. 환경 변수DATABASE_URL
을 통해 다른 데이터베이스(예: PostgreSQL)에 연결 정보를 설정할 수 있습니다.DATABASE_CONNECT_DICT
는 데이터베이스 연결에 사용될 추가 옵션을 담을 수 있는 딕셔너리입니다(비어 있음).
DevelopmentConfig
, ProductionConfig
, TestingConfig
클래스
- 이 세 클래스는 각각 개발, 운영, 테스트 환경에 특화된 설정을 할 수 있게 합니다. 현재는
BaseConfig
를 직접 상속만 받고 추가 설정은 없습니다. 필요에 따라 각 환경별로 다른 설정을 추가할 수 있습니다.
get_settings
함수
@lru_cache()
데코레이터는 이 함수의 결과를 캐싱하여, 동일한 입력값에 대해 함수가 다시 호출될 때마다 계산을 반복하지 않고 캐싱된 결과를 반환합니다. 이는 설정을 여러 번 불러오더라도 성능 저하 없이 빠르게 처리할 수 있게 해줍니다.config_cls_dict
딕셔너리는 문자열 키(환경 이름)와 해당 환경에 대한 설정 클래스를 매핑합니다.os.environ.get("FASTAPI_CONFIG", "development")
을 통해 환경 변수FASTAPI_CONFIG
의 값을 읽어오며, 설정된 값이 없다면 기본값으로"development"
를 사용합니다. 이는 현재 어떤 환경(개발, 운영, 테스트)에서 실행되고 있는지를 결정합니다.- 결정된 설정 클래스를 인스턴스화하여 반환합니다. 이렇게 함으로써, 환경에 맞는 설정을 쉽게 불러와 사용할 수 있습니다.
settings
변수
get_settings()
함수를 호출하여 얻은 설정 객체를settings
변수에 할당합니다. 이 변수를 통해 프로젝트의 다른 부분에서 설정에 접근할 수 있습니다.
주의 사항
Pydantic의
BaseSettings
클래스를 사용하지 않는 이유는 Celery와 Flower를 사용할 때 발생할 수 있는 특정 에러(KeyError('__signature__')
)를 피하기 위함입니다. 이는 Pydantic과 Celery 간의 호환성 문제 때문에 발생할 수 있으며, 여기서는 이 문제를 회피하기 위해 기본 Python 클래스와 기능을 사용하여 설정을 관리합니다.pydantic?
Pydantic은 Python에서 데이터 검증과 설정 관리를 위한 라이브러리입니다. Pydantic을 사용하면 데이터를 Python 클래스로 모델링하고, 타입 힌트를 활용하여 데이터 유효성 검증을 간단하게 할 수 있습니다. Pydantic은 FastAPI와 같은 현대적인 웹 프레임워크에서 널리 사용되며,
설정 파일 관리
, API 요청 및 응답 데이터의 검증, 데이터 파싱과 같은 다양한 상황에 활용됩니다.
이 설정 파일 구조는 프로젝트의 다양한 환경에 따라 유연하게 설정을 관리할 수 있게 해줍니다. 필요에 따라 환경별로 다른 설정을 쉽게 추가하거나 수정할 수 있어, 개발 및 배포 과정에서 큰 유연성을 제공합니다.
프로젝트 구조는 아래와 같습니다.
├── main.py
├── project
│ ├── __init__.py
│ └── config.py
└── requirements.txt
SQLAlchemy 임포트
project디렉토리 아래 database.py파일을 아래와 같이 작성합니다.
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from project.config import settings
# https://fastapi.tiangolo.com/tutorial/sql-databases/#create-the-sqlalchemy-engine
engine = create_engine(
settings.DATABASE_URL, connect_args=settings.DATABASE_CONNECT_DICT
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
코드 설명
이 database.py 파일은 SQLAlchemy를 사용하여 데이터베이스 연결을 설정하고 세션을 관리하는 기본 구조를 구성합니다. SQLAlchemy는 Python에서 가장 인기 있는 ORM(Object-Relational Mapping) 라이브러리 중 하나로, 데이터베이스와 객체 지향 프로그래밍 사이의 브릿지 역할을 합니다. 각 코드 라인의 의미를 자세히 설명하겠습니다.
임포트 구문
create_engine
: SQLAlchemy의 엔진을 생성하는 함수입니다. 엔진은 SQLAlchemy와 데이터베이스 간의 핵심 연결을 관리합니다.declarative_base
: 클래스 정의를 기반으로 테이블스키마를 생성
하는 데 사용되는 함수입니다. 이를 통해 Python 클래스를 데이터베이스 테이블로 매핑할 수 있습니다.sessionmaker
: 데이터베이스와의 모든 대화를 관리하는 세션 객체를 생성하는 팩토리 함수입니다. 세션을 통해 객체를 데이터베이스에 추가, 수정, 삭제할 수 있습니다.
프로젝트 설정에서 데이터베이스 설정 로딩
from project.config import settings
: 앞서 설명한config.py
파일에서 정의한 설정 객체를 임포트합니다. 이를 통해 데이터베이스 URL과 연결 옵션을 가져올 수 있습니다.
SQLAlchemy 엔진 생성
engine = create_engine(...)
:create_engine
함수를 사용해 SQLAlchemy 엔진을 생성합니다. 이 엔진은settings.DATABASE_URL
에서 제공하는 데이터베이스 연결 정보를 사용합니다.connect_args
매개변수는 추가 연결 옵션을 제공하는 데 사용되며, 여기서는settings.DATABASE_CONNECT_DICT
를 사용합니다.
세션 설정
SessionLocal = sessionmaker(...)
:sessionmaker
함수를 사용해 세션 팩토리SessionLocal
을 생성합니다. 이 팩토리는 개별 요청마다 새로운 세션 인스턴스를 생성하는 데 사용됩니다.autocommit=False
는 자동 커밋을 비활성화하여 명시적인 커밋이 필요함을 나타내고,autoflush=False
는 세션에서 객체 상태 변경을 데이터베이스에 자동으로 플러시하지 않도록 설정합니다.bind=engine
은 생성된 세션을 앞서 생성한 엔진에 바인딩하여, 해당 엔진을 사용하여 데이터베이스 연결을 관리하도록 설정합니다.
베이스 클래스 생성
Base = declarative_base()
:declarative_base
함수를 호출하여 모든 ORM 모델의 기본 클래스인Base
를 생성합니다. 이Base
클래스를 상속받아 모델을 정의하면, SQLAlchemy가 Python 클래스를 데이터베이스 테이블로 매핑하는 구조를 자동으로 생성할 수 있습니다.
이 구성을 통해 애플리케이션에서 SQLAlchemy를 사용하여 데이터베이스 작업을 수행할 준비를 마칩니다. 데이터 모델을 정의하고, 이 파일에서 생성한 SessionLocal
인스턴스를 사용하여 데이터베이스 세션을 관리할 수 있게 됩니다.
Alembic 설정
alembic 초기화
(venv)$ alembic init alembic
프로젝트에 새로운 폴더와 파일이 보일 것입니다.
├── alembic # new
│ ├── README
│ ├── env.py
│ ├── script.py.mako
│ └── versions
├── alembic.ini # new
├── main.py
├── project
│ ├── __init__.py
│ ├── config.py
│ └── database.py
└── requirements.txt
alembic/env.py 파일을 아래와 같이 수정합니다.
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
from project import create_app # new
from project.config import settings # new
from project.database import Base # new
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
config.set_main_option("sqlalchemy.url", str(settings.DATABASE_URL)) # new
fastapi_app = create_app() # new
target_metadata = Base.metadata # new
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
...
Alembic은 SQLAlchemy를 사용하는 프로젝트에서 데이터베이스 마이그레이션을 관리하는 도구입니다. Alembic이 애플리케이션의 데이터베이스 설정과 모델을 인식하고, 데이터베이스 스키마 마이그레이션을 자동으로 생성하도록 설정하는 데 필요합니다.
변경된 부분 설명
추가된 임포트
from project import create_app
: FastAPI 애플리케이션 인스턴스를 생성하는 함수를 임포트합니다. 이 함수를 호출하면 애플리케이션 초기화 중에 데이터베이스 모델 등이 로드됩니다.from project.config import settings
: 애플리케이션 설정을 담고 있는settings
객체를 임포트합니다. 여기에는 데이터베이스 연결 정보(DATABASE_URL
) 등이 포함됩니다.from project.database import Base
: SQLAlchemy의 모델 베이스를 임포트합니다. 이 베이스는 모든 데이터베이스 모델의 기본 클래스로, 모델의 메타데이터 정보를 포함합니다.
Alembic 설정 수정
config.set_main_option("sqlalchemy.url", str(settings.DATABASE_URL))
: Alembic 설정 객체의sqlalchemy.url
옵션을 프로젝트 설정에서 가져온 데이터베이스 URL로 설정합니다. 이는 Alembic이 데이터베이스 연결을 설정할 때 사용할 문자열입니다.fastapi_app = create_app()
: FastAPI 애플리케이션 인스턴스를 생성합니다. 이는 애플리케이션 구성요소가 초기화되도록 하여, Alembic이 모델 정의를 포함하여 필요한 모든 정보에 접근할 수 있게 합니다.target_metadata = Base.metadata
: Alembic에게 마이그레이션 생성 시 참조할 메타데이터 객체를 설정합니다. 이 메타데이터는 애플리케이션의 모든 데이터베이스 모델 정보를 포함합니다. 이를 통해 Alembic은 데이터베이스 스키마 변경 사항을 자동으로 감지하고 마이그레이션 파일을 생성할 수 있습니다.
마이그레이션 관리의 이점
이 설정을 통해 Alembic은 애플리케이션의 데이터베이스 모델 변경 사항을 자동으로 감지하고, 이러한 변경사항에 기반한 마이그레이션 스크립트를 생성할 수 있습니다. 마이그레이션 스크립트는 데이터베이스 스키마를 버전 관리할 수 있게 해주며, 개발자는 데이터베이스 스키마 변경을 코드로 관리하고, 다양한 개발 환경 간에 데이터베이스 스키마를 일관되게 유지할 수 있습니다. 이는 팀 작업 및 배포 과정에서 데이터베이스 관련 문제를 최소화하는 데 큰 도움이 됩니다.
db.sqlite3
파일, 여기서는 파일형 데이터베이스 생성을 하기 위해 아래 명령어를 수행합니다.
(venv)$ python
>>> from main import app
>>> from project.database import Base, engine
>>> Base.metadata.create_all(bind=engine)
>>> exit()
(venv)ls db.sqlite3
db.sqlite3
아직 모델을 정의하지 않았지만 데이터베이스 마이그레이션을 진행해보겠습니다.
(venv)$ alembic revision --autogenerate
INFO [alembic.runtime.migration] Context impl SQLiteImpl.
INFO [alembic.runtime.migration] Will assume non-transactional DDL.
Generating alembic/versions/512d31e06401_.py ... done
(venv)$ alembic upgrade head
INFO [alembic.runtime.migration] Context impl SQLiteImpl.
INFO [alembic.runtime.migration] Will assume non-transactional DDL.
INFO [alembic.runtime.migration] Running upgrade -> 512d31e06401, empty message
Applicaiton 구조
FastAPI 프로젝트에서 "Django Apps"나 "Flask Blueprints"와 유사한 방식으로 애플리케이션의 구조를 나누고, 공통 기능을 그룹화하여 재사용 가능한 컴포넌트로 만드는 방법을 소개합니다. 이 예제에서는 "users"라는 폴더를 생성하여 사용자 관련 기능을 그룹화하는 방법을 설명하겠습니다.
project 디렉토리 내에 users 디렉토리를 만듭니다. __init__.py
파일을 만들고 아래와 같이 정의합니다.
from fastapi import APIRouter
users_router = APIRouter(
prefix="/users",
)
from . import models
코드 설명
"users" 폴더와 __init__.py
파일
- "users" 폴더:
FastAPI 프로젝트 내에 "project" 폴더 아래 "users"라는 새 폴더를 생성합니다. 이 폴더는 사용자 관련 모델, 스키마, 라우트 등 모든 관련 파일을 포함하게 됩니다. 이렇게 기능별로 코드를 모듈화하여 구조화하는 방식은 프로젝트의 유지 보수성을 높이고, 기능별로 명확하게 구분할 수 있게 해줍니다. __init__.py
파일:
"users" 폴더 내에__init__.py
파일을 생성합니다. 이 파일은 해당 폴더가 Python 패키지로 인식되도록 하는 역할을 합니다. 또한, 이 파일 내에서 FastAPI의APIRouter
를 사용하여 "users" 관련 라우트를 초기화하고 구성합니다.
APIRouter
를 사용한 라우팅
from fastapi import APIRouter
:
FastAPI의APIRouter
를 임포트합니다.APIRouter
는 특정 경로(prefix)에 관련된 라우트들을 그룹화하는 기능을 제공합니다. 이는 Flask의 블루프린트나 Django의 앱 시스템과 유사한 개념입니다.users_router = APIRouter(prefix="/users")
:
사용자 관련 API 엔드포인트를 "/users" 경로 아래에 그룹화하기 위해APIRouter
인스턴스를 생성합니다.prefix="/users"
는 이 라우터에 속하는 모든 라우트가 "/users" 경로로 시작하도록 설정합니다.from . import models # noqa
:
"users" 폴더 내의models.py
파일을 임포트합니다. 이는 사용자 데이터를 정의하는 데이터 모델이 포함된 파일입니다.# noqa
는 일반적으로 코드 검사 도구에서 특정 경고를 무시하도록 하는 주석입니다. 여기서는 모델이 실제로 이 위치에서 사용되지 않더라도 임포트해야 하는 상황을 처리합니다. 이렇게 하면models.py
파일 내의 모델 정의가 SQLAlchemy 등의 ORM에 의해 인식될 수 있습니다.
이 구조를 통해 FastAPI 애플리케이션 내에서 "users" 기능 관련 코드를 모듈화하고 재사용 가능한 방식으로 관리할 수 있습니다. 이는 큰 프로젝트를 더 쉽게 관리하고 확장할 수 있게 해주며, 코드의 가독성과 유지 보수성을 향상시킵니다.
project/users/model.py 파일을 생성하고 아래와 같이 정의합니다.
from sqlalchemy import Column, Integer, String
from project.database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, autoincrement=True)
username = Column(String(128), unique=True, nullable=False)
email = Column(String(128), unique=True, nullable=False)
def __init__(self, username, email, *args, **kwargs):
self.username = username
self.email = email
project/__init__.py
를 수정합니다.
from fastapi import FastAPI
def create_app() -> FastAPI:
app = FastAPI()
from project.users import users_router # new
app.include_router(users_router) # new
@app.get("/")
async def root():
return {"message": "Hello World"}
return app
from project.users import users_router
코드가 호출되면 project/users/__init__.py
파일이 실행됩니다.
그러면 models.py
파일도 import 되게 됩니다.
프로젝트 구조는 아래와 같습니다.
├── alembic
│ ├── README
│ ├── env.py
│ ├── script.py.mako
│ └── versions
│ └── 512d31e06401_.py
├── alembic.ini
├── main.py
├── project
│ ├── __init__.py
│ ├── config.py
│ ├── database.py
│ └── users
│ ├── __init__.py
│ └── models.py
└── requirements.txt
main.py
- 새로운 FastAPI app 인스턴스를 생성하는데create_app
을 사용합니다.project/__init__.py
- Factory 함수project/config.py
- FastAPI 설정project/users
- 관련 users 도메인과 관련된 모델과 라우터등
Database 작업
다음으로, 새 데이터베이스 마이그레이션을 생성하고 User 모델에 대한 테이블을 생성해 보겠습니다.
(venv)$ alembic revision --autogenerate
# INFO [alembic.autogenerate.compare] Detected added table 'users'
(venv)$ alembic upgrade head
# Create users table
python shell을 이용하여 DB에 접근하여 CRUD를 할 수 있습니다.
>>> from main import app
>>> from project.database import SessionLocal
>>> from project.users.models import User
>>> user = User(username='test1', email='test1@example.com')
>>> session = SessionLocal()
>>> session.add(user)
>>> session.commit()
>>>
>>> new_session = SessionLocal()
>>> new_session.query(User).first().username
'test1'
>>> exit()
Celery 추가
class BaseConfig:
BASE_DIR: pathlib.Path = pathlib.Path(__file__).parent.parent
DATABASE_URL: str = os.environ.get("DATABASE_URL", f"sqlite:///{BASE_DIR}/db.sqlite3")
DATABASE_CONNECT_DICT: dict = {}
CELERY_BROKER_URL: str = os.environ.get("CELERY_BROKER_URL", "redis://127.0.0.1:6379/0") # NEW
CELERY_RESULT_BACKEND: str = os.environ.get("CELERY_RESULT_BACKEND", "redis://127.0.0.1:6379/0") # NEW
project/celery_utils.py
파일을 새로 만들고 아래와 같이 정의합니다.
from celery import current_app as current_celery_app
from project.config import settings
def create_celery():
celery_app = current_celery_app
celery_app.config_from_object(settings, namespace="CELERY")
return celery_app
BaseConfig
클래스 업데이트
project/config.py
파일의 BaseConfig
클래스에 CELERY_BROKER_URL
과 CELERY_RESULT_BACKEND
설정을 추가함으로써, Celery의 기본 설정을 정의합니다.
CELERY_BROKER_URL
: Celery가 사용할 메시지 브로커의 URL을 설정합니다. 이 예제에서는 기본값으로 Redis를 사용하며,redis://127.0.0.1:6379/0
주소를 가리킵니다. 메시지 브로커는 Celery 작업자(worker)와 통신하기 위해 사용되는 중간 서비스입니다.CELERY_RESULT_BACKEND
: Celery 작업의 결과를 저장할 백엔드의 URL을 설정합니다. 작업의 상태와 결과를 조회할 수 있게 해줍니다. 이 예제 역시 Redis를 결과 백엔드로 사용합니다.
환경 변수(os.environ.get
)를 사용하여 이러한 설정값을 동적으로 로드할 수 있게 합니다. 이는 개발, 테스트, 운영 환경에서 다른 설정을 쉽게 적용할 수 있도록 해줍니다.
project/celery_utils.py
파일
이 파일은 Celery 애플리케이션 인스턴스를 생성하고 설정하는 팩토리 함수 create_celery
를 정의합니다.
from celery import current_app as current_celery_app
: 현재 활성화된 Celery 애플리케이션 인스턴스를 가져옵니다. 이를 통해 새 인스턴스를 생성하는 대신, 이미 존재하는 인스턴스를 사용하여 여러 곳에서 Celery 작업을 공유할 수 있습니다.celery_app.config_from_object(settings, namespace="CELERY")
:settings
객체에서 Celery 설정을 로드합니다.namespace="CELERY"
는 모든 Celery 관련 설정 키가CELERY_
로 시작해야 함을 의미합니다. 예를 들어, 브로커 URL을 설정하기 위해CELERY_BROKER_URL
키를 사용합니다.
create_celery
함수는 Celery 애플리케이션 인스턴스를 반환합니다. 이 인스턴스는 프로젝트의 다른 부분에서 백그라운드 작업을 정의하고 실행하는 데 사용될 수 있습니다.
이렇게 Celery를 설정하고 초기화하는 과정은 FastAPI 애플리케이션에 비동기 작업 처리 기능을 통합하는 데 필요한 기본 단계입니다. Celery를 사용하면 웹 요청 처리와 동시에 이메일 발송, 데이터 처리 등의 백그라운드 작업을 효율적으로 관리할 수 있습니다.
project/__init__.py
를 수정합니다.
Update project/__init__.py:
from fastapi import FastAPI
from project.celery_utils import create_celery # new
def create_app() -> FastAPI:
app = FastAPI()
# do this before loading routes # new
app.celery_app = create_celery() # new
from project.users import users_router
app.include_router(users_router)
@app.get("/")
async def root():
return {"message": "Hello World"}
return app
project/users/tasks.py
파일을 생성하고 정의합니다.
from celery import shared_task
@shared_task
def divide(x, y):
import time
time.sleep(5)
return x / y
project/users/__init__.py
파일을 수정합니다. 위의 tasks.py 모듈을 임포트 하기 위해서입니다.
from fastapi import APIRouter
users_router = APIRouter(
prefix="/users",
)
from . import models, tasks # noqa
main.py
모듈을 수정합니다.
from project import create_app
app = create_app()
celery = app.celery_app
project/__init__.py
의 변경 사항
- 이 파일에서
create_celery
함수를 임포트하여 FastAPI 애플리케이션 인스턴스에 Celery 애플리케이션 인스턴스를 속성으로 추가합니다. 이렇게 함으로써, FastAPI 애플리케이션에서 Celery 작업을 손쉽게 정의하고 실행할 수 있게 됩니다. create_app
함수 내에서, FastAPI 라우터를 초기화하기 전에app.celery_app = create_celery()
를 호출하여 Celery 애플리케이션을 생성하고 FastAPI 애플리케이션 인스턴스에 할당합니다. 이는 애플리케이션이 실행되면서 Celery 작업을 사용할 수 있게 합니다.
project/users/tasks.py
파일 생성
- Celery의
@shared_task
데코레이터를 사용하여 비동기 작업을 정의합니다. 여기서는 예제로divide
라는 간단한 작업을 정의했으며, 이 작업은 두 숫자를 나누는 함수입니다.@shared_task
는 Celery 인스턴스에 직접 의존하지 않으므로, 애플리케이션 내 어디서든 재사용할 수 있는 작업을 만듭니다. shared_task
를 사용하는 주된 이유는 순환 임포트 문제를 방지하고 코드의 재사용성을 높이기 위함입니다.
project/users/__init__.py
업데이트
- 이 파일에서
tasks
모듈을 임포트하여 Celery 작업이 프로젝트에서 사용될 수 있도록 합니다. 이로써tasks.py
에 정의된 비동기 작업들이 애플리케이션 시작 시 로드되고, Celery 워커가 이를 인식할 수 있게 됩니다.
main.py
파일 업데이트
- FastAPI 애플리케이션을 생성하고, 이를 통해 Celery 애플리케이션 인스턴스에 접근합니다. 이 파일은 FastAPI 애플리케이션의 진입점 역할을 하며, Celery 워커를 시작할 때 사용할 수 있는 Celery 인스턴스를 제공합니다.
프로젝트 구조
위의 변경 사항을 적용한 후 프로젝트의 구조는 다음과 같습니다:
├── alembic
│ ├── README
│ ├── env.py
│ ├── script.py.mako
│ └── versions
├── alembic.ini
├── db.sqlite3
├── main.py
├── project
│ ├── __init__.py
│ ├── celery_utils.py
│ ├── config.py
│ ├── database.py
│ └── users
│ ├── __init__.py
│ ├── models.py
│ └── tasks.py
└── requirements.txt
이 구조는 FastAPI와 Celery를 통합하여 비동기 백그라운드 작업을 관리할 수 있는 효과적인 방법을 제시합니다. Celery를 사용함으로써 애플리케이션의 성능을 향상시키고, 사용자 요청 처리와 동시에 시간이 많이 소요되는 작업을 백그라운드에서 실행할 수 있게 됩니다.
Manual Test
새로운 터미널에서 워커를 기동해봅니다.
(venv)$ celery -A main.celery worker --loglevel=info
[config]
.> app: default:0x10f681940 (.default.Loader)
.> transport: redis://127.0.0.1:6379/0
.> results: redis://127.0.0.1:6379/0
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. project.users.tasks.divide
위 [task]
section에서 project.users.tasks.divide
부분이 확인됩낟.
Celery worker가 task를 성공적으로 발견하였습니다.
새로운 터미널을 하나 추가로 열고 이번에는 파이썬 쉘로 접속합니다.
task를 생성하여 Celery worker에 보냅니다.
>>> from main import app
>>> from project.users.tasks import divide
>>> task = divide.delay(1, 2)
첫 번째 터미널에서 로그들이 아래와 같이 볼수 있습니다.
[2024-03-20 16:21:09,668: INFO/MainProcess] Task project.users.tasks.divide[f2875744-69b6-4c97-9e0b-17095c1a14ea] received
[2024-03-20 16:21:14,683: INFO/ForkPoolWorker-8] Task project.users.tasks.divide[f2875744-69b6-4c97-9e0b-17095c1a14ea] succeeded in 5.013083600002574s: 0.5
'프로그래밍 언어 > 파이썬' 카테고리의 다른 글
Celery와 FastAPI - 5 (1) | 2024.03.24 |
---|---|
Celery와 FastAPI - 4 (0) | 2024.03.24 |
Celery와 FastAPI - 2 (1) | 2024.03.22 |
Celery와 FastAPI - 1 (0) | 2024.03.22 |
classmethod와 staticmethod의 차이 (0) | 2024.03.21 |