프레임워크/자바 스프링

접속자 대기열 시스템 #4- 대기열 등록 API 개발

hyeseong-dev 2024. 10. 9. 13:51

핵심 코드 분석

1. UserQueueController - 대기열 등록 API

@RestController
@RequestMapping("/api/v1/queue")
@RequiredArgsConstructor
public class UserQueueController {

    private final UserQueueService userQueueService;

    @PostMapping
    public Mono<RegisterUserResponse> registerUser(
            @RequestParam(name="queue", defaultValue = "default") String queueNm,
            @RequestParam(name="user_id") Long userId
    ){
        return userQueueService.registerWatingQueue(queueNm, userId)
                .map(RegisterUserResponse::new);
    }
}
  • 비동기 API 엔드포인트: Mono<RegisterUserResponse>는 이 API가 비동기적으로 동작함을 나타냅니다. 요청이 들어오면 논블로킹 방식으로 유저를 대기열에 등록하고, 순위를 계산하여 반환합니다.
  • Reactor의 map 연산: userQueueService.registerWatingQueue() 메서드는 비동기적으로 대기열 등록을 처리하고, 그 결과로 순위를 반환합니다. 반환된 순위는 map()을 통해 RegisterUserResponse로 변환됩니다.
  • @RestController: 이 클래스는 API 엔드포인트로 작동합니다.
  • @PostMapping: HTTP POST 요청을 처리하며, 유저를 대기열에 등록하는 엔드포인트입니다.
  • Mono<RegisterUserResponse>: WebFlux를 사용해 비동기 방식으로 처리 결과를 반환합니다. 대기열에 유저를 등록한 후 그 순위를 포함한 응답을 반환합니다.
  • UserQueueService를 호출하여 실제 등록 로직을 처리하고, 그 결과를 RegisterUserResponse로 감쌉니다.

비동기 처리의 이점

이 방식은 스레드를 차단하지 않으며, 대기열 등록이 완료될 때까지 다른 작업이 병렬로 처리될 수 있게 해줍니다. 서버는 높은 동시성을 처리할 수 있게 되어, 더 많은 사용자를 처리할 수 있습니다.

2. UserQueueService - 비즈니스 로직

주요 구성 요소

  1. ReactiveRedisTemplate<String, String> reactiveRedisTemplate

    • Reactive Redis를 사용하기 위한 템플릿입니다. Redis와 비동기적으로 데이터를 주고받는 데 사용됩니다.
    • 여기서는 String 타입의 키와 값을 사용하여 Redis에 데이터를 저장하거나 조회합니다.
  2. USER_QUEUE_WAIT_KEY와 USER_QUEUE_PROCEED_KEY

    • Redis에서 사용자 대기열과 접속 허용 목록을 구분하기 위한 키 형식입니다.
    • USER_QUEUE_WAIT_KEY는 사용자가 대기 중인 큐를 의미하며, "users:queue:%s:wait" 형식으로 정의됩니다. %s에는 대기열의 이름이 들어갑니다.
    • USER_QUEUE_PROCEED_KEY는 사용자가 대기열을 통과하여 접속이 허용된 상태를 나타내며, "users:queue:%s:proceed" 형식으로 정의됩니다.
public Mono<Long> registerWatingQueue(final String queue, final Long userId) {
    var unixTimeStamp = Instant.now().getEpochSecond();
    // ZSet에 userId 등록 시도
    return reactiveRedisTemplate.opsForZSet()
            .add(USER_QUEUE_WAIT_KEY.formatted(queue), userId.toString(), unixTimeStamp)
            .filter(i -> i)
            .switchIfEmpty(Mono.error(ErrorCode.QUEUE_ALREADY_REGISTERED_USER.build()))
            .flatMap(i -> reactiveRedisTemplate.opsForZSet().rank(USER_QUEUE_WAIT_KEY.formatted(queue), userId.toString()))
            .map(i -> i >= 0 ? i + 1 : i);
}

세부 설명

1. var unixTimeStamp = Instant.now().getEpochSecond();

  • 현재 시간을 Unix 타임스탬프(초 단위)로 가져옵니다.
  • 이 값은 사용자 대기열에 등록할 때, ZSetscore로 사용됩니다. score는 정렬된 집합에서 사용자가 등록된 순서를 유지하게 합니다.

2. reactiveRedisTemplate.opsForZSet().add(...)

  • ZSet에 사용자를 추가하는 작업을 수행합니다.
  • USER_QUEUE_WAIT_KEY.formatted(queue)는 Redis에서 사용할 키 이름을 생성합니다. 예를 들어, queue"default"라면 키는 "users:queue:default:wait"이 됩니다.
  • userId.toString()ZSet에 추가할 사용자 ID를 문자열로 변환하여 사용합니다.
  • unixTimeStampZSetscore로 사용되며, 이 값으로 정렬이 이루어집니다.
  • 이 메서드는 사용자 추가가 성공하면 true를 반환하고, 실패하면 false를 반환합니다.

