Celery와 FastAPI - 8

2024. 3. 27. 16:50·프로그래밍 언어/파이썬

이번에는 Broadcaster와 WebSockets를 사용하여 어플리케이션에서의 비효율적인 폴링 절차를 최적화하는 방법을 확인해봅니다.
XHR Short Polling을 사용하여 태스크 상태를 확인했지만, 이 방식은 많은 연결과 쿼리를 생성하여 리소스를 낭비할 수 있으며, 폴링 간격에 따라 태스크 완료와 클라이언트 업데이트 사이에 지연이 발생할 수 있습니다.

목표

  1. Broadcaster를 사용하여 Redis pub/sub을 통해 다중 프로세스 알림 처리를 해봅니다.
  2. asyncio에 대해 알아 봅니다.
  3. asgiref 라이브러리를 이용하여 async를 sync로 변환해봅니다.

Websocket?

WebSocket 프로토콜은 클라이언트와 서버 간의 양방향(full-duplex) 통신 채널을 단일 TCP 연결을 통해 제공하는 컴퓨터 통신 프로토콜입니다. 이는 HTTP 프로토콜의 한계를 극복하고, 실시간 웹 애플리케이션에 필요한 지속적인 데이터 교환을 가능하게 합니다.

WebSocket의 작동 방식

1 연결 수립:
웹 클라이언트(예: 웹 브라우저)가 서버에 WebSocket 연결을 요청합니다. 이 과정은 처음에 HTTP 요청을 통해 이루어지며, 이후 서버가 이를 WebSocket 연결로 승인하면 HTTP 연결이 WebSocket 연결로 업그레이드됩니다.

2 양방향 통신:
연결이 수립되면, 클라이언트와 서버는 연결이 종료될 때까지 지속적으로 데이터를 주고받을 수 있습니다. 이는 양측이 동시에 메시지를 보낼 수 있는 양방향 통신을 가능하게 합니다.

3 지속적 연결:
WebSocket 연결은 수동으로 종료되거나 네트워크 오류 등으로 인해 끊어지기 전까지 계속 유지됩니다. 이로 인해, 클라이언트와 서버는 필요할 때마다 즉시 데이터를 교환할 수 있습니다.

WebSocket의 주요 특징

  • 실시간 통신: 실시간 채팅 애플리케이션, 게임, 금융 거래 플랫폼 등에서 실시간 데이터 교환이 필요한 경우 WebSocket을 이용할 수 있습니다.
  • 효율성: 한 번의 연결 수립으로 지속적인 데이터 교환이 가능하므로, 폴링 방식에 비해 네트워크 오버헤드가 줄어듭니다.
  • 호환성: 모든 현대적인 웹 브라우저는 WebSocket을 지원합니다. 이는 개발자가 별도의 플러그인이나 라이브러리 없이도 WebSocket 기반의 실시간 애플리케이션을 구현할 수 있음을 의미합니다.

브라우저 지원 현황

  • Chrome, Firefox, Internet Explorer, Edge, Safari, Opera 및 이들의 모바일 버전을 포함하여 현대적인 대부분의 웹 브라우저와 모바일 브라우저는 WebSocket을 지원합니다. 이는 WebSocket이 웹 개발에서 널리 사용되고 표준화된 기술임을 보여줍니다.

ASGI vs WSGI

ASGI(Asynchronous Server Gateway Interface)는 WSGI(Web Server Gateway Interface)의 후임자로, Python 웹 서버, 프레임워크, 애플리케이션 간의 호환성을 위한 오랜 표준을 발전시킨 것입니다. ASGI와 WSGI는 모두 Python 애플리케이션을 웹 서버와 연동하기 위한 인터페이스를 제공하지만, 그들이 지원하는 기능과 목적에 있어 중요한 차이점이 있습니다.

