이번에는 Broadcaster와 WebSockets를 사용하여 어플리케이션에서의 비효율적인 폴링 절차를 최적화하는 방법을 확인해봅니다.
XHR Short Polling을 사용하여 태스크 상태를 확인했지만, 이 방식은 많은 연결과 쿼리를 생성하여 리소스를 낭비할 수 있으며, 폴링 간격에 따라 태스크 완료와 클라이언트 업데이트 사이에 지연이 발생할 수 있습니다.
목표
- Broadcaster를 사용하여 Redis pub/sub을 통해 다중 프로세스 알림 처리를 해봅니다.
- asyncio에 대해 알아 봅니다.
- asgiref 라이브러리를 이용하여 async를 sync로 변환해봅니다.
Websocket?
WebSocket 프로토콜은 클라이언트와 서버 간의 양방향(full-duplex) 통신 채널을 단일 TCP 연결을 통해 제공하는 컴퓨터 통신 프로토콜입니다. 이는 HTTP 프로토콜의 한계를 극복하고, 실시간 웹 애플리케이션에 필요한 지속적인 데이터 교환을 가능하게 합니다.
WebSocket의 작동 방식
1 연결 수립:
웹 클라이언트(예: 웹 브라우저)가 서버에 WebSocket 연결을 요청합니다. 이 과정은 처음에 HTTP 요청을 통해 이루어지며, 이후 서버가 이를 WebSocket 연결로 승인하면 HTTP 연결이 WebSocket 연결로 업그레이드됩니다.
2 양방향 통신:
연결이 수립되면, 클라이언트와 서버는 연결이 종료될 때까지 지속적으로 데이터를 주고받을 수 있습니다. 이는 양측이 동시에 메시지를 보낼 수 있는 양방향 통신을 가능하게 합니다.
3 지속적 연결:
WebSocket 연결은 수동으로 종료되거나 네트워크 오류 등으로 인해 끊어지기 전까지 계속 유지됩니다. 이로 인해, 클라이언트와 서버는 필요할 때마다 즉시 데이터를 교환할 수 있습니다.
WebSocket의 주요 특징
- 실시간 통신: 실시간 채팅 애플리케이션, 게임, 금융 거래 플랫폼 등에서 실시간 데이터 교환이 필요한 경우 WebSocket을 이용할 수 있습니다.
- 효율성: 한 번의 연결 수립으로 지속적인 데이터 교환이 가능하므로, 폴링 방식에 비해 네트워크 오버헤드가 줄어듭니다.
- 호환성: 모든 현대적인 웹 브라우저는 WebSocket을 지원합니다. 이는 개발자가 별도의 플러그인이나 라이브러리 없이도 WebSocket 기반의 실시간 애플리케이션을 구현할 수 있음을 의미합니다.
브라우저 지원 현황
- Chrome, Firefox, Internet Explorer, Edge, Safari, Opera 및 이들의 모바일 버전을 포함하여 현대적인 대부분의 웹 브라우저와 모바일 브라우저는 WebSocket을 지원합니다. 이는 WebSocket이 웹 개발에서 널리 사용되고 표준화된 기술임을 보여줍니다.
ASGI vs WSGI
ASGI(Asynchronous Server Gateway Interface)는 WSGI(Web Server Gateway Interface)의 후임자로, Python 웹 서버, 프레임워크, 애플리케이션 간의 호환성을 위한 오랜 표준을 발전시킨 것입니다. ASGI와 WSGI는 모두 Python 애플리케이션을 웹 서버와 연동하기 위한 인터페이스를 제공하지만, 그들이 지원하는 기능과 목적에 있어 중요한 차이점이 있습니다.
WSGI
- WSGI는 Python 애플리케이션과 웹 서버 간의 표준 인터페이스로, 동기 방식의 통신을 지원합니다.
- WSGI는 한 번에 하나의 요청을 처리하는 방식으로, 요청 처리 중에는 해당 프로세스 또는 스레드가 다른 작업을 수행할 수 없습니다. 이는 동시성을 제한하며, I/O 작업이 많은 애플리케이션에서 성능 병목현상을 유발할 수 있습니다.
- WSGI는 HTTP/1.1 요청을 처리하는 데 최적화되어 있으며, WebSockets이나 HTTP/2와 같은 프로토콜을 직접 지원하지 않습니다.
ASGI
- ASGI는 비동기 IO를 지원하도록 설계된 인터페이스로, Python 3.5 이상에서 도입된
async
와await
구문을 사용하여 비동기 프로그래밍을 가능하게 합니다. - ASGI는 동시에 여러 요청을 비동기적으로 처리할 수 있어, I/O 바운드 작업에서 높은 성능을 발휘합니다. 이는 WebSockets, HTTP/2, long polling과 같은 실시간 통신 요구사항을 가진 애플리케이션 개발에 적합합니다.
- ASGI는 WSGI의 기능을 확장하며, 기존의 동기 방식 요청 처리뿐만 아니라 비동기 처리도 가능하게 해줍니다. 이로 인해 개발자는 하나의 애플리케이션 내에서 동기와 비동기 코드를 모두 사용할 수 있게 되었습니다.
FastAPI와 ASGI
- FastAPI는 ASGI를 기반으로 하는 모던 Python 웹 프레임워크입니다. Starlette ASGI 프레임워크 위에 구축되어 있으며, ASGI의 비동기 기능을 완전히 활용하여 고성능 API 서버를 구축할 수 있게 해줍니다.
- FastAPI는 비동기 프로그래밍을 쉽게 구현할 수 있게 해주며, WebSockets과 HTTP/2 지원을 포함하여 실시간 통신과 높은 동시성 요구사항을 가진 어플리케이션 개발에 이상적입니다.
결론적으로, ASGI는 Python 웹 개발에서 비동기 프로그래밍과 실시간 웹 통신을 가능하게 하는 현대적인 인터페이스로, WSGI의 기능을 확장하여 더 다양한 웹 개발 요구사항을 충족시키도록 설계되었습니다.
WorkFlow
이 시나리오는 FastAPI, Celery, 그리고 WebSocket을 사용하여 비동기 태스크의 상태 업데이트를 실시간으로 클라이언트에 전달하는 과정을 설명합니다. 각 단계별로 자세히 살펴보겠습니다.
1. AJAX 요청으로 Celery 태스크 트리거
- 클라이언트(웹 브라우저)는 AJAX 요청을 FastAPI가 구동하는 서버의 특정 뷰로 보냅니다. 이 요청의 목적은 비동기적으로 실행될 Celery 태스크를 트리거하는 것입니다.
2. FastAPI에서 태스크 ID 반환
- 요청을 받은 FastAPI는 Celery 태스크를 생성하고 celery worker를 실행시키고, 생성된 태스크의 고유 ID(
task_id
)를 클라이언트에게 반환합니다. 이task_id
는 태스크의 상태를 추적하는 데 사용됩니다.
3. WebSocket 연결 생성
- 클라이언트는 받은
task_id
를 사용하여 WebSocket 연결을 요청합니다. 연결 주소는ws://127.0.0.1:8010/ws/task_status/{task_id}
형식으로, 태스크 ID를 포함하고 있습니다.
4. FastAPI에서 WebSocket 요청 처리
- FastAPI 서버는 WebSocket 연결 요청을 받고, 해당 연결을
task_id
채널에 구독시킵니다. 이는 서버가 해당 태스크에 대한 업데이트를 수신할 준비가 되었음을 의미합니다.
5. Celery 태스크 처리 완료 및 메시지 전송
- Celery 태스크가 처리를 마치면, 처리 결과와 함께 메시지를
task_id
채널로 보냅니다. 이 채널에 구독된 모든 클라이언트(이 경우 WebSocket을 통해 연결된 클라이언트)는 이 메시지를 수신하게 됩니다.
6. 클라이언트에서 WebSocket 종료 및 결과 표시
- 클라이언트는 Celery 태스크의 처리 결과를 포함한 메시지를 수신하면 WebSocket 연결을 종료하고, 웹 페이지에 결과를 표시합니다.
이 과정을 통해, 실시간 웹 어플리케이션에서 비동기 태스크의 진행 상황과 결과를 효과적으로 사용자에게 전달할 수 있습니다. WebSocket을 사용함으로써 서버와 클라이언트 간에 지속적이고 실시간으로 통신할 수 있으며, 이는 사용자 경험을 크게 향상시키는 동시에 서버 리소스의 효율적 사용을 가능하게 합니다.
용어 정리
XHR(XMLHttpRequest)?
XHR(XMLHttpRequest)은 JavaScript를 사용하여 서버와 비동기적으로 데이터를 교환할 수 있는 Web API입니다. 이를 통해 웹 페이지를 새로 고치지 않고도 페이지의 일부분만을 업데이트할 수 있어, 웹 애플리케이션의 사용자 경험을 크게 개선할 수 있습니다. XHR은 AJAX(Asynchronous JavaScript and XML) 프로그래밍의 핵심 요소로, JSON, XML, HTML, 텍스트 등 다양한 형태의 데이터를 처리할 수 있습니다.
XHR Short Polling?
XHR Short Polling은 클라이언트가 서버에 주기적으로 요청을 보내어 새로운 데이터가 있는지 확인하는 방식입니다.
XHR(XMLHttpRequest) 객체를 사용하여 이러한 HTTP 요청을 비동기적으로 수행하며, 서버로부터의 응답을 기다린 후 즉시 다시 요청을 보내는 방식입니다. Short Polling은 구현이 간단하고 어떤 환경에서든 쉽게 사용할 수 있다는 장점이 있지만, 실시간 데이터 처리에는 적합하지 않고 서버 리소스를 비효율적으로 사용할 수 있다는 단점이 있습니다.
Long Polling?
Long Polling은 Short Polling의 단점을 일부 개선한 방식으로, 클라이언트가 서버에 요청을 보내면 서버는 새로운 데이터가 생길 때까지 요청을 보류하고, 데이터가 준비되면 그 즉시 응답합니다.
서버의 응답을 받은 클라이언트는 다시 새로운 요청을 보내어 이 과정을 반복합니다. Long Polling은 Short Polling에 비해 서버 리소스를 더 효율적으로 사용할 수 있고, 데이터의 실시간 처리에 더 적합하지만, 여전히 연결 설정과 종료에 따른 오버헤드가 있으며, 많은 동시 연결을 관리해야 하는 서버에 부담을 줄 수 있습니다.
Long Polling보다 더 성능이 좋은 방법?
Long Polling보다 성능이 더 좋은 대안으로는 WebSocket과 Server-Sent Events(SSE)가 있습니다.
WebSocket: 양방향 통신을 지원하는 프로토콜로, 클라이언트와 서버 간에 지속적인 연결을 유지합니다. 이를 통해 데이터가 준비되는 즉시 실시간으로 데이터를 교환할 수 있으며, Long Polling에 비해 훨씬 효율적이고 빠릅니다. 실시간 채팅 애플리케이션, 게임, 실시간 피드 등의 구현에 적합합니다.
Server-Sent Events(SSE): 서버에서 클라이언트로 데이터 스트림을 실시간으로 전송할 수 있는 방법으로, 주로 단방향 통신에 사용됩니다. WebSocket만큼 유연하지는 않지만, 단순한 실시간 데이터 전송이 필요한 경우에 적합하며, HTTP 프로토콜을 기반으로 하기 때문에 구현이 비교적 간단합니다.
WebSocket과 SSE는 실시간 통신을 구현할 때 Long Polling보다 성능과 효율성 측면에서 우수한 대안을 제공합니다.
구현
Config
WS_MESSAGE_QUEUE
환경 변수를 .env/.dev-sample
파일에 정의합니다.
WS_MESSAGE_QUEUE=redis://redis:6379/0
BaseConfig 클래스에 WS_MESSAGE_QUEUE
변수를 project/config.py
모듈에 추가합니다.
class BaseConfig:
BASE_DIR: pathlib.Path = pathlib.Path(__file__).parent.parent
DATABASE_URL: str = os.environ.get("DATABASE_URL", f"sqlite:///{BASE_DIR}/db.sqlite3")
DATABASE_CONNECT_DICT: dict = {}
CELERY_BROKER_URL: str = os.environ.get("CELERY_BROKER_URL", "redis://127.0.0.1:6379/0")
CELERY_RESULT_BACKEND: str = os.environ.get("CELERY_RESULT_BACKEND", "redis://127.0.0.1:6379/0")
WS_MESSAGE_QUEUE: str = os.environ.get("WS_MESSAGE_QUEUE", "redis://127.0.0.1:6379/0")
FastAPI와 Celery Worker는 양방향 통신을 사용을 위해 WS_MESSAGE_QUEUE 환경 변수를 사용 할 것입니다.
Broadcaster
requirements.txt 파일에 라이브러리를 추가하도록 합니다.
asgiref==3.7.2
asyncio-redis==0.16.0
broadcaster==0.2.0
1 asyncio_redis는 Broadcaster에 필요한 asyncio를 지원하는 Redis client입니다.
project/__init__.py
모듈을 수정합니다.
from contextlib import asynccontextmanager
from broadcaster import Broadcast
from fastapi import FastAPI
from project.config import settings
broadcast = Broadcast(settings.WS_MESSAGE_QUEUE)
@asynccontextmanager
async def lifespan(app: FastAPI):
await broadcast.connect()
yield
await broadcast.disconnect()
def create_app() -> FastAPI:
app = FastAPI(lifespan=lifespan)
# do this before loading routes
from project.celery_utils import create_celery
app.celery_app = create_celery()
from project.users import users_router
app.include_router(users_router)
@app.get("/")
async def root():
return {"message": "Hello World"}
return app
- FastAPI에서 사용되는 Broadcast인스턴스를 만들었습니다. 참고로 celery worker에서 사용하지 않습니다.
- lifespan함수에는 애플리케이션 시작 전 broadcast 커넥션을 맺고 앱 종료 후 커넥션을 끊어버립니다.
Celery
project/celery_utils.py
from celery import current_app as current_celery_app
from celery.result import AsyncResult
from project.config import settings
def create_celery():
celery_app = current_celery_app
celery_app.config_from_object(settings, namespace="CELERY")
return celery_app
def get_task_info(task_id):
"""
return task info according to the task_id
"""
task = AsyncResult(task_id)
state = task.state
if state == "FAILURE":
error = str(task.result)
response = {
"state": task.state,
"error": error,
}
else:
response = {
"state": task.state,
}
return response
task의 상태를 반환하는 get_task_info
함수를 추가했습니다.
Router
새로운 라우터, ws_router
를 추가하겠습니다.
$ mkdir project/ws
$ touch project/ws/__init__.py
$ touch project/ws/views.py
project/ws/__init__.py:
from fastapi import APIRouter
ws_router = APIRouter()
from . import views # noqa
project/ws/views.py:
import json
from fastapi import WebSocket
from . import ws_router
from project import broadcast
from project.celery_utils import get_task_info
@ws_router.websocket("/ws/task_status/{task_id}")
async def ws_task_status(websocket: WebSocket):
await websocket.accept()
task_id = websocket.scope["path_params"]["task_id"]
async with broadcast.subscribe(channel=task_id) as subscriber:
# just in case the task already finish
data = get_task_info(task_id)
await websocket.send_json(data)
async for event in subscriber:
await websocket.send_json(json.loads(event.message))
async def update_celery_task_status(task_id: str):
"""
This function is called by Celery worker in task_postrun signal handler
"""
await broadcast.connect()
await broadcast.publish(
channel=task_id,
message=json.dumps(get_task_info(task_id)) # RedisProtocol.publish expect str
)
await broadcast.disconnect()
from contextlib import asynccontextmanager
from broadcaster import Broadcast
from fastapi import FastAPI
from project.config import settings
broadcast = Broadcast(settings.WS_MESSAGE_QUEUE)
@asynccontextmanager
async def lifespan(app: FastAPI):
await broadcast.connect()
yield
await broadcast.disconnect()
def create_app() -> FastAPI:
app = FastAPI(lifespan=lifespan)
# do this before loading routes
from project.celery_utils import create_celery
app.celery_app = create_celery()
from project.users import users_router
app.include_router(users_router)
from project.ws import ws_router # new
app.include_router(ws_router) # new
@app.get("/")
async def root():
return {"message": "Hello World"}
return app
코드 설명
이 코드는 FastAPI와 WebSocket, 그리고 Celery를 함께 사용하여 비동기 태스크의 상태를 실시간으로 웹 클라이언트에게 전달하는 방법을 구현한 예입니다. 여기서는 broadcast
를 통해 메시지 통신을 관리하고 있으며, 이는 아마도 Redis와 같은 메시지 브로커를 사용하는 방식으로 추정됩니다. 코드의 주요 부분을 단계별로 살펴보겠습니다.
WebSocket 라우터 설정
@ws_router.websocket("/ws/task_status/{task_id}")
데코레이터를 사용하여, 클라이언트가 WebSocket을 통해 특정 태스크 ID에 대한 상태 정보를 구독할 수 있는 엔드포인트를 정의합니다.
WebSocket 연결 수락 및 데이터 전송
- 클라이언트로부터 WebSocket 연결이 요청되면, 먼저 연결을 수락합니다(
await websocket.accept()
). - 태스크 ID는 URL 경로 파라미터에서 추출되며, 이 ID를 사용하여 특정 태스크의 현재 상태 정보를 가져옵니다(
get_task_info(task_id)
). - 연결 직후, 이미 완료된 태스크의 정보를 클라이언트에게 전송합니다.
- 그 후, 클라이언트는
broadcast
시스템을 통해 해당 태스크 ID 채널을 구독하고, 태스크 상태 업데이트를 실시간으로 수신합니다(async for event in subscriber
).
Celery 태스크 상태 업데이트 통보
update_celery_task_status
함수는 Celery 태스크의 상태가 변경될 때마다 호출됩니다. 이 함수는 Celery 워커에서 태스크의 포스트런(post-run) 신호 핸들러를 통해 트리거될 수 있습니다.- 함수는 먼저
broadcast
시스템에 연결하고, 변경된 태스크 상태 정보를 해당 태스크 ID 채널에 게시합니다. 이 메시지는 JSON 문자열로 인코딩되어 게시됩니다. - 상태 업데이트 메시지를 게시한 후,
broadcast
시스템의 연결을 종료합니다.
전반적인 흐름
- 클라이언트는 WebSocket을 통해 특정 태스크의 상태 업데이트를 실시간으로 구독합니다.
- Celery 태스크가 실행되고, 상태가 변경될 때마다
update_celery_task_status
함수가 호출되어 해당 태스크 ID 채널에 상태 업데이트를 게시합니다. - WebSocket으로 연결된 클라이언트는 이 업데이트를 실시간으로 수신하여 사용자에게 보여줍니다.
Celery Signal Handler
project/users/tasks.py
모듈에 새 signal handler를 추가합니다.
@task_postrun.connect
def task_postrun_handler(task_id, **kwargs):
from project.ws.views import update_celery_task_status
async_to_sync(update_celery_task_status)(task_id)
코드 설명
task_postrun_handler 함수는 Celery 태스크의 실행이 완료된 후 호출됩니다. 이 핸들러는 아래 작업을 수행합니다
하나. update_celery_task_status 함수를 임포트합니다. 이 함수는 비동기로 구현되었으며, 태스크의 상태 정보를 특정 WebSocket 채널을 통해 클라이언트에게 전송하는 역할을 합니다.
둘. async_to_sync를 사용하여 비동기 함수인 update_celery_task_status를 동기적인 환경에서 호출합니다. Celery는 기본적으로 비동기 IO(예: asyncio)와 호환되지 않기 때문에, async_to_sync를 통해 이 문제를 해결합니다.
셋. task_id를 update_celery_task_status 함수에 전달하여, 해당 태스크의 상태 정보를 처리하고, 관련 WebSocket 채널에 메시지를 publish합니다.
async_to_sync
메소드, task_postrun
데코레이터 사용을 위해 임포트도 해줍니다.
import random
import requests
from asgiref.sync import async_to_sync
from celery import shared_task
from celery.signals import task_postrun
from celery.utils.log import get_task_logger
라우터 추가
project/users/views.py
모듈에서 users_router
에 /form_ws/
엔드포인트를 추가해줍니다.
@users_router.get("/form_ws/")
def form_ws_example(request: Request):
return templates.TemplateResponse("form_ws.html", {"request": request})
Template
project/users/templates/form_ws.html
파일을 아래와 같이 정의해줍니다
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Celery example</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.2.3/dist/css/bootstrap.min.css"
rel="stylesheet"
integrity="sha384-rbsA2VBKQhggwzxH7pPCaAqO46MgnOM80zW1RWuH61DGLwZJEdK2Kadq2F9CUG65"
crossorigin="anonymous"
>
</head>
<body>
<div class="container">
<div class="row">
<div class="col-12 col-md-4">
<form id="your-form">
<div class="mb-3">
<label for="email" class="form-label">Email address</label>
<input type="email" class="form-control" id="email" name="email">
</div>
<div class="mb-3">
<label for="username" class="form-label">Username</label>
<input type="text" class="form-control" id="username" name="username">
</div>
<div class="mb-3" id="messages"></div>
<button type="submit" class="btn btn-primary">Submit</button>
</form>
</div>
</div>
</div>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.2.3/dist/js/bootstrap.bundle.min.js"
integrity="sha384-kenU1KFdBIe4zVF0s0G1M5b4hcpxyD9F7jL+jjXkk+Q2h455rYXK/7HAuoJl+0I4"
crossorigin="anonymous">
</script>
<script>
function updateProgress(yourForm, task_id, btnHtml) {
const ws_url = `/ws/task_status/${task_id}`;
const WS = new WebSocket((location.protocol === 'https:' ? 'wss' : 'ws') + '://' + window.location.host + ws_url);
WS.onmessage = function (event) {
const res = JSON.parse(event.data);
const taskStatus = res.state;
if (['SUCCESS', 'FAILURE'].includes(taskStatus)) {
const msg = yourForm.querySelector('#messages');
const submitBtn = yourForm.querySelector('button[type="submit"]');
if (taskStatus === 'SUCCESS') {
msg.innerHTML = 'job succeeded';
} else if (taskStatus === 'FAILURE') {
msg.innerHTML = res.error;
}
submitBtn.disabled = false;
submitBtn.innerHTML = btnHtml;
// close the websocket because we do not need it now
WS.close();
}
}
}
function serialize (data) {
let obj = {};
for (let [key, value] of data) {
if (obj[key] !== undefined) {
if (!Array.isArray(obj[key])) {
obj[key] = [obj[key]];
}
obj[key].push(value);
} else {
obj[key] = value;
}
}
return obj;
}
document.addEventListener("DOMContentLoaded", function () {
const yourForm = document.getElementById("your-form");
yourForm.addEventListener("submit", function (event) {
event.preventDefault();
const submitBtn = yourForm.querySelector('button[type="submit"]');
const btnHtml = submitBtn.innerHTML;
const spinnerHtml = 'Processing...';
submitBtn.disabled = true;
submitBtn.innerHTML = spinnerHtml;
const msg = yourForm.querySelector('#messages');
msg.innerHTML = '';
// Get all field data from the form
let data = new FormData(yourForm);
// Convert to an object
let formData = serialize(data);
fetch('/users/form/', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(formData),
})
.then(response => response.json())
.then((res) => {
// after we get Celery task id, we start polling
const task_id = res.task_id;
updateProgress(yourForm, task_id, btnHtml);
console.log(res);
}).catch((error) => {
console.error('Error:', error);
});
});
});
</script>
</body>
</html>
코드 설명
사용자는 폼을 통해 데이터를 입력하고 제출할 수 있으며, 서버 측에서는 Celery를 사용하여 비동기 태스크를 실행합니다. 태스크 상태는 WebSocket을 통해 실시간으로 클라이언트에 전달됩니다. 주요 기능은 다음과 같이 설명할 수 있습니다:
HTML 구조
- Bootstrap CSS를 사용하여 스타일링된 간단한 폼이 있습니다. 이 폼에는 이메일 주소와 사용자 이름을 입력할 수 있는 필드가 있습니다.
id="your-form"
을 가진<form>
태그 내에, 사용자 입력을 위한 이메일과 사용자 이름 입력 필드가 있습니다.- 폼 제출 버튼을 클릭하면 태스크가 시작되고, 태스크의 상태는
id="messages"
인 div 태그에 실시간으로 표시됩니다.
JavaScript 로직
serialize
함수는 폼 데이터를 객체로 변환합니다. 이 객체는 서버로 전송되기 전에 JSON 문자열로 변환됩니다.updateProgress
함수는 WebSocket 연결을 생성하고, 서버로부터 태스크 상태 업데이트를 실시간으로 수신합니다. 태스크 상태(SUCCESS
또는FAILURE
)에 따라 메시지를 화면에 표시하고, 필요한 경우 WebSocket 연결을 종료합니다.DOMContentLoaded
이벤트 리스너 내에서 폼 제출 이벤트를 처리합니다. 이는 사용자가 폼을 제출할 때 비동기적으로 서버에 데이터를 전송하고, 반환된 태스크 ID를 사용하여updateProgress
함수를 호출하여 태스크 상태 업데이트를 수신하기 시작합니다.
폼 제출 및 태스크 상태 처리
- 사용자가 폼을 제출하면,
event.preventDefault()
를 호출하여 폼의 기본 제출 동작을 막습니다. - 폼 데이터는
serialize
함수를 통해 객체로 변환되고, 이 객체는 서버로 POST 요청을 보내기 위해 JSON 문자열로 변환됩니다. - 서버로부터 태스크 ID를 받으면, 이 ID를 사용하여
updateProgress
함수 내에서 WebSocket 연결을 시작합니다. - WebSocket을 통해 서버로부터 태스크의 상태 업데이트를 받으면, 이를 사용자에게 표시합니다. 태스크가 성공하거나 실패하면, 관련 메시지를
#messages
div에 표시하고, WebSocket 연결을 종료합니다.
테스트
작성한 내용을 테스트 하기 위해서 도커 이미지를 다시 빌드하고 컨테이너를 재기동해 봅니다.
$ docker compose up -d --build
http://localhost:8010/users/form_ws/
를 브라우저 주소창에 입력하고 랜덤한 이름과 이메일을 입력한 이후 submit버튼을 클릭합니다.
API결과에 따라서 성공 혹은 오류를 확인 할 수 있습니다.
여기서의 핵심은 https://httpbin.org/delay/5
, 외부 API 호출에 따라 5초가 소요되는지 아닌지를 확인하기 위함입니다.
이미지를 통해 확인해 보면 PENDING이라는 결과를 개발자 도구를 통해 확인할 수 있습니다.
task가 완료되면 signal handler가 수행되어서 message가 Redis channel로 전달 됩니다. 그리고 나서 FastAPI가
메시지를 전달 받아서 클라이언트에게 반환해주게 됩니다.
'프로그래밍 언어 > 파이썬' 카테고리의 다른 글
Celery와 FastAPI - 7 (1) | 2024.03.26 |
---|---|
Celery와 FastAPI - 6 (1) | 2024.03.24 |
Celery와 FastAPI - 5 (1) | 2024.03.24 |
Celery와 FastAPI - 4 (0) | 2024.03.24 |
Celery와 FastAPI - 3 (0) | 2024.03.23 |