Webflux - reactor 실습

2024. 9. 18. 22:16·프로그래밍 언어/스프링부트

Reactor Core를 이용한 Flux와 Mono 실습

Reactor는 자바 비동기 프로그래밍을 위한 라이브러리로, 주로 Flux와 Mono라는 두 가지 기본 타입을 사용하여 데이터 스트림을 처리합니다. 이번 글에서는 Flux와 Mono의 차이를 간략히 설명하고, 예제를 통해 그 동작을 살펴보겠습니다.

라이브러리 설정

Reactor와 Spring Boot를 활용한 프로젝트에서는 reactor-core 라이브러리를 추가하여 Flux와 Mono를 사용할 수 있습니다. 아래는 필요한 라이브러리를 포함한 build.gradle 설정 예시입니다.

plugins {
    id 'java'
    id 'org.springframework.boot' version '3.3.3'
    id "io.spring.dependency-management" version "1.0.7.RELEASE"
}

group = 'com.example'
version = '0.0.1-SNAPSHOT'

java {
    toolchain {
        languageVersion = JavaLanguageVersion.of(17)
    }
}

repositories {
    mavenCentral()
}

dependencyManagement {
    imports {
        mavenBom "io.projectreactor:reactor-bom:2023.0.8"
    }
}

dependencies {
    // Reactor Core
    implementation platform('io.projectreactor:reactor-bom:2023.0.8')
    implementation 'io.projectreactor:reactor-core'

    implementation 'org.springframework.boot:spring-boot-starter'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}

Flux와 Mono란?

Flux

Flux는 0부터 N개의 아이템을 발행할 수 있는 데이터 스트림입니다. Flux는 주로 아래의 세 가지 이벤트로 구성됩니다.

  • onNext(n): 데이터가 발행될 때 호출되는 이벤트.
  • onComplete: 스트림이 정상적으로 끝났을 때 호출되는 이벤트.
  • onError: 스트림 처리 중 오류가 발생했을 때 호출되는 이벤트.

Mono

Mono는 0 또는 1개의 아이템을 발행할 수 있는 데이터 스트림입니다. Mono도 onNext, onComplete, onError 이벤트를 가지며, 단일 값 발행이라는 점에서 Flux와 차이가 있습니다.


실습 코드

Flux 실습

1. ReactorExApplication.java

package com.example.reactorex;

public class ReactorExApplication {

    public static void main(String[] args) {
        Publisher publisher = new Publisher();
        publisher.startFlux().subscribe(System.out::println);
    }
}

2. Publisher.java

package com.example.reactorex;

import reactor.core.publisher.Flux;

public class Publisher {
    public Flux<Integer> startFlux() {
        return Flux.range(1, 10).log();
    }
}

startFlux() 메소드는 Flux.range(1, 10)을 사용하여 1부터 10까지의 숫자를 발행하며, log()로 이벤트 흐름을 기록합니다.

실행 결과 로그 분석

21:28:00.218 [main] INFO reactor.Flux.Range.1 -- | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
21:28:00.220 [main] INFO reactor.Flux.Range.1 -- | request(unbounded)
21:28:00.220 [main] INFO reactor.Flux.Range.1 -- | onNext(1)
...
21:28:00.221 [main] INFO reactor.Flux.Range.1 -- | onComplete()
  • onSubscribe: Flux가 구독되었음을 나타냅니다.
  • request(unbounded): 구독자가 데이터를 무제한으로 요청했음을 의미합니다.
  • onNext: 각 숫자(1~10)가 순차적으로 발행됩니다.
  • onComplete: 모든 값이 발행된 후 스트림이 정상적으로 종료됩니다.

Mono 실습 1: 단일 값 발행

1. ReactorExApplication.java

package com.example.reactorex;

public class ReactorExApplication {

    public static void main(String[] args) {
        Publisher publisher = new Publisher();
        publisher.startMono().subscribe();
    }
}

2. Publisher.java

package com.example.reactorex;

import reactor.core.publisher.Mono;

public class Publisher {
    public Mono<Integer> startMono() {
        return Mono.just(1).log();
    }
}

startMono() 메소드는 Mono.just(1)을 사용하여 단일 값을 발행합니다.

실행 결과 로그 분석

21:28:00.224 [main] INFO reactor.Mono.Just.1 -- | onSubscribe([Synchronous Fuseable] MonoJust)
21:28:00.224 [main] INFO reactor.Mono.Just.1 -- | request(unbounded)
21:28:00.224 [main] INFO reactor.Mono.Just.1 -- | onNext(1)
21:28:00.224 [main] INFO reactor.Mono.Just.1 -- | onComplete()
  • onSubscribe: Mono가 구독되었음을 나타냅니다.
  • onNext: Mono가 발행한 단일 값(1)이 구독자에게 전달됩니다.
  • onComplete: 값이 발행된 후 스트림이 종료됩니다.

Mono 실습 2: 빈 스트림 처리

ReactorExApplication.java

package com.example.reactorex;

public class ReactorExApplication {

    public static void main(String[] args) {
        Publisher publisher = new Publisher();
        publisher.startMono2().subscribe();
    }
}

Publisher.java

package com.example.reactorex;

import reactor.core.publisher.Mono;

public class Publisher {
    public Mono<?> startMono2() {
        return Mono.empty().log();
    }
}

startMono2() 메소드는 데이터를 발행하지 않는 빈 스트림 Mono.empty()를 반환합니다.

실행 결과 로그 분석

21:51:09.536 [main] INFO reactor.Mono.Empty.1 -- onSubscribe([Fuseable] Operators.EmptySubscription)
21:51:09.538 [main] INFO reactor.Mono.Empty.1 -- request(unbounded)
21:51:09.538 [main] INFO reactor.Mono.Empty.1 -- onComplete()
  • onSubscribe: 빈 스트림에 대한 구독이 시작됩니다.
  • onComplete: 발행할 데이터가 없으므로 바로 스트림이 완료됩니다.

요약

Reactor의 Flux와 Mono는 각각 다수 또는 단일 값을 처리하는 반응형 스트림을 제공합니다. Flux는 다중 데이터를, Mono는 단일 데이터 또는 빈 값을 처리할 때 적합합니다. 이를 통해 비동기 프로그래밍을 쉽게 구현할 수 있으며, 효율적인 데이터 흐름 제어가 가능합니다.


Reactor Test (feat. StepVerifier)

Reactor는 비동기 스트림의 동작을 검증할 수 있는 도구인 StepVerifier를 제공합니다. 이를 사용하면 Flux와 Mono의 발행 순서, 완료 여부, 오류 발생 여부 등을 쉽게 테스트할 수 있습니다.

StepVerifier란?

StepVerifier는 리액티브 스트림을 테스트할 수 있는 도구로, 데이터 발행 흐름을 단계별로 검증할 수 있습니다. 이를 통해 비동기 스트림의 정상 동작 여부뿐만 아니라, 오류 처리 등도 쉽게 테스트할 수 있습니다.

build.gradle 설정

StepVerifier를 사용하기 위해 reactor-test 라이브러리를 추가해야 합니다.

dependencies {
    // Reactor Test 라이브러리 추가
    implementation 'io.projectreactor:reactor-test'
}

실습 코드

Publisher 클래스는 다양한 Flux와 Mono를 발행하며, 이를 StepVerifier로 테스트할 수 있습니다.

package com.example.reactorex;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.List;

public class Publisher {
    public Flux<Integer> startFlux() {
        return Flux.range(1, 10).log();
    }

    public Flux<String> startFlux2() {
        return Flux.fromIterable(List.of("a", "b", "c")).log();
    }

    public Mono<Integer> startMono() {
        return Mono.just(1).log();
    }

    public Mono<?> startMono2() {
        return Mono.empty().log();
    }

    public Mono<?> startMono3() {
        return Mono.error(new Exception("hello reactor"));
    }
}

테스트 코드 작성

각 메소드의 동작을 StepVerifier로 테스트합니다.

package com.example.reactorex;

import org.junit.jupiter.api.Test;
import reactor.test.StepVerifier;

class PublisherTest {

    private final Publisher publisher = new Publisher();

    // Flux 테스트
    @Test
    void startFlux() {
        StepVerifier.create(publisher.startFlux())
                .expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
                .verifyComplete();
    }

    // Flux2 테스트
    @Test
    void startFlux2() {
        StepVerifier

.create(publisher.startFlux2())
                .expectNext("a", "b", "c")
                .verifyComplete();
    }

    // Mono 테스트
    @Test
    void startMono() {
        StepVerifier.create(publisher.startMono())
                .expectNext(1)
                .verifyComplete();
    }

    // Mono2 테스트 (빈 스트림)
    @Test
    void startMono2() {
        StepVerifier.create(publisher.startMono2())
                .verifyComplete();
    }

    // Mono3 테스트 (오류 발생)
    @Test
    void startMono3() {
        StepVerifier.create(publisher.startMono3())
                .expectError(Exception.class)
                .verify();
    }
}

테스트 시나리오 분석

  1. startFlux() 테스트: Flux.range(1, 10)에서 1부터 10까지의 숫자가 순차적으로 발행되는 것을 검증합니다.
  2. startFlux2() 테스트: Flux.fromIterable(List.of("a", "b", "c"))에서 문자열 "a", "b", "c"가 발행되는지 검증합니다.
  3. startMono() 테스트: Mono.just(1)에서 단일 값 1이 발행되는지 확인합니다.
  4. startMono2() 테스트: 빈 스트림 Mono.empty()가 정상적으로 완료되는지 검증합니다.
  5. startMono3() 테스트: Mono.error(new Exception("hello reactor"))에서 오류가 발생하는지 확인합니다.

요약

StepVerifier는 리액티브 스트림을 쉽게 검증할 수 있는 도구로, 비동기 프로그램의 안정성을 높일 수 있습니다. Flux와 Mono의 발행 순서, 완료 여부, 오류 처리까지 모든 면에서 테스트를 쉽게 구성할 수 있어 리액티브 프로그램의 정확한 동작을 보장할 수 있습니다.


Operator

Reactor Core에서 제공하는 주요 Operator 중 map, filter, take, 그리고 flatMap은 각각 데이터 스트림을 조작하거나 필터링하는 데 사용됩니다. 이 메소드들은 Flux와 Mono에서 비동기 데이터 처리의 다양한 패턴을 구현하는 데 매우 유용합니다. 각각의 동작 방식과 예시를 살펴보겠습니다.

Reactor Core에서 제공하는 주요 Operator 중 map, filter, take, 그리고 flatMap은 각각 데이터 스트림을 조작하거나 필터링하는 데 사용됩니다. 이 메소드들은 Flux와 Mono에서 비동기 데이터 처리의 다양한 패턴을 구현하는 데 매우 유용합니다. 각각의 동작 방식과 예시를 살펴보겠습니다.

1. map

map은 데이터 스트림 내의 각각의 요소를 다른 형태로 변환할 때 사용됩니다. 함수형 프로그래밍에서 말하는 "맵핑"의 개념으로, 입력 값을 받아 새로운 값을 반환하는 함수를 적용합니다. 기존의 값을 변환하지만 스트림의 크기나 순서를 변경하지는 않습니다.

예시
package com.example.reactorex;

import reactor.core.publisher.Flux;

public class Operator1 {
    public Flux<Integer> fluxMap(){
        return Flux.range(1, 5)
                .map(i -> i * 2)
                .log();
    }
}

테스트

package com.example.reactorex;

import org.junit.jupiter.api.Test;
import reactor.test.StepVerifier;

import static org.junit.jupiter.api.Assertions.*;

class Operator1Test {

