이번에는 Broadcaster와 WebSockets를 사용하여 어플리케이션에서의 비효율적인 폴링 절차를 최적화하는 방법을 확인해봅니다.
XHR Short Polling을 사용하여 태스크 상태를 확인했지만, 이 방식은 많은 연결과 쿼리를 생성하여 리소스를 낭비할 수 있으며, 폴링 간격에 따라 태스크 완료와 클라이언트 업데이트 사이에 지연이 발생할 수 있습니다.
목표
- Broadcaster를 사용하여 Redis pub/sub을 통해 다중 프로세스 알림 처리를 해봅니다.
- asyncio에 대해 알아 봅니다.
- asgiref 라이브러리를 이용하여 async를 sync로 변환해봅니다.
Websocket?
WebSocket 프로토콜은 클라이언트와 서버 간의 양방향(full-duplex) 통신 채널을 단일 TCP 연결을 통해 제공하는 컴퓨터 통신 프로토콜입니다. 이는 HTTP 프로토콜의 한계를 극복하고, 실시간 웹 애플리케이션에 필요한 지속적인 데이터 교환을 가능하게 합니다.
WebSocket의 작동 방식
1 연결 수립:
웹 클라이언트(예: 웹 브라우저)가 서버에 WebSocket 연결을 요청합니다. 이 과정은 처음에 HTTP 요청을 통해 이루어지며, 이후 서버가 이를 WebSocket 연결로 승인하면 HTTP 연결이 WebSocket 연결로 업그레이드됩니다.
2 양방향 통신:
연결이 수립되면, 클라이언트와 서버는 연결이 종료될 때까지 지속적으로 데이터를 주고받을 수 있습니다. 이는 양측이 동시에 메시지를 보낼 수 있는 양방향 통신을 가능하게 합니다.
3 지속적 연결:
WebSocket 연결은 수동으로 종료되거나 네트워크 오류 등으로 인해 끊어지기 전까지 계속 유지됩니다. 이로 인해, 클라이언트와 서버는 필요할 때마다 즉시 데이터를 교환할 수 있습니다.
WebSocket의 주요 특징
- 실시간 통신: 실시간 채팅 애플리케이션, 게임, 금융 거래 플랫폼 등에서 실시간 데이터 교환이 필요한 경우 WebSocket을 이용할 수 있습니다.
- 효율성: 한 번의 연결 수립으로 지속적인 데이터 교환이 가능하므로, 폴링 방식에 비해 네트워크 오버헤드가 줄어듭니다.
- 호환성: 모든 현대적인 웹 브라우저는 WebSocket을 지원합니다. 이는 개발자가 별도의 플러그인이나 라이브러리 없이도 WebSocket 기반의 실시간 애플리케이션을 구현할 수 있음을 의미합니다.
브라우저 지원 현황
- Chrome, Firefox, Internet Explorer, Edge, Safari, Opera 및 이들의 모바일 버전을 포함하여 현대적인 대부분의 웹 브라우저와 모바일 브라우저는 WebSocket을 지원합니다. 이는 WebSocket이 웹 개발에서 널리 사용되고 표준화된 기술임을 보여줍니다.
ASGI vs WSGI
ASGI(Asynchronous Server Gateway Interface)는 WSGI(Web Server Gateway Interface)의 후임자로, Python 웹 서버, 프레임워크, 애플리케이션 간의 호환성을 위한 오랜 표준을 발전시킨 것입니다. ASGI와 WSGI는 모두 Python 애플리케이션을 웹 서버와 연동하기 위한 인터페이스를 제공하지만, 그들이 지원하는 기능과 목적에 있어 중요한 차이점이 있습니다.
WSGI
- WSGI는 Python 애플리케이션과 웹 서버 간의 표준 인터페이스로, 동기 방식의 통신을 지원합니다.
- WSGI는 한 번에 하나의 요청을 처리하는 방식으로, 요청 처리 중에는 해당 프로세스 또는 스레드가 다른 작업을 수행할 수 없습니다. 이는 동시성을 제한하며, I/O 작업이 많은 애플리케이션에서 성능 병목현상을 유발할 수 있습니다.
- WSGI는 HTTP/1.1 요청을 처리하는 데 최적화되어 있으며, WebSockets이나 HTTP/2와 같은 프로토콜을 직접 지원하지 않습니다.
ASGI
- ASGI는 비동기 IO를 지원하도록 설계된 인터페이스로, Python 3.5 이상에서 도입된
async와await구문을 사용하여 비동기 프로그래밍을 가능하게 합니다. - ASGI는 동시에 여러 요청을 비동기적으로 처리할 수 있어, I/O 바운드 작업에서 높은 성능을 발휘합니다. 이는 WebSockets, HTTP/2, long polling과 같은 실시간 통신 요구사항을 가진 애플리케이션 개발에 적합합니다.
- ASGI는 WSGI의 기능을 확장하며, 기존의 동기 방식 요청 처리뿐만 아니라 비동기 처리도 가능하게 해줍니다. 이로 인해 개발자는 하나의 애플리케이션 내에서 동기와 비동기 코드를 모두 사용할 수 있게 되었습니다.
FastAPI와 ASGI
- FastAPI는 ASGI를 기반으로 하는 모던 Python 웹 프레임워크입니다. Starlette ASGI 프레임워크 위에 구축되어 있으며, ASGI의 비동기 기능을 완전히 활용하여 고성능 API 서버를 구축할 수 있게 해줍니다.
- FastAPI는 비동기 프로그래밍을 쉽게 구현할 수 있게 해주며, WebSockets과 HTTP/2 지원을 포함하여 실시간 통신과 높은 동시성 요구사항을 가진 어플리케이션 개발에 이상적입니다.
결론적으로, ASGI는 Python 웹 개발에서 비동기 프로그래밍과 실시간 웹 통신을 가능하게 하는 현대적인 인터페이스로, WSGI의 기능을 확장하여 더 다양한 웹 개발 요구사항을 충족시키도록 설계되었습니다.
WorkFlow