WSGI

  • WSGI는 Python 애플리케이션과 웹 서버 간의 표준 인터페이스로, 동기 방식의 통신을 지원합니다.
  • WSGI는 한 번에 하나의 요청을 처리하는 방식으로, 요청 처리 중에는 해당 프로세스 또는 스레드가 다른 작업을 수행할 수 없습니다. 이는 동시성을 제한하며, I/O 작업이 많은 애플리케이션에서 성능 병목현상을 유발할 수 있습니다.
  • WSGI는 HTTP/1.1 요청을 처리하는 데 최적화되어 있으며, WebSockets이나 HTTP/2와 같은 프로토콜을 직접 지원하지 않습니다.

ASGI

  • ASGI는 비동기 IO를 지원하도록 설계된 인터페이스로, Python 3.5 이상에서 도입된 async와 await 구문을 사용하여 비동기 프로그래밍을 가능하게 합니다.
  • ASGI는 동시에 여러 요청을 비동기적으로 처리할 수 있어, I/O 바운드 작업에서 높은 성능을 발휘합니다. 이는 WebSockets, HTTP/2, long polling과 같은 실시간 통신 요구사항을 가진 애플리케이션 개발에 적합합니다.
  • ASGI는 WSGI의 기능을 확장하며, 기존의 동기 방식 요청 처리뿐만 아니라 비동기 처리도 가능하게 해줍니다. 이로 인해 개발자는 하나의 애플리케이션 내에서 동기와 비동기 코드를 모두 사용할 수 있게 되었습니다.

FastAPI와 ASGI

  • FastAPI는 ASGI를 기반으로 하는 모던 Python 웹 프레임워크입니다. Starlette ASGI 프레임워크 위에 구축되어 있으며, ASGI의 비동기 기능을 완전히 활용하여 고성능 API 서버를 구축할 수 있게 해줍니다.
  • FastAPI는 비동기 프로그래밍을 쉽게 구현할 수 있게 해주며, WebSockets과 HTTP/2 지원을 포함하여 실시간 통신과 높은 동시성 요구사항을 가진 어플리케이션 개발에 이상적입니다.

결론적으로, ASGI는 Python 웹 개발에서 비동기 프로그래밍과 실시간 웹 통신을 가능하게 하는 현대적인 인터페이스로, WSGI의 기능을 확장하여 더 다양한 웹 개발 요구사항을 충족시키도록 설계되었습니다.

WorkFlow

이 시나리오는 FastAPI, Celery, 그리고 WebSocket을 사용하여 비동기 태스크의 상태 업데이트를 실시간으로 클라이언트에 전달하는 과정을 설명합니다. 각 단계별로 자세히 살펴보겠습니다.

1. AJAX 요청으로 Celery 태스크 트리거

  • 클라이언트(웹 브라우저)는 AJAX 요청을 FastAPI가 구동하는 서버의 특정 뷰로 보냅니다. 이 요청의 목적은 비동기적으로 실행될 Celery 태스크를 트리거하는 것입니다.

2. FastAPI에서 태스크 ID 반환

  • 요청을 받은 FastAPI는 Celery 태스크를 생성하고 celery worker를 실행시키고, 생성된 태스크의 고유 ID(task_id)를 클라이언트에게 반환합니다. 이 task_id는 태스크의 상태를 추적하는 데 사용됩니다.

3. WebSocket 연결 생성

  • 클라이언트는 받은 task_id를 사용하여 WebSocket 연결을 요청합니다. 연결 주소는 ws://127.0.0.1:8010/ws/task_status/{task_id} 형식으로, 태스크 ID를 포함하고 있습니다.

4. FastAPI에서 WebSocket 요청 처리

  • FastAPI 서버는 WebSocket 연결 요청을 받고, 해당 연결을 task_id 채널에 구독시킵니다. 이는 서버가 해당 태스크에 대한 업데이트를 수신할 준비가 되었음을 의미합니다.