    private Operator1 operator1 = new Operator1();
    @Test
    void fluxMap() {
        StepVerifier.create(operator1.fluxMap())
                .expectNext(2, 4,  6, 8, 10)
                .verifyComplete();
    }
}
> Task :compileJava
> Task :processResources UP-TO-DATE
> Task :classes
> Task :compileTestJava
> Task :processTestResources NO-SOURCE
> Task :testClasses
19:27:45.128 [Test worker] INFO reactor.Flux.MapFuseable.1 -- | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
19:27:45.131 [Test worker] INFO reactor.Flux.MapFuseable.1 -- | request(unbounded)
19:27:45.132 [Test worker] INFO reactor.Flux.MapFuseable.1 -- | onNext(2)
19:27:45.132 [Test worker] INFO reactor.Flux.MapFuseable.1 -- | onNext(4)
19:27:45.132 [Test worker] INFO reactor.Flux.MapFuseable.1 -- | onNext(6)
19:27:45.132 [Test worker] INFO reactor.Flux.MapFuseable.1 -- | onNext(8)
19:27:45.132 [Test worker] INFO reactor.Flux.MapFuseable.1 -- | onNext(10)
19:27:45.132 [Test worker] INFO reactor.Flux.MapFuseable.1 -- | onComplete()
> Task :test
BUILD SUCCESSFUL in 2s
4 actionable tasks: 3 executed, 1 up-to-date
7:27:45 PM: Execution finished ':test --tests "com.example.reactorex.Operator1Test.fluxMap"'.

2. filter

filter는 조건에 맞는 데이터만 남기고, 나머지는 필터링하여 제거하는 역할을 합니다. 즉, 특정 조건을 만족하는 요소만을 데이터 스트림에 남기고 나머지는 제거합니다.

예시
public class Operator1 {

    public Flux<Integer> fluxFilter(){
        return Flux.range(1, 10)
                .filter(i -> i>5)
                .log();
    }
}

테스트

class Operator1Test {

    private Operator1 operator1 = new Operator1();

    @Test
    void fluxFilter(){
        StepVerifier.create(operator1.fluxFilter())
                .expectNext(6, 7, 8,9, 10)
                .verifyComplete();
    }
}
> Task :compileJava
> Task :processResources UP-TO-DATE
> Task :classes
> Task :compileTestJava
> Task :processTestResources NO-SOURCE
> Task :testClasses
19:32:57.338 [Test worker] INFO reactor.Flux.FilterFuseable.1 -- | onSubscribe([Fuseable] FluxFilterFuseable.FilterFuseableSubscriber)
19:32:57.341 [Test worker] INFO reactor.Flux.FilterFuseable.1 -- | request(unbounded)
19:32:57.341 [Test worker] INFO reactor.Flux.FilterFuseable.1 -- | onNext(6)
19:32:57.341 [Test worker] INFO reactor.Flux.FilterFuseable.1 -- | onNext(7)
19:32:57.341 [Test worker] INFO reactor.Flux.FilterFuseable.1 -- | onNext(8)
19:32:57.341 [Test worker] INFO reactor.Flux.FilterFuseable.1 -- | onNext(9)
19:32:57.341 [Test worker] INFO reactor.Flux.FilterFuseable.1 -- | onNext(10)
19:32:57.342 [Test worker] INFO reactor.Flux.FilterFuseable.1 -- | onComplete()
> Task :test
BUILD SUCCESSFUL in 907ms
4 actionable tasks: 3 executed, 1 up-to-date
7:32:57 PM: Execution finished ':test --tests "com.example.reactorex.Operator1Test.fluxFilter"'.

3. take

take는 데이터 스트림에서 처음부터 지정한 개수만큼의 요소만을 발행합니다. 주로 스트림의 데이터를 일정 부분만 취할 때 사용합니다. 예를 들어, 첫 N개의 요소만 처리하고 싶을 때 유용합니다.

예시
public class Operator1 {

    public Flux<Integer> fluxFilterTake(){
        return Flux.range(1, 10)
                .filter(i -> i>5)
                .take(3)
                .log();
    }
}

테스트

    @Test
    void fluxFilterTake(){
        StepVerifier.create(operator1.fluxFilterTake())
                .expectNext(6, 7, 8)
                .verifyComplete();
    }
> Task :compileJava UP-TO-DATE
> Task :processResources UP-TO-DATE
> Task :classes UP-TO-DATE
> Task :compileTestJava
> Task :processTestResources NO-SOURCE
> Task :testClasses
19:37:03.953 [Test worker] INFO reactor.Flux.LimitRequest.1 -- onSubscribe(FluxLimitRequest.FluxLimitRequestSubscriber)
19:37:03.956 [Test worker] INFO reactor.Flux.LimitRequest.1 -- request(unbounded)
19:37:03.956 [Test worker] INFO reactor.Flux.LimitRequest.1 -- onNext(6)
19:37:03.956 [Test worker] INFO reactor.Flux.LimitRequest.1 -- onNext(7)
19:37:03.956 [Test worker] INFO reactor.Flux.LimitRequest.1 -- onNext(8)
19:37:03.956 [Test worker] INFO reactor.Flux.LimitRequest.1 -- onComplete()
> Task :test
BUILD SUCCESSFUL in 771ms
4 actionable tasks: 2 executed, 2 up-to-date
7:37:03 PM: Execution finished ':test --tests "com.example.reactorex.Operator1Test.fluxFilterTake"'.

4. flatMap

flatMap은 map과 유사하지만, 함수가 반환하는 값이 또 다른 Flux나 Mono일 때 사용합니다. 이를 통해 각 요소를 비동기로 처리할 수 있으며, 여러 비동기 작업의 결과를 하나의 Flux로 합쳐서 처리할 수 있습니다. 즉, 각 요소에 대해 비동기 작업을 수행하고 그 결과를 다시 하나의 스트림으로 병합합니다.

예시
    public Flux<Integer> fluxFlatMap(){
        return Flux.range(1, 10)
                .flatMap(i -> Flux.range(i * 10, 10)
                        .delayElements(Duration.ofMillis(100)))
                .log();
    }

동작 방식

  • Flux.range(1, 10): 1부터 9까지의 값을 발행하는 Flux를 생성합니다.
  • flatMap(i -> Flux.range(i * 10, 10)): 각 i에 대해 i * 10부터 시작하는 새로운 Flux가 생성됩니다. 예를 들어, i = 1일 때 Flux.range(10, 10)을 생성해 10부터 19까지 발행합니다.
  • delayElements(Duration.ofMillis(100)): 각 값이 100ms의 지연을 두고 발행됩니다.
  • log(): 리액티브 스트림의 모든 이벤트(onNext, onComplete, onError)가 로그로 출력됩니다.

flatMap()과 병렬 처리

  • flatMap()은 각 Publisher가 비동기적으로 병렬 실행되기 때문에, 발행된 값의 순서가 보장되지 않습니다. 즉, 여러 개의 Flux.range()가 병렬로 실행되면서 값이 순서와 상관없이 발행됩니다.

테스트

@Test
void fluxFlatMap(){
    StepVerifier.create(operator1.fluxFlatMap())
            .expectNextCount(100) // 100개의 값이 발행되는지 확인
            .verifyComplete(); // 스트림이 정상적으로 완료되었는지 확인
}
  • StepVerifier.create(operator1.fluxFlatMap()): fluxFlatMap() 메소드를 테스트하기 위해 StepVerifier를 생성합니다.
  • expectNextCount(100): 100개의 값이 정상적으로 발행되는지 확인합니다.
  • verifyComplete(): 모든 값이 발행된 후 스트림이 정상적으로 완료되었는지 검증합니다.
20:21:17.046 [parallel-10] INFO reactor.Flux.FlatMap.1 -- onNext(100)
20:21:17.048 [parallel-1] INFO reactor.Flux.FlatMap.1 -- onNext(10)
20:21:17.048 [parallel-9] INFO reactor.Flux.FlatMap.1 -- onNext(90)
...
20:21:17.972 [parallel-8] INFO reactor.Flux.FlatMap.1 -- onNext(109)
20:21:17.974 [parallel-6] INFO reactor.Flux.FlatMap.1 -- onComplete()

특징적인 실행 결과

  • 비동기 병렬 처리: parallel로 표시된 다양한 스레드(parallel-1, parallel-10, etc.)에서 값들이 동시에 발행되고 있습니다. 이는 flatMap()이 각 스트림을 병렬로 실행하고 있음을 나타냅니다.
  • 순서가 보장되지 않음: 값이 순서대로 발행되지 않습니다. 예를 들어, 10, 100, 90 등이 순서 없이 동시에 발행되고 있습니다. 이는 flatMap()의 특징으로, 병렬로 각 스트림이 독립적으로 실행되기 때문입니다.
  • 최종적으로 100개의 값 발행: 로그에서 100개의 값이 발행되며, 마지막에 onComplete() 이벤트가 호출되어 스트림이 정상적으로 완료되었습니다.

5. 구구단 만들기

Flux와 flatMap을 사용하여 구구단을 출력하고 테스트하는 코드입니다. Flux.range(1, 9)를 이용해 1부터 9까지의 숫자를 반복하며, 각 숫자에 대해 1부터 9까지의 또 다른 반복을 수행하여 구구단의 결과를 출력합니다.

fluxFlatMap2() 메소드

public Flux<Integer> fluxFlatMap2() {
    return Flux.range(1, 9) // 1부터 9까지의 숫자를 발행
            .flatMap(i -> Flux.range(1, 9) // 각 i에 대해 1부터 9까지의 숫자를 발행
                    .map(j -> {
                        System.out.printf("%d * %d = %d\n", i, j, i * j); // 구구단을 출력
                        return i * j;
                    })
            );
}
  • Flux.range(1, 9): 1부터 9까지의 값을 발행하는 Flux를 생성합니다. 여기서 i는 1부터 9까지 반복됩니다.
  • flatMap(i -> Flux.range(1, 9)): 각 i 값에 대해 다시 1부터 9까지의 값을 발행하는 Flux를 생성합니다. 이때 j 값이 1부터 9까지 순차적으로 변합니다.
  • map(j -> {...}): i와 j의 곱셈 결과를 계산하여 출력한 후, 그 값을 반환합니다. 이로 인해 실제로 콘솔에 구구단이 출력됩니다.
    • 예를 들어, i = 1일 때 1 * 1, 1 * 2 ... 1 * 9까지 출력하고, 이어서 i = 2일 때 2 * 1, 2 * 2... 2 * 9까지 계산하고 출력합니다.

결과적으로

  • 1부터 9까지의 숫자(i) 각각에 대해, 1부터 9까지의 숫자(j)를 곱하여 구구단을 출력하고, 그 결과를 발행합니다.
  • 따라서 출력되는 값은 1 * 1부터 9 * 9 = 81까지의 총 81개의 값입니다.

StepVerifier 테스트

@Test
void fluxFlatMap2() {
    StepVerifier.create(operator1.fluxFlatMap2()) // fluxFlatMap2()를 테스트
            .expectNextCount(81) // 총 81개의 값이 발행되는지 확인
            .verifyComplete(); // 스트림이 정상적으로 완료되었는지 확인
}
  • StepVerifier.create(operator1.fluxFlatMap2()): fluxFlatMap2()에서 생성된 Flux를 테스트합니다.
  • expectNextCount(81): 1 * 1부터 9 * 9까지 계산된 81개의 값이 정확히 발행되었는지 확인합니다.
  • verifyComplete(): 스트림이 정상적으로 완료되었는지 검증합니다.

테스트 실행 결과

콘솔에 구구단이 출력되며, StepVerifier는 expectNextCount(81)로 81개의 값이 정상적으로 발행되었음을 검증합니다.

출력 예시