이 시나리오는 FastAPI, Celery, 그리고 WebSocket을 사용하여 비동기 태스크의 상태 업데이트를 실시간으로 클라이언트에 전달하는 과정을 설명합니다. 각 단계별로 자세히 살펴보겠습니다.
1. AJAX 요청으로 Celery 태스크 트리거
- 클라이언트(웹 브라우저)는 AJAX 요청을 FastAPI가 구동하는 서버의 특정 뷰로 보냅니다. 이 요청의 목적은 비동기적으로 실행될 Celery 태스크를 트리거하는 것입니다.
2. FastAPI에서 태스크 ID 반환
- 요청을 받은 FastAPI는 Celery 태스크를 생성하고 celery worker를 실행시키고, 생성된 태스크의 고유 ID(
task_id)를 클라이언트에게 반환합니다. 이task_id는 태스크의 상태를 추적하는 데 사용됩니다.
3. WebSocket 연결 생성
- 클라이언트는 받은
task_id를 사용하여 WebSocket 연결을 요청합니다. 연결 주소는ws://127.0.0.1:8010/ws/task_status/{task_id}형식으로, 태스크 ID를 포함하고 있습니다.
4. FastAPI에서 WebSocket 요청 처리
- FastAPI 서버는 WebSocket 연결 요청을 받고, 해당 연결을
task_id채널에 구독시킵니다. 이는 서버가 해당 태스크에 대한 업데이트를 수신할 준비가 되었음을 의미합니다.
5. Celery 태스크 처리 완료 및 메시지 전송
- Celery 태스크가 처리를 마치면, 처리 결과와 함께 메시지를
task_id채널로 보냅니다. 이 채널에 구독된 모든 클라이언트(이 경우 WebSocket을 통해 연결된 클라이언트)는 이 메시지를 수신하게 됩니다.
6. 클라이언트에서 WebSocket 종료 및 결과 표시
- 클라이언트는 Celery 태스크의 처리 결과를 포함한 메시지를 수신하면 WebSocket 연결을 종료하고, 웹 페이지에 결과를 표시합니다.
이 과정을 통해, 실시간 웹 어플리케이션에서 비동기 태스크의 진행 상황과 결과를 효과적으로 사용자에게 전달할 수 있습니다. WebSocket을 사용함으로써 서버와 클라이언트 간에 지속적이고 실시간으로 통신할 수 있으며, 이는 사용자 경험을 크게 향상시키는 동시에 서버 리소스의 효율적 사용을 가능하게 합니다.
용어 정리
XHR(XMLHttpRequest)?
XHR(XMLHttpRequest)은 JavaScript를 사용하여 서버와 비동기적으로 데이터를 교환할 수 있는 Web API입니다. 이를 통해 웹 페이지를 새로 고치지 않고도 페이지의 일부분만을 업데이트할 수 있어, 웹 애플리케이션의 사용자 경험을 크게 개선할 수 있습니다. XHR은 AJAX(Asynchronous JavaScript and XML) 프로그래밍의 핵심 요소로, JSON, XML, HTML, 텍스트 등 다양한 형태의 데이터를 처리할 수 있습니다.
XHR Short Polling?
XHR Short Polling은 클라이언트가 서버에 주기적으로 요청을 보내어 새로운 데이터가 있는지 확인하는 방식입니다.
XHR(XMLHttpRequest) 객체를 사용하여 이러한 HTTP 요청을 비동기적으로 수행하며, 서버로부터의 응답을 기다린 후 즉시 다시 요청을 보내는 방식입니다. Short Polling은 구현이 간단하고 어떤 환경에서든 쉽게 사용할 수 있다는 장점이 있지만, 실시간 데이터 처리에는 적합하지 않고 서버 리소스를 비효율적으로 사용할 수 있다는 단점이 있습니다.
Long Polling?
Long Polling은 Short Polling의 단점을 일부 개선한 방식으로, 클라이언트가 서버에 요청을 보내면 서버는 새로운 데이터가 생길 때까지 요청을 보류하고, 데이터가 준비되면 그 즉시 응답합니다.
서버의 응답을 받은 클라이언트는 다시 새로운 요청을 보내어 이 과정을 반복합니다. Long Polling은 Short Polling에 비해 서버 리소스를 더 효율적으로 사용할 수 있고, 데이터의 실시간 처리에 더 적합하지만, 여전히 연결 설정과 종료에 따른 오버헤드가 있으며, 많은 동시 연결을 관리해야 하는 서버에 부담을 줄 수 있습니다.
Long Polling보다 더 성능이 좋은 방법?
Long Polling보다 성능이 더 좋은 대안으로는 WebSocket과 Server-Sent Events(SSE)가 있습니다.
WebSocket: 양방향 통신을 지원하는 프로토콜로, 클라이언트와 서버 간에 지속적인 연결을 유지합니다. 이를 통해 데이터가 준비되는 즉시 실시간으로 데이터를 교환할 수 있으며, Long Polling에 비해 훨씬 효율적이고 빠릅니다. 실시간 채팅 애플리케이션, 게임, 실시간 피드 등의 구현에 적합합니다.
Server-Sent Events(SSE): 서버에서 클라이언트로 데이터 스트림을 실시간으로 전송할 수 있는 방법으로, 주로 단방향 통신에 사용됩니다. WebSocket만큼 유연하지는 않지만, 단순한 실시간 데이터 전송이 필요한 경우에 적합하며, HTTP 프로토콜을 기반으로 하기 때문에 구현이 비교적 간단합니다.
WebSocket과 SSE는 실시간 통신을 구현할 때 Long Polling보다 성능과 효율성 측면에서 우수한 대안을 제공합니다.
구현
Config
WS_MESSAGE_QUEUE 환경 변수를 .env/.dev-sample 파일에 정의합니다.
WS_MESSAGE_QUEUE=redis://redis:6379/0
BaseConfig 클래스에 WS_MESSAGE_QUEUE 변수를 project/config.py 모듈에 추가합니다.
class BaseConfig:
BASE_DIR: pathlib.Path = pathlib.Path(__file__).parent.parent
DATABASE_URL: str = os.environ.get("DATABASE_URL", f"sqlite:///{BASE_DIR}/db.sqlite3")
DATABASE_CONNECT_DICT: dict = {}
CELERY_BROKER_URL: str = os.environ.get("CELERY_BROKER_URL", "redis://127.0.0.1:6379/0")
CELERY_RESULT_BACKEND: str = os.environ.get("CELERY_RESULT_BACKEND", "redis://127.0.0.1:6379/0")
WS_MESSAGE_QUEUE: str = os.environ.get("WS_MESSAGE_QUEUE", "redis://127.0.0.1:6379/0")
FastAPI와 Celery Worker는 양방향 통신을 사용을 위해 WS_MESSAGE_QUEUE 환경 변수를 사용 할 것입니다.
Broadcaster
requirements.txt 파일에 라이브러리를 추가하도록 합니다.
asgiref==3.7.2
asyncio-redis==0.16.0
broadcaster==0.2.0
1 asyncio_redis는 Broadcaster에 필요한 asyncio를 지원하는 Redis client입니다.
project/__init__.py모듈을 수정합니다.
from contextlib import asynccontextmanager
from broadcaster import Broadcast
from fastapi import FastAPI
from project.config import settings
broadcast = Broadcast(settings.WS_MESSAGE_QUEUE)
@asynccontextmanager
async def lifespan(app: FastAPI):
await broadcast.connect()
yield
await broadcast.disconnect()
def create_app() -> FastAPI:
app = FastAPI(lifespan=lifespan)
# do this before loading routes
from project.celery_utils import create_celery
app.celery_app = create_celery()
from project.users import users_router
app.include_router(users_router)
@app.get("/")
async def root():
return {"message": "Hello World"}
return app
- FastAPI에서 사용되는 Broadcast인스턴스를 만들었습니다. 참고로 celery worker에서 사용하지 않습니다.
- lifespan함수에는 애플리케이션 시작 전 broadcast 커넥션을 맺고 앱 종료 후 커넥션을 끊어버립니다.
Celery
project/celery_utils.py
from celery import current_app as current_celery_app
from celery.result import AsyncResult
from project.config import settings
def create_celery():
celery_app = current_celery_app
celery_app.config_from_object(settings, namespace="CELERY")
return celery_app
def get_task_info(task_id):
"""
return task info according to the task_id
"""
task = AsyncResult(task_id)
state = task.state
if state == "FAILURE":
error = str(task.result)
response = {
"state": task.state,
"error": error,
}
else:
response = {
"state": task.state,
}
return response
task의 상태를 반환하는 get_task_info함수를 추가했습니다.
Router
새로운 라우터, ws_router를 추가하겠습니다.
$ mkdir project/ws
$ touch project/ws/__init__.py
$ touch project/ws/views.py
project/ws/__init__.py:
from fastapi import APIRouter
ws_router = APIRouter()
from . import views # noqa
project/ws/views.py:
import json
from fastapi import WebSocket
from . import ws_router
from project import broadcast
from project.celery_utils import get_task_info
@ws_router.websocket("/ws/task_status/{task_id}")
async def ws_task_status(websocket: WebSocket):
await websocket.accept()
task_id = websocket.scope["path_params"]["task_id"]
async with broadcast.subscribe(channel=task_id) as subscriber:
# just in case the task already finish
data = get_task_info(task_id)
await websocket.send_json(data)
async for event in subscriber:
await websocket.send_json(json.loads(event.message))
async def update_celery_task_status(task_id: str):
"""
This function is called by Celery worker in task_postrun signal handler
"""
await broadcast.connect()
await broadcast.publish(
channel=task_id,
message=json.dumps(get_task_info(task_id)) # RedisProtocol.publish expect str
)
await broadcast.disconnect()
from contextlib import asynccontextmanager
from broadcaster import Broadcast
from fastapi import FastAPI
from project.config import settings
broadcast = Broadcast(settings.WS_MESSAGE_QUEUE)
@asynccontextmanager
async def lifespan(app: FastAPI):
await broadcast.connect()
yield
await broadcast.disconnect()
def create_app() -> FastAPI:
app = FastAPI(lifespan=lifespan)
# do this before loading routes
from project.celery_utils import create_celery
app.celery_app = create_celery()
from project.users import users_router
app.include_router(users_router)
from project.ws import ws_router # new
app.include_router(ws_router) # new
@app.get("/")
async def root():
return {"message": "Hello World"}
return app
코드 설명
이 코드는 FastAPI와 WebSocket, 그리고 Celery를 함께 사용하여 비동기 태스크의 상태를 실시간으로 웹 클라이언트에게 전달하는 방법을 구현한 예입니다. 여기서는 broadcast를 통해 메시지 통신을 관리하고 있으며, 이는 아마도 Redis와 같은 메시지 브로커를 사용하는 방식으로 추정됩니다. 코드의 주요 부분을 단계별로 살펴보겠습니다.
WebSocket 라우터 설정
@ws_router.websocket("/ws/task_status/{task_id}")데코레이터를 사용하여, 클라이언트가 WebSocket을 통해 특정 태스크 ID에 대한 상태 정보를 구독할 수 있는 엔드포인트를 정의합니다.
WebSocket 연결 수락 및 데이터 전송
- 클라이언트로부터 WebSocket 연결이 요청되면, 먼저 연결을 수락합니다(
await websocket.accept()). - 태스크 ID는 URL 경로 파라미터에서 추출되며, 이 ID를 사용하여 특정 태스크의 현재 상태 정보를 가져옵니다(
get_task_info(task_id)). - 연결 직후, 이미 완료된 태스크의 정보를 클라이언트에게 전송합니다.
- 그 후, 클라이언트는
broadcast시스템을 통해 해당 태스크 ID 채널을 구독하고, 태스크 상태 업데이트를 실시간으로 수신합니다(async for event in subscriber).
Celery 태스크 상태 업데이트 통보
update_celery_task_status함수는 Celery 태스크의 상태가 변경될 때마다 호출됩니다. 이 함수는 Celery 워커에서 태스크의 포스트런(post-run) 신호 핸들러를 통해 트리거될 수 있습니다.- 함수는 먼저
broadcast시스템에 연결하고, 변경된 태스크 상태 정보를 해당 태스크 ID 채널에 게시합니다. 이 메시지는 JSON 문자열로 인코딩되어 게시됩니다. - 상태 업데이트 메시지를 게시한 후,
broadcast시스템의 연결을 종료합니다.
전반적인 흐름
- 클라이언트는 WebSocket을 통해 특정 태스크의 상태 업데이트를 실시간으로 구독합니다.
- Celery 태스크가 실행되고, 상태가 변경될 때마다
update_celery_task_status함수가 호출되어 해당 태스크 ID 채널에 상태 업데이트를 게시합니다. - WebSocket으로 연결된 클라이언트는 이 업데이트를 실시간으로 수신하여 사용자에게 보여줍니다.
Celery Signal Handler
project/users/tasks.py모듈에 새 signal handler를 추가합니다.
@task_postrun.connect
def task_postrun_handler(task_id, **kwargs):
from project.ws.views import update_celery_task_status
async_to_sync(update_celery_task_status)(task_id)
코드 설명
task_postrun_handler 함수는 Celery 태스크의 실행이 완료된 후 호출됩니다. 이 핸들러는 아래 작업을 수행합니다
하나. update_celery_task_status 함수를 임포트합니다. 이 함수는 비동기로 구현되었으며, 태스크의 상태 정보를 특정 WebSocket 채널을 통해 클라이언트에게 전송하는 역할을 합니다.
둘. async_to_sync를 사용하여 비동기 함수인 update_celery_task_status를 동기적인 환경에서 호출합니다. Celery는 기본적으로 비동기 IO(예: asyncio)와 호환되지 않기 때문에, async_to_sync를 통해 이 문제를 해결합니다.
셋. task_id를 update_celery_task_status 함수에 전달하여, 해당 태스크의 상태 정보를 처리하고, 관련 WebSocket 채널에 메시지를 publish합니다.
async_to_sync메소드, task_postrun데코레이터 사용을 위해 임포트도 해줍니다.
import random
import requests
from asgiref.sync import async_to_sync
from celery import shared_task
from celery.signals import task_postrun
from celery.utils.log import get_task_logger
라우터 추가
project/users/views.py 모듈에서 users_router에 /form_ws/ 엔드포인트를 추가해줍니다.
@users_router.get("/form_ws/")
def form_ws_example(request: Request):
return templates.TemplateResponse("form_ws.html", {"request": request})
Template
project/users/templates/form_ws.html 파일을 아래와 같이 정의해줍니다
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Celery example</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.2.3/dist/css/bootstrap.min.css"
rel="stylesheet"
integrity="sha384-rbsA2VBKQhggwzxH7pPCaAqO46MgnOM80zW1RWuH61DGLwZJEdK2Kadq2F9CUG65"
crossorigin="anonymous"
>
</head>
<body>
<div class="container">
<div class="row">
<div class="col-12 col-md-4">
<form id="your-form">
<div class="mb-3">
<label for="email" class="form-label">Email address</label>
<input type="email" class="form-control" id="email" name="email">
</div>
<div class="mb-3">
<label for="username" class="form-label">Username</label>
<input type="text" class="form-control" id="username" name="username">
</div>
<div class="mb-3" id="messages"></div>
<button type="submit" class="btn btn-primary">Submit</button>
</form>
</div>
</div>
</div>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.2.3/dist/js/bootstrap.bundle.min.js"
integrity="sha384-kenU1KFdBIe4zVF0s0G1M5b4hcpxyD9F7jL+jjXkk+Q2h455rYXK/7HAuoJl+0I4"
crossorigin="anonymous">
</script>
<script>
function updateProgress(yourForm, task_id, btnHtml) {
const ws_url = `/ws/task_status/${task_id}`;
const WS = new WebSocket((location.protocol === 'https:' ? 'wss' : 'ws') + '://' + window.location.host + ws_url);
WS.onmessage = function (event) {
const res = JSON.parse(event.data);
const taskStatus = res.state;
if (['SUCCESS', 'FAILURE'].includes(taskStatus)) {
const msg = yourForm.querySelector('#messages');
const submitBtn = yourForm.querySelector('button[type="submit"]');
if (taskStatus === 'SUCCESS') {
msg.innerHTML = 'job succeeded';
} else if (taskStatus === 'FAILURE') {
msg.innerHTML = res.error;
}
submitBtn.disabled = false;
submitBtn.innerHTML = btnHtml;
// close the websocket because we do not need it now
WS.close();
}
}
}
function serialize (data) {
let obj = {};
for (let [key, value] of data) {
if (obj[key] !== undefined) {
if (!Array.isArray(obj[key])) {
obj[key] = [obj[key]];
}
obj[key].push(value);
} else {
obj[key] = value;
}
}
return obj;
}
document.addEventListener("DOMContentLoaded", function () {
const yourForm = document.getElementById("your-form");
yourForm.addEventListener("submit", function (event) {
event.preventDefault();
const submitBtn = yourForm.querySelector('button[type="submit"]');
const btnHtml = submitBtn.innerHTML;
const spinnerHtml = 'Processing...';
submitBtn.disabled = true;
submitBtn.innerHTML = spinnerHtml;
const msg = yourForm.querySelector('#messages');
msg.innerHTML = '';
// Get all field data from the form
let data = new FormData(yourForm);
// Convert to an object
let formData = serialize(data);
fetch('/users/form/', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(formData),
})
.then(response => response.json())
.then((res) => {
// after we get Celery task id, we start polling
const task_id = res.task_id;
updateProgress(yourForm, task_id, btnHtml);
console.log(res);
}).catch((error) => {
console.error('Error:', error);
});
});
});
</script>
</body>
</html>
코드 설명
사용자는 폼을 통해 데이터를 입력하고 제출할 수 있으며, 서버 측에서는 Celery를 사용하여 비동기 태스크를 실행합니다. 태스크 상태는 WebSocket을 통해 실시간으로 클라이언트에 전달됩니다. 주요 기능은 다음과 같이 설명할 수 있습니다:
HTML 구조
- Bootstrap CSS를 사용하여 스타일링된 간단한 폼이 있습니다. 이 폼에는 이메일 주소와 사용자 이름을 입력할 수 있는 필드가 있습니다.
id="your-form"을 가진<form>태그 내에, 사용자 입력을 위한 이메일과 사용자 이름 입력 필드가 있습니다.- 폼 제출 버튼을 클릭하면 태스크가 시작되고, 태스크의 상태는
id="messages"인 div 태그에 실시간으로 표시됩니다.
JavaScript 로직
serialize함수는 폼 데이터를 객체로 변환합니다. 이 객체는 서버로 전송되기 전에 JSON 문자열로 변환됩니다.updateProgress함수는 WebSocket 연결을 생성하고, 서버로부터 태스크 상태 업데이트를 실시간으로 수신합니다. 태스크 상태(SUCCESS또는FAILURE)에 따라 메시지를 화면에 표시하고, 필요한 경우 WebSocket 연결을 종료합니다.DOMContentLoaded이벤트 리스너 내에서 폼 제출 이벤트를 처리합니다. 이는 사용자가 폼을 제출할 때 비동기적으로 서버에 데이터를 전송하고, 반환된 태스크 ID를 사용하여updateProgress함수를 호출하여 태스크 상태 업데이트를 수신하기 시작합니다.
폼 제출 및 태스크 상태 처리
- 사용자가 폼을 제출하면,
event.preventDefault()를 호출하여 폼의 기본 제출 동작을 막습니다. - 폼 데이터는
serialize함수를 통해 객체로 변환되고, 이 객체는 서버로 POST 요청을 보내기 위해 JSON 문자열로 변환됩니다. - 서버로부터 태스크 ID를 받으면, 이 ID를 사용하여
updateProgress함수 내에서 WebSocket 연결을 시작합니다. - WebSocket을 통해 서버로부터 태스크의 상태 업데이트를 받으면, 이를 사용자에게 표시합니다. 태스크가 성공하거나 실패하면, 관련 메시지를
#messagesdiv에 표시하고, WebSocket 연결을 종료합니다.
테스트
작성한 내용을 테스트 하기 위해서 도커 이미지를 다시 빌드하고 컨테이너를 재기동해 봅니다.
$ docker compose up -d --build
http://localhost:8010/users/form_ws/ 를 브라우저 주소창에 입력하고 랜덤한 이름과 이메일을 입력한 이후 submit버튼을 클릭합니다.
API결과에 따라서 성공 혹은 오류를 확인 할 수 있습니다.
여기서의 핵심은 https://httpbin.org/delay/5, 외부 API 호출에 따라 5초가 소요되는지 아닌지를 확인하기 위함입니다.

이미지를 통해 확인해 보면 PENDING이라는 결과를 개발자 도구를 통해 확인할 수 있습니다.
task가 완료되면 signal handler가 수행되어서 message가 Redis channel로 전달 됩니다. 그리고 나서 FastAPI가
메시지를 전달 받아서 클라이언트에게 반환해주게 됩니다.
'프로그래밍 언어 > 파이썬' 카테고리의 다른 글
| input & itertools.accumulate 함수 알아보기 (6) | 2025.08.18 |
|---|---|
| Celery와 FastAPI - 7 (1) | 2024.03.26 |
| Celery와 FastAPI - 6 (1) | 2024.03.24 |
| Celery와 FastAPI - 5 (1) | 2024.03.24 |
| Celery와 FastAPI - 4 (0) | 2024.03.24 |