5. Celery 태스크 처리 완료 및 메시지 전송

  • Celery 태스크가 처리를 마치면, 처리 결과와 함께 메시지를 task_id 채널로 보냅니다. 이 채널에 구독된 모든 클라이언트(이 경우 WebSocket을 통해 연결된 클라이언트)는 이 메시지를 수신하게 됩니다.

6. 클라이언트에서 WebSocket 종료 및 결과 표시

  • 클라이언트는 Celery 태스크의 처리 결과를 포함한 메시지를 수신하면 WebSocket 연결을 종료하고, 웹 페이지에 결과를 표시합니다.

이 과정을 통해, 실시간 웹 어플리케이션에서 비동기 태스크의 진행 상황과 결과를 효과적으로 사용자에게 전달할 수 있습니다. WebSocket을 사용함으로써 서버와 클라이언트 간에 지속적이고 실시간으로 통신할 수 있으며, 이는 사용자 경험을 크게 향상시키는 동시에 서버 리소스의 효율적 사용을 가능하게 합니다.

용어 정리

XHR(XMLHttpRequest)?

XHR(XMLHttpRequest)은 JavaScript를 사용하여 서버와 비동기적으로 데이터를 교환할 수 있는 Web API입니다. 이를 통해 웹 페이지를 새로 고치지 않고도 페이지의 일부분만을 업데이트할 수 있어, 웹 애플리케이션의 사용자 경험을 크게 개선할 수 있습니다. XHR은 AJAX(Asynchronous JavaScript and XML) 프로그래밍의 핵심 요소로, JSON, XML, HTML, 텍스트 등 다양한 형태의 데이터를 처리할 수 있습니다.

XHR Short Polling?

XHR Short Polling은 클라이언트가 서버에 주기적으로 요청을 보내어 새로운 데이터가 있는지 확인하는 방식입니다.
XHR(XMLHttpRequest) 객체를 사용하여 이러한 HTTP 요청을 비동기적으로 수행하며, 서버로부터의 응답을 기다린 후 즉시 다시 요청을 보내는 방식입니다. Short Polling은 구현이 간단하고 어떤 환경에서든 쉽게 사용할 수 있다는 장점이 있지만, 실시간 데이터 처리에는 적합하지 않고 서버 리소스를 비효율적으로 사용할 수 있다는 단점이 있습니다.

Long Polling?

Long Polling은 Short Polling의 단점을 일부 개선한 방식으로, 클라이언트가 서버에 요청을 보내면 서버는 새로운 데이터가 생길 때까지 요청을 보류하고, 데이터가 준비되면 그 즉시 응답합니다.
서버의 응답을 받은 클라이언트는 다시 새로운 요청을 보내어 이 과정을 반복합니다. Long Polling은 Short Polling에 비해 서버 리소스를 더 효율적으로 사용할 수 있고, 데이터의 실시간 처리에 더 적합하지만, 여전히 연결 설정과 종료에 따른 오버헤드가 있으며, 많은 동시 연결을 관리해야 하는 서버에 부담을 줄 수 있습니다.

Long Polling보다 더 성능이 좋은 방법?

Long Polling보다 성능이 더 좋은 대안으로는 WebSocket과 Server-Sent Events(SSE)가 있습니다.

WebSocket: 양방향 통신을 지원하는 프로토콜로, 클라이언트와 서버 간에 지속적인 연결을 유지합니다. 이를 통해 데이터가 준비되는 즉시 실시간으로 데이터를 교환할 수 있으며, Long Polling에 비해 훨씬 효율적이고 빠릅니다. 실시간 채팅 애플리케이션, 게임, 실시간 피드 등의 구현에 적합합니다.

Server-Sent Events(SSE): 서버에서 클라이언트로 데이터 스트림을 실시간으로 전송할 수 있는 방법으로, 주로 단방향 통신에 사용됩니다. WebSocket만큼 유연하지는 않지만, 단순한 실시간 데이터 전송이 필요한 경우에 적합하며, HTTP 프로토콜을 기반으로 하기 때문에 구현이 비교적 간단합니다.