1 * 1 = 1
1 * 2 = 2
1 * 3 = 3
...
9 * 8 = 72
9 * 9 = 81

요약

  • fluxFlatMap2() 메소드는 구구단(1부터 9까지)을 계산하여 출력하는 코드입니다.
  • StepVerifier 테스트에서는 총 81개의 구구단 값이 발행되고, 스트림이 정상적으로 완료되는지 확인합니다.

6. concatMap

concatMap 메소드를 사용하는 위 코드는 비동기적으로 데이터를 처리하지만, 처리 순서를 보장하는 리액티브 프로그램을 구현한 예시입니다. 아래에서 코드를 분석하고, 테스트 코드와 실행 결과에 대해 설명하겠습니다.

public Flux<Integer> fluxConcatMap(){
    return Flux.range(1, 10)
            .concatMap(i -> Flux.range(i * 10, 10) // i * 10부터 10개의 숫자를 발행
                    .delayElements(Duration.ofMillis(100))) // 각 요소마다 100ms 지연
            .log(); // 이벤트 흐름을 로그로 기록
}

이 코드는 다음과 같은 단계로 동작합니다
1. Flux.range(1, 10): 1부터 9까지의 값을 순차적으로 발행합니다.

  • 예를 들어, 첫 번째 값은 1, 두 번째 값은 2, ... 아홉 번째 값은 9가 발행됩니다.

2. concatMap(i -> Flux.range(i * 10, 10)): 각 i 값에 대해 i * 10부터 시작하는 10개의 숫자를 발행하는 새로운 Flux를 생성합니다.

  • 예를 들어, i = 1일 때 Flux.range(10, 10)은 10부터 19까지의 값을 발행합니다.
  • i = 2일 때는 Flux.range(20, 10)이 20부터 29까지의 값을 발행합니다.

3. delayElements(Duration.ofMillis(100)): 각 요소를 발행할 때마다 100ms의 지연을 추가합니다. 이로 인해 각각의 값은 100ms 간격으로 순차적으로 발행됩니다.

4. log(): 리액티브 스트림에서 발생하는 모든 이벤트(onSubscribe, onNext, onComplete 등)를 로그에 기록합니다.

concatMap은 flatMap과 유사하지만, 각 서브스트림을 처리할 때 순서를 보장합니다. 즉, 첫 번째 스트림이 완료된 후 두 번째 스트림이 시작되며, 모든 스트림은 순서대로 실행됩니다.

예상 출력

  • 첫 번째로 i = 1일 때 10부터 19까지의 값이 100ms 간격으로 발행됩니다.
  • 두 번째로 i = 2일 때 20부터 29까지의 값이 발행됩니다.
  • 마지막으로 i = 9일 때 90부터 99까지 발행됩니다.
  • 총 100개의 값이 발행되며, 스트림은 순서대로 처리됩니다.
@Test
void fluxConcatMap() {
    StepVerifier.create(operator2.fluxConcatMap()) // fluxConcatMap() 메소드를 테스트
            .expectNextCount(100) // 총 100개의 값이 발행되는지 확인
            .verifyComplete(); // 스트림이 정상적으로 완료되었는지 검증
}

실행 결과 분석

20:10:02.704 [parallel-1] INFO reactor.Flux.ConcatMapNoPrefetch.1 -- onNext(10)
20:10:02.810 [parallel-2] INFO reactor.Flux.ConcatMapNoPrefetch.1 -- onNext(11)
20:10:02.915 [parallel-3] INFO reactor.Flux.ConcatMapNoPrefetch.1 -- onNext(12)
...
20:10:12.886 [parallel-10] INFO reactor.Flux.ConcatMapNoPrefetch.1 -- onNext(109)
20:10:12.887 [parallel-10] INFO reactor.Flux.ConcatMapNoPrefetch.1 -- onComplete()
> Task :test
BUILD SUCCESSFUL in 10s
4 actionable tasks: 2 executed, 2 up-to-date
8:10:12 PM: Execution finished ':test --tests "com.example.reactorex.Operator2Test.fluxConcatMap"'.

concatMap()

concatMap 메소드는 Reactor와 같은 리액티브 프로그래밍에서 사용되는 메소드로, 비동기 처리에서 데이터 순서를 보장하며 여러 개의 Publisher를 연결할 때 사용됩니다. concatMap은 flatMap과 유사하지만, 중요한 차이점은 순서를 보장한다는 것입니다.

동작 방식

concatMap은 각 요소에 대해 순차적으로 Publisher를 생성하고, 각 Publisher가 완료된 후에 다음 Publisher를 실행합니다. 즉, 여러 개의 Publisher를 병렬로 실행하는 것이 아니라, 순차적으로 실행하여 데이터의 순서를 유지합니다.

주요 특징

  • 순서 보장: concatMap은 각 요소에 대해 생성된 Publisher가 순차적으로 실행되기 때문에, 최종적으로 발행되는 데이터의 순서가 원본 스트림의 순서를 따릅니다.
  • 비동기 처리: 각 Publisher는 비동기적으로 처리되지만, 순차적으로 이어져 실행됩니다.
    서브 스트림 완료 후 다음 스트림 시작: 각 스트림이 완전히 끝난 후에야 다음 스트림이 시작됩니다.

concatMap과 flatMap의 차이점

  • flatMap: 여러 Publisher를 병렬로 실행하며, 실행 순서는 보장하지 않습니다. 즉, 빠른 순서로 처리된 데이터가 먼저 발행될 수 있습니다.
  • concatMap: 여러 Publisher를 순차적으로 실행하여 순서를 보장합니다. 첫 번째 Publisher가 완료된 후에 두 번째 Publisher가 실행됩니다.

flatMapMany() 메서드는 Reactor에서 Mono 타입을 Flux로 변환할 때 사용하는 메서드입니다. 주로 Mono에서 여러 개의 값을 발행하는 스트림을 만들 때 유용하게 쓰입니다. 이 메서드는 Mono가 하나의 값을 발행한 후, 그 값을 사용해 여러 값을 발행하는 Flux로 변환합니다.

flatMapMany()의 동작 방식

  • Mono.flatMapMany(): Mono가 발행한 단일 값을 사용하여 여러 값을 발행하는 Flux를 생성합니다.
    • Mono는 0 또는 1개의 값을 발행하는 반면, Flux는 0부터 N개의 값을 발행할 수 있습니다.
    • flatMapMany()는 Mono에서 나온 값을 기반으로 다수의 값을 발행하는 Flux를 반환합니다.
코드 분석
public Flux<Integer> monoFlatMapMany(){
    return Mono.just(10)
            .flatMapMany(i -> Flux.range(i, i)) // i 값을 이용해 i부터 i개의 숫자를 발행하는 Flux를 생성
            .log(); // 로그로 이벤트 흐름을 기록
}
동작 설명

1. Mono.just(10): 단일 값 10을 발행하는 Mono를 생성합니다.
2. flatMapMany(i -> Flux.range(i, i)): Mono가 발행한 값 10을 받아서 Flux.range(i, i)를 생성합니다. 여기서 Flux.range(10, 10)은 10부터 10개의 값을 발행하는 Flux입니다. 즉, 10, 11, 12, ..., 19까지 10개의 값을 발행합니다.
3. log(): 리액티브 스트림의 각 이벤트(onSubscribe, onNext, onComplete 등)를 로그로 출력합니다.

테스트 코드 분석

@Test
void monoFlatMapMany(){
    StepVerifier.create(operator2.monoFlatMapMany()) // monoFlatMapMany()를 테스트
            .expectNextCount(10) // 총 10개의 값이 발행되는지 확인
            .verifyComplete(); // 스트림이 정상적으로 완료되었는지 확인
}
  • StepVerifier.create(operator2.monoFlatMapMany()): monoFlatMapMany() 메소드에서 생성된 Flux를 테스트합니다.
  • expectNextCount(10): Flux.range(10, 10)에서 발행된 10개의 값(10부터 19까지)이 정확히 발행되었는지 확인합니다.
  • verifyComplete(): 스트림이 정상적으로 완료되었는지 검증합니다.
실행 결과 분석
20:42:57.240 [Test worker] INFO reactor.Flux.MonoFlatMapMany.1 -- onNext(10)
20:42:57.240 [Test worker] INFO reactor.Flux.MonoFlatMapMany.1 -- onNext(11)
...
20:42:57.241 [Test worker] INFO reactor.Flux.MonoFlatMapMany.1 -- onNext(19)
20:42:57.241 [Test worker] INFO reactor.Flux.MonoFlatMapMany.1 -- onComplete()
특징적인 결과
  • 10부터 19까지의 값이 순차적으로 발행: Flux.range(10, 10)이 생성되면서 10부터 19까지 총 10개의 값이 순차적으로 발행됩니다.
  • 정상 완료: 마지막에 onComplete() 이벤트가 호출되며 스트림이 정상적으로 완료됩니다.

요약

  • flatMapMany()는 Mono에서 단일 값을 발행하고, 그 값을 기반으로 다수의 값을 발행하는 Flux로 변환할 때 사용됩니다.
  • 이 코드에서는 Mono.just(10)으로 시작하여, Flux.range(10, 10)을 통해 10부터 19까지의 값을 발행하는 Flux로 변환했습니다.
  • 테스트 코드는 10개의 값이 정확히 발행되고, 스트림이 정상적으로 완료되었는지 확인합니다.

defaultIfEmpty() 메서드 설명

defaultIfEmpty()는 리액티브 스트림에서 값이 없을 경우 기본 값을 제공하는 메서드입니다. 주로 Mono나 Flux에서 필터링 결과로 값이 발행되지 않을 때, 즉 스트림이 비어 있을 때 사용됩니다. 이 메서드는 기본값을 지정하여, 스트림이 비었을 경우 해당 기본값을 발행하게 합니다.

public Mono<Integer> defaultEmpty() {
    return Mono.just(100)
            .filter(i -> i > 100)  // 100보다 큰 값을 필터링, 조건을 만족하지 않으므로 값이 없음
            .defaultIfEmpty(30)    // 값이 없으면 30을 기본값으로 발행
            .log();                // 이벤트 흐름을 로그로 기록
}
동작 과정
  1. Mono.just(100): 단일 값 100을 발행하는 Mono를 생성합니다.
  2. filter(i -> i > 100): i > 100이라는 조건을 적용하여 필터링을 수행합니다. 그러나 값 100은 조건을 만족하지 않기 때문에 필터링된 후에는 값이 없게 됩니다.
  3. defaultIfEmpty(30): 스트림에 값이 없을 경우, 기본값인 30을 발행합니다. 즉, 필터링 후 값이 없으므로 30이 발행됩니다.
  4. log(): 리액티브 스트림의 모든 이벤트가 로그로 출력됩니다.
예상 동작
  • Mono.just(100)에서 시작된 값이 필터링된 후 값이 없으므로, defaultIfEmpty(30)에 의해 30이 기본값으로 발행됩니다.
@Test
void defaultIfEmpty() {
    StepVerifier.create(operator2.defaultEmpty())  // defaultEmpty() 메소드를 테스트
            .expectNext(30)                       // 30이 발행되었는지 확인
            .verifyComplete();                    // 스트림이 정상적으로 완료되었는지 확인
}
  • StepVerifier.create(operator2.defaultEmpty()): defaultEmpty() 메소드의 리액티브 스트림을 테스트합니다.
  • expectNext(30): defaultIfEmpty(30)에 의해 30이 발행되었는지 확인합니다.
  • verifyComplete(): 스트림이 정상적으로 완료되었는지 검증합니다.
