핵심 코드 분석
1. UserQueueController
- 대기열 등록 API
@RestController
@RequestMapping("/api/v1/queue")
@RequiredArgsConstructor
public class UserQueueController {
private final UserQueueService userQueueService;
@PostMapping
public Mono<RegisterUserResponse> registerUser(
@RequestParam(name="queue", defaultValue = "default") String queueNm,
@RequestParam(name="user_id") Long userId
){
return userQueueService.registerWatingQueue(queueNm, userId)
.map(RegisterUserResponse::new);
}
}
비동기 API 엔드포인트
:Mono<RegisterUserResponse>
는 이 API가 비동기적으로 동작함을 나타냅니다. 요청이 들어오면 논블로킹 방식으로 유저를 대기열에 등록하고, 순위를 계산하여 반환합니다.Reactor의 map 연산
: userQueueService.registerWatingQueue() 메서드는 비동기적으로 대기열 등록을 처리하고, 그 결과로 순위를 반환합니다. 반환된 순위는 map()을 통해 RegisterUserResponse로 변환됩니다.@RestController
: 이 클래스는 API 엔드포인트로 작동합니다.@PostMapping
: HTTP POST 요청을 처리하며, 유저를 대기열에 등록하는 엔드포인트입니다.Mono<RegisterUserResponse>
: WebFlux를 사용해 비동기 방식으로 처리 결과를 반환합니다. 대기열에 유저를 등록한 후 그 순위를 포함한 응답을 반환합니다.UserQueueService
를 호출하여 실제 등록 로직을 처리하고, 그 결과를RegisterUserResponse
로 감쌉니다.
비동기 처리의 이점
이 방식은 스레드를 차단하지 않으며, 대기열 등록이 완료될 때까지 다른 작업이 병렬로 처리될 수 있게 해줍니다. 서버는 높은 동시성을 처리할 수 있게 되어, 더 많은 사용자를 처리할 수 있습니다.
2. UserQueueService
- 비즈니스 로직
주요 구성 요소
ReactiveRedisTemplate<String, String> reactiveRedisTemplate
- Reactive Redis를 사용하기 위한 템플릿입니다. Redis와 비동기적으로 데이터를 주고받는 데 사용됩니다.
- 여기서는
String
타입의 키와 값을 사용하여 Redis에 데이터를 저장하거나 조회합니다.
USER_QUEUE_WAIT_KEY와 USER_QUEUE_PROCEED_KEY
- Redis에서 사용자 대기열과 접속 허용 목록을 구분하기 위한 키 형식입니다.
USER_QUEUE_WAIT_KEY
는 사용자가 대기 중인 큐를 의미하며,"users:queue:%s:wait"
형식으로 정의됩니다.%s
에는 대기열의 이름이 들어갑니다.USER_QUEUE_PROCEED_KEY
는 사용자가 대기열을 통과하여 접속이 허용된 상태를 나타내며,"users:queue:%s:proceed"
형식으로 정의됩니다.
public Mono<Long> registerWatingQueue(final String queue, final Long userId) {
var unixTimeStamp = Instant.now().getEpochSecond();
// ZSet에 userId 등록 시도
return reactiveRedisTemplate.opsForZSet()
.add(USER_QUEUE_WAIT_KEY.formatted(queue), userId.toString(), unixTimeStamp)
.filter(i -> i)
.switchIfEmpty(Mono.error(ErrorCode.QUEUE_ALREADY_REGISTERED_USER.build()))
.flatMap(i -> reactiveRedisTemplate.opsForZSet().rank(USER_QUEUE_WAIT_KEY.formatted(queue), userId.toString()))
.map(i -> i >= 0 ? i + 1 : i);
}
세부 설명
1. var unixTimeStamp = Instant.now().getEpochSecond();
- 현재 시간을 Unix 타임스탬프(초 단위)로 가져옵니다.
- 이 값은 사용자 대기열에 등록할 때,
ZSet
의score
로 사용됩니다.score
는 정렬된 집합에서 사용자가 등록된 순서를 유지하게 합니다.
2. reactiveRedisTemplate.opsForZSet().add(...)
ZSet
에 사용자를 추가하는 작업을 수행합니다.USER_QUEUE_WAIT_KEY.formatted(queue)
는 Redis에서 사용할 키 이름을 생성합니다. 예를 들어,queue
가"default"
라면 키는"users:queue:default:wait"
이 됩니다.userId.toString()
은ZSet
에 추가할 사용자 ID를 문자열로 변환하여 사용합니다.unixTimeStamp
는ZSet
의score
로 사용되며, 이 값으로 정렬이 이루어집니다.- 이 메서드는 사용자 추가가 성공하면
true
를 반환하고, 실패하면false
를 반환합니다.
3. .filter(i -> i)
- 이 구문은 추가 작업이 성공한 경우(
true
)만 다음 단계로 진행하도록 필터링합니다. - 만약 추가 작업이 실패한 경우(
false
), 빈 스트림이 생성됩니다.
4. .switchIfEmpty(Mono.error(ErrorCode.QUEUE_ALREADY_REGISTERED_USER.build()))
ZSet
에 이미 사용자가 등록되어 있는 경우(추가 실패),QUEUE_ALREADY_REGISTERED_USER
에러를 발생시킵니다.- 이를 통해 동일한 사용자가 중복 등록되지 않도록 합니다.
5. .flatMap(i -> reactiveRedisTemplate.opsForZSet().rank(...))
- 사용자가 성공적으로 추가되었으면, 해당 사용자의 순위를 조회합니다.
rank
메서드는ZSet
에서 해당 요소의 인덱스(0부터 시작)를 반환합니다.
6. .map(i -> i >= 0 ? i + 1 : i)
rank
값이 0 이상이면, 실제 대기 순번으로 표시하기 위해 1을 더해 반환합니다.ZSet
의rank
는 0부터 시작하지만, 사용자에게 표시할 때는 1부터 시작하는 순번을 사용하기 위함입니다.
3. ErrorCode
와 ApplicationException
- 커스텀 예외 처리
@AllArgsConstructor
public enum ErrorCode {
QUEUE_ALREADY_REGISTER_USESR(HttpStatus.CONFLICT, "UQ_00001", "Already registered in queue");
private final HttpStatus httpStatus;
private final String code;
private final String reason;
public ApplicationException build(){
return new ApplicationException(httpStatus, code, reason);
}
}
- ErrorCode Enum: 각종 오류 코드를 정의하고, 필요한 경우 예외를 생성하는 메서드를 제공합니다.
QUEUE_ALREADY_REGISTER_USESR
: 유저가 이미 대기열에 등록되어 있을 때 발생하는 오류 코드입니다.
@AllArgsConstructor
@Getter
public class ApplicationException extends RuntimeException {
private HttpStatus httpStatus;
private String code;
private String Reason;
}
- ApplicationException: 이 클래스는 커스텀 예외를 정의합니다. 예외 발생 시 HTTP 상태 코드와 메시지를 담아 반환할 수 있습니다.
4. ApplicationAdvice
- 예외 핸들러
@RestControllerAdvice
public class ApplicationAdvice {
@ExceptionHandler(ApplicationException.class)
Mono<ResponseEntity<ServerExceptionResponse>> applicationException(ApplicationException ex) {
return Mono.just(ResponseEntity
.status(ex.getHttpStatus())
.body(new ServerExceptionResponse(ex.getCode(), ex.getReason())));
}
public record ServerExceptionResponse(String code, String reason) {}
}
- 비동기 예외 처리: @RestControllerAdvice를 통해 전역적으로 예외를 처리합니다. 비동기 작업에서 발생한 ApplicationException은 Mono.just()로 감싸져, 클라이언트에 적절한 HTTP 상태 코드와 메시지를 비동기적으로 응답합니다.
5. RegisterUserResponse
- 응답 DTO
public record RegisterUserResponse(Long rank) {}
- DTO (Data Transfer Object): 유저의 대기 순서를 응답으로 반환하기 위해 사용됩니다. Java 17의
record
를 사용하여 간결하게 정의되었습니다.
Redis를 활용한 유저 대기열 처리 로직
- Sorted Set: 유저 대기열을 Redis의
Sorted Set
자료구조로 관리합니다.Sorted Set
은 유저 ID를 값으로, Unix 타임스탬프를 점수로 하여 자동으로 순서가 정렬됩니다. - 등록 로직: 유저가 등록될 때
ZADD
명령어가 호출되며, 유저가 이미 등록된 상태라면Mono.error()
로 에러를 반환합니다. 성공적으로 등록된 경우 유저의 순위를 반환합니다.
결론
이 프로젝트는 Spring WebFlux와 Reactive Redis를 사용하여 높은 성능과 확장성을 고려한 대기열 시스템을 구축하는 데 중점을 두고 있습니다. 비동기 처리를 통해 많은 사용자를 처리할 수 있는 유연한 시스템이며, Redis의 Sorted Set
을 사용하여 대기열에서 유저의 순위를 효율적으로 관리합니다.
이 프로젝트를 통해 대기열 시스템의 비즈니스 로직뿐만 아니라, 반응형 프로그래밍의 장점을 살펴볼 수 있으며, 효율적인 Redis 사용법을 이해하는 데 큰 도움이 될 것입니다.
참고: 이 프로젝트는 비동기 및 반응형 프로그래밍 모델에 대한 이해를 기반으로 하므로, 해당 개념에 익숙하지 않다면 먼저 관련 문서를 학습하는 것이 좋습니다.
'프레임워크 > 자바 스프링' 카테고리의 다른 글
접속자 대기열 시스템 #5- Redis를 이용한 대기열 관리 및 웹페이지 진입 API 구현 (1) | 2024.10.10 |
---|---|
접속자 대기열 시스템 #3- 셋업 (2) | 2024.10.10 |
BlockHound: Java 비동기 애플리케이션에서 블로킹 호출을 감지하는 도구 (6) | 2024.10.08 |
Spring MVC와 Spring Webflux 성능비교 (0) | 2024.10.08 |
Reactive Redis (1) | 2024.10.07 |