WebSocket과 SSE는 실시간 통신을 구현할 때 Long Polling보다 성능과 효율성 측면에서 우수한 대안을 제공합니다.

구현

Config

WS_MESSAGE_QUEUE 환경 변수를 .env/.dev-sample 파일에 정의합니다.

WS_MESSAGE_QUEUE=redis://redis:6379/0

BaseConfig 클래스에 WS_MESSAGE_QUEUE 변수를 project/config.py 모듈에 추가합니다.

class BaseConfig:
    BASE_DIR: pathlib.Path = pathlib.Path(__file__).parent.parent

    DATABASE_URL: str = os.environ.get("DATABASE_URL", f"sqlite:///{BASE_DIR}/db.sqlite3")
    DATABASE_CONNECT_DICT: dict = {}

    CELERY_BROKER_URL: str = os.environ.get("CELERY_BROKER_URL", "redis://127.0.0.1:6379/0")
    CELERY_RESULT_BACKEND: str = os.environ.get("CELERY_RESULT_BACKEND", "redis://127.0.0.1:6379/0")

    WS_MESSAGE_QUEUE: str = os.environ.get("WS_MESSAGE_QUEUE", "redis://127.0.0.1:6379/0")

FastAPI와 Celery Worker는 양방향 통신을 사용을 위해 WS_MESSAGE_QUEUE 환경 변수를 사용 할 것입니다.

Broadcaster

requirements.txt 파일에 라이브러리를 추가하도록 합니다.

asgiref==3.7.2
asyncio-redis==0.16.0
broadcaster==0.2.0

1 asyncio_redis는 Broadcaster에 필요한 asyncio를 지원하는 Redis client입니다.

project/__init__.py모듈을 수정합니다.

from contextlib import asynccontextmanager
from broadcaster import Broadcast
from fastapi import FastAPI

from project.config import settings

broadcast = Broadcast(settings.WS_MESSAGE_QUEUE)


@asynccontextmanager
async def lifespan(app: FastAPI):
    await broadcast.connect()
    yield
    await broadcast.disconnect()


def create_app() -> FastAPI:
    app = FastAPI(lifespan=lifespan)

    # do this before loading routes
    from project.celery_utils import create_celery
    app.celery_app = create_celery()

    from project.users import users_router
    app.include_router(users_router)

    @app.get("/")
    async def root():
        return {"message": "Hello World"}

    return app
  • FastAPI에서 사용되는 Broadcast인스턴스를 만들었습니다. 참고로 celery worker에서 사용하지 않습니다.
  • lifespan함수에는 애플리케이션 시작 전 broadcast 커넥션을 맺고 앱 종료 후 커넥션을 끊어버립니다.

Celery

project/celery_utils.py

from celery import current_app as current_celery_app
from celery.result import AsyncResult

from project.config import settings


def create_celery():
    celery_app = current_celery_app
    celery_app.config_from_object(settings, namespace="CELERY")

    return celery_app


def get_task_info(task_id):
    """
    return task info according to the task_id
    """
    task = AsyncResult(task_id)
    state = task.state

    if state == "FAILURE":
        error = str(task.result)
        response = {
            "state": task.state,
            "error": error,
        }
    else:
        response = {
            "state": task.state,
        }
    return response

task의 상태를 반환하는 get_task_info함수를 추가했습니다.

Router

새로운 라우터, ws_router를 추가하겠습니다.

$ mkdir project/ws
$ touch project/ws/__init__.py
$ touch project/ws/views.py

project/ws/__init__.py:

from fastapi import APIRouter

ws_router = APIRouter()

from . import views # noqa

project/ws/views.py:

import json

from fastapi import WebSocket

from . import ws_router
from project import broadcast
from project.celery_utils import get_task_info