실행 결과 분석
20:48:58.571 [Test worker] INFO reactor.Mono.DefaultIfEmpty.1 -- onNext(30)
20:48:58.571 [Test worker] INFO reactor.Mono.DefaultIfEmpty.1 -- onComplete()
결과 요약
  • onNext(30): 필터링된 후 값이 없으므로 기본값인 30이 발행되었습니다.
  • onComplete(): 값이 발행된 후 스트림이 정상적으로 완료되었습니다.

요약

  • defaultIfEmpty()는 리액티브 스트림이 값이 없을 때 지정된 기본값을 발행합니다.
  • 이 코드에서는 Mono.just(100)에서 값이 필터링된 후 아무 값도 남지 않기 때문에 defaultIfEmpty(30)이 호출되어 기본값인 30이 발행됩니다.
  • 테스트는 30이 정상적으로 발행되었고, 스트림이 정상적으로 완료되었는지 검증합니다.

switchIfEmpty() 메서드 설명

switchIfEmpty()는 리액티브 스트림에서 값이 없을 경우 대체 스트림을 실행하는 메서드입니다. 이는 기본적으로 defaultIfEmpty()와 비슷한 역할을 하지만, 대체 스트림을 발행할 수 있다는 점에서 차이가 있습니다. switchIfEmpty()는 원본 Mono나 Flux가 값을 발행하지 않을 때, 지정된 다른 Mono나 Flux를 사용하여 값을 발행하도록 설정합니다.

코드 분석

switchIfEmpty() 메서드
public Mono<Integer> switchIfEmpty() {
    return Mono.just(100)
            .filter(i -> i > 100)  // i > 100 조건을 만족하지 않으므로 값이 없음
            .switchIfEmpty(Mono.just(30).map(i -> i * 2))  // 값이 없으면 30 * 2 값을 발행
            .log();  // 이벤트 흐름을 기록
}
동작 과정
  1. Mono.just(100): 단일 값 100을 발행하는 Mono를 생성합니다.
  2. filter(i -> i > 100): i > 100이라는 조건을 적용하여 필터링합니다. 값 100은 조건을 만족하지 않으므로 필터링 후에는 값이 없습니다.
  3. switchIfEmpty(Mono.just(30).map(i -> i * 2)): 원본 Mono가 값이 없으므로, 대체 Mono인 Mono.just(30).map(i -> i * 2)이 실행됩니다. 이로 인해 30이 2배로 변환되어 최종적으로 60이 발행됩니다.
  4. log(): 리액티브 스트림의 모든 이벤트(onSubscribe, onNext, onComplete)가 로그로 출력됩니다.
동작 결과
  • 원본 스트림이 값이 없으므로 switchIfEmpty()로 지정된 Mono.just(30)이 대체되어 실행되고, 그 값은 map(i -> i * 2)로 변환되어 60을 발행합니다.
테스트 코드 분석
@Test
void switchIfEmpty() {
    StepVerifier.create(operator2.switchIfEmpty())  // switchIfEmpty() 메소드를 테스트
            .expectNext(60)                       // 60이 발행되었는지 확인
            .verifyComplete();                    // 스트림이 정상적으로 완료되었는지 확인
}
  • StepVerifier.create(operator2.switchIfEmpty()): switchIfEmpty() 메소드의 리액티브 스트림을 테스트합니다.
  • expectNext(60): switchIfEmpty()에서 30 * 2 = 60이 발행되었는지 확인합니다.
  • verifyComplete(): 스트림이 정상적으로 완료되었는지 검증합니다.
실행 결과 분석
20:54:34.434 [Test worker] INFO reactor.Mono.SwitchIfEmpty.1 -- onNext(60)
20:54:34.434 [Test worker] INFO reactor.Mono.SwitchIfEmpty.1 -- onComplete()
결과 요약
  • onNext(60): 원본 스트림이 필터링 후 값이 없었기 때문에 switchIfEmpty()에 의해 대체 스트림에서 60이 발행되었습니다.
  • onComplete(): 발행된 후 스트림이 정상적으로 완료되었습니다.

switchIfEmpty()와 defaultIfEmpty() 차이점

  • defaultIfEmpty(): 값이 없을 경우 하나의 기본값을 발행합니다.
  • switchIfEmpty(): 값이 없을 경우 다른 스트림으로 대체되어 실행됩니다. 더 복잡한 대체 로직을 포함할 수 있습니다.

요약

  • switchIfEmpty()는 리액티브 스트림이 값이 없을 때 대체 스트림을 실행하여 값을 발행하는 메서드입니다.
  • 이 코드에서는 Mono.just(100)에서 값을 필터링한 후 값이 없으므로 switchIfEmpty()를 통해 Mono.just(30)이 실행되고, map(i -> i * 2)에 의해 최종적으로 60이 발행됩니다.
  • 테스트는 60이 발행되고, 스트림이 정상적으로 완료되었는지 검증합니다.

switchIfEmpty() 메서드와 에러 처리

switchIfEmpty() 메서드는 원본 스트림이 비었을 때 대체 스트림을 제공하는 역할을 합니다. 이번 코드에서는 대체 스트림 대신 에러를 발생시키는 예제입니다. 이 메서드를 통해 리액티브 스트림이 값이 없을 때 지정된 에러를 던질 수 있습니다.

코드 분석

public Mono<Integer> switchIfEmpty() {
    return Mono.just(100)  // 100을 발행하는 Mono
            .filter(i -> i > 100)  // i가 100보다 큰 경우만 필터링, 100은 조건을 만족하지 않음
            .switchIfEmpty(Mono.error(new Exception("Not exists value...")))  // 값이 없을 경우 에러 발생
            .log();  // 로그로 이벤트 흐름을 기록
}
동작 설명
  1. Mono.just(100): 단일 값 100을 발행하는 Mono를 생성합니다.
  2. filter(i -> i > 100): i > 100이라는 조건을 적용하여 필터링합니다. 값 100은 조건을 만족하지 않기 때문에 필터링 후 값이 없게 됩니다.
  3. switchIfEmpty(Mono.error(...)): 원본 스트림이 값이 없으므로, 대체 스트림으로 Mono.error(new Exception("Not exists value..."))가 실행됩니다. 이로 인해 에러가 발생합니다.
  4. log(): 리액티브 스트림의 이벤트 흐름(onError, onComplete 등)이 로그로 출력됩니다.
동작 결과
  • 원본 스트림이 비어 있으므로 switchIfEmpty()에 의해 지정된 예외 Exception("Not exists value...")가 던져집니다.
테스트 코드 분석
@Test
void switchIfEmpty2() {
    StepVerifier.create(operator2.switchIfEmpty())  // switchIfEmpty() 메소드를 테스트
            .expectError()                         // 에러가 발생하는지 확인
            .verify();                             // 스트림이 정상적으로 완료되었는지 확인
}
  • StepVerifier.create(operator2.switchIfEmpty()): switchIfEmpty() 메소드의 리액티브 스트림을 테스트합니다.
  • expectError(): 에러가 발생하는지 확인합니다.
  • verify(): 스트림이 에러와 함께 정상적으로 종료되었는지 검증합니다.
실행 결과 분석
21:05:13.469 [Test worker] ERROR reactor.Mono.SwitchIfEmpty.1 -- onError(java.lang.Exception: Not exists value...)
결과 요약
  • onError(Exception: Not exists value...): switchIfEmpty()로 인해 지정된 에러가 발생하고, 로그로 출력됩니다.
  • 테스트는 에러 발생 여부를 확인하며, expectError()를 통해 에러가 발생했음을 검증합니다.

요약

  • switchIfEmpty(): 리액티브 스트림이 값이 없을 경우 대체 스트림을 실행합니다. 이번 예제에서는 대체 스트림 대신 에러를 던지도록 설정하였습니다.
  • 에러 처리: 원본 스트림에서 값이 없으면 Mono.error(new Exception(...))이 실행되며, 테스트에서는 이 에러가 정상적으로 발생했는지 확인합니다.
  • 테스트: StepVerifier를 사용하여 스트림에서 에러가 발생했음을 expectError()로 확인하고, 스트림이 에러와 함께 종료되었는지 검증합니다.
Flux.merge() 메서드 설명

Flux.merge()는 여러 개의 Flux 스트림을 병합하여 동시에 발행하는 메서드입니다. 이 메서드는 각각의 Flux가 비동기적으로 값을 발행할 수 있으며, 발행되는 순서는 각 스트림의 처리 속도에 따라 달라질 수 있습니다. 단, 병합된 스트림은 모든 스트림이 완료될 때까지 실행됩니다.

코드 분석
public Flux<String> fluxMerge() {
    return Flux.merge(
            Flux.fromIterable(List.of("1", "2", "3")),  // 1, 2, 3을 발행하는 Flux
            Flux.just("4")  // 4를 발행하는 Flux
    ).log();  // 이벤트 흐름을 기록
}
동작 설명
  1. Flux.fromIterable(List.of("1", "2", "3")): 리스트 ["1", "2", "3"]의 요소를 순차적으로 발행하는 Flux를 생성합니다.
  2. Flux.just("4"): 단일 값 "4"를 발행하는 Flux를 생성합니다.
  3. Flux.merge(): 위에서 생성한 두 개의 Flux를 병합하여 하나의 Flux로 만듭니다. 두 스트림이 병합되어 순서와 상관없이 동시에 발행됩니다.
  4. log(): 리액티브 스트림의 모든 이벤트(onNext, onComplete, onError 등)가 로그로 기록됩니다.
예상 동작
  • "1", "2", "3"은 Flux.fromIterable()에서 발행되고, "4"는 Flux.just()에서 발행됩니다.
  • 두 개의 Flux는 병합되며, 두 스트림이 완료될 때까지 모든 값이 발행됩니다.
  • 결과적으로 "1", "2", "3", "4"가 발행됩니다.

테스트 코드 분석

@Test
void fluxMerge() {
    StepVerifier.create(operator2.fluxMerge())  // fluxMerge() 메소드를 테스트
            .expectNext("1", "2", "3", "4")   // "1", "2", "3", "4" 순서대로 발행되는지 확인
            .verifyComplete();                // 스트림이 정상적으로 완료되었는지 확인
}
  • StepVerifier.create(operator2.fluxMerge()): fluxMerge() 메소드의 리액티브 스트림을 테스트합니다.
  • expectNext("1", "2", "3", "4"): "1", "2", "3", "4"이 순차적으로 발행되는지 확인합니다.
  • verifyComplete(): 모든 값이 발행되고 스트림이 정상적으로 완료되었는지 검증합니다.

실행 결과 분석

21:13:08.585 [Test worker] INFO reactor.Flux.Merge.1 -- onNext(1)
21:13:08.586 [Test worker] INFO reactor.Flux.Merge.1 -- onNext(2)
21:13:08.586 [Test worker] INFO reactor.Flux.Merge.1 -- onNext(3)
21:13:08.586 [Test worker] INFO reactor.Flux.Merge.1 -- onNext(4)
21:13:08.586 [Test worker] INFO reactor.Flux.Merge.1 -- onComplete()
결과 요약
  • onNext(1), onNext(2), onNext(3), onNext(4): "1", "2", "3", "4"가 차례대로 발행되었습니다.
  • onComplete(): 발행이 완료된 후 스트림이 정상적으로 종료되었습니다.

Flux.merge()의 특징

  • 병합된 스트림의 발행 순서: Flux.merge()는 각 스트림을 병렬로 실행하므로, 순서가 보장되지 않을 수 있습니다. 하지만 이 예제에서는 값 발행 순서가 동일하기 때문에 "1", "2", "3", "4"가 순서대로 출력됩니다.
  • 병렬 실행: 각 Flux는 병렬로 실행될 수 있으며, 실제로 여러 Flux의 처리 속도에 따라 발행 순서가 달라질 수 있습니다.

