Mono 사용법
데이터로 시작: just, empty
함수로 시작: fromCallable, defer
1.0 또는 1개의 데이터를 만들때
@DisplayName("데이터로 시작하는 Mono(just ,empty)")
//just, empty
@Test
public void startMonoFromData(){
Mono.just(1).subscribe(data -> System.out.println("data = " + data));
//ex) 사소한 에러가 발생했을 때 로그를 남기고 empty의 Mono를 전파
Mono.empty().subscribe(data -> System.out.println("data = " + data));
}
2.0 또는 1개의 함수를 만들때
@DisplayName("함수로 시작하는 Mono(fromCallable -> 동기적인 객체를 반환할 때, " +
"defer -> Mono를 반환할 때)")
@Test
public void startMonoFromFunction(){
/**
* restTemplate, JPA -> 블로킹이 발생하는 라이브러리 Mono로 스레드 분리하여 논블로킹으로 처리 가능
* */
Mono<String> monoFromCallable = Mono.fromCallable(() -> {
return callRestTemplate("안녕");
}).subscribeOn(Schedulers.boundedElastic());
/**
* just는 쓰레드가 "안녕" 이라는 로직까지 바로 읽지만 defer는 구독이 이뤄질때 실제 로직을 읽는다.
* */
Mono<String> monoFromdefer = Mono.defer(() -> {
return callWebClient("안녕");
});
monoFromdefer.subscribe();
Mono<String> monoFromJust = Mono.just("안녕");
}
public Mono<String> callWebClient(String request){
return Mono.just(request + "callRestTemplate 응답");
}
3.외부 API들의 결과값들을 모두 받아서 조합하는 로직을 Mono의 흐름속에서 관리하고 싶다.
@Test
public void testDeferNecessity(){
//abc를 만드는 로직도 Mono의 흐름 속에서 관리하고 싶다.
//ex) b에서 블로킹이 발생하면 subscribeOn을 통해서 논블록하게도 동작가능
Mono.defer(() -> {
String a = "안녕";
String b = "하세";
String c = "요";
return callWebClient(a+b+c);
}).subscribeOn(Schedulers.boundedElastic());
}
4.Mono에서 데이터 방출 개수가 많아져서 Flux 로 바꾸고 싶을때
flatMap을 사용
@Test
public void monoToFlux(){
Mono<Integer> one = Mono.just(1);
Flux<Integer> integerFlux = one.flatMapMany(data -> {
return Flux.just(data ,data+1, data+2);
});
integerFlux.subscribe(data -> System.out.println("data = " + data));
}
Flux 사용법
데이터로 시작: just, empty, from-시리즈
함수로 시작: defer(안에서 Flux 객체를 반환해야 함), create(안에서 동기적인 객체(next)를 반환해야 함)
1.데이터로 시작
@Test
public void testFluxFromData(){
Flux.just(1,2,3,4)
.subscribe(data -> System.out.println("data = " + data));
//java 이터러블을 상속하는 모든 클래스를 담을 수 있음.
Flux.fromIterable(List.of(1,2,3,4,5))
.subscribe(data -> System.out.println("data = " + data));
}
2.함수로 시작
@Test
public void testFluxFromFunction(){
Flux.defer(() -> {
return Flux.just(1,2,3,4);
}).subscribe(data -> System.out.println("data from defer = " + data));
Flux.create(sink -> {
sink.next(1);
sink.next(2);
sink.next(3);
sink.complete(); //sink 사용할 때 마지막에 호출
}).subscribe(data -> System.out.println("data from sink= " + data));
}
3.sink에서 변수 넘겨주는 방법
@Test
public void testSinkDetail(){
Flux.<String>create(sink -> {
AtomicInteger counter = new AtomicInteger(0);
recursiveFunction(sink);
})
.contextWrite(Context.of("counter", new AtomicInteger(0)))
.subscribe(data -> System.out.println("data = " + data));
}
public void recursiveFunction(FluxSink<String> sink){
AtomicInteger counter = sink.contextView().get("counter");
if (counter.incrementAndGet() < 10){
sink.next("sink count" + counter);
recursiveFunction(sink);
} else {
sink.complete();
}
}
4.Flux를 Mono 리스트로 변환하는 방법
collectList를 사용한다.
@Test
void testFluxCollectList(){
Mono<List<Integer>> listMono = Flux.<Integer>just(1,2,3,4,5)
.map(data -> data*2)
.filter(data -> data%4 == 0)
.collectList();
listMono.subscribe(data -> System.out.println("data = " + data));
}
블로킹을 피하는 방법
여기서 블로킹을 피할 수 있는 방법은?
void fluxBlockingTest(){
Flux<Integer> intFlux = Flux.create(sink -> {
for (int i=1; i<=9; i++){
try {
Thread.sleep(1000);
} catch(Exception e){
}
sink.next(i);
}
sink.complete();
});
intFlux.subscribe(data -> System.out.println("webFlux 구독 중: " + data));
}
아래와 같이 스케쥴러를 사용하여 추가 스레드를 구성한다.(tomcat과 같은 방식이 된다. -> 단점)
void fluxBlockingTest(){
Flux<Integer> intFlux = Flux.<Integer>create(sink -> {
for (int i=1; i<=9; i++){
try {
Thread.sleep(1000);
} catch(Exception e){
}
sink.next(i);
}
sink.complete();
}).subscribeOn(Schedulers.boundedElastic());
intFlux.subscribe(data -> System.out.println("webFlux 구독 중: " + data));
}
그러면 스케쥴러는 언제 사용?
- 어쩔 수 없는 블로킹이 발생하는 요소
- 마땅한 라이브러리가 없는 I/O 작업
- 병렬처리 / 이벤트 루프가 할 일이 너무 많을 때
기존의 tomcat 에서 사용되는 blocking api를 non-blocking api로 마이그레이션 하는 방법은?
@GetMapping("/list/legacy")
public List<Integer> produceOneToNineLegacy(){
List<Integer> sink = new ArrayList<>();
for (int i=1; i<=9; i++){
try {
Thread.sleep(500);
} catch (Exception e) {
}
sink.add(i);
}
return sink;
}
1.fromCallable 적용
@GetMapping("/list/legacy")
public Mono<List<Integer>> produceOneToNineLegacyV1(){
return Mono.fromCallable(() -> {
List<Integer> sink = new ArrayList<>();
for (int i=1; i<=9; i++){
try {
Thread.sleep(500);
} catch (Exception e) {
}
sink.add(i);
}
return sink;
}).subscribeOn(Schedulers.boundedElastic());
}
2.defer 적용
@GetMapping("/list/legacy")
public Mono<List<Integer>> produceOneToNineLegacyV1(){
return Mono.defer(() -> {
List<Integer> sink = new ArrayList<>();
for (int i=1; i<=9; i++){
try {
Thread.sleep(500);
} catch (Exception e) {
}
sink.add(i);
}
return Mono.just(sink);
}).subscribeOn(Schedulers.boundedElastic());
}
defer와 just의 차이점은?
Mono<String> monoFromDefer = Mono.defer(() -> {
return Mono.just("안녕");
});
Mono<String> monoFromJust = Mono.just("안녕");
just는 쓰레드가 "안녕" 이라는 로직까지 바로 읽지만 defer는 구독이 이뤄질때 실제 로직을 읽는다.
Mono<String> monoFromDefer = Mono.defer(() -> {
return Mono.just("안녕");
});
monoFromDefer.subscribe(); //defer 실제 구독
FlatMap을 언제사용하지?
Mono<Mono<T>> -> Mono<T>
Mono<Flux<T>> -> Flux<T>
Flux<Mono<T>> -> Flux<T>
위와 같이 비동기가 겹쳐진 구조를 비동기 1개로 평탄화 시켜주는것이 FlatMap이다.
1.처음엔 Mono로 만들었지만 값이 여러개 추가되어서 Flux로 변환해야 할때
@Test
public void monoToFlux(){
Mono<Integer> one = Mono.just(1);
Mono<Flux<Integer>> integerFlux = one.map(data -> {
return Flux.just(data, data + 1, data + 2);
});
integerFlux.subscribe(data -> System.out.println("data = " + data));
}
비동기 객체 2개가 겹처진(Mono<Flux>) 기형적인 형태가 된다.
언제 만들어질지 모르는 객체안에 언제 만들어질지 모르는 객체가 있는 것이다 ㄷㄷ;;
비동기+비동기 = 비동기 이기 때문에 평탄화(FlatMap) 시켜서 하나의 비동기 객체로 만들 수 있다.
FlatMap 사용
@Test
public void monoToFlux(){
Mono<Integer> one = Mono.just(1);
Flux<Integer> integerFlux = one.flatMapMany(data -> {
return Flux.just(data, data+1, data+2);
});
integerFlux.subscribe(data -> System.out.println("data = " + data));
}
2.외부 api 호출시
//외부 api 호출 로직
public Mono<String> callWebClient(String request, long delay){
return Mono.defer(() -> {
try {
Thread.sleep(delay);
return Mono.just(request+ " -> 딜레이: " + delay);
} catch (Exception e) {
return Mono.empty();
}
}).subscribeOn(Schedulers.boundedElastic());//블로킹시 스케줄러 스레드 가동
}
@Test
public void testWebClientFlatMap(){
Flux<Mono<String>> just = Flux.just(callWebClient("1단계 - 문제 이해하기", 1500),
callWebClient("2단계 - 문제 단계별로 풀어가기", 1000),
callWebClient("3단계 - 최종 응답", 500));
Flux<Mono<String>> create = Flux.<Mono<String>>create( sink -> {
sink.next(callWebClient("1단계 - 문제 이해하기", 1500));
sink.next(callWebClient("2단계 - 문제 단계별로 풀어가기", 1000));
sink.next(callWebClient("3단계 - 최종 응답", 500));
sink.complete();
});
}
외부 api를 Flux에서 호출할땐 just, create 2가지 방법을 사용할 수 있다.
이럴때 중첩된 비동기 구조 Flux<Mono>가 생긴다. 이럴때도 FlatMap을 사용하자.
FlatMap 사용
@Test
public void testWebClientFlatMap(){
Flux<String> just = Flux.just(callWebClient("1단계 - 문제 이해하기", 1500),
callWebClient("2단계 - 문제 단계별로 풀어가기", 1000),
callWebClient("3단계 - 최종 응답", 500))
.flatMap(monoData -> {
return monoData;
});
flatMapSequential.subscribe(data -> System.out.println("flatMapSequential data = " + data));
Flux<String> create = Flux.<Mono<String>>create( sink -> {
sink.next(callWebClient("1단계 - 문제 이해하기", 1500));
sink.next(callWebClient("2단계 - 문제 단계별로 풀어가기", 1000));
sink.next(callWebClient("3단계 - 최종 응답", 500));
sink.complete();
}).flatMap(monoData -> {
return monoData;
});
try {
Thread.sleep(10000);
}catch (Exception e){
}
}
결과값
flatMap data = 3단계 - 최종 응답 -> 딜레이: 500
flatMap data = 2단계 - 문제 단계별로 풀어가기 -> 딜레이: 1000
flatMap data = 1단계 - 문제 이해하기 -> 딜레이: 1500
FlatMap은 입력 순서와 상관 없이 먼저 처리되는 순서대로 데이터를 방출한다.
입력 순서대로 출력 순서를 보장하고 싶다면? flatMapSequential 사용하자
Flux<String> flatMapSequential = Flux.just(callWebClient("1단계 - 문제 이해하기", 1500),
callWebClient("2단계 - 문제 단계별로 풀어가기", 1000),
callWebClient("3단계 - 최종 응답", 500))
.flatMapSequential(monoData -> {
return monoData;
});
flatMapSequential.subscribe(data -> System.out.println("flatMapSequential data = " + data));
결과값
flatMap data = 1단계 - 문제 이해하기 -> 딜레이: 1500
flatMap data = 2단계 - 문제 단계별로 풀어가기 -> 딜레이: 1000
flatMap data = 3단계 - 최종 응답 -> 딜레이: 500
해당 코드의 flatMap에서 데이터에 대한 아무 가공도 안하고 있다면 merge를 사용해서 코드를 더 깔끔하게 만들자.
Flux<String> just = Flux.just(callWebClient("1단계 - 문제 이해하기", 1500),
callWebClient("2단계 - 문제 단계별로 풀어가기", 1000),
callWebClient("3단계 - 최종 응답", 500))
.flatMap(monoData -> {
return monoData;
});
merge사용
Flux<String> merge = Flux.merge(callWebClient("1단계 - 문제 이해하기", 1500),
callWebClient("2단계 - 문제 단계별로 풀어가기", 1000),
callWebClient("3단계 - 최종 응답", 500));
merge.subscribe(data -> {
System.out.println("merge data = " + data);
});
merge의 방출 순서도 입력순서대로 맞추고 싶다면 mergeSequential을 사용하자
Flux<String> merge = Flux.mergeSequential(callWebClient("1단계 - 문제 이해하기", 1500),
callWebClient("2단계 - 문제 단계별로 풀어가기", 1000),
callWebClient("3단계 - 최종 응답", 500));
이런 방법들은 입력을 병렬로 처리한 뒤 나중에 순서를 맟춰서 방출되지만,
concat, concatMap은 내부에 로직 호출 순서대로 실행하고 끝날때까지 쓰레드가 대기하고 다음 호출을 또 실행하는 식으로 진행되므로 비효율적이다.
==> concat, concatMap은 사용하지 말자.
Signal과 에러처리
데이터가 방출 될 때 --> onNext
스트림이 완료 됐을 때 --> onComplete
스트림에서 에러가 발생 했을 때 --> onError
@Test
public void testBasicSignal(){
Flux.just(1,2,3,4)
.doOnNext(publishedData -> System.out.println("publishedData = " + publishedData))
.doOnComplete(() -> System.out.println("스트림이 끝났습니다"))
.doOnError(ex -> {
System.out.println("ex 발생: " + ex);
})
.subscribe(data -> System.out.println("data = " + data));
}
리액티브 스트림 안에서 발생하는 예외는 밖으로 던져지지 않는다.
때문에 스트림 안에서 적절히 try catch를 사용하거나, onError 류의 오퍼레이터를 사용하여 처리해야 한다.
예외처리 2가지 방법
@Test
public void testFluxMonoError(){
Flux.just(1,2,3,4)
.map(data -> {
if (data == 3){
throw new RuntimeException();
}
return data*2;
})
.subscribe(data -> System.out.println("data = " + data));
}
@Test
public void testFluxMonoDotError(){
Flux.just(1,2,3,4)
.map(data -> {
if (data == 3){
return Mono.error(new RuntimeException());
}
return Mono.just(data);
})
.subscribe(data -> System.out.println("data = " + data));
}
Scheduler에 대해서
BoundedElastic
- 생성 방법
- 유저가 스레드를 요청 할 때 마다 스레드를 탄력적으로 생성해서 할당.
- 이미 만들어둔 스레드가 있으면 해당 스레드를 할당
- 생성 가능한 스레드 제한이 있으며, 스레드를 할당 받지 못한 작업들은 큐에서 대기한다.
- 생명 주기
- 한 번 만들어지면 일정 시간 동안 스레드를 삭제하지 않고 유지하며 , 일정 시간동안 사용되지 않으면 삭제된다.
- 용도
- 블로킹 작업에 보통 쓰임
Parallel
- 생성 방법
- 유저가 최초 한번 호출하면 물리 스레드와 같은 양의 스레드를 생성해두고 삭제하지 않는다.
- 생명 주기
- 한번 만들어지면 계속 유지한다.
- 용도
- cpu 작업을 병렬로 처리할 때
pub/sub 용 스레드를 별도로 할당 할 수 있다.
@Test
public void testBasic(){
Mono.<Integer>just(2)
.map(data -> {
System.out.println("map Thread name: " + Thread.currentThread().getName());
return data*2;
})
.publishOn(Schedulers.parallel())
.filter(data -> {
System.out.println("filter Thread name: " + Thread.currentThread().getName());
return data%4==0;
})
.subscribeOn(Schedulers.boundedElastic())
.subscribe(data -> System.out.println(Thread.currentThread().getName()+" data = " + data));
}
map Thread name: boundedElastic-1
filter Thread name: parallel-1
parallel-1 data = 4
'SPRING > 리액티브 프로그래밍' 카테고리의 다른 글
| 리액티브 프로그래밍 배경지식 알고가기 (13) | 2025.07.31 |
|---|---|
| 리액티브 프로그래밍 개념정리(2) (0) | 2024.05.10 |
| 리액티브 프로그래밍 개념정리(1) (0) | 2024.05.09 |