@ws_router.websocket("/ws/task_status/{task_id}")
async def ws_task_status(websocket: WebSocket):
    await websocket.accept()

    task_id = websocket.scope["path_params"]["task_id"]

    async with broadcast.subscribe(channel=task_id) as subscriber:
        # just in case the task already finish
        data = get_task_info(task_id)
        await websocket.send_json(data)

        async for event in subscriber:
            await websocket.send_json(json.loads(event.message))


async def update_celery_task_status(task_id: str):
    """
    This function is called by Celery worker in task_postrun signal handler
    """
    await broadcast.connect()
    await broadcast.publish(
        channel=task_id,
        message=json.dumps(get_task_info(task_id))  # RedisProtocol.publish expect str
    )
    await broadcast.disconnect()
from contextlib import asynccontextmanager
from broadcaster import Broadcast
from fastapi import FastAPI

from project.config import settings

broadcast = Broadcast(settings.WS_MESSAGE_QUEUE)


@asynccontextmanager
async def lifespan(app: FastAPI):
    await broadcast.connect()
    yield
    await broadcast.disconnect()


def create_app() -> FastAPI:
    app = FastAPI(lifespan=lifespan)

    # do this before loading routes
    from project.celery_utils import create_celery
    app.celery_app = create_celery()

    from project.users import users_router
    app.include_router(users_router)

    from project.ws import ws_router                   # new
    app.include_router(ws_router)                      # new

    @app.get("/")
    async def root():
        return {"message": "Hello World"}

    return app

코드 설명

이 코드는 FastAPI와 WebSocket, 그리고 Celery를 함께 사용하여 비동기 태스크의 상태를 실시간으로 웹 클라이언트에게 전달하는 방법을 구현한 예입니다. 여기서는 broadcast를 통해 메시지 통신을 관리하고 있으며, 이는 아마도 Redis와 같은 메시지 브로커를 사용하는 방식으로 추정됩니다. 코드의 주요 부분을 단계별로 살펴보겠습니다.

WebSocket 라우터 설정

  • @ws_router.websocket("/ws/task_status/{task_id}") 데코레이터를 사용하여, 클라이언트가 WebSocket을 통해 특정 태스크 ID에 대한 상태 정보를 구독할 수 있는 엔드포인트를 정의합니다.

WebSocket 연결 수락 및 데이터 전송

  • 클라이언트로부터 WebSocket 연결이 요청되면, 먼저 연결을 수락합니다(await websocket.accept()).
  • 태스크 ID는 URL 경로 파라미터에서 추출되며, 이 ID를 사용하여 특정 태스크의 현재 상태 정보를 가져옵니다(get_task_info(task_id)).
  • 연결 직후, 이미 완료된 태스크의 정보를 클라이언트에게 전송합니다.
  • 그 후, 클라이언트는 broadcast 시스템을 통해 해당 태스크 ID 채널을 구독하고, 태스크 상태 업데이트를 실시간으로 수신합니다(async for event in subscriber).

Celery 태스크 상태 업데이트 통보

  • update_celery_task_status 함수는 Celery 태스크의 상태가 변경될 때마다 호출됩니다. 이 함수는 Celery 워커에서 태스크의 포스트런(post-run) 신호 핸들러를 통해 트리거될 수 있습니다.
  • 함수는 먼저 broadcast 시스템에 연결하고, 변경된 태스크 상태 정보를 해당 태스크 ID 채널에 게시합니다. 이 메시지는 JSON 문자열로 인코딩되어 게시됩니다.
  • 상태 업데이트 메시지를 게시한 후, broadcast 시스템의 연결을 종료합니다.

전반적인 흐름

  • 클라이언트는 WebSocket을 통해 특정 태스크의 상태 업데이트를 실시간으로 구독합니다.
  • Celery 태스크가 실행되고, 상태가 변경될 때마다 update_celery_task_status 함수가 호출되어 해당 태스크 ID 채널에 상태 업데이트를 게시합니다.
  • WebSocket으로 연결된 클라이언트는 이 업데이트를 실시간으로 수신하여 사용자에게 보여줍니다.