요약

  • Flux.merge()는 여러 Flux를 병합하여 병렬로 실행하는 메서드입니다.
  • 이 코드에서는 Flux.fromIterable()과 Flux.just()를 병합하여 "1", "2", "3", "4"를 발행합니다.
  • 테스트는 발행된 값들이 순서대로 출력되고 스트림이 정상적으로 완료되었는지 검증합니다.

Mono.mergeWith() 메서드 설명

mergeWith() 메서드는 두 개의 Mono 또는 Flux를 병합하여 동시에 발행할 수 있도록 해줍니다. 이 메서드는 각 스트림이 병렬로 실행될 수 있으며, 두 개 이상의 Mono를 병합하여 Flux로 변환합니다.

Mono와 Mono를 병합하면 Flux가 되는 이유

Mono는 0 또는 1개의 값을 발행할 수 있는 제한이 있습니다. 만약 두 개 이상의 Mono를 병합하게 되면, 각 Mono는 최대 하나의 값을 발행할 수 있으므로 최대 2개의 값 이상이 발행될 수 있습니다. 이러한 상황에서 발행된 값의 수가 Mono의 발행 제한을 넘기기 때문에 결과적으로 0개 이상의 값을 발행할 수 있는 Flux로 변환됩니다.

요약:

  • Mono는 1개의 값만 발행할 수 있습니다.
  • 두 개 이상의 Mono를 병합하면 발행 가능한 값의 수가 1개를 초과할 수 있으므로, 여러 값을 발행할 수 있는 Flux로 자동 변환됩니다.

예를 들어, 다음 코드에서 Mono.just("1")와 Mono.just("2")를 병합할 때, 결과는 두 개의 값을 발행해야 하기 때문에 Flux<String>로 변환됩니다.

Mono<String> mono1 = Mono.just("A");
Mono<String> mono2 = Mono.just("B");

Flux<String> flux = mono1.mergeWith(mono2);  // Flux가 반환됨

코드 분석

monoMerge() 메서드

public Flux<String> monoMerge() {
    return Mono.just("1")
            .mergeWith(Mono.just("2"))  // "1"과 "2"를 병합
            .mergeWith(Mono.just("3"))  // 병합된 결과에 "3"을 추가로 병합
            .log();  // 이벤트 흐름을 기록
}

동작 설명

  1. Mono.just("1"): 단일 값 "1"을 발행하는 Mono를 생성합니다.
  2. mergeWith(Mono.just("2")): "1"을 발행하는 Mono와 "2"를 발행하는 Mono를 병합합니다.
  3. mergeWith(Mono.just("3")): 병합된 "1"과 "2"의 결과에 "3"을 발행하는 Mono를 병합합니다.
  4. log(): 리액티브 스트림의 이벤트 흐름(onNext, onComplete, onError)이 로그로 기록됩니다.

예상 동작

  • "1", "2", "3"이 병합되어 Flux<String>로 반환됩니다.
  • 각 값이 병렬로 실행될 수 있지만, 이 예제에서는 값들이 순차적으로 발행됩니다.

테스트 코드 분석

@Test
void monoMerge() {
    StepVerifier.create(operator2.monoMerge())  // monoMerge() 메소드를 테스트
            .expectNext("1", "2", "3")         // "1", "2", "3"이 발행되는지 확인
            .verifyComplete();                 // 스트림이 정상적으로 완료되었는지 확인
}
  • StepVerifier.create(operator2.monoMerge()): monoMerge() 메소드의 리액티브 스트림을 테스트합니다.
  • expectNext("1", "2", "3"): "1", "2", "3"이 순서대로 발행되는지 확인합니다.
  • verifyComplete(): 모든 값이 발행되고 스트림이 정상적으로 완료되었는지 검증합니다.

실행 결과 분석

> Task :compileJava UP-TO-DATE
> Task :processResources UP-TO-DATE
> Task :classes UP-TO-DATE
> Task :compileTestJava
> Task :processTestResources NO-SOURCE
> Task :testClasses
> Task :test
BUILD SUCCESSFUL in 668ms
4 actionable tasks: 2 executed, 2 up-to-date

결과 요약

  • expectNext("1", "2", "3"): "1", "2", "3"이 순서대로 발행되었습니다.
  • verifyComplete(): 값이 발행된 후 스트림이 정상적으로 완료되었습니다.

mergeWith()와 병렬 처리

  • 병렬 처리 가능성: mergeWith()는 병합된 Mono나 Flux를 병렬로 실행할 수 있습니다. 그러나 각 Mono가 이미 단일 값을 즉시 발행하고 완료되기 때문에 순서대로 발행된 것처럼 보입니다.
  • 순서 보장 없음: mergeWith()는 병렬로 실행될 때 각 Mono의 발행 순서를 보장하지 않지만, 이 예제에서는 순차적으로 발행됩니다.

요약

  • mergeWith()는 여러 Mono를 병합하여 하나의 Flux로 변환하며, 각 스트림을 병렬로 실행할 수 있습니다.
  • Mono 두 개 이상을 병합하면 최대 하나 이상의 값이 발행되기 때문에, 결과는 여러 개의 값을 발행할 수 있는 Flux가 됩니다.
  • 이 코드에서는 "1", "2", "3"을 발행하는 Mono들을 병합하여 결과적으로 "1", "2", "3"이 발행됩니다.
  • 테스트는 값들이 순서대로 발행되고 스트림이 정상적으로 완료되었는지 검증합니다.

Flux.zip() 메서드 설명

Flux.zip()은 두 개 이상의 Flux를 짝 지어 결합하는 메서드입니다. 각 Flux에서 동일한 순서로 발행된 값을 짝 지어 새로운 값으로 변환할 수 있습니다. 즉, 각각의 스트림에서 같은 순번의 값을 함께 묶어서 처리할 수 있는 방식입니다.

fluxZip() 메서드
public Flux<String> fluxZip() {
    return Flux.zip(Flux.just("a", "b", "c"), Flux.just("d", "e", "f"))  // 두 Flux를 결합
            .map(i -> i.getT1() + i.getT2())  // 각 짝의 값을 결합하여 새로운 값을 생성
            .log();  // 이벤트 흐름을 기록
}
동작 설명

1. Flux.zip(): Flux.just("a", "b", "c")와 Flux.just("d", "e", "f")에서 발행된 값을 순차적으로 짝 지어 결합합니다.
- 첫 번째 Flux의 "a"와 두 번째 Flux의 "d"가 결합되고, 그 다음으로 "b"와 "e", "c"와 "f"가 결합됩니다.
2. map(i -> i.getT1() + i.getT2()): zip()으로 결합된 두 값(T1과 T2)을 받아서 문자열을 결합합니다.
- 예를 들어, "a"와 "d"가 결합되어 "ad"가 됩니다.
3. log(): 리액티브 스트림의 이벤트 흐름(onNext, onComplete, onError)이 로그로 기록됩니다.

예상 동작
  • "a"와 "d", "b"와 "e", "c"와 "f"가 각각 짝 지어 결합된 "ad", "be", "cf"가 순차적으로 발행됩니다.

테스트 코드 분석

@Test
void fluxZip() {
    StepVerifier.create(operator2.fluxZip())  // fluxZip() 메소드를 테스트
            .expectNext("ad", "be", "cf")    // "ad", "be", "cf"가 순서대로 발행되는지 확인
            .verifyComplete();               // 스트림이 정상적으로 완료되었는지 확인
}
  • StepVerifier.create(operator2.fluxZip()): fluxZip() 메소드의 리액티브 스트림을 테스트합니다.
  • expectNext("ad", "be", "cf"): "ad", "be", "cf"가 순서대로 발행되는지 확인합니다.
  • verifyComplete(): 모든 값이 발행되고 스트림이 정상적으로 완료되었는지 검증합니다.

실행 결과 분석

21:32:42.091 [Test worker] INFO reactor.Flux.Map.1 -- onNext(ad)
21:32:42.092 [Test worker] INFO reactor.Flux.Map.1 -- onNext(be)
21:32:42.092 [Test worker] INFO reactor.Flux.Map.1 -- onNext(cf)
21:32:42.092 [Test worker] INFO reactor.Flux.Map.1 -- onComplete()

결과 요약

  • onNext(ad), onNext(be), onNext(cf): 두 Flux에서 각각 결합된 "ad", "be", "cf"가 순서대로 발행되었습니다.
  • onComplete(): 발행이 완료된 후 스트림이 정상적으로 종료되었습니다.

Flux.zip()의 특징

  • 동시 발행된 값들을 결합: 두 개의 Flux에서 같은 순서로 발행된 값들을 결합합니다.
  • 짝 지어진 순서대로 결합: 각 Flux의 발행 순서에 맞춰 짝 지어 결합되며, 발행 순서가 유지됩니다.
  • 결합한 스트림: 두 개 이상의 Flux에서 동일한 순서로 발행된 값들이 짝을 이루어 새롭게 변환된 값이 발행됩니다.

요약

  • Flux.zip()은 두 개 이상의 Flux를 병합하여 같은 순서로 발행된 값들을 짝 지어 결합합니다.
  • 이 코드에서는 "a", "b", "c"와 "d", "e", "f"가 각각 결합되어 "ad", "be", "cf"가 발행됩니다.
  • 테스트는 발행된 값들이 순서대로 출력되고 스트림이 정상적으로 완료되었는지 검증합니다.

Mono.zip() 메서드 설명

Mono.zip() 메서드는 여러 개의 Mono를 결합하여 각각의 값을 하나의 튜플로 묶는 메서드입니다. 이를 통해 각각의 Mono에서 발행된 값을 동시에 처리할 수 있습니다. 최대 8개의 Mono를 결합할 수 있으며, 모든 Mono가 값을 발행한 후, 해당 값들을 하나의 Mono로 결합하여 결과를 발행합니다.

monoZip() 메서드
public Mono<Integer> monoZip() {
    return Mono.zip(
            Mono.just(1),  // 첫 번째 Mono에서 값 1을 발행
            Mono.just(2),  // 두 번째 Mono에서 값 2를 발행
            Mono.just(3)   // 세 번째 Mono에서 값 3을 발행
        )
        .map(i -> i.getT1() + i.getT2() + i.getT3())  // 각 Mono에서 받은 값을 더함
        .log();  // 이벤트 흐름을 기록
}
동작 설명

1. Mono.zip(): 세 개의 Mono(Mono.just(1), Mono.just(2), Mono.just(3))에서 각각 값을 발행한 후, 세 값을 하나의 튜플로 묶습니다.
- 이 튜플은 Tuple3<Integer, Integer, Integer> 형태로 반환되며, 각각의 값은 getT1(), getT2(), getT3() 메서드를 통해 접근할 수 있습니다.
2. map(i -> i.getT1() + i.getT2() + i.getT3()): 결합된 튜플(Tuple3)을 받아, 각 값을 더한 결과를 반환합니다.
- 예를 들어, 1 + 2 + 3의 결과로 6이 발행됩니다.
3. log(): 리액티브 스트림의 이벤트(onNext, onComplete, onError)가 로그로 기록됩니다.

예상 동작
  • 각 Mono에서 발행된 값인 1, 2, 3이 결합되고, 더한 값 6이 최종적으로 발행됩니다.

테스트 코드 분석

@Test
void monoZip() {
    StepVerifier.create(operator2.monoZip())  // monoZip() 메소드를 테스트
            .expectNext(6)                   // 결합된 값(1+2+3=6)이 발행되는지 확인
            .verifyComplete();               // 스트림이 정상적으로 완료되었는지 확인
}
  • StepVerifier.create(operator2.monoZip()): monoZip() 메소드의 리액티브 스트림을 테스트합니다.
  • expectNext(6): 결합된 값 6이 발행되는지 확인합니다.
  • verifyComplete(): 스트림이 정상적으로 완료되었는지 검증합니다.

