FastAPI 애플리케이션에서 Zapier, SendGrid, Stripe와 같은 외부 서비스를 사용할 때, RESTful API나 웹훅을 활용하게 됩니다. 올바르게 구현하면 개발 과정을 가속화하고 시간 및 비용을 절약할 수 있습니다. 그러나 중요한 점은 서비스를 통합하면서도 웹 애플리케이션의 전반적인 성능을 저하시키지 않도록 주의해야 한다는 것입니다.
특히 메인 웹 프로세스를 차단하지 않으면서 작업을 처리하는 것이 중요합니다.
Celery를 활용하여 외부 서비스를 애플리케이션에 통합하는 방법을 확인 해보겠습니다. Celery는 백그라운드에서 시간이 많이 소요되는 작업을 처리할 수 있도록 해주어, request/response 흐름을 외부에서 작업을 처리 하게 만듭니다.
목표
- Celery Worker를 가지고 form 제풀 프로세스를 처리 해봅니다.
- Celery worker를 이용하여 webhook notification으로 trgger하여 복잡한 로직 처리를 해봅니다.
retry
메소드를 이용하여 failed된 task를 다시 시도해봅니다.
이슈 1 : Blocking Web Process
이슈를 시뮬레이션 해봅니다.
웹 프로세스 차단에 관한 것으로, 사용자가 뉴스레터를 구독할 수 있는 API를 예로 들고 있습니다.
사용자가 제공한 이메일 주소를 받아 MailChimp나 ConvertKit과 같은 제3자 이메일 마케팅 API로 전송하는 과정을 구현하고 있습니다.
예시.
import random
import requests
def subscribe_user(email: str):
# used for testing a failed api call
if random.choice([0, 1]):
raise Exception('random processing error')
# used for simulating a call to a third-party email marketing api
requests.post('https://httpbin.org/delay/5')
@users_router.post("/form/")
def subscribe(payload: Body(...)):
subscribe_user(payload["email"])
return {"message": "thanks"}
문제 상황
이 예제에서는 requests.post('https://httpbin.org/delay/5')
를 사용해 외부 이메일 마케팅 API 호출을 모방합니다. 이 호출은 5초 동안 지연되며, 이는 특히 트래픽이 많을 때 웹 애플리케이션의 성능을 저하시킬 수 있습니다. 만약 Gunicorn과 함께 세 개의 Uvicorn 워커를 실행 중이라면, 해당 엔트리포인트로 들어오는 각 HTTP 요청을 처리할 수 있는 세 개의 워커 프로세스를 가지게 됩니다. 동시에 세 명의 사용자가 제출하면, 모든 프로세스가 최소 5초 동안 차단될 수 있습니다.
문제의 핵심
requests
라이브러리는 동기적으로 작동하기 때문에, 이러한 방식으로 외부 API 호출을 처리하면 요청 처리 중에 웹 서버의 스레드나 프로세스가 차단됩니다. 이는 서버의 동시 처리 능력을 크게 제한하고, 사용자 경험을 저하시킬 수 있습니다.
해결책
- 비동기 HTTP 클라이언트 사용:
httpx
나aiohttp
와 같은 비동기 라이브러리를 사용하여 외부 API 호출을 비동기적으로 처리할 수 있습니다. 이 방식을 사용하면, 요청을 보내는 동안 다른 작업을 계속 처리할 수 있으므로 웹 프로세스가 차단되지 않습니다. - Celery 사용:
제3자 서비스 호출과 같은 시간이 많이 소요되는 작업을 백그라운드 작업으로 처리하기 위해 Celery를 사용할 수 있습니다. Celery를 사용하면 작업을 비동기적으로 실행할 수 있으며, 웹 프로세스가 차단되는 것을 방지할 수 있습니다. Celery 워커는 독립적인 프로세스로 실행되어 백그라운드에서 작업을 처리하므로, 메인 웹 애플리케이션의 성능에 영향을 주지 않습니다.
### 결론 외부 API 호출과 같이 시간이 많이 소요되는 작업을 동기적으로 처리하는 것은 웹 애플리케이션의 성능을 저하시킬 수 있습니다. 이를 해결하기 위해 비동기 HTTP 클라이언트 라이브러리를 사용하거나, Celery와 같은 비동기 작업 큐 시스템을 사용하여 이러한 작업을 백그라운드에서 처리하는 것이 좋습니다. 이러한 접근 방식을 통해 애플리케이션의 성능을 유지하면서도 필요한 기능을 효과적으로 구현할 수 있습니다.
Celery를 사용하면 뷰 내에서 제3자 API를 호출할 때 웹 프로세스가 차단되는 것을 방지할 수 있습니다. 이를 위해 실제 API 호출을 비동기 Celery 태스크로 이동시키는 방식을 사용합니다.
전체 워크플로우:
- 폼 제출 하이재킹:
자바스크립트를 사용하여 폼 제출을 하이재킹한 다음, AJAX 요청을 통해 데이터를 서버로 보냅니다. - FastAPI 뷰에서 태스크 등록:
FastAPI 뷰 내에서, 제출된 이메일을 받아 외부 API를 호출하는 새로운 태스크를 등록하고, 클라이언트로 다시 응답으로 태스크 ID를 반환합니다. - 태스크 상태 체크:
반환된 태스크 ID를 사용하여 다른 AJAX 요청을 통해 태스크의 상태를 계속해서 확인합니다. - 태스크 완료 시 메시지 표시:
태스크가 완료되면, 태스크의 성공 여부에 따라 적절한 메시지를 표시합니다.
Celery 워크플로우 다이어그램
태스크의 상태를 폴링(루프를 돌며 API를 호출하여 태스크 상태 확인)하는 방법 대신, WebSockets 또는 HTTP/2를 사용하여 태스크가 실행 완료된 후 서버에서 클라이언트로 응답을 "푸시"할 수 있습니다. 추후 이에 대한 예제를 볼 수 있습니다.
구현 방법
이러한 전체 워크플로우는 단순한 외부 이메일 마케팅 API 호출에는 다소 과할 수 있습니다. 사용자는 즉시 무언가를 기다리고 있지 않으며, 다음 뉴스레터를 받기를 기대할 뿐입니다. 따라서 폼 제출이 성공했다는 것을 표시하고, 백엔드에서 API 호출과 관련된 문제를 사용자가 정확히 무엇이 일어나고 있는지 모르게 처리해야 합니다.
하지만 결제 처리
나 외부 서비스를 통한 사용자 제출 파일 수정
과 같은 API를 사용하는 경우에는 이러한 전체 워크플로우를 확실히 사용해야 합니다. 이를 통해 사용자 경험을 저해하지 않으면서 필요한 작업을 효과적으로 처리할 수 있습니다.
구현
Dependencies
아래 라이브러리를 requirements.txt 파일에 정의해줍니다.
Jinja2==3.1.2
requests==2.31.0
Schema
project/users/schemas.py
모듈에 UserBody 클래스를 정의해줍니다.
from pydantic import BaseModel
class UserBody(BaseModel):
username: str
email: str
View
project/users/views.py
모듈을 아래와 같이 정의합니다.
import logging
import random
import requests
from celery.result import AsyncResult
from fastapi import FastAPI, Request, Body
from fastapi.responses import JSONResponse
from fastapi.templating import Jinja2Templates
from . import users_router
from .schemas import UserBody
from .tasks import sample_task
logger = logging.getLogger(__name__)
templates = Jinja2Templates(directory="project/users/templates")
def api_call(email: str):
# used for testing a failed api call
if random.choice([0, 1]):
raise Exception("random processing error")
# used for simulating a call to a third-party api
requests.post("https://httpbin.org/delay/5")
@users_router.get("/form/")
def form_example_get(request: Request):
return templates.TemplateResponse("form.html", {"request": request})
@users_router.post("/form/")
def form_example_post(user_body: UserBody):
task = sample_task.delay(user_body.email)
return JSONResponse({"task_id": task.task_id})
@users_router.get("/task_status/")
def task_status(task_id: str):
task = AsyncResult(task_id)
state = task.state
if state == 'FAILURE':
error = str(task.result)
response = {
'state': state,
'error': error,
}
else:
response = {
'state': state,
}
return JSONResponse(response)
FastAPI의 라우터를 사용하여 여러 엔드포인트를 정의하고 처리합니다. 여기에는 다음 기능이 포함됩니다:
코드 설명
- api_call 함수는 외부 API에 요청을 보내는 가상의 함수로, 성공 또는 실패를 무작위로 시뮬레이션합니다.
- form_example_get 함수는 GET 요청을 처리하여 HTML 폼을 렌더링합니다. Jinja2Templates를 사용하여 템플릿을 렌더링하고 요청을 컨텍스트로 전달합니다.
- form_example_post 함수는 폼 제출을 처리합니다. 사용자의 이메일 주소를 포함하는 작업을 비동기적으로 실행(Celery를 통해)하고 작업 ID를 반환합니다.
- task_status 함수는 주어진 작업 ID에 대한 상태를 확인하고 반환합니다. 이는 비동기 작업의 진행 상태를 추적합니다.
이 코드는 FastAPI와 Celery를 사용하여 비동기 작업을 큐에 추가하고, 작업의 상태를 조회하는 전체적인 흐름을 보여줍니다. 또한, 사용자 입력을 받아 처리하는 웹 폼의 예제와, 외부 API 호출을 시뮬레이션하는 방법을 제공합니다.
Celery Task
project/users/tasks.py
모듈을 수정합니다.
from celery import shared_task
@shared_task
def divide(x, y):
# from celery.contrib import rdb
# rdb.set_trace()
import time
time.sleep(5)
return x / y
@shared_task()
def sample_task(email):
from project.users.views import api_call
api_call(email)
코드 설명
divide 함수
첫 번째 함수 divide(x, y)
는 간단한 나눗셈 연산을 수행하는 예제입니다. shared_task
데코레이터는 이 함수를 Celery 작업으로 등록합니다. 함수 내부에서 time.sleep(5)
를 호출하여 5초 동안 실행을 일시 중지합니다. 이는 일반적으로 Celery 작업이 어떻게 비동기 작업을 처리할 수 있는지 시연하기 위한 예제로 사용됩니다. 실제 환경에서는 네트워크 호출, 대용량 데이터 처리 등 시간이 많이 소요되는 작업을 처리할 때 이러한 방식이 유용합니다.
sample_task 함수
두 번째 함수 sample_task(email)
는 이메일 주소를 받아 project.users.views.api_call
함수에 전달합니다. shared_task()
데코레이터를 사용하여 이 함수 역시 Celery 작업으로 등록됩니다. api_call
함수는 실제로 어떤 작업(예: 이메일 발송, 외부 API 호출 등)을 비동기적으로 수행합니다. 이렇게 Celery 작업으로 만들어진 함수는 메인 웹 프로세스와 독립적으로 실행되므로, api_call
함수 내부에서 시간이 많이 걸리는 작업을 수행하더라도 메인 웹 프로세스(예: 사용자에게 웹 페이지를 제공하는 프로세스)는 차단되지 않습니다.
비동기 작업의 이점
이 예제에서 볼 수 있듯이, Celery를 사용하여 비동기 작업을 구현함으로써, 메인 어플리케이션의 성능을 저하시키지 않고 시간이 많이 소요되는 작업을 처리할 수 있습니다. 이는 사용자 경험을 개선하고, 리소스 사용을 최적화하는 데 도움이 됩니다.
Template
웹 브라우저에서 유저가 form을 통해서 유저정보(예. 아이디, 비번, 이메일등)를 제출하고 이를 celery로 처리하도록
clent단 코드를 만들어 봅니다.
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Celery example</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.2.3/dist/css/bootstrap.min.css"
rel="stylesheet"
integrity="sha384-rbsA2VBKQhggwzxH7pPCaAqO46MgnOM80zW1RWuH61DGLwZJEdK2Kadq2F9CUG65"
crossorigin="anonymous"
>
</head>
<body>
<div class="container">
<div class="row">
<div class="col-12 col-md-4">
<form id="your-form">
<div class="mb-3">
<label for="email" class="form-label">Email address</label>
<input type="email" class="form-control" id="email" name="email">
</div>
<div class="mb-3">
<label for="username" class="form-label">Username</label>
<input type="text" class="form-control" id="username" name="username">
</div>
<div class="mb-3" id="messages"></div>
<button type="submit" class="btn btn-primary">Submit</button>
</form>
</div>
</div>
</div>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.2.3/dist/js/bootstrap.bundle.min.js"
integrity="sha384-kenU1KFdBIe4zVF0s0G1M5b4hcpxyD9F7jL+jjXkk+Q2h455rYXK/7HAuoJl+0I4"
crossorigin="anonymous">
</script>
</body>
</html>
taskID를 사용하여 task status를 확인하도록 하는 initial XHR request 코드를 javascript로 만들어 봅니다.
// 페이지의 모든 내용이 로드된 후에 함수가 실행됩니다.
document.addEventListener("DOMContentLoaded", function() {
// "your-form" ID를 가진 폼을 찾아 yourForm 변수에 할당합니다.
const yourForm = document.getElementById("your-form");
// 폼에 submit 이벤트 리스너를 추가합니다. 폼이 제출될 때 실행될 함수를 정의합니다.
yourForm.addEventListener("submit", function(event) {
// 기본 제출 이벤트를 방지하여 페이지 새로고침을 막습니다.
event.preventDefault();
// 제출 버튼을 찾고, 버튼의 현재 HTML을 btnHtml 변수에 저장합니다.
const submitBtn = yourForm.querySelector('button[type="submit"]');
const btnHtml = submitBtn.innerHTML;
// "Processing..." 텍스트로 버튼을 변경하고, 버튼을 비활성화합니다.
const spinnerHtml = 'Processing...';
submitBtn.disabled = true;
submitBtn.innerHTML = spinnerHtml;
// 메시지를 표시할 요소의 내용을 비웁니다.
const msg = yourForm.querySelector('#messages');
msg.innerHTML = '';
// 폼의 데이터를 FormData 객체로 가져옵니다.
let data = new FormData(yourForm);
// serialize 함수를 사용하여 FormData 객체를 JSON 객체로 변환합니다.
let formData = serialize(data);
// 비동기 POST 요청을 서버에 전송합니다.
fetch('/users/form/', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(formData),
})
.then(response => response.json())
.then((res) => {
// 서버로부터 받은 응답에서 Celery 작업 ID를 추출합니다.
const task_id = res.task_id;
// 작업 진행 상황을 추적하기 위해 updateProgress 함수를 호출합니다.
updateProgress(yourForm, task_id, btnHtml);
console.log(res);
})
.catch((error) => {
console.error('Error:', error)
});
});
});
// 폼 데이터를 객체로 변환하는 함수입니다.
function serialize(data) {
let obj = {};
for (let [key, value] of data) {
if (obj[key] !== undefined) {
if (!Array.isArray(obj[key])) {
obj[key] = [obj[key]];
}
obj[key].push(value);
} else {
obj[key] = value;
}
}
return obj;
}
// 작업 진행 상황을 확인하고 결과에 따라 UI를 업데이트하는 함수입니다.
function updateProgress(yourForm, task_id, btnHtml) {
fetch(`/users /task_status/?task_id=${ task_id } `, {
method: 'GET',
})
.then(response => response.json())
.then((res) => {
const taskStatus = res.state;
if (['SUCCESS', 'FAILURE'].includes(taskStatus)) {
const msg = yourForm.querySelector('#messages');
const submitBtn = yourForm.querySelector('button[type="submit"]');
if (taskStatus === 'SUCCESS') {
// 작업이 성공적으로 완료되었을 때의 메시지를 표시합니다.
msg.innerHTML = 'job succeeded';
} else if (taskStatus === 'FAILURE') {
// 작업이 실패했을 때의 에러 메시지를 표시합니다.
msg.innerHTML = res.error;
}
// 버튼을 다시 활성화하고 원래 텍스트로 변경합니다.
submitBtn.disabled = false;
submitBtn.innerHTML = btnHtml;
} else {
// 작업이 아직 완료되지 않았다면, 1초 후에 다시 상태를 확인합니다.
setTimeout(function() {
updateProgress(yourForm, task_id, btnHtml);
}, 1000);
}
})
.catch((error) => {
console.error('Error:', error)
});
}
폼 제출 시의 동작:
사용자가 폼을 제출하면, JavaScript 코드는 폼 제출 버튼을 비활성화하고 버튼의 텍스트를 "Processing..."으로 변경합니다. 이는 사용자에게 백엔드에서 어떠한 처리가 진행 중임을 시각적으로 표시합니다. 그 다음, 폼에 입력된 값들을 직렬화한 후, 이를 JSON 형태로 변환하여 /users/form/
경로로 POST 요청과 함께 전송합니다.
작업 ID를 통한 진행 상태 확인:
서버로부터의 응답에는 Celery 작업의 고유 ID가 포함되어 있습니다. 이 ID는 updateProgress
함수에 전달되어, /users/task_status/
엔드포인트에 대해 1초마다 GET 요청을 보내어 작업의 상태를 확인합니다. 이 폴링(polling) 과정은 작업의 상태가 'SUCCESS' 또는 'FAILURE'로 변경될 때까지 계속됩니다.
작업 완료 후의 UI 업데이트:
작업이 완료되면 ('SUCCESS'
또는 'FAILURE'
상태로 변경될 때), 적절한 메시지를 사용자에게 표시합니다. 작업이 성공적으로 완료된 경우, "job succeeded"와 같은 성공 메시지를, 실패한 경우 서버로부터 받은 에러 메시지를 사용자에게 보여줍니다. 또한, 폼 제출 버튼의 텍스트를 원래 상태로 복구하고 버튼을 다시 활성화하여, 사용자가 필요한 경우 다른 작업을 계속 진행할 수 있도록 합니다.
이슈2 : Webhook Handler
이벤트 notification 발송을 위해서 통상 외부 API를 웹훅으로 사용합니다.
notification에 따라서 process처리가 이뤄질수 있습니다.
예를들어, 만약 어떤 프로세스가 백그라운드로 처리 되지 않는다면 Gunicorn worker들 중에 하나가 프로세스가 처리를 끝 마칠 때까지
block 될 수 있스빈다.
구현
새로운 핸들러를 project/users/views.py
에 추가합니다.
@users_router.post("/webhook_test/")
def webhook_test():
if not random.choice([0, 1]):
# mimic an error
raise Exception()
# blocking process
requests.post("https://httpbin.org/delay/5")
return "pong"
위 이슈2
를 시뮬레이션 하기 위해서 도커 이미지를 업데이트하고 컨테이너를 재기동 합니다.
$ docker compose up -d --build
$ curl -X POST http://localhost:8010/users/webhook_test/ -d {'data':'ping'}
제시된 구현과 테스트 설명은 웹훅(webhook)을 사용하여 외부 서비스 간의 비동기 통신을 설정하는 방법에 관한 것입니다.
웹훅은 한 서비스(서비스 A)가 특정 이벤트가 발생했을 때 다른 서비스(여기서는 우리의 서비스)에 신호를 보내도록 설정할 수 있는 HTTP 콜백입니다. 이 설명에서는 여러 가지 중요한 개념과 잠재적인 문제점들이 언급되고 있습니다.
구현 세부사항
새로운 엔드포인트 추가:project/users/views.py
파일에 webhook_test
라는 새로운 뷰를 추가합니다. 이 뷰는 POST 요청을 받아 처리하며, 50%의 확률로 예외를 발생시키거나, 예외가 발생하지 않는 경우 https://httpbin.org/delay/5
로 POST 요청을 보냅니다. 이 요청은 5초 동안 지연됩니다.
테스트 방법:
Docker 이미지를 업데이트하고 컨테이너를 재시작한 후, curl
명령어를 사용하여 /users/webhook_test/
엔드포인트에 POST 요청을 보냅니다.
잠재적 문제점들
예외 처리:
웹훅 핸들러에서 예외가 발생하거나, 호출되는 서비스 B가 유지보수로 인해 다운된 경우, 서비스 A는 실패한 요청에 대해 재시도(retry) 정책이 없다면, 중요한 통신이 누락될 수 있습니다.
서비스의 retry policy 부재:
대부분의 신뢰할 수 있는 서비스들은 웹훅 알림의 수신자가 200 OK를 보내지 않는 경우, 알림을 여러 번 재시도합니다. 이는 웹훅 처리 과정에서 발생할 수 있는 일시적인 문제들을 극복할 수 있게 해줍니다.
네트워크 Block I/O:
웹훅 핸들러가 외부 API를 호출 시, Gunicorn들 중 하나가 외부 API 호출로 인해 Block 될 수 있습니다. 특히 https://httpbin.org/delay/5
와 같이 의도적으로 지연되는 요청의 경우, 다른 요청을 처리하지 못하게 만들 수 있습니다.
해결 방안
- 비동기 처리: 웹훅 핸들러 내에서 시간이 오래 걸리는 작업은 비동기적으로 처리해야 합니다. 예를 들어, Celery와 같은 비동기 작업 큐를 사용하여 서비스 B에 대한 요청을 백그라운드 작업으로 처리할 수 있습니다.
- 에러 처리와 로깅: 예외 처리 로직을 강화하여, 잠재적인 문제를 적절히 로깅하고, 필요한 경우 자동으로 재시도할 수 있도록 해야 합니다.
- 서비스 A와의 계약(Contract) 검토: 서비스 A가 웹훅 알림에 대해 어떤 재시도 정책을 가지고 있는지 확인하고, 우리 서비스의 처리 능력과 잘 맞도록 조정해야 합니다.
이러한 접근 방식을 통해, 웹훅을 통한 서비스 간 통신을 더 안정적으로 만들고, 사용자와 시스템에 더 좋은 경험을 제공할 수 있습니다.
Webhook Notification을 다루기 위해 Celery 사용 해보기
새로운 태스크를 추가해봅니다.
# project/users/tasks.py
@shared_task(bind=True)
def task_process_notification(self):
try:
if not random.choice([0, 1]):
# mimic random error
raise Exception()
# this would block the I/O
requests.post("https://httpbin.org/delay/5")
except Exception as e:
logger.error("exception raised, it would be retry after 5 seconds")
raise self.retry(exc=e, countdown=5)
제시된 코드는 Celery를 사용해 작성된 비동기 태스크(task_process_notification
)의 예시입니다. 이 태스크는 특정 작업을 수행하고, 실패할 경우 자동으로 재시도하는 로직을 포함합니다. 코드의 주요 구성 요소와 개념을 하나씩 살펴보겠습니다.
bind=True와 바운드 태스크
@shared_task(bind=True)
데코레이터를 사용함으로써, 이 태스크는 "바운드 태스크(bound task)"가 됩니다. 바운드 태스크란, 태스크 함수의 첫 번째 인자로 현재 태스크 인스턴스(self
)를 자동으로 받는 태스크를 말합니다. 이를 통해 태스크 내부에서self.retry
같은 인스턴스 메소드를 직접 호출할 수 있게 됩니다.
재시도 로직
try
블록 내에서, 50% 확률로 예외를 발생시키거나(random.choice([0, 1])
),requests.post("https://httpbin.org/delay/5")
를 호출하여 5초간 I/O block을 시뮬레이션합니다.- 예외가 발생하면,
except
블록이 실행되어logger.error
를 통해 로그를 기록하고,self.retry(exc=e, countdown=5)
를 호출하여 태스크를 재시도합니다. 여기서countdown=5
는 재시도 전 5초간 대기한다는 것을 의미합니다.
self.retry 메소드
self.retry
메소드는 현재 실행 중인 태스크를 재시도하도록 스케줄링합니다.exc
인자에는 재시도의 이유가 되는 예외를 전달하고,countdown
인자에는 재시도 전 대기 시간(초)을 지정합니다.- 재시도가 성공적으로 스케줄링되려면,
self.retry
메소드에 의해 반환된 예외를 raise해야 합니다. 이를 통해 Celery 작업자(worker)가 해당 태스크를 재시도하도록 합니다.
celery 인스턴스의 추적을 위한 로그 메소드를 임포트 해보겠습니다.
import random
import requests
from celery import shared_task
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
새로운 task등록을 위해 새 핸들러를 만듭니다.
@users_router.post("/webhook_test_async/")
def webhook_test_async():
task = task_process_notification.delay()
print(task.id)
return "pong"
코드 변경에 따라 임포트 영역도 업데이트 해줍니다.
import logging
import random
import requests
from celery.result import AsyncResult
from fastapi import FastAPI, Request, Body
from fastapi.responses import JSONResponse
from fastapi.templating import Jinja2Templates
from . import users_router
from .schemas import UserBody
from .tasks import sample_task, task_process_notification
테스트를 진행해봅니다. 그러기에 앞서 도커 이미지를 빌드하고 컨테이너를 재기동합니다.
$ docker compose up -d --build
컨테이너 로그 기록을 -f
옵션을 통해 계속 확인 할 수 있도록 합니다 .
$ docker compose logs -f
curl 요청으로 fastapi앱에 비동기 로직이 트리거 되도록 API 호출을 해봅니다.
$ curl -X POST http://localhost:8010/users/webhook_test_async/ -d {'data':'ping'}
celery worker 컨테이너의 로그기록을 살펴봅니다.
fastapi-celery-project-celery_worker-1 | [2024-01-07 08:33:05,443: INFO/MainProcess] Task project.users.tasks.task_process_notification[2dd3f566-5e27-4413-aaa7-d4241f22d3c7] received
fastapi-celery-project-celery_worker-1 | [2024-01-07 08:33:05,446: ERROR/ForkPoolWorker-16] project.users.tasks.task_process_notification[2dd3f566-5e27-4413-aaa7-d4241f22d3c7]: exception raised, it would be retry after 5 seconds
fastapi-celery-project-celery_worker-1 | [2024-01-07 08:33:05,462: INFO/MainProcess] Task project.users.tasks.task_process_notification[2dd3f566-5e27-4413-aaa7-d4241f22d3c7] received
fastapi-celery-project-celery_worker-1 | [2024-01-07 08:33:05,470: INFO/ForkPoolWorker-16] Task project.users.tasks.task_process_notification[2dd3f566-5e27-4413-aaa7-d4241f22d3c7] retry: Retry in 5s: Exception()
fastapi-celery-project-celery_worker-1 | [2024-01-07 08:33:10,450: ERROR/ForkPoolWorker-16] project.users.tasks.task_process_notification[2dd3f566-5e27-4413-aaa7-d4241f22d3c7]: exception raised, it would be retry after 5 seconds
fastapi-celery-project-celery_worker-1 | [2024-01-07 08:33:10,453: INFO/MainProcess] Task project.users.tasks.task_process_notification[2dd3f566-5e27-4413-aaa7-d4241f22d3c7] received
fastapi-celery-project-celery_worker-1 | [2024-01-07 08:33:10,456: INFO/ForkPoolWorker-16] Task project.users.tasks.task_process_notification[2dd3f566-5e27-4413-aaa7-d4241f22d3c7] retry: Retry in 5s: Exception()
fastapi-celery-project-celery_worker-1 | [2024-01-07 08:33:22,810: INFO/ForkPoolWorker-16] Task project.users.tasks.task_process_notification[2dd3f566-5e27-4413-aaa7-d4241f22d3c7] succeeded in 7.3755379500216804s: None
로그 기록에서 알 수 있듯이 fastapi앱에서 celery worker에 task를 전달하고 celery worker는 task를 2번 실패하고 3번만에
성공하였습니다.
상세 설명
기본 설정 및 로거 인스턴스 생성
- 필요한 라이브러리(
random
,requests
,logging
,fastapi
등)와 Celery 유틸리티를 위한get_task_logger
를 임포트합니다. - Celery 로거 인스턴스를 생성하여 태스크 실행 중 발생하는 정보나 오류를 로깅합니다.
새로운 비동기 태스크 정의
task_process_notification
이라는 새로운 Celery 태스크를 정의하고, 이 태스크가 실패할 경우 자동으로 재시도하도록 설정합니다. 이때 재시도는 5초 후에 이루어지며, 실패 원인에 따라 로그 메시지를 기록합니다.
FastAPI 뷰를 통한 태스크 실행
- FastAPI 애플리케이션 내에
/webhook_test_async/
엔드포인트를 통해 호출되는 새로운 뷰를 추가합니다. 이 뷰는task_process_notification
태스크를 비동기적으로 실행하고(delay()
메소드 사용), 실행된 태스크의 ID를 출력합니다.
Docker를 이용한 환경 구성 및 실행
- Docker Compose를 사용하여 FastAPI 애플리케이션과 Celery 워커의 컨테이너를 빌드하고 실행합니다.
- 별도의 터미널 창에서 Docker Compose 로그를 모니터링하여 태스크의 실행 과정을 관찰할 수 있습니다.
태스크의 실패와 성공 로깅 관찰
curl
명령어를 사용하여/webhook_test_async/
엔드포인트에 POST 요청을 보냄으로써, 비동기 태스크를 트리거합니다.- Celery 워커 로그를 통해 태스크가 여러 번 재시도되는 과정을 관찰할 수 있으며, 마지막으로 태스크가 성공적으로 완료되는 것을 확인할 수 있습니다.
외부 API 호출의 비동기 처리
외부 API 호출을 비동기적으로 처리하는 방법에는 여러 가지가 있습니다. 예를 들어, Python의 threading
라이브러리나 asyncio
모듈을 사용하여 비동기적인 작업을 구현할 수 있습니다. 이러한 방법들은 외부 API 호출을 메인 어플리케이션 흐름에서 분리하여, API 응답을 기다리는 동안에도 어플리케이션이 다른 작업을 계속 처리할 수 있도록 해줍니다.
Celery의 활용
그러나 만약 외부 API 호출이 복잡한 로직이나 워크플로우와 관련되어 있다면, Celery와 같은 분산 태스크 큐 시스템을 사용하는 것이 더 효율적일 수 있습니다. Celery를 사용하면 시간이 많이 걸리는 작업을 큐에 추가하고, 백그라운드에서 이러한 작업들을 비동기적으로 처리할 수 있습니다. 이는 어플리케이션의 성능을 향상시키고, 보다 복잡한 비동기 작업 흐름을 쉽게 관리할 수 있게 해줍니다.
다양한 작업에 대한 적용
이러한 비동기 처리 방식은 외부 API 호출뿐만 아니라, 이미지 리사이징, 분석 데이터 생성, 보고서 생성과 같은 다양한 시간 소모적인 작업에도 적용될 수 있습니다. 이런 작업들을 비동기적으로 처리함으로써, 어플리케이션의 응답성을 유지하고, 사용자에게 더 나은 경험을 제공할 수 있습니다.
'프로그래밍 언어 > 파이썬' 카테고리의 다른 글
Celery와 FastAPI - 8 (0) | 2024.03.27 |
---|---|
Celery와 FastAPI - 6 (1) | 2024.03.24 |
Celery와 FastAPI - 5 (1) | 2024.03.24 |
Celery와 FastAPI - 4 (0) | 2024.03.24 |
Celery와 FastAPI - 3 (0) | 2024.03.23 |