Celery Signal Handler

project/users/tasks.py모듈에 새 signal handler를 추가합니다.

@task_postrun.connect
def task_postrun_handler(task_id, **kwargs):
    from project.ws.views import update_celery_task_status
    async_to_sync(update_celery_task_status)(task_id)

코드 설명


task_postrun_handler 함수는 Celery 태스크의 실행이 완료된 후 호출됩니다. 이 핸들러는 아래 작업을 수행합니다

하나. update_celery_task_status 함수를 임포트합니다. 이 함수는 비동기로 구현되었으며, 태스크의 상태 정보를 특정 WebSocket 채널을 통해 클라이언트에게 전송하는 역할을 합니다.

둘. async_to_sync를 사용하여 비동기 함수인 update_celery_task_status를 동기적인 환경에서 호출합니다. Celery는 기본적으로 비동기 IO(예: asyncio)와 호환되지 않기 때문에, async_to_sync를 통해 이 문제를 해결합니다.

셋. task_id를 update_celery_task_status 함수에 전달하여, 해당 태스크의 상태 정보를 처리하고, 관련 WebSocket 채널에 메시지를 publish합니다.


async_to_sync메소드, task_postrun데코레이터 사용을 위해 임포트도 해줍니다.

import random

import requests
from asgiref.sync import async_to_sync
from celery import shared_task
from celery.signals import task_postrun
from celery.utils.log import get_task_logger

라우터 추가

project/users/views.py 모듈에서 users_router에 /form_ws/ 엔드포인트를 추가해줍니다.

@users_router.get("/form_ws/")
def form_ws_example(request: Request):
    return templates.TemplateResponse("form_ws.html", {"request": request})

Template

project/users/templates/form_ws.html 파일을 아래와 같이 정의해줍니다

<!DOCTYPE html>
<html lang="en">
<head>
  <meta charset="UTF-8">
  <meta name="viewport" content="width=device-width, initial-scale=1.0">
  <title>Celery example</title>
    <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.2.3/dist/css/bootstrap.min.css"
          rel="stylesheet"
          integrity="sha384-rbsA2VBKQhggwzxH7pPCaAqO46MgnOM80zW1RWuH61DGLwZJEdK2Kadq2F9CUG65"
          crossorigin="anonymous"
    >