실행 결과 분석

21:39:15.140 [Test worker] INFO reactor.Mono.Map.1 -- onNext(6)
21:39:15.140 [Test worker] INFO reactor.Mono.Map.1 -- onComplete()
결과 요약
  • onNext(6): Mono.zip()을 통해 결합된 값이 6으로 발행되었습니다.
  • onComplete(): 값이 발행된 후 스트림이 정상적으로 완료되었습니다.

Mono.zip()의 특징

  • 여러 Mono의 값을 결합: Mono.zip()은 여러 개의 Mono에서 값을 받아 하나의 튜플로 결합합니다. 이 예제에서는 Tuple3<Integer, Integer, Integer>이 반환됩니다.
  • 결합된 값의 처리: 결합된 튜플의 값들은 map()과 같은 연산자를 사용해 조작할 수 있습니다.
  • 최대 8개의 Mono 지원: Mono.zip()은 최대 8개의 Mono를 결합할 수 있으며, 그 이상의 결합이 필요할 경우 Flux.zip()을 사용할 수 있습니다.

요약

  • Mono.zip()은 여러 개의 Mono를 결합하여 그 값들을 하나의 튜플로 묶고, 그 결과를 하나의 Mono로 발행합니다.
  • 이 코드에서는 Mono.just(1), Mono.just(2), Mono.just(3)을 결합한 후, 세 값을 더하여 결과적으로 6을 발행합니다.
  • 테스트는 값 6이 정상적으로 발행되고, 스트림이 완료되었는지 검증합니다.

집계함수

groupBy() 메서드

public Flux<GroupedFlux<Character, String>> fluxGroupBy() {
    return Flux.just("apple", "banana", "apricot", "blueberry")
            .groupBy(fruit -> fruit.charAt(0))  // 첫 글자를 기준으로 그룹화
            .log();  // 로그 기록
}

설명:

  • Flux.just("apple", "banana", "apricot", "blueberry"): 4개의 문자열("apple", "banana", "apricot", "blueberry")을 발행하는 Flux를 생성합니다.
  • groupBy(fruit -> fruit.charAt(0)): 각 문자열을 첫 글자('a', 'b')로 그룹화합니다. 이 그룹은 GroupedFlux로 반환됩니다.
@Test
void fluxGroupBy() {
    Flux<Map.Entry<Character, List<String>>> groupedLists = operator3.fluxGroupBy()
            .flatMap(group -> group.collectList()
            .map(list -> Map.entry(group.key(), list)));

    StepVerifier.create(groupedLists)
            .expectNextMatches(entry -> entry.getKey() == 'a' && entry.getValue().equals(List.of("apple", "apricot")))
            .expectNextMatches(entry -> entry.getKey() == 'b' && entry.getValue().equals(List.of("banana", "blueberry")))
            .verifyComplete();
}

설명:

  1. operator3.fluxGroupBy(): fluxGroupBy() 메서드를 호출하여 과일 이름을 첫 글자 기준으로 그룹화된 Flux<GroupedFlux<Character, String>>을 반환받습니다.
  2. flatMap(group -> group.collectList()): 각 그룹(GroupedFlux)을 List로 수집하여 Map.Entry로 변환합니다. group.key()를 통해 그룹의 키(첫 글자)를 가져오고, 각 그룹의 값(과일 이름들)을 리스트로 변환하여 매핑합니다.
  3. StepVerifier.create(groupedLists): StepVerifier를 사용하여 결과를 검증합니다.
    • expectNextMatches(): 첫 번째 그룹은 키가 'a'이고, 값이 ["apple", "apricot"]인지 확인합니다.
    • 두 번째 그룹은 키가 'b'이고, 값이 ["banana", "blueberry"]인지 확인합니다.
    • verifyComplete(): 모든 그룹이 예상대로 처리되었고 스트림이 정상적으로 완료되었는지 확인합니다.

로그 설명

테스트 실행 시 log() 메서드에 의해 출력된 로그는 Flux 스트림의 동작을 추적할 수 있게 합니다:

INFO reactor.Flux.GroupBy.1 -- | onSubscribe([Fuseable] FluxGroupBy.GroupByMain)  // 구독 시작
INFO reactor.Flux.GroupBy.1 -- | request(256)  // 256개의 요청
INFO reactor.Flux.GroupBy.1 -- | onNext(UnicastGroupedFlux)  // 첫 번째 그룹 발행 ('a' 그룹)
INFO reactor.Flux.GroupBy.1 -- | onNext(UnicastGroupedFlux)  // 두 번째 그룹 발행 ('b' 그룹)
INFO reactor.Flux.GroupBy.1 -- | request(1)  // 각 그룹에 대해 요청
INFO reactor.Flux.GroupBy.1 -- | onComplete()  // 그룹화 완료
  • onSubscribe: Flux 스트림이 구독될 때 발생합니다.
  • request(256): 요청된 데이터의 양을 나타냅니다. 여기서는 최대 256개의 항목을 요청했습니다.
  • onNext(UnicastGroupedFlux): 그룹이 UnicastGroupedFlux로 발행되었음을 나타냅니다. 이는 각 그룹('a' 그룹, 'b' 그룹)에 해당하는 GroupedFlux 객체입니다.
  • onComplete(): 스트림의 모든 작업이 완료되었음을 나타냅니다.

count() 메서드

public Mono<Long> fluxCount() {
    return Flux.range(1, 10).count();  // 1부터 10까지의 값을 발행하고, 개수를 세어 Mono<Long>로 반환
}

설명:

  • Flux.range(1, 10): 1부터 10까지의 숫자를 발행하는 Flux를 생성합니다.
  • count(): Flux가 발행한 모든 요소를 세고, 그 개수를 Mono<Long>로 반환합니다. 이 예제에서는 총 10개의 숫자가 발행되므로 Mono<Long>에는 10L이 반환됩니다.
@Test
void fluxCount() {
    StepVerifier.create(operator3.fluxCount())
            .expectNext(10L)  // 10개의 요소가 발행되었는지 확인
            .verifyComplete();  // 스트림이 정상적으로 완료되었는지 검증
}

설명:

  1. operator3.fluxCount(): fluxCount() 메서드를 호출하여 Mono<Long>을 반환받습니다.
  2. StepVerifier.create(): 반환된 Mono<Long>에 대한 테스트를 생성합니다.
  3. expectNext(10L): Mono<Long>에서 발행되는 값이 10L인지 확인합니다.
  4. verifyComplete(): 스트림이 정상적으로 완료되었는지 검증합니다.

로그 설명

테스트 실행 시 Mono.count() 메서드에 의해 출력된 로그는 Mono 스트림의 동작을 추적할 수 있습니다:

INFO reactor.Mono.Count.1 -- | onSubscribe([Fuseable] MonoCount.CountSubscriber)  // 구독 시작
INFO reactor.Mono.Count.1 -- | request(unbounded)  // 요청량이 제한 없이 요청됨
INFO reactor.Mono.Count.1 -- | onNext(10)  // 카운트된 값 10이 발행됨
INFO reactor.Mono.Count.1 -- | onComplete()  // 스트림 완료
  • onSubscribe: Mono 스트림이 구독되었을 때 발생합니다.
  • request(unbounded): 데이터 요청이 무제한으로 요청되었음을 의미합니다.
  • onNext(10): 10개의 요소가 발행된 후, 그 개수인 10이 반환됩니다.
  • onComplete(): 스트림이 정상적으로 완료되었음을 나타냅니다.

요약

  • fluxCount() 메서드는 Flux.range(1, 10)을 사용하여 1부터 10까지의 숫자를 발행하고, 그 개수를 계산하여 Mono<Long>으로 반환합니다.
  • 테스트에서는 Mono<Long>에서 발행된 값이 정확히 10L인지 검증하며, 스트림이 정상적으로 완료되었는지 확인합니다.

코드 및 테스트 설명

distinct() 메서드

public Flux<String> fluxDistinct() {
    return Flux.fromIterable(List.of("a", "b", "a", "b", "c"))
               .distinct()  // 중복된 값 제거
               .log();  // 로그 기록
}

설명:

  • Flux.fromIterable(List.of("a", "b", "a", "b", "c")): "a", "b", "a", "b", "c"를 발행하는 Flux를 생성합니다.
  • distinct(): 중복된 값을 제거하고 고유한 값만 발행합니다. 이 경우 "a", "b", "c"가 남습니다.
  • log(): Flux의 동작(onNext, onComplete, onError, onSubscribe)을 로그로 기록하여 스트림 처리 과정이 출력됩니다.
@Test
void fluxDistinct() {
    StepVerifier.create(operator3.fluxDistinct())
            .expectNext("a", "b", "c")  // 중복을 제거한 결과로 "a", "b", "c"가 발행됨
            .verifyComplete();  // 스트림이 정상적으로 완료되었는지 확인
}

설명:

  1. operator3.fluxDistinct(): fluxDistinct() 메서드를 호출하여 중복 제거된 문자열들을 발행하는 Flux<String>을 반환받습니다.
  2. StepVerifier.create(): 반환된 Flux<String>에 대한 테스트를 생성합니다.
  3. expectNext("a", "b", "c"): 중복 제거 후 "a", "b", "c"가 순차적으로 발행되는지 확인합니다.
  4. verifyComplete(): 스트림이 정상적으로 완료되었는지 검증합니다.

로그 설명

테스트 실행 시 log() 메서드에 의해 출력된 로그는 Flux 스트림의 동작을 추적할 수 있습니다:

INFO reactor.Flux.DistinctFuseable.1 -- | onSubscribe([Fuseable] FluxDistinct.DistinctFuseableSubscriber)  // 구독 시작
INFO reactor.Flux.DistinctFuseable.1 -- | request(unbounded)  // 무제한 데이터 요청
INFO reactor.Flux.DistinctFuseable.1 -- | onNext(a)  // "a" 발행
INFO reactor.Flux.DistinctFuseable.1 -- | onNext(b)  // "b" 발행
INFO reactor.Flux.DistinctFuseable.1 -- | onNext(c)  // "c" 발행
INFO reactor.Flux.DistinctFuseable.1 -- | onComplete()  // 스트림 완료
  • onSubscribe: 스트림이 구독되었을 때 발생합니다.
  • request(unbounded): 제한 없이 데이터 요청이 이루어졌음을 나타냅니다.
  • onNext("a"), onNext("b"), onNext("c"): 중복을 제거한 값들이 순차적으로 발행됩니다.
  • onComplete(): 모든 값이 발행되고 스트림이 정상적으로 완료되었음을 나타냅니다.

요약

  • fluxDistinct() 메서드는 중복된 값이 포함된 Flux에서 중복을 제거한 후 고유한 값들만 발행합니다.
  • 테스트에서는 중복 제거 후 "a", "b", "c"가 올바르게 발행되고, 스트림이 정상적으로 완료되었는지 확인합니다.

reduce() 메소드

fluxReduce() 메서드

public Mono<Long> fluxReduce() {
    return Flux.range(1, 10)
               .reduce(0L, (i, j) -> i + j)
               .log();
}

설명:

  • Flux.range(1, 10): 1부터 10까지의 숫자를 발행하는 Flux를 생성합니다.
  • reduce(0L, (i, j) -> i + j): 초기값 0L에서 시작하여 각 값을 누적하여 더합니다. 이 경우 1부터 10까지의 숫자를 모두 더해 결과값 55를 만듭니다.
  • log(): Flux의 동작(onNext, onComplete, onError, onSubscribe)을 로그로 기록하여 스트림 처리 과정을 출력합니다.