3. .filter(i -> i)

  • 이 구문은 추가 작업이 성공한 경우(true)만 다음 단계로 진행하도록 필터링합니다.
  • 만약 추가 작업이 실패한 경우(false), 빈 스트림이 생성됩니다.

4. .switchIfEmpty(Mono.error(ErrorCode.QUEUE_ALREADY_REGISTERED_USER.build()))

  • ZSet에 이미 사용자가 등록되어 있는 경우(추가 실패), QUEUE_ALREADY_REGISTERED_USER 에러를 발생시킵니다.
  • 이를 통해 동일한 사용자가 중복 등록되지 않도록 합니다.

5. .flatMap(i -> reactiveRedisTemplate.opsForZSet().rank(...))

  • 사용자가 성공적으로 추가되었으면, 해당 사용자의 순위를 조회합니다.
  • rank 메서드는 ZSet에서 해당 요소의 인덱스(0부터 시작)를 반환합니다.

6. .map(i -> i >= 0 ? i + 1 : i)

  • rank 값이 0 이상이면, 실제 대기 순번으로 표시하기 위해 1을 더해 반환합니다.
  • ZSetrank는 0부터 시작하지만, 사용자에게 표시할 때는 1부터 시작하는 순번을 사용하기 위함입니다.

3. ErrorCodeApplicationException - 커스텀 예외 처리

@AllArgsConstructor
public enum ErrorCode {
    QUEUE_ALREADY_REGISTER_USESR(HttpStatus.CONFLICT, "UQ_00001", "Already registered in queue");

    private final HttpStatus httpStatus;
    private final String code;
    private final String reason;

    public ApplicationException build(){
        return new ApplicationException(httpStatus, code, reason);
    }
}
  • ErrorCode Enum: 각종 오류 코드를 정의하고, 필요한 경우 예외를 생성하는 메서드를 제공합니다.
  • QUEUE_ALREADY_REGISTER_USESR: 유저가 이미 대기열에 등록되어 있을 때 발생하는 오류 코드입니다.
@AllArgsConstructor
@Getter
public class ApplicationException extends RuntimeException {
    private HttpStatus httpStatus;
    private String code;
    private String Reason;
}
  • ApplicationException: 이 클래스는 커스텀 예외를 정의합니다. 예외 발생 시 HTTP 상태 코드와 메시지를 담아 반환할 수 있습니다.

4. ApplicationAdvice - 예외 핸들러

@RestControllerAdvice
public class ApplicationAdvice {

    @ExceptionHandler(ApplicationException.class)
    Mono<ResponseEntity<ServerExceptionResponse>> applicationException(ApplicationException ex) {
        return Mono.just(ResponseEntity
                .status(ex.getHttpStatus())
                .body(new ServerExceptionResponse(ex.getCode(), ex.getReason())));
    }

    public record ServerExceptionResponse(String code, String reason) {}
}
  • 비동기 예외 처리: @RestControllerAdvice를 통해 전역적으로 예외를 처리합니다. 비동기 작업에서 발생한 ApplicationException은 Mono.just()로 감싸져, 클라이언트에 적절한 HTTP 상태 코드와 메시지를 비동기적으로 응답합니다.

5. RegisterUserResponse - 응답 DTO

public record RegisterUserResponse(Long rank) {}
  • DTO (Data Transfer Object): 유저의 대기 순서를 응답으로 반환하기 위해 사용됩니다. Java 17의 record를 사용하여 간결하게 정의되었습니다.

Redis를 활용한 유저 대기열 처리 로직

  • Sorted Set: 유저 대기열을 Redis의 Sorted Set 자료구조로 관리합니다. Sorted Set은 유저 ID를 값으로, Unix 타임스탬프를 점수로 하여 자동으로 순서가 정렬됩니다.
  • 등록 로직: 유저가 등록될 때 ZADD 명령어가 호출되며, 유저가 이미 등록된 상태라면 Mono.error()로 에러를 반환합니다. 성공적으로 등록된 경우 유저의 순위를 반환합니다.

결론

이 프로젝트는 Spring WebFluxReactive Redis를 사용하여 높은 성능과 확장성을 고려한 대기열 시스템을 구축하는 데 중점을 두고 있습니다. 비동기 처리를 통해 많은 사용자를 처리할 수 있는 유연한 시스템이며, Redis의 Sorted Set을 사용하여 대기열에서 유저의 순위를 효율적으로 관리합니다.

이 프로젝트를 통해 대기열 시스템의 비즈니스 로직뿐만 아니라, 반응형 프로그래밍의 장점을 살펴볼 수 있으며, 효율적인 Redis 사용법을 이해하는 데 큰 도움이 될 것입니다.

참고: 이 프로젝트는 비동기 및 반응형 프로그래밍 모델에 대한 이해를 기반으로 하므로, 해당 개념에 익숙하지 않다면 먼저 관련 문서를 학습하는 것이 좋습니다.