</head>
<body>
  <div class="container">
    <div class="row">
      <div class="col-12 col-md-4">
        <form id="your-form">
          <div class="mb-3">
            <label for="email" class="form-label">Email address</label>
            <input type="email" class="form-control" id="email" name="email">
          </div>
          <div class="mb-3">
            <label for="username" class="form-label">Username</label>
            <input type="text" class="form-control" id="username" name="username">
          </div>
          <div class="mb-3" id="messages"></div>
          <button type="submit" class="btn btn-primary">Submit</button>
        </form>
      </div>
    </div>
  </div>

  <script src="https://cdn.jsdelivr.net/npm/bootstrap@5.2.3/dist/js/bootstrap.bundle.min.js"
          integrity="sha384-kenU1KFdBIe4zVF0s0G1M5b4hcpxyD9F7jL+jjXkk+Q2h455rYXK/7HAuoJl+0I4"
          crossorigin="anonymous">
  </script>

  <script>
    function updateProgress(yourForm, task_id, btnHtml) {
      const ws_url = `/ws/task_status/${task_id}`;
      const WS = new WebSocket((location.protocol === 'https:' ? 'wss' : 'ws') + '://' + window.location.host + ws_url);

      WS.onmessage = function (event) {
        const res = JSON.parse(event.data);
        const taskStatus = res.state;

        if (['SUCCESS', 'FAILURE'].includes(taskStatus)) {
          const msg = yourForm.querySelector('#messages');
          const submitBtn = yourForm.querySelector('button[type="submit"]');

          if (taskStatus === 'SUCCESS') {
            msg.innerHTML = 'job succeeded';
          } else if (taskStatus === 'FAILURE') {
            msg.innerHTML = res.error;
          }

          submitBtn.disabled = false;
          submitBtn.innerHTML = btnHtml;

          // close the websocket because we do not need it now
          WS.close();
        }
      }
    }

    function serialize (data) {
      let obj = {};
      for (let [key, value] of data) {
        if (obj[key] !== undefined) {
          if (!Array.isArray(obj[key])) {
            obj[key] = [obj[key]];
          }
          obj[key].push(value);
        } else {
          obj[key] = value;
        }
      }
      return obj;
    }

    document.addEventListener("DOMContentLoaded", function () {
      const yourForm = document.getElementById("your-form");
      yourForm.addEventListener("submit", function (event) {
        event.preventDefault();
        const submitBtn = yourForm.querySelector('button[type="submit"]');
        const btnHtml = submitBtn.innerHTML;
        const spinnerHtml = 'Processing...';
        submitBtn.disabled = true;
        submitBtn.innerHTML = spinnerHtml;

        const msg = yourForm.querySelector('#messages');
        msg.innerHTML = '';

        // Get all field data from the form
        let data = new FormData(yourForm);
        // Convert to an object
        let formData = serialize(data);

        fetch('/users/form/', {
          method: 'POST',
          headers: {
            'Content-Type': 'application/json'
          },
          body: JSON.stringify(formData),
        })
        .then(response => response.json())
        .then((res) => {
          // after we get Celery task id, we start polling
          const task_id = res.task_id;
          updateProgress(yourForm, task_id, btnHtml);
          console.log(res);
        }).catch((error) => {
          console.error('Error:', error);
        });
      });
    });
  </script>
</body>
</html>

코드 설명

사용자는 폼을 통해 데이터를 입력하고 제출할 수 있으며, 서버 측에서는 Celery를 사용하여 비동기 태스크를 실행합니다. 태스크 상태는 WebSocket을 통해 실시간으로 클라이언트에 전달됩니다. 주요 기능은 다음과 같이 설명할 수 있습니다:

HTML 구조

  • Bootstrap CSS를 사용하여 스타일링된 간단한 폼이 있습니다. 이 폼에는 이메일 주소와 사용자 이름을 입력할 수 있는 필드가 있습니다.
  • id="your-form"을 가진 <form> 태그 내에, 사용자 입력을 위한 이메일과 사용자 이름 입력 필드가 있습니다.
  • 폼 제출 버튼을 클릭하면 태스크가 시작되고, 태스크의 상태는 id="messages"인 div 태그에 실시간으로 표시됩니다.

JavaScript 로직

  • serialize 함수는 폼 데이터를 객체로 변환합니다. 이 객체는 서버로 전송되기 전에 JSON 문자열로 변환됩니다.
  • updateProgress 함수는 WebSocket 연결을 생성하고, 서버로부터 태스크 상태 업데이트를 실시간으로 수신합니다. 태스크 상태(SUCCESS 또는 FAILURE)에 따라 메시지를 화면에 표시하고, 필요한 경우 WebSocket 연결을 종료합니다.
  • DOMContentLoaded 이벤트 리스너 내에서 폼 제출 이벤트를 처리합니다. 이는 사용자가 폼을 제출할 때 비동기적으로 서버에 데이터를 전송하고, 반환된 태스크 ID를 사용하여 updateProgress 함수를 호출하여 태스크 상태 업데이트를 수신하기 시작합니다.