@Test
void fluxReduce() {
    StepVerifier.create(operator3.fluxReduce())
                .expectNext(55L)  // 1부터 10까지의 합이 55임을 검증
                .verifyComplete();  // 스트림이 정상적으로 완료되었는지 확인
}

설명:

  1. operator3.fluxReduce(): fluxReduce() 메서드를 호출하여 1부터 10까지의 숫자를 더한 Mono<Long>을 반환받습니다.
  2. StepVerifier.create(): 반환된 Mono<Long>에 대한 테스트를 생성합니다.
  3. expectNext(55L): reduce() 결과로 값 55L이 발행되는지 확인합니다.
  4. verifyComplete(): 스트림이 정상적으로 완료되었는지 검증합니다.

로그 설명

테스트 실행 시 log() 메서드에 의해 출력된 로그는 Mono 스트림의 동작을 추적할 수 있습니다:

INFO reactor.Mono.ReduceSeed.1 -- | onSubscribe([Fuseable] MonoReduceSeed.ReduceSeedSubscriber)  // 구독 시작
INFO reactor.Mono.ReduceSeed.1 -- | request(unbounded)  // 무제한 데이터 요청
INFO reactor.Mono.ReduceSeed.1 -- | onNext(55)  // reduce의 결과로 55 발행
INFO reactor.Mono.ReduceSeed.1 -- | onComplete()  // 스트림 완료
  • onSubscribe: 스트림이 구독되었을 때 발생합니다.
  • request(unbounded): 제한 없이 데이터 요청이 이루어졌음을 나타냅니다.
  • onNext(55): 누적된 결과 값 55가 발행됩니다.
  • onComplete(): 모든 값이 처리되고 스트림이 정상적으로 완료되었음을 나타냅니다.

요약

  • fluxReduce() 메서드는 1부터 10까지의 숫자를 더한 값을 발행하며, 최종적으로 Mono<Long>에 결과값 55를 담습니다.
  • 테스트에서는 이 값이 올바르게 발행되고, 스트림이 정상적으로 완료되었는지 검증합니다.

BackPressure 연산자

  • delaySequence / limitRate
  • sample

fluxDelayAndLimit() 메서드

public Flux<Integer> fluxDelayAndLimit() {
    return Flux.range(1, 10)
               .delaySequence(Duration.ofSeconds(1))
               .log()
               .limitRate(2);
}

설명:

  • Flux.range(1, 10): 1부터 10까지의 숫자를 발행하는 Flux를 생성합니다.
  • delaySequence(Duration.ofSeconds(1)): 모든 요소의 발행을 1초 지연시킵니다. 즉, 처음 값을 발행하기 전에 1초 동안 대기한 후 연속적으로 값을 발행합니다.
  • log(): Flux의 동작(onNext, onComplete, onError, onSubscribe)을 로그로 기록하여 스트림 처리 과정이 출력됩니다.
  • limitRate(2): 한번에 요청할 값의 수를 2로 제한합니다. 즉, 두 개의 값을 처리한 후 다시 두 개의 값을 요청하는 방식으로 값을 제한적으로 요청합니다.
@Test
void fluxDelayAndLimit() {
    StepVerifier.create(operator4.fluxDelayAndLimit())
                .expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)  // 1부터 10까지의 숫자가 순차적으로 발행됨
                .verifyComplete();  // 스트림이 정상적으로 완료되었는지 확인
}

설명:

  1. operator4.fluxDelayAndLimit(): fluxDelayAndLimit() 메서드를 호출하여 1초 지연 후 1부터 10까지의 숫자를 발행하는 Flux<Integer>를 반환받습니다.
  2. StepVerifier.create(): 반환된 Flux<Integer>에 대한 테스트를 생성합니다.
  3. expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10): 숫자 1부터 10까지 순차적으로 발행되는지 확인합니다.
  4. verifyComplete(): 스트림이 정상적으로 완료되었는지 검증합니다.

로그 설명

테스트 실행 시 log() 메서드에 의해 출력된 로그는 Flux 스트림의 동작을 추적할 수 있습니다:

INFO reactor.Flux.DelaySequence.1 -- onSubscribe(SerializedSubscriber)  // 구독 시작
INFO reactor.Flux.DelaySequence.1 -- request(2)  // 처음 2개의 값 요청
INFO reactor.Flux.DelaySequence.1 -- onNext(1)  // 첫 번째 값 발행
INFO reactor.Flux.DelaySequence.1 -- onNext(2)  // 두 번째 값 발행
INFO reactor.Flux.DelaySequence.1 -- request(2)  // 다음 2개의 값 요청
INFO reactor.Flux.DelaySequence.1 -- onNext(3)  // 세 번째 값 발행
INFO reactor.Flux.DelaySequence.1 -- onNext(4)  // 네 번째 값 발행
INFO reactor.Flux.DelaySequence.1 -- request(2)  // 또 다음 2개의 값 요청
INFO reactor.Flux.DelaySequence.1 -- onNext(5)  // 다섯 번째 값 발행
INFO reactor.Flux.DelaySequence.1 -- onNext(6)  // 여섯 번째 값 발행
INFO reactor.Flux.DelaySequence.1 -- request(2)  // 다시 2개의 값 요청
INFO reactor.Flux.DelaySequence.1 -- onNext(7)  // 일곱 번째 값 발행
INFO reactor.Flux.DelaySequence.1 -- onNext(8)  // 여덟 번째 값 발행
INFO reactor.Flux.DelaySequence.1 -- request(2)  // 또다시 2개의 값 요청
INFO reactor.Flux.DelaySequence.1 -- onNext(9)  // 아홉 번째 값 발행
INFO reactor.Flux.DelaySequence.1 -- onNext(10)  // 열 번째 값 발행
INFO reactor.Flux.DelaySequence.1 -- onComplete()  // 스트림 완료
  • onSubscribe: 스트림이 구독되었을 때 발생합니다.
  • request(2): 한번에 두 개의 값을 요청합니다.
  • onNext(x): 요청된 값이 순차적으로 발행됩니다.
  • onComplete(): 모든 값이 처리되고 스트림이 정상적으로 완료되었음을 나타냅니다.

요약

  • fluxDelayAndLimit() 메서드는 1초 지연 후 1부터 10까지의 숫자를 발행하며, 한번에 두 개의 값을 요청하는 방식으로 limitRate(2)를 사용합니다.
  • 테스트에서는 숫자 1부터 10까지 순차적으로 발행되고, 스트림이 정상적으로 완료되었는지 검증합니다.

fluxSample() 메서드

public Flux<Integer> fluxSample() {
    return Flux.range(1, 100)
               .delayElements(Duration.ofMillis(100))
               .sample(Duration.ofMillis(300))
               .log();
}

설명:

  • Flux.range(1, 100): 1부터 100까지의 정수를 발행하는 Flux를 생성합니다.
  • delayElements(Duration.ofMillis(100)): 각 요소를 100밀리초 간격으로 발행합니다. 즉, 1초에 약 10개의 숫자가 발행됩니다.
  • sample(Duration.ofMillis(300)): 300밀리초마다 가장 최근에 발행된 값을 선택하여 발행합니다. 이는 3개의 값 중 가장 최근 값이 샘플링되어 발행되는 것을 의미합니다.
@Test
void fluxSample() {
    StepVerifier.create(operator4.fluxSample())
                .expectNextCount(10)  // 10개의 샘플링된 값이 발행되었는지 확인
                .thenCancel()          // 테스트 중간에 스트림을 취소하여 완료를 검증
                .verify();             // 검증
}

설명:

  1. operator4.fluxSample(): fluxSample() 메서드를 호출하여 1부터 100까지의 숫자를 100밀리초 간격으로 발행하고 300밀리초 간격으로 샘플링하는 Flux<Integer>를 반환합니다.
  2. StepVerifier.create(): Flux<Integer>에 대한 테스트를 생성합니다.
  3. expectNextCount(10): 샘플링된 값이 10개가 발행되는지 확인합니다.
  4. thenCancel(): 스트림을 강제로 취소합니다. 이를 통해 스트림이 완료되기 전에 중간에 취소하는 시나리오를 테스트할 수 있습니다.
  5. verify(): 테스트를 실행하고, 검증이 완료되었는지 확인합니다.

로그 설명

테스트 실행 시 log() 메서드에 의해 출력된 로그는 Flux 스트림의 동작을 추적할 수 있습니다:

INFO reactor.Flux.Sample.1 -- onSubscribe(FluxSample.SampleMainSubscriber)  // 구독 시작
INFO reactor.Flux.Sample.1 -- request(unbounded)                            // 제한 없이 데이터 요청
INFO reactor.Flux.Sample.1 -- onNext(2)                                     // 첫 번째 샘플링된 값(2) 발행
INFO reactor.Flux.Sample.1 -- onNext(5)                                     // 두 번째 샘플링된 값(5) 발행
INFO reactor.Flux.Sample.1 -- onNext(8)                                     // 세 번째 샘플링된 값(8) 발행
INFO reactor.Flux.Sample.1 -- onNext(11)                                    // 네 번째 샘플링된 값(11) 발행
INFO reactor.Flux.Sample.1 -- onNext(14)                                    // 다섯 번째 샘플링된 값(14) 발행
INFO reactor.Flux.Sample.1 -- onNext(17)                                    // 여섯 번째 샘플링된 값(17) 발행
INFO reactor.Flux.Sample.1 -- onNext(20)                                    // 일곱 번째 샘플링된 값(20) 발행
INFO reactor.Flux.Sample.1 -- onNext(23)                                    // 여덟 번째 샘플링된 값(23) 발행
INFO reactor.Flux.Sample.1 -- onNext(26)                                    // 아홉 번째 샘플링된 값(26) 발행
INFO reactor.Flux.Sample.1 -- onNext(29)                                    // 열 번째 샘플링된 값(29) 발행
INFO reactor.Flux.Sample.1 -- cancel()                                      // 스트림 취소
  • onSubscribe: 스트림이 구독되었음을 나타냅니다.
  • request(unbounded): 제한 없이 값을 요청하고 있다는 로그입니다.
  • onNext(x): 샘플링된 값이 발행될 때마다 로그로 출력됩니다.
  • cancel(): StepVerifier에 의해 스트림이 취소되었음을 나타냅니다.

요약

  • fluxSample() 메서드는 1부터 100까지의 숫자를 100밀리초 간격으로 발행하고, 300밀리초마다 그 중 가장 최근에 발행된 값을 선택하여 발행합니다.
  • 테스트에서는 10개의 샘플링된 값이 발행된 후 스트림을 취소하고, 스트림이 정상적으로 동작했는지를 검증합니다.
  • 로그를 통해 샘플링된 값이 정확히 발행되고 스트림이 중간에 취소되었음을 확인할 수 있습니다.

Schedulers

Schedulers는 Reactor (Reactive Streams) 라이브러리에서 비동기 작업을 실행할 스레드 풀을 관리하는 클래스입니다. 비동기적 또는 병렬 처리 작업을 어느 스레드에서 실행할지를 결정하고, 다양한 스케줄링 전략을 제공합니다.

Schedulers는 작업을 처리할 때 사용할 스레드를 지정하거나, 병렬 처리, 단일 스레드 처리, 또는 즉시 실행과 같은 방식으로 비동기 실행 환경을 설정하는 데 사용됩니다.

역할:

  • 스레드 관리: 특정 작업이 어떤 스레드에서 실행될지를 제어합니다.
  • 비동기 처리 지원: 비동기적으로 실행할 수 있도록 스케줄러를 제공하여, 리액티브 스트림에서 고성능 처리를 가능하게 합니다.
  • 다양한 실행 환경 제공: 병렬 처리, I/O 작업, 단일 스레드 실행 등 다양한 실행 패턴을 지원합니다.

