Reactor Core를 이용한 Flux와 Mono 실습
Reactor는 자바 비동기 프로그래밍을 위한 라이브러리로, 주로 Flux
와 Mono
라는 두 가지 기본 타입을 사용하여 데이터 스트림을 처리합니다. 이번 글에서는 Flux
와 Mono
의 차이를 간략히 설명하고, 예제를 통해 그 동작을 살펴보겠습니다.
라이브러리 설정
Reactor와 Spring Boot를 활용한 프로젝트에서는 reactor-core
라이브러리를 추가하여 Flux
와 Mono
를 사용할 수 있습니다. 아래는 필요한 라이브러리를 포함한 build.gradle
설정 예시입니다.
plugins {
id 'java'
id 'org.springframework.boot' version '3.3.3'
id "io.spring.dependency-management" version "1.0.7.RELEASE"
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
java {
toolchain {
languageVersion = JavaLanguageVersion.of(17)
}
}
repositories {
mavenCentral()
}
dependencyManagement {
imports {
mavenBom "io.projectreactor:reactor-bom:2023.0.8"
}
}
dependencies {
// Reactor Core
implementation platform('io.projectreactor:reactor-bom:2023.0.8')
implementation 'io.projectreactor:reactor-core'
implementation 'org.springframework.boot:spring-boot-starter'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
Flux와 Mono란?
Flux
Flux
는 0부터 N개의 아이템을 발행할 수 있는 데이터 스트림입니다. Flux
는 주로 아래의 세 가지 이벤트로 구성됩니다.
onNext(n)
: 데이터가 발행될 때 호출되는 이벤트.onComplete
: 스트림이 정상적으로 끝났을 때 호출되는 이벤트.onError
: 스트림 처리 중 오류가 발생했을 때 호출되는 이벤트.
Mono
Mono
는 0 또는 1개의 아이템을 발행할 수 있는 데이터 스트림입니다. Mono
도 onNext
, onComplete
, onError
이벤트를 가지며, 단일 값 발행이라는 점에서 Flux
와 차이가 있습니다.
실습 코드
Flux 실습
1. ReactorExApplication.java
package com.example.reactorex;
public class ReactorExApplication {
public static void main(String[] args) {
Publisher publisher = new Publisher();
publisher.startFlux().subscribe(System.out::println);
}
}
2. Publisher.java
package com.example.reactorex;
import reactor.core.publisher.Flux;
public class Publisher {
public Flux<Integer> startFlux() {
return Flux.range(1, 10).log();
}
}
startFlux()
메소드는 Flux.range(1, 10)
을 사용하여 1부터 10까지의 숫자를 발행하며, log()
로 이벤트 흐름을 기록합니다.
실행 결과 로그 분석
21:28:00.218 [main] INFO reactor.Flux.Range.1 -- | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
21:28:00.220 [main] INFO reactor.Flux.Range.1 -- | request(unbounded)
21:28:00.220 [main] INFO reactor.Flux.Range.1 -- | onNext(1)
...
21:28:00.221 [main] INFO reactor.Flux.Range.1 -- | onComplete()
- onSubscribe:
Flux
가 구독되었음을 나타냅니다. - request(unbounded): 구독자가 데이터를 무제한으로 요청했음을 의미합니다.
- onNext: 각 숫자(1~10)가 순차적으로 발행됩니다.
- onComplete: 모든 값이 발행된 후 스트림이 정상적으로 종료됩니다.
Mono 실습 1: 단일 값 발행
1. ReactorExApplication.java
package com.example.reactorex;
public class ReactorExApplication {
public static void main(String[] args) {
Publisher publisher = new Publisher();
publisher.startMono().subscribe();
}
}
2. Publisher.java
package com.example.reactorex;
import reactor.core.publisher.Mono;
public class Publisher {
public Mono<Integer> startMono() {
return Mono.just(1).log();
}
}
startMono()
메소드는 Mono.just(1)
을 사용하여 단일 값을 발행합니다.
실행 결과 로그 분석
21:28:00.224 [main] INFO reactor.Mono.Just.1 -- | onSubscribe([Synchronous Fuseable] MonoJust)
21:28:00.224 [main] INFO reactor.Mono.Just.1 -- | request(unbounded)
21:28:00.224 [main] INFO reactor.Mono.Just.1 -- | onNext(1)
21:28:00.224 [main] INFO reactor.Mono.Just.1 -- | onComplete()
- onSubscribe:
Mono
가 구독되었음을 나타냅니다. - onNext:
Mono
가 발행한 단일 값(1)이 구독자에게 전달됩니다. - onComplete: 값이 발행된 후 스트림이 종료됩니다.
Mono 실습 2: 빈 스트림 처리
ReactorExApplication.java
package com.example.reactorex;
public class ReactorExApplication {
public static void main(String[] args) {
Publisher publisher = new Publisher();
publisher.startMono2().subscribe();
}
}
Publisher.java
package com.example.reactorex;
import reactor.core.publisher.Mono;
public class Publisher {
public Mono<?> startMono2() {
return Mono.empty().log();
}
}
startMono2()
메소드는 데이터를 발행하지 않는 빈 스트림 Mono.empty()
를 반환합니다.
실행 결과 로그 분석
21:51:09.536 [main] INFO reactor.Mono.Empty.1 -- onSubscribe([Fuseable] Operators.EmptySubscription)
21:51:09.538 [main] INFO reactor.Mono.Empty.1 -- request(unbounded)
21:51:09.538 [main] INFO reactor.Mono.Empty.1 -- onComplete()
- onSubscribe: 빈 스트림에 대한 구독이 시작됩니다.
- onComplete: 발행할 데이터가 없으므로 바로 스트림이 완료됩니다.
요약
Reactor의 Flux
와 Mono
는 각각 다수 또는 단일 값을 처리하는 반응형 스트림을 제공합니다. Flux
는 다중 데이터를, Mono
는 단일 데이터 또는 빈 값을 처리할 때 적합합니다. 이를 통해 비동기 프로그래밍을 쉽게 구현할 수 있으며, 효율적인 데이터 흐름 제어가 가능합니다.
Reactor Test (feat. StepVerifier)
Reactor는 비동기 스트림의 동작을 검증할 수 있는 도구인 StepVerifier를 제공합니다. 이를 사용하면 Flux
와 Mono
의 발행 순서, 완료 여부, 오류 발생 여부 등을 쉽게 테스트할 수 있습니다.
StepVerifier란?
StepVerifier
는 리액티브 스트림을 테스트할 수 있는 도구로, 데이터 발행 흐름을 단계별로 검증할 수 있습니다. 이를 통해 비동기 스트림의 정상 동작 여부뿐만 아니라, 오류 처리 등도 쉽게 테스트할 수 있습니다.
build.gradle 설정
StepVerifier를 사용하기 위해 reactor-test
라이브러리를 추가해야 합니다.
dependencies {
// Reactor Test 라이브러리 추가
implementation 'io.projectreactor:reactor-test'
}
실습 코드
Publisher
클래스는 다양한 Flux
와 Mono
를 발행하며, 이를 StepVerifier로 테스트할 수 있습니다.
package com.example.reactorex;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.List;
public class Publisher {
public Flux<Integer> startFlux() {
return Flux.range(1, 10).log();
}
public Flux<String> startFlux2() {
return Flux.fromIterable(List.of("a", "b", "c")).log();
}
public Mono<Integer> startMono() {
return Mono.just(1).log();
}
public Mono<?> startMono2() {
return Mono.empty().log();
}
public Mono<?> startMono3() {
return Mono.error(new Exception("hello reactor"));
}
}
테스트 코드 작성
각 메소드의 동작을 StepVerifier
로 테스트합니다.
package com.example.reactorex;
import org.junit.jupiter.api.Test;
import reactor.test.StepVerifier;
class PublisherTest {
private final Publisher publisher = new Publisher();
// Flux 테스트
@Test
void startFlux() {
StepVerifier.create(publisher.startFlux())
.expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.verifyComplete();
}
// Flux2 테스트
@Test
void startFlux2() {
StepVerifier
.create(publisher.startFlux2())
.expectNext("a", "b", "c")
.verifyComplete();
}
// Mono 테스트
@Test
void startMono() {
StepVerifier.create(publisher.startMono())
.expectNext(1)
.verifyComplete();
}
// Mono2 테스트 (빈 스트림)
@Test
void startMono2() {
StepVerifier.create(publisher.startMono2())
.verifyComplete();
}
// Mono3 테스트 (오류 발생)
@Test
void startMono3() {
StepVerifier.create(publisher.startMono3())
.expectError(Exception.class)
.verify();
}
}
테스트 시나리오 분석
startFlux()
테스트:Flux.range(1, 10)
에서 1부터 10까지의 숫자가 순차적으로 발행되는 것을 검증합니다.startFlux2()
테스트:Flux.fromIterable(List.of("a", "b", "c"))
에서 문자열"a"
,"b"
,"c"
가 발행되는지 검증합니다.startMono()
테스트:Mono.just(1)
에서 단일 값1
이 발행되는지 확인합니다.startMono2()
테스트: 빈 스트림Mono.empty()
가 정상적으로 완료되는지 검증합니다.startMono3()
테스트:Mono.error(new Exception("hello reactor"))
에서 오류가 발생하는지 확인합니다.
요약
StepVerifier
는 리액티브 스트림을 쉽게 검증할 수 있는 도구로, 비동기 프로그램의 안정성을 높일 수 있습니다. Flux
와 Mono
의 발행 순서, 완료 여부, 오류 처리까지 모든 면에서 테스트를 쉽게 구성할 수 있어 리액티브 프로그램의 정확한 동작을 보장할 수 있습니다.
Operator
Reactor Core에서 제공하는 주요 Operator 중 map, filter, take, 그리고 flatMap은 각각 데이터 스트림을 조작하거나 필터링하는 데 사용됩니다. 이 메소드들은 Flux와 Mono에서 비동기 데이터 처리의 다양한 패턴을 구현하는 데 매우 유용합니다. 각각의 동작 방식과 예시를 살펴보겠습니다.
Reactor Core에서 제공하는 주요 Operator 중 map
, filter
, take
, 그리고 flatMap
은 각각 데이터 스트림을 조작하거나 필터링하는 데 사용됩니다. 이 메소드들은 Flux
와 Mono
에서 비동기 데이터 처리의 다양한 패턴을 구현하는 데 매우 유용합니다. 각각의 동작 방식과 예시를 살펴보겠습니다.
1. map
map
은 데이터 스트림 내의 각각의 요소를 다른 형태로 변환할 때 사용됩니다. 함수형 프로그래밍에서 말하는 "맵핑"의 개념으로, 입력 값을 받아 새로운 값을 반환하는 함수를 적용합니다. 기존의 값을 변환하지만 스트림의 크기나 순서를 변경하지는 않습니다.
예시
package com.example.reactorex;
import reactor.core.publisher.Flux;
public class Operator1 {
public Flux<Integer> fluxMap(){
return Flux.range(1, 5)
.map(i -> i * 2)
.log();
}
}
테스트
package com.example.reactorex;
import org.junit.jupiter.api.Test;
import reactor.test.StepVerifier;
import static org.junit.jupiter.api.Assertions.*;
class Operator1Test {
private Operator1 operator1 = new Operator1();
@Test
void fluxMap() {
StepVerifier.create(operator1.fluxMap())
.expectNext(2, 4, 6, 8, 10)
.verifyComplete();
}
}
> Task :compileJava
> Task :processResources UP-TO-DATE
> Task :classes
> Task :compileTestJava
> Task :processTestResources NO-SOURCE
> Task :testClasses
19:27:45.128 [Test worker] INFO reactor.Flux.MapFuseable.1 -- | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
19:27:45.131 [Test worker] INFO reactor.Flux.MapFuseable.1 -- | request(unbounded)
19:27:45.132 [Test worker] INFO reactor.Flux.MapFuseable.1 -- | onNext(2)
19:27:45.132 [Test worker] INFO reactor.Flux.MapFuseable.1 -- | onNext(4)
19:27:45.132 [Test worker] INFO reactor.Flux.MapFuseable.1 -- | onNext(6)
19:27:45.132 [Test worker] INFO reactor.Flux.MapFuseable.1 -- | onNext(8)
19:27:45.132 [Test worker] INFO reactor.Flux.MapFuseable.1 -- | onNext(10)
19:27:45.132 [Test worker] INFO reactor.Flux.MapFuseable.1 -- | onComplete()
> Task :test
BUILD SUCCESSFUL in 2s
4 actionable tasks: 3 executed, 1 up-to-date
7:27:45 PM: Execution finished ':test --tests "com.example.reactorex.Operator1Test.fluxMap"'.
2. filter
filter
는 조건에 맞는 데이터만 남기고, 나머지는 필터링하여 제거하는 역할을 합니다. 즉, 특정 조건을 만족하는 요소만을 데이터 스트림에 남기고 나머지는 제거합니다.
예시
public class Operator1 {
public Flux<Integer> fluxFilter(){
return Flux.range(1, 10)
.filter(i -> i>5)
.log();
}
}
테스트
class Operator1Test {
private Operator1 operator1 = new Operator1();
@Test
void fluxFilter(){
StepVerifier.create(operator1.fluxFilter())
.expectNext(6, 7, 8,9, 10)
.verifyComplete();
}
}
> Task :compileJava
> Task :processResources UP-TO-DATE
> Task :classes
> Task :compileTestJava
> Task :processTestResources NO-SOURCE
> Task :testClasses
19:32:57.338 [Test worker] INFO reactor.Flux.FilterFuseable.1 -- | onSubscribe([Fuseable] FluxFilterFuseable.FilterFuseableSubscriber)
19:32:57.341 [Test worker] INFO reactor.Flux.FilterFuseable.1 -- | request(unbounded)
19:32:57.341 [Test worker] INFO reactor.Flux.FilterFuseable.1 -- | onNext(6)
19:32:57.341 [Test worker] INFO reactor.Flux.FilterFuseable.1 -- | onNext(7)
19:32:57.341 [Test worker] INFO reactor.Flux.FilterFuseable.1 -- | onNext(8)
19:32:57.341 [Test worker] INFO reactor.Flux.FilterFuseable.1 -- | onNext(9)
19:32:57.341 [Test worker] INFO reactor.Flux.FilterFuseable.1 -- | onNext(10)
19:32:57.342 [Test worker] INFO reactor.Flux.FilterFuseable.1 -- | onComplete()
> Task :test
BUILD SUCCESSFUL in 907ms
4 actionable tasks: 3 executed, 1 up-to-date
7:32:57 PM: Execution finished ':test --tests "com.example.reactorex.Operator1Test.fluxFilter"'.
3. take
take
는 데이터 스트림에서 처음부터 지정한 개수만큼의 요소만을 발행합니다. 주로 스트림의 데이터를 일정 부분만 취할 때 사용합니다. 예를 들어, 첫 N개의 요소만 처리하고 싶을 때 유용합니다.
예시
public class Operator1 {
public Flux<Integer> fluxFilterTake(){
return Flux.range(1, 10)
.filter(i -> i>5)
.take(3)
.log();
}
}
테스트
@Test
void fluxFilterTake(){
StepVerifier.create(operator1.fluxFilterTake())
.expectNext(6, 7, 8)
.verifyComplete();
}
> Task :compileJava UP-TO-DATE
> Task :processResources UP-TO-DATE
> Task :classes UP-TO-DATE
> Task :compileTestJava
> Task :processTestResources NO-SOURCE
> Task :testClasses
19:37:03.953 [Test worker] INFO reactor.Flux.LimitRequest.1 -- onSubscribe(FluxLimitRequest.FluxLimitRequestSubscriber)
19:37:03.956 [Test worker] INFO reactor.Flux.LimitRequest.1 -- request(unbounded)
19:37:03.956 [Test worker] INFO reactor.Flux.LimitRequest.1 -- onNext(6)
19:37:03.956 [Test worker] INFO reactor.Flux.LimitRequest.1 -- onNext(7)
19:37:03.956 [Test worker] INFO reactor.Flux.LimitRequest.1 -- onNext(8)
19:37:03.956 [Test worker] INFO reactor.Flux.LimitRequest.1 -- onComplete()
> Task :test
BUILD SUCCESSFUL in 771ms
4 actionable tasks: 2 executed, 2 up-to-date
7:37:03 PM: Execution finished ':test --tests "com.example.reactorex.Operator1Test.fluxFilterTake"'.
4. flatMap
flatMap
은 map
과 유사하지만, 함수가 반환하는 값이 또 다른 Flux
나 Mono
일 때 사용합니다. 이를 통해 각 요소를 비동기로 처리할 수 있으며, 여러 비동기 작업의 결과를 하나의 Flux
로 합쳐서 처리할 수 있습니다. 즉, 각 요소에 대해 비동기 작업을 수행하고 그 결과를 다시 하나의 스트림으로 병합합니다.
예시
public Flux<Integer> fluxFlatMap(){
return Flux.range(1, 10)
.flatMap(i -> Flux.range(i * 10, 10)
.delayElements(Duration.ofMillis(100)))
.log();
}
동작 방식
- Flux.range(1, 10): 1부터 9까지의 값을 발행하는 Flux를 생성합니다.
- flatMap(i -> Flux.range(i * 10, 10)): 각 i에 대해 i * 10부터 시작하는 새로운 Flux가 생성됩니다. 예를 들어, i = 1일 때 Flux.range(10, 10)을 생성해 10부터 19까지 발행합니다.
- delayElements(Duration.ofMillis(100)): 각 값이 100ms의 지연을 두고 발행됩니다.
- log(): 리액티브 스트림의 모든 이벤트(onNext, onComplete, onError)가 로그로 출력됩니다.
flatMap()과 병렬 처리
- flatMap()은 각 Publisher가 비동기적으로 병렬 실행되기 때문에, 발행된 값의 순서가 보장되지 않습니다. 즉, 여러 개의 Flux.range()가 병렬로 실행되면서 값이 순서와 상관없이 발행됩니다.
테스트
@Test
void fluxFlatMap(){
StepVerifier.create(operator1.fluxFlatMap())
.expectNextCount(100) // 100개의 값이 발행되는지 확인
.verifyComplete(); // 스트림이 정상적으로 완료되었는지 확인
}
- StepVerifier.create(operator1.fluxFlatMap()): fluxFlatMap() 메소드를 테스트하기 위해 StepVerifier를 생성합니다.
- expectNextCount(100): 100개의 값이 정상적으로 발행되는지 확인합니다.
- verifyComplete(): 모든 값이 발행된 후 스트림이 정상적으로 완료되었는지 검증합니다.
20:21:17.046 [parallel-10] INFO reactor.Flux.FlatMap.1 -- onNext(100)
20:21:17.048 [parallel-1] INFO reactor.Flux.FlatMap.1 -- onNext(10)
20:21:17.048 [parallel-9] INFO reactor.Flux.FlatMap.1 -- onNext(90)
...
20:21:17.972 [parallel-8] INFO reactor.Flux.FlatMap.1 -- onNext(109)
20:21:17.974 [parallel-6] INFO reactor.Flux.FlatMap.1 -- onComplete()
특징적인 실행 결과
- 비동기 병렬 처리: parallel로 표시된 다양한 스레드(parallel-1, parallel-10, etc.)에서 값들이 동시에 발행되고 있습니다. 이는 flatMap()이 각 스트림을 병렬로 실행하고 있음을 나타냅니다.
- 순서가 보장되지 않음: 값이 순서대로 발행되지 않습니다. 예를 들어, 10, 100, 90 등이 순서 없이 동시에 발행되고 있습니다. 이는 flatMap()의 특징으로, 병렬로 각 스트림이 독립적으로 실행되기 때문입니다.
- 최종적으로 100개의 값 발행: 로그에서 100개의 값이 발행되며, 마지막에 onComplete() 이벤트가 호출되어 스트림이 정상적으로 완료되었습니다.
5. 구구단 만들기
Flux와 flatMap을 사용하여 구구단을 출력하고 테스트하는 코드입니다. Flux.range(1, 9)를 이용해 1부터 9까지의 숫자를 반복하며, 각 숫자에 대해 1부터 9까지의 또 다른 반복을 수행하여 구구단의 결과를 출력합니다.
fluxFlatMap2()
메소드
public Flux<Integer> fluxFlatMap2() {
return Flux.range(1, 9) // 1부터 9까지의 숫자를 발행
.flatMap(i -> Flux.range(1, 9) // 각 i에 대해 1부터 9까지의 숫자를 발행
.map(j -> {
System.out.printf("%d * %d = %d\n", i, j, i * j); // 구구단을 출력
return i * j;
})
);
}
Flux.range(1, 9)
: 1부터 9까지의 값을 발행하는Flux
를 생성합니다. 여기서i
는 1부터 9까지 반복됩니다.flatMap(i -> Flux.range(1, 9))
: 각i
값에 대해 다시 1부터 9까지의 값을 발행하는Flux
를 생성합니다. 이때j
값이 1부터 9까지 순차적으로 변합니다.map(j -> {...})
:i
와j
의 곱셈 결과를 계산하여 출력한 후, 그 값을 반환합니다. 이로 인해 실제로 콘솔에 구구단이 출력됩니다.- 예를 들어,
i = 1
일 때1 * 1
,1 * 2
...1 * 9
까지 출력하고, 이어서i = 2
일 때2 * 1
,2 * 2
...2 * 9
까지 계산하고 출력합니다.
- 예를 들어,
결과적으로
- 1부터 9까지의 숫자(
i
) 각각에 대해, 1부터 9까지의 숫자(j
)를 곱하여 구구단을 출력하고, 그 결과를 발행합니다. - 따라서 출력되는 값은
1 * 1
부터9 * 9 = 81
까지의 총 81개의 값입니다.
StepVerifier
테스트
@Test
void fluxFlatMap2() {
StepVerifier.create(operator1.fluxFlatMap2()) // fluxFlatMap2()를 테스트
.expectNextCount(81) // 총 81개의 값이 발행되는지 확인
.verifyComplete(); // 스트림이 정상적으로 완료되었는지 확인
}
StepVerifier.create(operator1.fluxFlatMap2())
:fluxFlatMap2()
에서 생성된Flux
를 테스트합니다.expectNextCount(81)
:1 * 1
부터9 * 9
까지 계산된 81개의 값이 정확히 발행되었는지 확인합니다.verifyComplete()
: 스트림이 정상적으로 완료되었는지 검증합니다.
테스트 실행 결과
콘솔에 구구단이 출력되며, StepVerifier
는 expectNextCount(81)
로 81개의 값이 정상적으로 발행되었음을 검증합니다.
출력 예시
1 * 1 = 1
1 * 2 = 2
1 * 3 = 3
...
9 * 8 = 72
9 * 9 = 81
요약
fluxFlatMap2()
메소드는 구구단(1부터 9까지)을 계산하여 출력하는 코드입니다.StepVerifier
테스트에서는 총 81개의 구구단 값이 발행되고, 스트림이 정상적으로 완료되는지 확인합니다.
6. concatMap
concatMap
메소드를 사용하는 위 코드는 비동기적으로 데이터를 처리하지만, 처리 순서를 보장하는 리액티브 프로그램을 구현한 예시입니다. 아래에서 코드를 분석하고, 테스트 코드와 실행 결과에 대해 설명하겠습니다.
public Flux<Integer> fluxConcatMap(){
return Flux.range(1, 10)
.concatMap(i -> Flux.range(i * 10, 10) // i * 10부터 10개의 숫자를 발행
.delayElements(Duration.ofMillis(100))) // 각 요소마다 100ms 지연
.log(); // 이벤트 흐름을 로그로 기록
}
이 코드는 다음과 같은 단계로 동작합니다
1. Flux.range(1, 10)
: 1부터 9까지의 값을 순차적으로 발행합니다.
- 예를 들어, 첫 번째 값은 1, 두 번째 값은 2, ... 아홉 번째 값은 9가 발행됩니다.
2. concatMap(i -> Flux.range(i * 10, 10))
: 각 i
값에 대해 i * 10
부터 시작하는 10개의 숫자를 발행하는 새로운 Flux
를 생성합니다.
- 예를 들어,
i = 1
일 때Flux.range(10, 10)
은 10부터 19까지의 값을 발행합니다. i = 2
일 때는Flux.range(20, 10)
이 20부터 29까지의 값을 발행합니다.
3. delayElements(Duration.ofMillis(100))
: 각 요소를 발행할 때마다 100ms의 지연을 추가합니다. 이로 인해 각각의 값은 100ms 간격으로 순차적으로 발행됩니다.
4. log()
: 리액티브 스트림에서 발생하는 모든 이벤트(onSubscribe
, onNext
, onComplete
등)를 로그에 기록합니다.
concatMap
은 flatMap
과 유사하지만, 각 서브스트림을 처리할 때 순서를 보장합니다. 즉, 첫 번째 스트림이 완료된 후 두 번째 스트림이 시작되며, 모든 스트림은 순서대로 실행됩니다.
예상 출력
- 첫 번째로
i = 1
일 때 10부터 19까지의 값이 100ms 간격으로 발행됩니다. - 두 번째로
i = 2
일 때 20부터 29까지의 값이 발행됩니다. - 마지막으로
i = 9
일 때 90부터 99까지 발행됩니다. - 총 100개의 값이 발행되며, 스트림은 순서대로 처리됩니다.
@Test
void fluxConcatMap() {
StepVerifier.create(operator2.fluxConcatMap()) // fluxConcatMap() 메소드를 테스트
.expectNextCount(100) // 총 100개의 값이 발행되는지 확인
.verifyComplete(); // 스트림이 정상적으로 완료되었는지 검증
}
실행 결과 분석
20:10:02.704 [parallel-1] INFO reactor.Flux.ConcatMapNoPrefetch.1 -- onNext(10)
20:10:02.810 [parallel-2] INFO reactor.Flux.ConcatMapNoPrefetch.1 -- onNext(11)
20:10:02.915 [parallel-3] INFO reactor.Flux.ConcatMapNoPrefetch.1 -- onNext(12)
...
20:10:12.886 [parallel-10] INFO reactor.Flux.ConcatMapNoPrefetch.1 -- onNext(109)
20:10:12.887 [parallel-10] INFO reactor.Flux.ConcatMapNoPrefetch.1 -- onComplete()
> Task :test
BUILD SUCCESSFUL in 10s
4 actionable tasks: 2 executed, 2 up-to-date
8:10:12 PM: Execution finished ':test --tests "com.example.reactorex.Operator2Test.fluxConcatMap"'.
concatMap()
concatMap 메소드는 Reactor와 같은 리액티브 프로그래밍에서 사용되는 메소드로, 비동기 처리에서 데이터 순서를 보장하며 여러 개의 Publisher를 연결할 때 사용됩니다. concatMap은 flatMap과 유사하지만, 중요한 차이점은 순서를 보장한다는 것입니다.
동작 방식
concatMap은 각 요소에 대해 순차적으로 Publisher를 생성하고, 각 Publisher가 완료된 후에 다음 Publisher를 실행합니다. 즉, 여러 개의 Publisher를 병렬로 실행하는 것이 아니라, 순차적으로 실행하여 데이터의 순서를 유지합니다.
주요 특징
- 순서 보장: concatMap은 각 요소에 대해 생성된 Publisher가 순차적으로 실행되기 때문에, 최종적으로 발행되는 데이터의 순서가 원본 스트림의 순서를 따릅니다.
- 비동기 처리: 각 Publisher는 비동기적으로 처리되지만, 순차적으로 이어져 실행됩니다.
서브 스트림 완료 후 다음 스트림 시작: 각 스트림이 완전히 끝난 후에야 다음 스트림이 시작됩니다.
concatMap과 flatMap의 차이점
- flatMap: 여러 Publisher를 병렬로 실행하며, 실행 순서는 보장하지 않습니다. 즉, 빠른 순서로 처리된 데이터가 먼저 발행될 수 있습니다.
- concatMap: 여러 Publisher를 순차적으로 실행하여 순서를 보장합니다. 첫 번째 Publisher가 완료된 후에 두 번째 Publisher가 실행됩니다.
flatMapMany()
메서드는 Reactor에서 Mono
타입을 Flux
로 변환할 때 사용하는 메서드입니다. 주로 Mono
에서 여러 개의 값을 발행하는 스트림을 만들 때 유용하게 쓰입니다. 이 메서드는 Mono
가 하나의 값을 발행한 후, 그 값을 사용해 여러 값을 발행하는 Flux
로 변환합니다.
flatMapMany()
의 동작 방식
Mono.flatMapMany()
:Mono
가 발행한 단일 값을 사용하여 여러 값을 발행하는Flux
를 생성합니다.Mono
는 0 또는 1개의 값을 발행하는 반면,Flux
는 0부터 N개의 값을 발행할 수 있습니다.flatMapMany()
는Mono
에서 나온 값을 기반으로 다수의 값을 발행하는Flux
를 반환합니다.
코드 분석
public Flux<Integer> monoFlatMapMany(){
return Mono.just(10)
.flatMapMany(i -> Flux.range(i, i)) // i 값을 이용해 i부터 i개의 숫자를 발행하는 Flux를 생성
.log(); // 로그로 이벤트 흐름을 기록
}
동작 설명
1. Mono.just(10)
: 단일 값 10
을 발행하는 Mono
를 생성합니다.
2. flatMapMany(i -> Flux.range(i, i))
: Mono
가 발행한 값 10
을 받아서 Flux.range(i, i)
를 생성합니다. 여기서 Flux.range(10, 10)
은 10부터 10개의 값을 발행하는 Flux
입니다. 즉, 10, 11, 12, ..., 19
까지 10개의 값을 발행합니다.
3. log()
: 리액티브 스트림의 각 이벤트(onSubscribe
, onNext
, onComplete
등)를 로그로 출력합니다.
테스트 코드 분석
@Test
void monoFlatMapMany(){
StepVerifier.create(operator2.monoFlatMapMany()) // monoFlatMapMany()를 테스트
.expectNextCount(10) // 총 10개의 값이 발행되는지 확인
.verifyComplete(); // 스트림이 정상적으로 완료되었는지 확인
}
StepVerifier.create(operator2.monoFlatMapMany())
:monoFlatMapMany()
메소드에서 생성된Flux
를 테스트합니다.expectNextCount(10)
:Flux.range(10, 10)
에서 발행된 10개의 값(10
부터19
까지)이 정확히 발행되었는지 확인합니다.verifyComplete()
: 스트림이 정상적으로 완료되었는지 검증합니다.
실행 결과 분석
20:42:57.240 [Test worker] INFO reactor.Flux.MonoFlatMapMany.1 -- onNext(10)
20:42:57.240 [Test worker] INFO reactor.Flux.MonoFlatMapMany.1 -- onNext(11)
...
20:42:57.241 [Test worker] INFO reactor.Flux.MonoFlatMapMany.1 -- onNext(19)
20:42:57.241 [Test worker] INFO reactor.Flux.MonoFlatMapMany.1 -- onComplete()
특징적인 결과
- 10부터 19까지의 값이 순차적으로 발행:
Flux.range(10, 10)
이 생성되면서 10부터 19까지 총 10개의 값이 순차적으로 발행됩니다. - 정상 완료: 마지막에
onComplete()
이벤트가 호출되며 스트림이 정상적으로 완료됩니다.
요약
flatMapMany()
는Mono
에서 단일 값을 발행하고, 그 값을 기반으로 다수의 값을 발행하는Flux
로 변환할 때 사용됩니다.- 이 코드에서는
Mono.just(10)
으로 시작하여,Flux.range(10, 10)
을 통해 10부터 19까지의 값을 발행하는Flux
로 변환했습니다. - 테스트 코드는 10개의 값이 정확히 발행되고, 스트림이 정상적으로 완료되었는지 확인합니다.
defaultIfEmpty()
메서드 설명
defaultIfEmpty()
는 리액티브 스트림에서 값이 없을 경우 기본 값을 제공하는 메서드입니다. 주로 Mono
나 Flux
에서 필터링 결과로 값이 발행되지 않을 때, 즉 스트림이 비어 있을 때 사용됩니다. 이 메서드는 기본값을 지정하여, 스트림이 비었을 경우 해당 기본값을 발행하게 합니다.
public Mono<Integer> defaultEmpty() {
return Mono.just(100)
.filter(i -> i > 100) // 100보다 큰 값을 필터링, 조건을 만족하지 않으므로 값이 없음
.defaultIfEmpty(30) // 값이 없으면 30을 기본값으로 발행
.log(); // 이벤트 흐름을 로그로 기록
}
동작 과정
Mono.just(100)
: 단일 값100
을 발행하는Mono
를 생성합니다.filter(i -> i > 100)
:i > 100
이라는 조건을 적용하여 필터링을 수행합니다. 그러나 값100
은 조건을 만족하지 않기 때문에 필터링된 후에는 값이 없게 됩니다.defaultIfEmpty(30)
: 스트림에 값이 없을 경우, 기본값인30
을 발행합니다. 즉, 필터링 후 값이 없으므로30
이 발행됩니다.log()
: 리액티브 스트림의 모든 이벤트가 로그로 출력됩니다.
예상 동작
Mono.just(100)
에서 시작된 값이 필터링된 후 값이 없으므로,defaultIfEmpty(30)
에 의해30
이 기본값으로 발행됩니다.
@Test
void defaultIfEmpty() {
StepVerifier.create(operator2.defaultEmpty()) // defaultEmpty() 메소드를 테스트
.expectNext(30) // 30이 발행되었는지 확인
.verifyComplete(); // 스트림이 정상적으로 완료되었는지 확인
}
StepVerifier.create(operator2.defaultEmpty())
:defaultEmpty()
메소드의 리액티브 스트림을 테스트합니다.expectNext(30)
:defaultIfEmpty(30)
에 의해 30이 발행되었는지 확인합니다.verifyComplete()
: 스트림이 정상적으로 완료되었는지 검증합니다.
실행 결과 분석
20:48:58.571 [Test worker] INFO reactor.Mono.DefaultIfEmpty.1 -- onNext(30)
20:48:58.571 [Test worker] INFO reactor.Mono.DefaultIfEmpty.1 -- onComplete()
결과 요약
- onNext(30): 필터링된 후 값이 없으므로 기본값인
30
이 발행되었습니다. - onComplete(): 값이 발행된 후 스트림이 정상적으로 완료되었습니다.
요약
defaultIfEmpty()
는 리액티브 스트림이 값이 없을 때 지정된 기본값을 발행합니다.- 이 코드에서는
Mono.just(100)
에서 값이 필터링된 후 아무 값도 남지 않기 때문에defaultIfEmpty(30)
이 호출되어 기본값인30
이 발행됩니다. - 테스트는
30
이 정상적으로 발행되었고, 스트림이 정상적으로 완료되었는지 검증합니다.
switchIfEmpty()
메서드 설명
switchIfEmpty()
는 리액티브 스트림에서 값이 없을 경우 대체 스트림을 실행하는 메서드입니다. 이는 기본적으로 defaultIfEmpty()
와 비슷한 역할을 하지만, 대체 스트림을 발행할 수 있다는 점에서 차이가 있습니다. switchIfEmpty()
는 원본 Mono
나 Flux
가 값을 발행하지 않을 때, 지정된 다른 Mono
나 Flux
를 사용하여 값을 발행하도록 설정합니다.
코드 분석
switchIfEmpty()
메서드
public Mono<Integer> switchIfEmpty() {
return Mono.just(100)
.filter(i -> i > 100) // i > 100 조건을 만족하지 않으므로 값이 없음
.switchIfEmpty(Mono.just(30).map(i -> i * 2)) // 값이 없으면 30 * 2 값을 발행
.log(); // 이벤트 흐름을 기록
}
동작 과정
Mono.just(100)
: 단일 값100
을 발행하는Mono
를 생성합니다.filter(i -> i > 100)
:i > 100
이라는 조건을 적용하여 필터링합니다. 값100
은 조건을 만족하지 않으므로 필터링 후에는 값이 없습니다.switchIfEmpty(Mono.just(30).map(i -> i * 2))
: 원본Mono
가 값이 없으므로, 대체Mono
인Mono.just(30).map(i -> i * 2)
이 실행됩니다. 이로 인해30
이 2배로 변환되어 최종적으로60
이 발행됩니다.log()
: 리액티브 스트림의 모든 이벤트(onSubscribe
,onNext
,onComplete
)가 로그로 출력됩니다.
동작 결과
- 원본 스트림이 값이 없으므로
switchIfEmpty()
로 지정된Mono.just(30)
이 대체되어 실행되고, 그 값은map(i -> i * 2)
로 변환되어60
을 발행합니다.
테스트 코드 분석
@Test
void switchIfEmpty() {
StepVerifier.create(operator2.switchIfEmpty()) // switchIfEmpty() 메소드를 테스트
.expectNext(60) // 60이 발행되었는지 확인
.verifyComplete(); // 스트림이 정상적으로 완료되었는지 확인
}
StepVerifier.create(operator2.switchIfEmpty())
:switchIfEmpty()
메소드의 리액티브 스트림을 테스트합니다.expectNext(60)
:switchIfEmpty()
에서30 * 2 = 60
이 발행되었는지 확인합니다.verifyComplete()
: 스트림이 정상적으로 완료되었는지 검증합니다.
실행 결과 분석
20:54:34.434 [Test worker] INFO reactor.Mono.SwitchIfEmpty.1 -- onNext(60)
20:54:34.434 [Test worker] INFO reactor.Mono.SwitchIfEmpty.1 -- onComplete()
결과 요약
- onNext(60): 원본 스트림이 필터링 후 값이 없었기 때문에
switchIfEmpty()
에 의해 대체 스트림에서60
이 발행되었습니다. - onComplete(): 발행된 후 스트림이 정상적으로 완료되었습니다.
switchIfEmpty()
와 defaultIfEmpty()
차이점
defaultIfEmpty()
: 값이 없을 경우 하나의 기본값을 발행합니다.switchIfEmpty()
: 값이 없을 경우 다른 스트림으로 대체되어 실행됩니다. 더 복잡한 대체 로직을 포함할 수 있습니다.
요약
switchIfEmpty()
는 리액티브 스트림이 값이 없을 때 대체 스트림을 실행하여 값을 발행하는 메서드입니다.- 이 코드에서는
Mono.just(100)
에서 값을 필터링한 후 값이 없으므로switchIfEmpty()
를 통해Mono.just(30)
이 실행되고,map(i -> i * 2)
에 의해 최종적으로60
이 발행됩니다. - 테스트는
60
이 발행되고, 스트림이 정상적으로 완료되었는지 검증합니다.
switchIfEmpty()
메서드와 에러 처리
switchIfEmpty()
메서드는 원본 스트림이 비었을 때 대체 스트림을 제공하는 역할을 합니다. 이번 코드에서는 대체 스트림 대신 에러를 발생시키는 예제입니다. 이 메서드를 통해 리액티브 스트림이 값이 없을 때 지정된 에러를 던질 수 있습니다.
코드 분석
public Mono<Integer> switchIfEmpty() {
return Mono.just(100) // 100을 발행하는 Mono
.filter(i -> i > 100) // i가 100보다 큰 경우만 필터링, 100은 조건을 만족하지 않음
.switchIfEmpty(Mono.error(new Exception("Not exists value..."))) // 값이 없을 경우 에러 발생
.log(); // 로그로 이벤트 흐름을 기록
}
동작 설명
Mono.just(100)
: 단일 값100
을 발행하는Mono
를 생성합니다.filter(i -> i > 100)
:i > 100
이라는 조건을 적용하여 필터링합니다. 값100
은 조건을 만족하지 않기 때문에 필터링 후 값이 없게 됩니다.switchIfEmpty(Mono.error(...))
: 원본 스트림이 값이 없으므로, 대체 스트림으로Mono.error(new Exception("Not exists value..."))
가 실행됩니다. 이로 인해 에러가 발생합니다.log()
: 리액티브 스트림의 이벤트 흐름(onError
,onComplete
등)이 로그로 출력됩니다.
동작 결과
- 원본 스트림이 비어 있으므로
switchIfEmpty()
에 의해 지정된 예외Exception("Not exists value...")
가 던져집니다.
테스트 코드 분석
@Test
void switchIfEmpty2() {
StepVerifier.create(operator2.switchIfEmpty()) // switchIfEmpty() 메소드를 테스트
.expectError() // 에러가 발생하는지 확인
.verify(); // 스트림이 정상적으로 완료되었는지 확인
}
StepVerifier.create(operator2.switchIfEmpty())
:switchIfEmpty()
메소드의 리액티브 스트림을 테스트합니다.expectError()
: 에러가 발생하는지 확인합니다.verify()
: 스트림이 에러와 함께 정상적으로 종료되었는지 검증합니다.
실행 결과 분석
21:05:13.469 [Test worker] ERROR reactor.Mono.SwitchIfEmpty.1 -- onError(java.lang.Exception: Not exists value...)
결과 요약
- onError(Exception: Not exists value...):
switchIfEmpty()
로 인해 지정된 에러가 발생하고, 로그로 출력됩니다. - 테스트는 에러 발생 여부를 확인하며,
expectError()
를 통해 에러가 발생했음을 검증합니다.
요약
switchIfEmpty()
: 리액티브 스트림이 값이 없을 경우 대체 스트림을 실행합니다. 이번 예제에서는 대체 스트림 대신 에러를 던지도록 설정하였습니다.- 에러 처리: 원본 스트림에서 값이 없으면
Mono.error(new Exception(...))
이 실행되며, 테스트에서는 이 에러가 정상적으로 발생했는지 확인합니다. - 테스트:
StepVerifier
를 사용하여 스트림에서 에러가 발생했음을expectError()
로 확인하고, 스트림이 에러와 함께 종료되었는지 검증합니다.
Flux.merge()
메서드 설명
Flux.merge()
는 여러 개의 Flux
스트림을 병합하여 동시에 발행하는 메서드입니다. 이 메서드는 각각의 Flux
가 비동기적으로 값을 발행할 수 있으며, 발행되는 순서는 각 스트림의 처리 속도에 따라 달라질 수 있습니다. 단, 병합된 스트림은 모든 스트림이 완료될 때까지 실행됩니다.
코드 분석
public Flux<String> fluxMerge() {
return Flux.merge(
Flux.fromIterable(List.of("1", "2", "3")), // 1, 2, 3을 발행하는 Flux
Flux.just("4") // 4를 발행하는 Flux
).log(); // 이벤트 흐름을 기록
}
동작 설명
Flux.fromIterable(List.of("1", "2", "3"))
: 리스트["1", "2", "3"]
의 요소를 순차적으로 발행하는Flux
를 생성합니다.Flux.just("4")
: 단일 값"4"
를 발행하는Flux
를 생성합니다.Flux.merge()
: 위에서 생성한 두 개의Flux
를 병합하여 하나의Flux
로 만듭니다. 두 스트림이 병합되어 순서와 상관없이 동시에 발행됩니다.log()
: 리액티브 스트림의 모든 이벤트(onNext
,onComplete
,onError
등)가 로그로 기록됩니다.
예상 동작
"1"
,"2"
,"3"
은Flux.fromIterable()
에서 발행되고,"4"
는Flux.just()
에서 발행됩니다.- 두 개의
Flux
는 병합되며, 두 스트림이 완료될 때까지 모든 값이 발행됩니다. - 결과적으로
"1"
,"2"
,"3"
,"4"
가 발행됩니다.
테스트 코드 분석
@Test
void fluxMerge() {
StepVerifier.create(operator2.fluxMerge()) // fluxMerge() 메소드를 테스트
.expectNext("1", "2", "3", "4") // "1", "2", "3", "4" 순서대로 발행되는지 확인
.verifyComplete(); // 스트림이 정상적으로 완료되었는지 확인
}
StepVerifier.create(operator2.fluxMerge())
:fluxMerge()
메소드의 리액티브 스트림을 테스트합니다.expectNext("1", "2", "3", "4")
:"1"
,"2"
,"3"
,"4"
이 순차적으로 발행되는지 확인합니다.verifyComplete()
: 모든 값이 발행되고 스트림이 정상적으로 완료되었는지 검증합니다.
실행 결과 분석
21:13:08.585 [Test worker] INFO reactor.Flux.Merge.1 -- onNext(1)
21:13:08.586 [Test worker] INFO reactor.Flux.Merge.1 -- onNext(2)
21:13:08.586 [Test worker] INFO reactor.Flux.Merge.1 -- onNext(3)
21:13:08.586 [Test worker] INFO reactor.Flux.Merge.1 -- onNext(4)
21:13:08.586 [Test worker] INFO reactor.Flux.Merge.1 -- onComplete()
결과 요약
- onNext(1), onNext(2), onNext(3), onNext(4):
"1"
,"2"
,"3"
,"4"
가 차례대로 발행되었습니다. - onComplete(): 발행이 완료된 후 스트림이 정상적으로 종료되었습니다.
Flux.merge()
의 특징
- 병합된 스트림의 발행 순서:
Flux.merge()
는 각 스트림을 병렬로 실행하므로, 순서가 보장되지 않을 수 있습니다. 하지만 이 예제에서는 값 발행 순서가 동일하기 때문에"1"
,"2"
,"3"
,"4"
가 순서대로 출력됩니다. - 병렬 실행: 각
Flux
는 병렬로 실행될 수 있으며, 실제로 여러Flux
의 처리 속도에 따라 발행 순서가 달라질 수 있습니다.
요약
Flux.merge()
는 여러Flux
를 병합하여 병렬로 실행하는 메서드입니다.- 이 코드에서는
Flux.fromIterable()
과Flux.just()
를 병합하여"1"
,"2"
,"3"
,"4"
를 발행합니다. - 테스트는 발행된 값들이 순서대로 출력되고 스트림이 정상적으로 완료되었는지 검증합니다.
Mono.mergeWith()
메서드 설명
mergeWith()
메서드는 두 개의 Mono
또는 Flux
를 병합하여 동시에 발행할 수 있도록 해줍니다. 이 메서드는 각 스트림이 병렬로 실행될 수 있으며, 두 개 이상의 Mono
를 병합하여 Flux
로 변환합니다.
Mono
와 Mono
를 병합하면 Flux
가 되는 이유
Mono
는 0 또는 1개의 값을 발행할 수 있는 제한이 있습니다. 만약 두 개 이상의 Mono
를 병합하게 되면, 각 Mono
는 최대 하나의 값을 발행할 수 있으므로 최대 2개의 값 이상이 발행될 수 있습니다. 이러한 상황에서 발행된 값의 수가 Mono
의 발행 제한을 넘기기 때문에 결과적으로 0개 이상의 값을 발행할 수 있는 Flux
로 변환됩니다.
요약:
Mono
는 1개의 값만 발행할 수 있습니다.- 두 개 이상의
Mono
를 병합하면 발행 가능한 값의 수가 1개를 초과할 수 있으므로, 여러 값을 발행할 수 있는Flux
로 자동 변환됩니다.
예를 들어, 다음 코드에서 Mono.just("1")
와 Mono.just("2")
를 병합할 때, 결과는 두 개의 값을 발행해야 하기 때문에 Flux<String>
로 변환됩니다.
Mono<String> mono1 = Mono.just("A");
Mono<String> mono2 = Mono.just("B");
Flux<String> flux = mono1.mergeWith(mono2); // Flux가 반환됨
코드 분석
monoMerge()
메서드
public Flux<String> monoMerge() {
return Mono.just("1")
.mergeWith(Mono.just("2")) // "1"과 "2"를 병합
.mergeWith(Mono.just("3")) // 병합된 결과에 "3"을 추가로 병합
.log(); // 이벤트 흐름을 기록
}
동작 설명
Mono.just("1")
: 단일 값"1"
을 발행하는Mono
를 생성합니다.mergeWith(Mono.just("2"))
:"1"
을 발행하는Mono
와"2"
를 발행하는Mono
를 병합합니다.mergeWith(Mono.just("3"))
: 병합된"1"
과"2"
의 결과에"3"
을 발행하는Mono
를 병합합니다.log()
: 리액티브 스트림의 이벤트 흐름(onNext
,onComplete
,onError
)이 로그로 기록됩니다.
예상 동작
"1"
,"2"
,"3"
이 병합되어Flux<String>
로 반환됩니다.- 각 값이 병렬로 실행될 수 있지만, 이 예제에서는 값들이 순차적으로 발행됩니다.
테스트 코드 분석
@Test
void monoMerge() {
StepVerifier.create(operator2.monoMerge()) // monoMerge() 메소드를 테스트
.expectNext("1", "2", "3") // "1", "2", "3"이 발행되는지 확인
.verifyComplete(); // 스트림이 정상적으로 완료되었는지 확인
}
StepVerifier.create(operator2.monoMerge())
:monoMerge()
메소드의 리액티브 스트림을 테스트합니다.expectNext("1", "2", "3")
:"1"
,"2"
,"3"
이 순서대로 발행되는지 확인합니다.verifyComplete()
: 모든 값이 발행되고 스트림이 정상적으로 완료되었는지 검증합니다.
실행 결과 분석
> Task :compileJava UP-TO-DATE
> Task :processResources UP-TO-DATE
> Task :classes UP-TO-DATE
> Task :compileTestJava
> Task :processTestResources NO-SOURCE
> Task :testClasses
> Task :test
BUILD SUCCESSFUL in 668ms
4 actionable tasks: 2 executed, 2 up-to-date
결과 요약
expectNext("1", "2", "3")
:"1"
,"2"
,"3"
이 순서대로 발행되었습니다.verifyComplete()
: 값이 발행된 후 스트림이 정상적으로 완료되었습니다.
mergeWith()
와 병렬 처리
- 병렬 처리 가능성:
mergeWith()
는 병합된Mono
나Flux
를 병렬로 실행할 수 있습니다. 그러나 각Mono
가 이미 단일 값을 즉시 발행하고 완료되기 때문에 순서대로 발행된 것처럼 보입니다. - 순서 보장 없음:
mergeWith()
는 병렬로 실행될 때 각Mono
의 발행 순서를 보장하지 않지만, 이 예제에서는 순차적으로 발행됩니다.
요약
mergeWith()
는 여러Mono
를 병합하여 하나의Flux
로 변환하며, 각 스트림을 병렬로 실행할 수 있습니다.Mono
두 개 이상을 병합하면 최대 하나 이상의 값이 발행되기 때문에, 결과는 여러 개의 값을 발행할 수 있는Flux
가 됩니다.- 이 코드에서는
"1"
,"2"
,"3"
을 발행하는Mono
들을 병합하여 결과적으로"1"
,"2"
,"3"
이 발행됩니다. - 테스트는 값들이 순서대로 발행되고 스트림이 정상적으로 완료되었는지 검증합니다.
Flux.zip()
메서드 설명
Flux.zip()
은 두 개 이상의 Flux
를 짝 지어 결합하는 메서드입니다. 각 Flux
에서 동일한 순서로 발행된 값을 짝 지어 새로운 값으로 변환할 수 있습니다. 즉, 각각의 스트림에서 같은 순번의 값을 함께 묶어서 처리할 수 있는 방식입니다.
fluxZip()
메서드
public Flux<String> fluxZip() {
return Flux.zip(Flux.just("a", "b", "c"), Flux.just("d", "e", "f")) // 두 Flux를 결합
.map(i -> i.getT1() + i.getT2()) // 각 짝의 값을 결합하여 새로운 값을 생성
.log(); // 이벤트 흐름을 기록
}
동작 설명
1. Flux.zip()
: Flux.just("a", "b", "c")
와 Flux.just("d", "e", "f")
에서 발행된 값을 순차적으로 짝 지어 결합합니다.
- 첫 번째 Flux
의 "a"
와 두 번째 Flux
의 "d"
가 결합되고, 그 다음으로 "b"
와 "e"
, "c"
와 "f"
가 결합됩니다.
2. map(i -> i.getT1() + i.getT2())
: zip()
으로 결합된 두 값(T1
과 T2
)을 받아서 문자열을 결합합니다.
- 예를 들어, "a"
와 "d"
가 결합되어 "ad"
가 됩니다.
3. log()
: 리액티브 스트림의 이벤트 흐름(onNext
, onComplete
, onError
)이 로그로 기록됩니다.
예상 동작
"a"
와"d"
,"b"
와"e"
,"c"
와"f"
가 각각 짝 지어 결합된"ad"
,"be"
,"cf"
가 순차적으로 발행됩니다.
테스트 코드 분석
@Test
void fluxZip() {
StepVerifier.create(operator2.fluxZip()) // fluxZip() 메소드를 테스트
.expectNext("ad", "be", "cf") // "ad", "be", "cf"가 순서대로 발행되는지 확인
.verifyComplete(); // 스트림이 정상적으로 완료되었는지 확인
}
StepVerifier.create(operator2.fluxZip())
:fluxZip()
메소드의 리액티브 스트림을 테스트합니다.expectNext("ad", "be", "cf")
:"ad"
,"be"
,"cf"
가 순서대로 발행되는지 확인합니다.verifyComplete()
: 모든 값이 발행되고 스트림이 정상적으로 완료되었는지 검증합니다.
실행 결과 분석
21:32:42.091 [Test worker] INFO reactor.Flux.Map.1 -- onNext(ad)
21:32:42.092 [Test worker] INFO reactor.Flux.Map.1 -- onNext(be)
21:32:42.092 [Test worker] INFO reactor.Flux.Map.1 -- onNext(cf)
21:32:42.092 [Test worker] INFO reactor.Flux.Map.1 -- onComplete()
결과 요약
- onNext(ad), onNext(be), onNext(cf): 두
Flux
에서 각각 결합된"ad"
,"be"
,"cf"
가 순서대로 발행되었습니다. - onComplete(): 발행이 완료된 후 스트림이 정상적으로 종료되었습니다.
Flux.zip()
의 특징
- 동시 발행된 값들을 결합: 두 개의
Flux
에서 같은 순서로 발행된 값들을 결합합니다. - 짝 지어진 순서대로 결합: 각
Flux
의 발행 순서에 맞춰 짝 지어 결합되며, 발행 순서가 유지됩니다. - 결합한 스트림: 두 개 이상의
Flux
에서 동일한 순서로 발행된 값들이 짝을 이루어 새롭게 변환된 값이 발행됩니다.
요약
Flux.zip()
은 두 개 이상의Flux
를 병합하여 같은 순서로 발행된 값들을 짝 지어 결합합니다.- 이 코드에서는
"a"
,"b"
,"c"
와"d"
,"e"
,"f"
가 각각 결합되어"ad"
,"be"
,"cf"
가 발행됩니다. - 테스트는 발행된 값들이 순서대로 출력되고 스트림이 정상적으로 완료되었는지 검증합니다.
Mono.zip()
메서드 설명
Mono.zip()
메서드는 여러 개의 Mono
를 결합하여 각각의 값을 하나의 튜플로 묶는 메서드입니다. 이를 통해 각각의 Mono
에서 발행된 값을 동시에 처리할 수 있습니다. 최대 8개의 Mono
를 결합할 수 있으며, 모든 Mono
가 값을 발행한 후, 해당 값들을 하나의 Mono
로 결합하여 결과를 발행합니다.
monoZip()
메서드
public Mono<Integer> monoZip() {
return Mono.zip(
Mono.just(1), // 첫 번째 Mono에서 값 1을 발행
Mono.just(2), // 두 번째 Mono에서 값 2를 발행
Mono.just(3) // 세 번째 Mono에서 값 3을 발행
)
.map(i -> i.getT1() + i.getT2() + i.getT3()) // 각 Mono에서 받은 값을 더함
.log(); // 이벤트 흐름을 기록
}
동작 설명
1. Mono.zip()
: 세 개의 Mono
(Mono.just(1)
, Mono.just(2)
, Mono.just(3)
)에서 각각 값을 발행한 후, 세 값을 하나의 튜플로 묶습니다.
- 이 튜플은 Tuple3<Integer, Integer, Integer>
형태로 반환되며, 각각의 값은 getT1()
, getT2()
, getT3()
메서드를 통해 접근할 수 있습니다.
2. map(i -> i.getT1() + i.getT2() + i.getT3())
: 결합된 튜플(Tuple3
)을 받아, 각 값을 더한 결과를 반환합니다.
- 예를 들어, 1 + 2 + 3
의 결과로 6
이 발행됩니다.
3. log()
: 리액티브 스트림의 이벤트(onNext
, onComplete
, onError
)가 로그로 기록됩니다.
예상 동작
- 각
Mono
에서 발행된 값인1
,2
,3
이 결합되고, 더한 값6
이 최종적으로 발행됩니다.
테스트 코드 분석
@Test
void monoZip() {
StepVerifier.create(operator2.monoZip()) // monoZip() 메소드를 테스트
.expectNext(6) // 결합된 값(1+2+3=6)이 발행되는지 확인
.verifyComplete(); // 스트림이 정상적으로 완료되었는지 확인
}
StepVerifier.create(operator2.monoZip())
:monoZip()
메소드의 리액티브 스트림을 테스트합니다.expectNext(6)
: 결합된 값6
이 발행되는지 확인합니다.verifyComplete()
: 스트림이 정상적으로 완료되었는지 검증합니다.
실행 결과 분석
21:39:15.140 [Test worker] INFO reactor.Mono.Map.1 -- onNext(6)
21:39:15.140 [Test worker] INFO reactor.Mono.Map.1 -- onComplete()
결과 요약
- onNext(6):
Mono.zip()
을 통해 결합된 값이6
으로 발행되었습니다. - onComplete(): 값이 발행된 후 스트림이 정상적으로 완료되었습니다.
Mono.zip()
의 특징
- 여러
Mono
의 값을 결합:Mono.zip()
은 여러 개의Mono
에서 값을 받아 하나의 튜플로 결합합니다. 이 예제에서는Tuple3<Integer, Integer, Integer>
이 반환됩니다. - 결합된 값의 처리: 결합된 튜플의 값들은
map()
과 같은 연산자를 사용해 조작할 수 있습니다. - 최대 8개의
Mono
지원:Mono.zip()
은 최대 8개의Mono
를 결합할 수 있으며, 그 이상의 결합이 필요할 경우Flux.zip()
을 사용할 수 있습니다.
요약
Mono.zip()
은 여러 개의Mono
를 결합하여 그 값들을 하나의 튜플로 묶고, 그 결과를 하나의Mono
로 발행합니다.- 이 코드에서는
Mono.just(1)
,Mono.just(2)
,Mono.just(3)
을 결합한 후, 세 값을 더하여 결과적으로6
을 발행합니다. - 테스트는 값
6
이 정상적으로 발행되고, 스트림이 완료되었는지 검증합니다.
집계함수
groupBy()
메서드
public Flux<GroupedFlux<Character, String>> fluxGroupBy() {
return Flux.just("apple", "banana", "apricot", "blueberry")
.groupBy(fruit -> fruit.charAt(0)) // 첫 글자를 기준으로 그룹화
.log(); // 로그 기록
}
설명:
Flux.just("apple", "banana", "apricot", "blueberry")
: 4개의 문자열("apple"
,"banana"
,"apricot"
,"blueberry"
)을 발행하는Flux
를 생성합니다.groupBy(fruit -> fruit.charAt(0))
: 각 문자열을 첫 글자('a'
,'b'
)로 그룹화합니다. 이 그룹은GroupedFlux
로 반환됩니다.
@Test
void fluxGroupBy() {
Flux<Map.Entry<Character, List<String>>> groupedLists = operator3.fluxGroupBy()
.flatMap(group -> group.collectList()
.map(list -> Map.entry(group.key(), list)));
StepVerifier.create(groupedLists)
.expectNextMatches(entry -> entry.getKey() == 'a' && entry.getValue().equals(List.of("apple", "apricot")))
.expectNextMatches(entry -> entry.getKey() == 'b' && entry.getValue().equals(List.of("banana", "blueberry")))
.verifyComplete();
}
설명:
operator3.fluxGroupBy()
:fluxGroupBy()
메서드를 호출하여 과일 이름을 첫 글자 기준으로 그룹화된Flux<GroupedFlux<Character, String>>
을 반환받습니다.flatMap(group -> group.collectList())
: 각 그룹(GroupedFlux
)을List
로 수집하여Map.Entry
로 변환합니다.group.key()
를 통해 그룹의 키(첫 글자)를 가져오고, 각 그룹의 값(과일 이름들)을 리스트로 변환하여 매핑합니다.StepVerifier.create(groupedLists)
:StepVerifier
를 사용하여 결과를 검증합니다.expectNextMatches()
: 첫 번째 그룹은 키가'a'
이고, 값이["apple", "apricot"]
인지 확인합니다.- 두 번째 그룹은 키가
'b'
이고, 값이["banana", "blueberry"]
인지 확인합니다. verifyComplete()
: 모든 그룹이 예상대로 처리되었고 스트림이 정상적으로 완료되었는지 확인합니다.
로그 설명
테스트 실행 시 log()
메서드에 의해 출력된 로그는 Flux
스트림의 동작을 추적할 수 있게 합니다:
INFO reactor.Flux.GroupBy.1 -- | onSubscribe([Fuseable] FluxGroupBy.GroupByMain) // 구독 시작
INFO reactor.Flux.GroupBy.1 -- | request(256) // 256개의 요청
INFO reactor.Flux.GroupBy.1 -- | onNext(UnicastGroupedFlux) // 첫 번째 그룹 발행 ('a' 그룹)
INFO reactor.Flux.GroupBy.1 -- | onNext(UnicastGroupedFlux) // 두 번째 그룹 발행 ('b' 그룹)
INFO reactor.Flux.GroupBy.1 -- | request(1) // 각 그룹에 대해 요청
INFO reactor.Flux.GroupBy.1 -- | onComplete() // 그룹화 완료
onSubscribe
:Flux
스트림이 구독될 때 발생합니다.request(256)
: 요청된 데이터의 양을 나타냅니다. 여기서는 최대 256개의 항목을 요청했습니다.onNext(UnicastGroupedFlux)
: 그룹이UnicastGroupedFlux
로 발행되었음을 나타냅니다. 이는 각 그룹('a'
그룹,'b'
그룹)에 해당하는GroupedFlux
객체입니다.onComplete()
: 스트림의 모든 작업이 완료되었음을 나타냅니다.
count()
메서드
public Mono<Long> fluxCount() {
return Flux.range(1, 10).count(); // 1부터 10까지의 값을 발행하고, 개수를 세어 Mono<Long>로 반환
}
설명:
Flux.range(1, 10)
: 1부터 10까지의 숫자를 발행하는Flux
를 생성합니다.count()
:Flux
가 발행한 모든 요소를 세고, 그 개수를Mono<Long>
로 반환합니다. 이 예제에서는 총 10개의 숫자가 발행되므로Mono<Long>
에는10L
이 반환됩니다.
@Test
void fluxCount() {
StepVerifier.create(operator3.fluxCount())
.expectNext(10L) // 10개의 요소가 발행되었는지 확인
.verifyComplete(); // 스트림이 정상적으로 완료되었는지 검증
}
설명:
operator3.fluxCount()
:fluxCount()
메서드를 호출하여Mono<Long>
을 반환받습니다.StepVerifier.create()
: 반환된Mono<Long>
에 대한 테스트를 생성합니다.expectNext(10L)
:Mono<Long>
에서 발행되는 값이10L
인지 확인합니다.verifyComplete()
: 스트림이 정상적으로 완료되었는지 검증합니다.
로그 설명
테스트 실행 시 Mono.count()
메서드에 의해 출력된 로그는 Mono
스트림의 동작을 추적할 수 있습니다:
INFO reactor.Mono.Count.1 -- | onSubscribe([Fuseable] MonoCount.CountSubscriber) // 구독 시작
INFO reactor.Mono.Count.1 -- | request(unbounded) // 요청량이 제한 없이 요청됨
INFO reactor.Mono.Count.1 -- | onNext(10) // 카운트된 값 10이 발행됨
INFO reactor.Mono.Count.1 -- | onComplete() // 스트림 완료
onSubscribe
:Mono
스트림이 구독되었을 때 발생합니다.request(unbounded)
: 데이터 요청이 무제한으로 요청되었음을 의미합니다.onNext(10)
: 10개의 요소가 발행된 후, 그 개수인10
이 반환됩니다.onComplete()
: 스트림이 정상적으로 완료되었음을 나타냅니다.
요약
fluxCount()
메서드는Flux.range(1, 10)
을 사용하여 1부터 10까지의 숫자를 발행하고, 그 개수를 계산하여Mono<Long>
으로 반환합니다.- 테스트에서는
Mono<Long>
에서 발행된 값이 정확히 10L인지 검증하며, 스트림이 정상적으로 완료되었는지 확인합니다.
코드 및 테스트 설명
distinct()
메서드
public Flux<String> fluxDistinct() {
return Flux.fromIterable(List.of("a", "b", "a", "b", "c"))
.distinct() // 중복된 값 제거
.log(); // 로그 기록
}
설명:
Flux.fromIterable(List.of("a", "b", "a", "b", "c"))
:"a"
,"b"
,"a"
,"b"
,"c"
를 발행하는Flux
를 생성합니다.distinct()
: 중복된 값을 제거하고 고유한 값만 발행합니다. 이 경우"a"
,"b"
,"c"
가 남습니다.log()
:Flux
의 동작(onNext
,onComplete
,onError
,onSubscribe
)을 로그로 기록하여 스트림 처리 과정이 출력됩니다.
@Test
void fluxDistinct() {
StepVerifier.create(operator3.fluxDistinct())
.expectNext("a", "b", "c") // 중복을 제거한 결과로 "a", "b", "c"가 발행됨
.verifyComplete(); // 스트림이 정상적으로 완료되었는지 확인
}
설명:
operator3.fluxDistinct()
:fluxDistinct()
메서드를 호출하여 중복 제거된 문자열들을 발행하는Flux<String>
을 반환받습니다.StepVerifier.create()
: 반환된Flux<String>
에 대한 테스트를 생성합니다.expectNext("a", "b", "c")
: 중복 제거 후"a"
,"b"
,"c"
가 순차적으로 발행되는지 확인합니다.verifyComplete()
: 스트림이 정상적으로 완료되었는지 검증합니다.
로그 설명
테스트 실행 시 log()
메서드에 의해 출력된 로그는 Flux
스트림의 동작을 추적할 수 있습니다:
INFO reactor.Flux.DistinctFuseable.1 -- | onSubscribe([Fuseable] FluxDistinct.DistinctFuseableSubscriber) // 구독 시작
INFO reactor.Flux.DistinctFuseable.1 -- | request(unbounded) // 무제한 데이터 요청
INFO reactor.Flux.DistinctFuseable.1 -- | onNext(a) // "a" 발행
INFO reactor.Flux.DistinctFuseable.1 -- | onNext(b) // "b" 발행
INFO reactor.Flux.DistinctFuseable.1 -- | onNext(c) // "c" 발행
INFO reactor.Flux.DistinctFuseable.1 -- | onComplete() // 스트림 완료
onSubscribe
: 스트림이 구독되었을 때 발생합니다.request(unbounded)
: 제한 없이 데이터 요청이 이루어졌음을 나타냅니다.onNext("a")
,onNext("b")
,onNext("c")
: 중복을 제거한 값들이 순차적으로 발행됩니다.onComplete()
: 모든 값이 발행되고 스트림이 정상적으로 완료되었음을 나타냅니다.
요약
fluxDistinct()
메서드는 중복된 값이 포함된Flux
에서 중복을 제거한 후 고유한 값들만 발행합니다.- 테스트에서는 중복 제거 후
"a"
,"b"
,"c"
가 올바르게 발행되고, 스트림이 정상적으로 완료되었는지 확인합니다.
reduce() 메소드
fluxReduce()
메서드
public Mono<Long> fluxReduce() {
return Flux.range(1, 10)
.reduce(0L, (i, j) -> i + j)
.log();
}
설명:
Flux.range(1, 10)
: 1부터 10까지의 숫자를 발행하는Flux
를 생성합니다.reduce(0L, (i, j) -> i + j)
: 초기값0L
에서 시작하여 각 값을 누적하여 더합니다. 이 경우 1부터 10까지의 숫자를 모두 더해 결과값55
를 만듭니다.log()
:Flux
의 동작(onNext
,onComplete
,onError
,onSubscribe
)을 로그로 기록하여 스트림 처리 과정을 출력합니다.
@Test
void fluxReduce() {
StepVerifier.create(operator3.fluxReduce())
.expectNext(55L) // 1부터 10까지의 합이 55임을 검증
.verifyComplete(); // 스트림이 정상적으로 완료되었는지 확인
}
설명:
operator3.fluxReduce()
:fluxReduce()
메서드를 호출하여 1부터 10까지의 숫자를 더한Mono<Long>
을 반환받습니다.StepVerifier.create()
: 반환된Mono<Long>
에 대한 테스트를 생성합니다.expectNext(55L)
:reduce()
결과로 값55L
이 발행되는지 확인합니다.verifyComplete()
: 스트림이 정상적으로 완료되었는지 검증합니다.
로그 설명
테스트 실행 시 log()
메서드에 의해 출력된 로그는 Mono
스트림의 동작을 추적할 수 있습니다:
INFO reactor.Mono.ReduceSeed.1 -- | onSubscribe([Fuseable] MonoReduceSeed.ReduceSeedSubscriber) // 구독 시작
INFO reactor.Mono.ReduceSeed.1 -- | request(unbounded) // 무제한 데이터 요청
INFO reactor.Mono.ReduceSeed.1 -- | onNext(55) // reduce의 결과로 55 발행
INFO reactor.Mono.ReduceSeed.1 -- | onComplete() // 스트림 완료
onSubscribe
: 스트림이 구독되었을 때 발생합니다.request(unbounded)
: 제한 없이 데이터 요청이 이루어졌음을 나타냅니다.onNext(55)
: 누적된 결과 값55
가 발행됩니다.onComplete()
: 모든 값이 처리되고 스트림이 정상적으로 완료되었음을 나타냅니다.
요약
fluxReduce()
메서드는 1부터 10까지의 숫자를 더한 값을 발행하며, 최종적으로Mono<Long>
에 결과값55
를 담습니다.- 테스트에서는 이 값이 올바르게 발행되고, 스트림이 정상적으로 완료되었는지 검증합니다.
BackPressure 연산자
- delaySequence / limitRate
- sample
fluxDelayAndLimit()
메서드
public Flux<Integer> fluxDelayAndLimit() {
return Flux.range(1, 10)
.delaySequence(Duration.ofSeconds(1))
.log()
.limitRate(2);
}
설명:
Flux.range(1, 10)
: 1부터 10까지의 숫자를 발행하는Flux
를 생성합니다.delaySequence(Duration.ofSeconds(1))
: 모든 요소의 발행을 1초 지연시킵니다. 즉, 처음 값을 발행하기 전에 1초 동안 대기한 후 연속적으로 값을 발행합니다.log()
:Flux
의 동작(onNext
,onComplete
,onError
,onSubscribe
)을 로그로 기록하여 스트림 처리 과정이 출력됩니다.limitRate(2)
: 한번에 요청할 값의 수를 2로 제한합니다. 즉, 두 개의 값을 처리한 후 다시 두 개의 값을 요청하는 방식으로 값을 제한적으로 요청합니다.
@Test
void fluxDelayAndLimit() {
StepVerifier.create(operator4.fluxDelayAndLimit())
.expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) // 1부터 10까지의 숫자가 순차적으로 발행됨
.verifyComplete(); // 스트림이 정상적으로 완료되었는지 확인
}
설명:
operator4.fluxDelayAndLimit()
:fluxDelayAndLimit()
메서드를 호출하여 1초 지연 후 1부터 10까지의 숫자를 발행하는Flux<Integer>
를 반환받습니다.StepVerifier.create()
: 반환된Flux<Integer>
에 대한 테스트를 생성합니다.expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
: 숫자1
부터10
까지 순차적으로 발행되는지 확인합니다.verifyComplete()
: 스트림이 정상적으로 완료되었는지 검증합니다.
로그 설명
테스트 실행 시 log()
메서드에 의해 출력된 로그는 Flux
스트림의 동작을 추적할 수 있습니다:
INFO reactor.Flux.DelaySequence.1 -- onSubscribe(SerializedSubscriber) // 구독 시작
INFO reactor.Flux.DelaySequence.1 -- request(2) // 처음 2개의 값 요청
INFO reactor.Flux.DelaySequence.1 -- onNext(1) // 첫 번째 값 발행
INFO reactor.Flux.DelaySequence.1 -- onNext(2) // 두 번째 값 발행
INFO reactor.Flux.DelaySequence.1 -- request(2) // 다음 2개의 값 요청
INFO reactor.Flux.DelaySequence.1 -- onNext(3) // 세 번째 값 발행
INFO reactor.Flux.DelaySequence.1 -- onNext(4) // 네 번째 값 발행
INFO reactor.Flux.DelaySequence.1 -- request(2) // 또 다음 2개의 값 요청
INFO reactor.Flux.DelaySequence.1 -- onNext(5) // 다섯 번째 값 발행
INFO reactor.Flux.DelaySequence.1 -- onNext(6) // 여섯 번째 값 발행
INFO reactor.Flux.DelaySequence.1 -- request(2) // 다시 2개의 값 요청
INFO reactor.Flux.DelaySequence.1 -- onNext(7) // 일곱 번째 값 발행
INFO reactor.Flux.DelaySequence.1 -- onNext(8) // 여덟 번째 값 발행
INFO reactor.Flux.DelaySequence.1 -- request(2) // 또다시 2개의 값 요청
INFO reactor.Flux.DelaySequence.1 -- onNext(9) // 아홉 번째 값 발행
INFO reactor.Flux.DelaySequence.1 -- onNext(10) // 열 번째 값 발행
INFO reactor.Flux.DelaySequence.1 -- onComplete() // 스트림 완료
onSubscribe
: 스트림이 구독되었을 때 발생합니다.request(2)
: 한번에 두 개의 값을 요청합니다.onNext(x)
: 요청된 값이 순차적으로 발행됩니다.onComplete()
: 모든 값이 처리되고 스트림이 정상적으로 완료되었음을 나타냅니다.
요약
fluxDelayAndLimit()
메서드는 1초 지연 후 1부터 10까지의 숫자를 발행하며, 한번에 두 개의 값을 요청하는 방식으로limitRate(2)
를 사용합니다.- 테스트에서는 숫자
1
부터10
까지 순차적으로 발행되고, 스트림이 정상적으로 완료되었는지 검증합니다.
fluxSample()
메서드
public Flux<Integer> fluxSample() {
return Flux.range(1, 100)
.delayElements(Duration.ofMillis(100))
.sample(Duration.ofMillis(300))
.log();
}
설명:
Flux.range(1, 100)
: 1부터 100까지의 정수를 발행하는Flux
를 생성합니다.delayElements(Duration.ofMillis(100))
: 각 요소를 100밀리초 간격으로 발행합니다. 즉, 1초에 약 10개의 숫자가 발행됩니다.sample(Duration.ofMillis(300))
: 300밀리초마다 가장 최근에 발행된 값을 선택하여 발행합니다. 이는 3개의 값 중 가장 최근 값이 샘플링되어 발행되는 것을 의미합니다.
@Test
void fluxSample() {
StepVerifier.create(operator4.fluxSample())
.expectNextCount(10) // 10개의 샘플링된 값이 발행되었는지 확인
.thenCancel() // 테스트 중간에 스트림을 취소하여 완료를 검증
.verify(); // 검증
}
설명:
operator4.fluxSample()
:fluxSample()
메서드를 호출하여 1부터 100까지의 숫자를 100밀리초 간격으로 발행하고 300밀리초 간격으로 샘플링하는Flux<Integer>
를 반환합니다.StepVerifier.create()
:Flux<Integer>
에 대한 테스트를 생성합니다.expectNextCount(10)
: 샘플링된 값이 10개가 발행되는지 확인합니다.thenCancel()
: 스트림을 강제로 취소합니다. 이를 통해 스트림이 완료되기 전에 중간에 취소하는 시나리오를 테스트할 수 있습니다.verify()
: 테스트를 실행하고, 검증이 완료되었는지 확인합니다.
로그 설명
테스트 실행 시 log()
메서드에 의해 출력된 로그는 Flux
스트림의 동작을 추적할 수 있습니다:
INFO reactor.Flux.Sample.1 -- onSubscribe(FluxSample.SampleMainSubscriber) // 구독 시작
INFO reactor.Flux.Sample.1 -- request(unbounded) // 제한 없이 데이터 요청
INFO reactor.Flux.Sample.1 -- onNext(2) // 첫 번째 샘플링된 값(2) 발행
INFO reactor.Flux.Sample.1 -- onNext(5) // 두 번째 샘플링된 값(5) 발행
INFO reactor.Flux.Sample.1 -- onNext(8) // 세 번째 샘플링된 값(8) 발행
INFO reactor.Flux.Sample.1 -- onNext(11) // 네 번째 샘플링된 값(11) 발행
INFO reactor.Flux.Sample.1 -- onNext(14) // 다섯 번째 샘플링된 값(14) 발행
INFO reactor.Flux.Sample.1 -- onNext(17) // 여섯 번째 샘플링된 값(17) 발행
INFO reactor.Flux.Sample.1 -- onNext(20) // 일곱 번째 샘플링된 값(20) 발행
INFO reactor.Flux.Sample.1 -- onNext(23) // 여덟 번째 샘플링된 값(23) 발행
INFO reactor.Flux.Sample.1 -- onNext(26) // 아홉 번째 샘플링된 값(26) 발행
INFO reactor.Flux.Sample.1 -- onNext(29) // 열 번째 샘플링된 값(29) 발행
INFO reactor.Flux.Sample.1 -- cancel() // 스트림 취소
onSubscribe
: 스트림이 구독되었음을 나타냅니다.request(unbounded)
: 제한 없이 값을 요청하고 있다는 로그입니다.onNext(x)
: 샘플링된 값이 발행될 때마다 로그로 출력됩니다.cancel()
:StepVerifier
에 의해 스트림이 취소되었음을 나타냅니다.
요약
fluxSample()
메서드는 1부터 100까지의 숫자를 100밀리초 간격으로 발행하고, 300밀리초마다 그 중 가장 최근에 발행된 값을 선택하여 발행합니다.- 테스트에서는 10개의 샘플링된 값이 발행된 후 스트림을 취소하고, 스트림이 정상적으로 동작했는지를 검증합니다.
- 로그를 통해 샘플링된 값이 정확히 발행되고 스트림이 중간에 취소되었음을 확인할 수 있습니다.
Schedulers
Schedulers
는 Reactor (Reactive Streams) 라이브러리에서 비동기 작업을 실행할 스레드 풀을 관리하는 클래스입니다. 비동기적 또는 병렬 처리 작업을 어느 스레드에서 실행할지를 결정하고, 다양한 스케줄링 전략을 제공합니다.
Schedulers
는 작업을 처리할 때 사용할 스레드를 지정하거나, 병렬 처리, 단일 스레드 처리, 또는 즉시 실행과 같은 방식으로 비동기 실행 환경을 설정하는 데 사용됩니다.
역할:
- 스레드 관리: 특정 작업이 어떤 스레드에서 실행될지를 제어합니다.
- 비동기 처리 지원: 비동기적으로 실행할 수 있도록 스케줄러를 제공하여, 리액티브 스트림에서 고성능 처리를 가능하게 합니다.
- 다양한 실행 환경 제공: 병렬 처리, I/O 작업, 단일 스레드 실행 등 다양한 실행 패턴을 지원합니다.
Schedulers 메소드
Schedulers.immediate()
: 현재 호출하는 스레드에서 즉시 실행합니다. 별도의 스레드 전환 없이 동기적으로 작업을 수행합니다.Schedulers.single()
: 단일 스레드에서 작업을 실행하며, 해당 스레드가 여러 작업을 순차적으로 처리합니다.Schedulers.parallel()
: 고성능 작업을 위해 고정된 크기의 스레드 풀(기본적으로 CPU 코어 수)을 사용하여 병렬로 작업을 실행합니다.Schedulers.boundedElastic()
: I/O 블로킹 작업에 적합하며, 필요에 따라 새로운 스레드를 생성하지만 스레드 수는 제한되어 있습니다.
fluxMapWithSubscribeOn()
메서드
public Flux<Integer> fluxMapWithSubscribeOn() {
return Flux.range(1, 10)
.map(I -> I * 2) // 각 값을 2배로 변환
.subscribeOn(Schedulers.boundedElastic()) // I/O 작업에 적합한 스케줄러 사용
.log();
}
설명:
Flux.range(1, 10)
: 1부터 10까지의 정수를 발행하는Flux
를 생성합니다.map(I -> I * 2)
: 각 발행된 값을 받아 2배로 변환합니다. 즉, 1은 2로, 2는 4로 변환하는 식으로 진행됩니다.subscribeOn(Schedulers.boundedElastic())
:boundedElastic
스케줄러를 사용하여 작업을 비동기적으로 실행합니다. 이 스케줄러는 주로 I/O 작업이나 블로킹 작업에 적합하며, 필요에 따라 스레드를 동적으로 생성하되 제한된 범위 내에서 생성합니다.
@Test
void fluxMapWithSubscribeOn() {
StepVerifier.create(scheduler1.fluxMapWithSubscribeOn())
.expectNextCount(10) // 10개의 변환된 값(2, 4, 6, ..., 20)이 발행되는지 확인
.verifyComplete(); // 스트림이 정상적으로 완료되었는지 검증
}
설명:
scheduler1.fluxMapWithSubscribeOn()
:fluxMapWithSubscribeOn()
메서드를 호출하여 1부터 10까지의 숫자를 2배로 변환하고,boundedElastic
스케줄러에서 비동기적으로 실행되는Flux<Integer>
를 반환합니다.expectNextCount(10)
: 10개의 변환된 값(2, 4, 6, ..., 20)이 발행되는지 확인합니다.verifyComplete()
: 스트림이 정상적으로 완료되었는지 확인합니다.
로그 설명
테스트 실행 시 log()
메서드에 의해 출력된 로그는 Flux
스트림의 동작을 추적할 수 있습니다:
INFO reactor.Flux.SubscribeOn.1 -- onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber) // 구독 시작
INFO reactor.Flux.SubscribeOn.1 -- request(unbounded) // 제한 없이 데이터 요청
INFO reactor.Flux.SubscribeOn.1 -- onNext(2) // 변환된 값(2) 발행
INFO reactor.Flux.SubscribeOn.1 -- onNext(4) // 변환된 값(4) 발행
INFO reactor.Flux.SubscribeOn.1 -- onNext(6) // 변환된 값(6) 발행
INFO reactor.Flux.SubscribeOn.1 -- onNext(8) // 변환된 값(8) 발행
INFO reactor.Flux.SubscribeOn.1 -- onNext(10) // 변환된 값(10) 발행
INFO reactor.Flux.SubscribeOn.1 -- onNext(12) // 변환된 값(12) 발행
INFO reactor.Flux.SubscribeOn.1 -- onNext(14) // 변환된 값(14) 발행
INFO reactor.Flux.SubscribeOn.1 -- onNext(16) // 변환된 값(16) 발행
INFO reactor.Flux.SubscribeOn.1 -- onNext(18) // 변환된 값(18) 발행
INFO reactor.Flux.SubscribeOn.1 -- onNext(20) // 변환된 값(20) 발행
INFO reactor.Flux.SubscribeOn.1 -- onComplete() // 스트림 완료
onSubscribe
: 스트림이 구독되었음을 나타냅니다.request(unbounded)
: 제한 없이 데이터를 요청하고 있다는 로그입니다.onNext(x)
: 변환된 값이 발행될 때마다 해당 값을 기록합니다.onComplete()
: 모든 값이 정상적으로 처리되고 스트림이 완료되었음을 나타냅니다.
요약
fluxMapWithSubscribeOn()
메서드는 1부터 10까지의 값을 2배로 변환하고,boundedElastic
스케줄러에서 비동기적으로 실행합니다.- 테스트에서는 10개의 변환된 값이 정상적으로 발행되고 스트림이 완료되는지 확인합니다.
코드 및 테스트 설명
fluxMapWithPublishOn()
메서드
public Flux<Integer> fluxMapWithPublishOn() {
return Flux.range(1, 10)
.map(i -> i + 1) // 각 값을 1씩 증가
.publishOn(Schedulers.boundedElastic()) // boundedElastic 스케줄러로 스레드 전환
.log() // 로그 출력
.publishOn(Schedulers.parallel()) // parallel 스케줄러로 스레드 전환
.log() // 로그 출력
.map(i -> i * 2); // 각 값을 2배로 변환
}
설명:
Flux.range(1, 10)
: 1부터 10까지의 숫자를 발행하는Flux
를 생성합니다.map(i -> i + 1)
: 발행된 값을 받아 1씩 증가시킵니다. (예: 1 → 2, 2 → 3 ...)publishOn(Schedulers.boundedElastic())
:boundedElastic
스케줄러에서 해당 작업을 실행하도록 설정합니다. 주로 I/O 작업이나 블로킹 작업을 처리하는 스케줄러입니다.log()
: 스트림의 동작을 로그로 출력합니다.publishOn(Schedulers.parallel())
:parallel
스케줄러에서 병렬로 처리하도록 스레드를 전환합니다. 주로 CPU 집약적인 작업을 위해 사용됩니다.map(i -> i * 2)
: 발행된 값을 받아 2배로 변환합니다.
테스트 메서드
@Test
void fluxMapWithPublishOn() {
StepVerifier.create(scheduler1.fluxMapWithPublishOn())
.expectNextCount(10) // 10개의 값이 발행되었는지 확인
.verifyComplete(); // 스트림이 정상적으로 완료되었는지 검증
}
설명:
StepVerifier.create()
:fluxMapWithPublishOn()
메서드가 반환하는Flux<Integer>
를 테스트합니다.expectNextCount(10)
: 변환된 10개의 값이 정상적으로 발행되었는지 확인합니다.verifyComplete()
: 스트림이 정상적으로 완료되었는지 검증합니다.
로그 설명
테스트 실행 시 log()
메서드에 의해 출력된 로그는 Flux
스트림의 동작을 추적할 수 있습니다:
INFO reactor.Flux.PublishOn.1 -- | onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber) // 스트림 구독 시작
INFO reactor.Flux.PublishOn.2 -- | request(unbounded) // 제한 없이 데이터 요청
INFO reactor.Flux.PublishOn.1 -- | onNext(2) // boundedElastic 스레드에서 처리
INFO reactor.Flux.PublishOn.2 -- | onNext(2) // parallel 스레드에서 처리
INFO reactor.Flux.PublishOn.1 -- | onNext(3)
INFO reactor.Flux.PublishOn.2 -- | onNext(3)
...
INFO reactor.Flux.PublishOn.1 -- | onComplete() // boundedElastic 스레드 완료
INFO reactor.Flux.PublishOn.2 -- | onComplete() // parallel 스레드 완료
publishOn(Schedulers.boundedElastic())
에 의해boundedElastic
스케줄러로 전환된 후, 값이 처리됩니다.- 이어서
publishOn(Schedulers.parallel())
에 의해parallel
스케줄러에서 다시 값이 처리됩니다. onComplete()
로그가 발생하며 스트림이 정상적으로 완료됩니다.
요약
fluxMapWithPublishOn()
메서드는 1부터 10까지의 값을 1씩 증가시키고, 두 번의publishOn
을 통해 스레드를boundedElastic
과parallel
로 전환한 후, 값을 2배로 변환하여 발행합니다.- 테스트는 10개의 변환된 값이 정상적으로 발행되고 스트림이 완료되는지를 확인합니다.
- 로그를 통해 스레드 전환 및 스트림의 각 동작을 추적할 수 있으며, 모든 값이 두 스케줄러에서 제대로 처리된 후 정상적으로 완료된 것을 알 수 있습니다.
'프로그래밍 언어 > 스프링부트' 카테고리의 다른 글
webflux - CPU Bound vs IO Bound (2) | 2024.09.17 |
---|---|
Redis Pub/Sub과 Spring Boot를 활용한 실시간 알림 시스템 구현 (0) | 2024.09.15 |
Spring Session (0) | 2024.09.15 |
OneToMany 관계 설정 시 필드 타입 설정은 뭘로 하나? (0) | 2024.05.28 |