폼 제출 및 태스크 상태 처리

  • 사용자가 폼을 제출하면, event.preventDefault()를 호출하여 폼의 기본 제출 동작을 막습니다.
  • 폼 데이터는 serialize 함수를 통해 객체로 변환되고, 이 객체는 서버로 POST 요청을 보내기 위해 JSON 문자열로 변환됩니다.
  • 서버로부터 태스크 ID를 받으면, 이 ID를 사용하여 updateProgress 함수 내에서 WebSocket 연결을 시작합니다.
  • WebSocket을 통해 서버로부터 태스크의 상태 업데이트를 받으면, 이를 사용자에게 표시합니다. 태스크가 성공하거나 실패하면, 관련 메시지를 #messages div에 표시하고, WebSocket 연결을 종료합니다.

테스트

작성한 내용을 테스트 하기 위해서 도커 이미지를 다시 빌드하고 컨테이너를 재기동해 봅니다.

$ docker compose up -d --build

http://localhost:8010/users/form_ws/ 를 브라우저 주소창에 입력하고 랜덤한 이름과 이메일을 입력한 이후 submit버튼을 클릭합니다.

API결과에 따라서 성공 혹은 오류를 확인 할 수 있습니다.


여기서의 핵심은 https://httpbin.org/delay/5, 외부 API 호출에 따라 5초가 소요되는지 아닌지를 확인하기 위함입니다.

 

이미지를 통해 확인해 보면 PENDING이라는 결과를 개발자 도구를 통해 확인할 수 있습니다. 

task가 완료되면 signal handler가 수행되어서 message가 Redis channel로 전달 됩니다. 그리고 나서 FastAPI가 

메시지를 전달 받아서 클라이언트에게 반환해주게 됩니다. 

저작자표시

'프로그래밍 언어 > 파이썬' 카테고리의 다른 글

Celery와 FastAPI - 7  (1) 2024.03.26
Celery와 FastAPI - 6  (1) 2024.03.24
Celery와 FastAPI - 5  (1) 2024.03.24
Celery와 FastAPI - 4  (0) 2024.03.24
Celery와 FastAPI - 3  (1) 2024.03.23
'프로그래밍 언어/파이썬' 카테고리의 다른 글
  • Celery와 FastAPI - 7
  • Celery와 FastAPI - 6
  • Celery와 FastAPI - 5
  • Celery와 FastAPI - 4
hyeseong-dev
hyeseong-dev
안녕하세요. 백엔드 개발자 이혜성입니다.
  • hyeseong-dev
    어제 오늘 그리고 내일
    hyeseong-dev
  • 전체
    오늘
    어제
    • 분류 전체보기 (282)
      • 여러가지 (107)
        • 알고리즘 & 자료구조 (72)
        • 오류 (4)
        • 이것저것 (29)
        • 일기 (1)
      • 프레임워크 (39)
        • 자바 스프링 (39)
        • React Native (0)
      • 프로그래밍 언어 (38)
        • 파이썬 (30)
        • 자바 (3)
        • 스프링부트 (5)
      • 운영체제 (0)
      • DB (17)
        • SQL (0)
        • Redis (17)
      • 클라우드 컴퓨팅 (2)
        • 도커 (2)
        • AWS (0)
      • 스케쥴 (65)
        • 세미나 (0)
        • 수료 (0)
        • 스터디 (24)
        • 시험 (41)
      • 트러블슈팅 (1)
      • 자격증 (0)
        • 정보처리기사 (0)
      • 재태크 (4)
        • 암호화폐 (4)
        • 기타 (0)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

  • 공지사항

  • 인기 글

  • 태그

    celery
    백준
    ecs
    OOP
    Docker-compose
    WebFlux
    mybatis
    프로그래머스
    FastAPI
    #개발자포트폴리오 #개발자이력서 #개발자취업 #개발자취준 #코딩테스트 #항해99 #취리코 #취업리부트코스
    spring
    Python
    AWS
    DP
    완전탐색
    Spring Boot
    SAA
    reactor
    그리디
    파이썬
    항해99
    docker
    취업리부트
    java
    시험
    자바
    Redis
    EC2
    RDS
    Spring WebFlux
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.0
hyeseong-dev
Celery와 FastAPI - 8
상단으로

티스토리툴바