Schedulers 메소드

  1. Schedulers.immediate(): 현재 호출하는 스레드에서 즉시 실행합니다. 별도의 스레드 전환 없이 동기적으로 작업을 수행합니다.
  2. Schedulers.single(): 단일 스레드에서 작업을 실행하며, 해당 스레드가 여러 작업을 순차적으로 처리합니다.
  3. Schedulers.parallel(): 고성능 작업을 위해 고정된 크기의 스레드 풀(기본적으로 CPU 코어 수)을 사용하여 병렬로 작업을 실행합니다.
  4. Schedulers.boundedElastic(): I/O 블로킹 작업에 적합하며, 필요에 따라 새로운 스레드를 생성하지만 스레드 수는 제한되어 있습니다.

fluxMapWithSubscribeOn() 메서드

public Flux<Integer> fluxMapWithSubscribeOn() {
    return Flux.range(1, 10)
               .map(I -> I * 2)  // 각 값을 2배로 변환
               .subscribeOn(Schedulers.boundedElastic())  // I/O 작업에 적합한 스케줄러 사용
               .log();
}

설명:

  • Flux.range(1, 10): 1부터 10까지의 정수를 발행하는 Flux를 생성합니다.
  • map(I -> I * 2): 각 발행된 값을 받아 2배로 변환합니다. 즉, 1은 2로, 2는 4로 변환하는 식으로 진행됩니다.
  • subscribeOn(Schedulers.boundedElastic()): boundedElastic 스케줄러를 사용하여 작업을 비동기적으로 실행합니다. 이 스케줄러는 주로 I/O 작업이나 블로킹 작업에 적합하며, 필요에 따라 스레드를 동적으로 생성하되 제한된 범위 내에서 생성합니다.
@Test
void fluxMapWithSubscribeOn() {
    StepVerifier.create(scheduler1.fluxMapWithSubscribeOn())
                .expectNextCount(10)  // 10개의 변환된 값(2, 4, 6, ..., 20)이 발행되는지 확인
                .verifyComplete();     // 스트림이 정상적으로 완료되었는지 검증
}

설명:

  1. scheduler1.fluxMapWithSubscribeOn(): fluxMapWithSubscribeOn() 메서드를 호출하여 1부터 10까지의 숫자를 2배로 변환하고, boundedElastic 스케줄러에서 비동기적으로 실행되는 Flux<Integer>를 반환합니다.
  2. expectNextCount(10): 10개의 변환된 값(2, 4, 6, ..., 20)이 발행되는지 확인합니다.
  3. verifyComplete(): 스트림이 정상적으로 완료되었는지 확인합니다.

로그 설명

테스트 실행 시 log() 메서드에 의해 출력된 로그는 Flux 스트림의 동작을 추적할 수 있습니다:

INFO reactor.Flux.SubscribeOn.1 -- onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)  // 구독 시작
INFO reactor.Flux.SubscribeOn.1 -- request(unbounded)                                  // 제한 없이 데이터 요청
INFO reactor.Flux.SubscribeOn.1 -- onNext(2)                                           // 변환된 값(2) 발행
INFO reactor.Flux.SubscribeOn.1 -- onNext(4)                                           // 변환된 값(4) 발행
INFO reactor.Flux.SubscribeOn.1 -- onNext(6)                                           // 변환된 값(6) 발행
INFO reactor.Flux.SubscribeOn.1 -- onNext(8)                                           // 변환된 값(8) 발행
INFO reactor.Flux.SubscribeOn.1 -- onNext(10)                                          // 변환된 값(10) 발행
INFO reactor.Flux.SubscribeOn.1 -- onNext(12)                                          // 변환된 값(12) 발행
INFO reactor.Flux.SubscribeOn.1 -- onNext(14)                                          // 변환된 값(14) 발행
INFO reactor.Flux.SubscribeOn.1 -- onNext(16)                                          // 변환된 값(16) 발행
INFO reactor.Flux.SubscribeOn.1 -- onNext(18)                                          // 변환된 값(18) 발행
INFO reactor.Flux.SubscribeOn.1 -- onNext(20)                                          // 변환된 값(20) 발행
INFO reactor.Flux.SubscribeOn.1 -- onComplete()                                        // 스트림 완료
  • onSubscribe: 스트림이 구독되었음을 나타냅니다.
  • request(unbounded): 제한 없이 데이터를 요청하고 있다는 로그입니다.
  • onNext(x): 변환된 값이 발행될 때마다 해당 값을 기록합니다.
  • onComplete(): 모든 값이 정상적으로 처리되고 스트림이 완료되었음을 나타냅니다.

요약

  • fluxMapWithSubscribeOn() 메서드는 1부터 10까지의 값을 2배로 변환하고, boundedElastic 스케줄러에서 비동기적으로 실행합니다.
  • 테스트에서는 10개의 변환된 값이 정상적으로 발행되고 스트림이 완료되는지 확인합니다.

코드 및 테스트 설명

fluxMapWithPublishOn() 메서드

public Flux<Integer> fluxMapWithPublishOn() {
    return Flux.range(1, 10)
               .map(i -> i + 1)  // 각 값을 1씩 증가
               .publishOn(Schedulers.boundedElastic())  // boundedElastic 스케줄러로 스레드 전환
               .log()  // 로그 출력
               .publishOn(Schedulers.parallel())  // parallel 스케줄러로 스레드 전환
               .log()  // 로그 출력
               .map(i -> i * 2);  // 각 값을 2배로 변환
}

설명:

  1. Flux.range(1, 10): 1부터 10까지의 숫자를 발행하는 Flux를 생성합니다.
  2. map(i -> i + 1): 발행된 값을 받아 1씩 증가시킵니다. (예: 1 → 2, 2 → 3 ...)
  3. publishOn(Schedulers.boundedElastic()): boundedElastic 스케줄러에서 해당 작업을 실행하도록 설정합니다. 주로 I/O 작업이나 블로킹 작업을 처리하는 스케줄러입니다.
  4. log(): 스트림의 동작을 로그로 출력합니다.
  5. publishOn(Schedulers.parallel()): parallel 스케줄러에서 병렬로 처리하도록 스레드를 전환합니다. 주로 CPU 집약적인 작업을 위해 사용됩니다.
  6. map(i -> i * 2): 발행된 값을 받아 2배로 변환합니다.

테스트 메서드

@Test
void fluxMapWithPublishOn() {
    StepVerifier.create(scheduler1.fluxMapWithPublishOn())
                .expectNextCount(10)  // 10개의 값이 발행되었는지 확인
                .verifyComplete();     // 스트림이 정상적으로 완료되었는지 검증
}

설명:

  1. StepVerifier.create(): fluxMapWithPublishOn() 메서드가 반환하는 Flux<Integer>를 테스트합니다.
  2. expectNextCount(10): 변환된 10개의 값이 정상적으로 발행되었는지 확인합니다.
  3. verifyComplete(): 스트림이 정상적으로 완료되었는지 검증합니다.

로그 설명

테스트 실행 시 log() 메서드에 의해 출력된 로그는 Flux 스트림의 동작을 추적할 수 있습니다:

INFO reactor.Flux.PublishOn.1 -- | onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)  // 스트림 구독 시작
INFO reactor.Flux.PublishOn.2 -- | request(unbounded)                                        // 제한 없이 데이터 요청
INFO reactor.Flux.PublishOn.1 -- | onNext(2)                                                 // boundedElastic 스레드에서 처리
INFO reactor.Flux.PublishOn.2 -- | onNext(2)                                                 // parallel 스레드에서 처리
INFO reactor.Flux.PublishOn.1 -- | onNext(3)
INFO reactor.Flux.PublishOn.2 -- | onNext(3)
...
INFO reactor.Flux.PublishOn.1 -- | onComplete()                                              // boundedElastic 스레드 완료
INFO reactor.Flux.PublishOn.2 -- | onComplete()                                              // parallel 스레드 완료
  • publishOn(Schedulers.boundedElastic())에 의해 boundedElastic 스케줄러로 전환된 후, 값이 처리됩니다.
  • 이어서 publishOn(Schedulers.parallel())에 의해 parallel 스케줄러에서 다시 값이 처리됩니다.
  • onComplete() 로그가 발생하며 스트림이 정상적으로 완료됩니다.

요약

  • fluxMapWithPublishOn() 메서드는 1부터 10까지의 값을 1씩 증가시키고, 두 번의 publishOn을 통해 스레드를 boundedElastic과 parallel로 전환한 후, 값을 2배로 변환하여 발행합니다.
  • 테스트는 10개의 변환된 값이 정상적으로 발행되고 스트림이 완료되는지를 확인합니다.
  • 로그를 통해 스레드 전환 및 스트림의 각 동작을 추적할 수 있으며, 모든 값이 두 스케줄러에서 제대로 처리된 후 정상적으로 완료된 것을 알 수 있습니다.
저작자표시 (새창열림)

'프로그래밍 언어 > 스프링부트' 카테고리의 다른 글

webflux - CPU Bound vs IO Bound  (3) 2024.09.17
Redis Pub/Sub과 Spring Boot를 활용한 실시간 알림 시스템 구현  (0) 2024.09.15
Spring Session  (1) 2024.09.15
OneToMany 관계 설정 시 필드 타입 설정은 뭘로 하나?  (1) 2024.05.28
'프로그래밍 언어/스프링부트' 카테고리의 다른 글
  • webflux - CPU Bound vs IO Bound
  • Redis Pub/Sub과 Spring Boot를 활용한 실시간 알림 시스템 구현
  • Spring Session
  • OneToMany 관계 설정 시 필드 타입 설정은 뭘로 하나?
hyeseong-dev
hyeseong-dev
안녕하세요. 백엔드 개발자 이혜성입니다.
  • hyeseong-dev
    어제 오늘 그리고 내일
    hyeseong-dev
  • 전체
    오늘
    어제
    • 분류 전체보기 (283)
      • 여러가지 (108)
        • 알고리즘 & 자료구조 (73)
        • 오류 (4)
        • 이것저것 (29)
        • 일기 (1)
      • 프레임워크 (39)
        • 자바 스프링 (39)
        • React Native (0)
      • 프로그래밍 언어 (39)
        • 파이썬 (31)
        • 자바 (3)
        • 스프링부트 (5)
      • 컴퓨터 구조와 운영체제 (3)
      • DB (17)
        • SQL (0)
        • Redis (17)
      • 클라우드 컴퓨팅 (2)
        • 도커 (2)
        • AWS (0)
      • 스케쥴 (65)
        • 세미나 (0)
        • 수료 (0)
        • 스터디 (24)
        • 시험 (41)
      • 트러블슈팅 (1)
      • 자격증 (0)
        • 정보처리기사 (0)
      • 재태크 (0)
        • 암호화폐 (0)
        • 기타 (0)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

  • 공지사항

  • 인기 글

  • 태그

    java
    취업리부트
    Docker-compose
    그리디
    프로그래머스
    Spring WebFlux
    시험
    reactor
    spring
    mybatis
    EC2
    #개발자포트폴리오 #개발자이력서 #개발자취업 #개발자취준 #코딩테스트 #항해99 #취리코 #취업리부트코스
    파이썬
    백준
    자바
    docker
    항해99
    SAA
    AWS
    FastAPI
    완전탐색
    Redis
    ecs
    RDS
    Python
    WebFlux
    DP
    Spring Boot
    OOP
    celery
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.0
hyeseong-dev
Webflux - reactor 실습
상단으로

티스토리툴바