본문으로 바로가기

WebFlux 문법

category SPRING/리액티브 프로그래밍 2025. 8. 8. 10:22
728x90
반응형
SMALL

Mono 사용법

데이터로 시작: just, empty

함수로 시작: fromCallable, defer


1.0 또는 1개의 데이터를 만들때

    @DisplayName("데이터로 시작하는 Mono(just ,empty)")
    //just, empty
    @Test
    public void startMonoFromData(){
        Mono.just(1).subscribe(data -> System.out.println("data = " + data));

        //ex) 사소한 에러가 발생했을 때 로그를 남기고 empty의 Mono를 전파
        Mono.empty().subscribe(data -> System.out.println("data = " + data));
    }

2.0 또는 1개의 함수를 만들때

    @DisplayName("함수로 시작하는 Mono(fromCallable -> 동기적인 객체를 반환할 때, " +
            "defer -> Mono를 반환할 때)")
    @Test
    public void startMonoFromFunction(){
        /**
         * restTemplate, JPA -> 블로킹이 발생하는 라이브러리 Mono로 스레드 분리하여 논블로킹으로 처리 가능
         * */
        Mono<String> monoFromCallable = Mono.fromCallable(() -> {
            return callRestTemplate("안녕");
        }).subscribeOn(Schedulers.boundedElastic());


        /**
         * just는 쓰레드가 "안녕" 이라는 로직까지 바로 읽지만 defer는 구독이 이뤄질때 실제 로직을 읽는다.
         * */
        Mono<String> monoFromdefer = Mono.defer(() -> {
            return callWebClient("안녕");
        });
        monoFromdefer.subscribe();

        Mono<String> monoFromJust = Mono.just("안녕");

    }

    public Mono<String> callWebClient(String request){
        return Mono.just(request + "callRestTemplate 응답");
    }

3.외부 API들의 결과값들을 모두 받아서 조합하는 로직을 Mono의 흐름속에서 관리하고 싶다.

    @Test
    public void testDeferNecessity(){
        //abc를 만드는 로직도 Mono의 흐름 속에서 관리하고 싶다.
        //ex) b에서 블로킹이 발생하면 subscribeOn을 통해서 논블록하게도 동작가능
        Mono.defer(() -> {
            String a = "안녕";
            String b = "하세";
            String c = "요";
            return callWebClient(a+b+c);
        }).subscribeOn(Schedulers.boundedElastic());
    }

4.Mono에서 데이터 방출 개수가 많아져서 Flux 로 바꾸고 싶을때

flatMap을 사용

    @Test
    public void monoToFlux(){
        Mono<Integer> one = Mono.just(1);
        Flux<Integer> integerFlux = one.flatMapMany(data -> {
            return Flux.just(data ,data+1, data+2);
        });
        integerFlux.subscribe(data -> System.out.println("data = " + data));
    }

Flux 사용법

데이터로 시작: just, empty, from-시리즈

함수로 시작: defer(안에서 Flux 객체를 반환해야 함), create(안에서 동기적인 객체(next)를 반환해야 함)

1.데이터로 시작

    @Test
    public void testFluxFromData(){
        Flux.just(1,2,3,4)
                .subscribe(data -> System.out.println("data = " + data));

        //java 이터러블을 상속하는 모든 클래스를 담을 수 있음.
        Flux.fromIterable(List.of(1,2,3,4,5))
                .subscribe(data -> System.out.println("data = " + data));
    }

2.함수로 시작

    @Test
    public void testFluxFromFunction(){
        Flux.defer(() -> {
            return Flux.just(1,2,3,4);
        }).subscribe(data -> System.out.println("data from defer = " + data));

        Flux.create(sink -> {
            sink.next(1);
            sink.next(2);
            sink.next(3);
            sink.complete(); //sink 사용할 때 마지막에 호출
        }).subscribe(data -> System.out.println("data from sink= " + data));
    }

3.sink에서 변수 넘겨주는 방법

@Test
public void testSinkDetail(){
    Flux.<String>create(sink -> {
        AtomicInteger counter = new AtomicInteger(0);
        recursiveFunction(sink);
    })
            .contextWrite(Context.of("counter", new AtomicInteger(0)))
            .subscribe(data -> System.out.println("data = " + data));
}    

public void recursiveFunction(FluxSink<String> sink){
    AtomicInteger counter = sink.contextView().get("counter");
    if (counter.incrementAndGet() < 10){
        sink.next("sink count" + counter);
        recursiveFunction(sink);
    } else {
        sink.complete();
    }
}

4.Flux를 Mono 리스트로 변환하는 방법

collectList를 사용한다.

    @Test
    void testFluxCollectList(){
        Mono<List<Integer>> listMono =  Flux.<Integer>just(1,2,3,4,5)
                .map(data -> data*2)
                .filter(data -> data%4 == 0)
                .collectList();

        listMono.subscribe(data -> System.out.println("data = " + data));
    }



블로킹을 피하는 방법

여기서 블로킹을 피할 수 있는 방법은?

void fluxBlockingTest(){
    Flux<Integer> intFlux = Flux.create(sink -> {
        for (int i=1; i<=9; i++){
            try {
                Thread.sleep(1000);
            } catch(Exception e){
            }
            sink.next(i);
        }

        sink.complete();
    });
    intFlux.subscribe(data -> System.out.println("webFlux 구독 중: " + data));
}

아래와 같이 스케쥴러를 사용하여 추가 스레드를 구성한다.(tomcat과 같은 방식이 된다. -> 단점)

void fluxBlockingTest(){
    Flux<Integer> intFlux = Flux.<Integer>create(sink -> {
        for (int i=1; i<=9; i++){
            try {
                Thread.sleep(1000);
            } catch(Exception e){
            }
            sink.next(i);
        }

        sink.complete();
    }).subscribeOn(Schedulers.boundedElastic());

    intFlux.subscribe(data -> System.out.println("webFlux 구독 중: " + data));
}

그러면 스케쥴러는 언제 사용?

  • 어쩔 수 없는 블로킹이 발생하는 요소
  • 마땅한 라이브러리가 없는 I/O 작업
  • 병렬처리 / 이벤트 루프가 할 일이 너무 많을 때

기존의 tomcat 에서 사용되는 blocking api를 non-blocking api로 마이그레이션 하는 방법은?

@GetMapping("/list/legacy")
public List<Integer> produceOneToNineLegacy(){
    List<Integer> sink = new ArrayList<>();
        for (int i=1; i<=9; i++){
            try {
                Thread.sleep(500);
            } catch (Exception e) {
            }
            sink.add(i);
        }
        return sink;
}

1.fromCallable 적용

    @GetMapping("/list/legacy")
    public Mono<List<Integer>> produceOneToNineLegacyV1(){
        return Mono.fromCallable(() -> {
            List<Integer> sink = new ArrayList<>();
            for (int i=1; i<=9; i++){
                try {
                    Thread.sleep(500);
                } catch (Exception e) {
                }
                sink.add(i);
            }
            return sink;
        }).subscribeOn(Schedulers.boundedElastic());
    }

2.defer 적용

    @GetMapping("/list/legacy")
    public Mono<List<Integer>> produceOneToNineLegacyV1(){
        return Mono.defer(() -> {
            List<Integer> sink = new ArrayList<>();
            for (int i=1; i<=9; i++){
                try {
                    Thread.sleep(500);
                } catch (Exception e) {
                }
                sink.add(i);
            }
            return Mono.just(sink);
        }).subscribeOn(Schedulers.boundedElastic());
    }

defer와 just의 차이점은?

        Mono<String> monoFromDefer = Mono.defer(() -> {
            return Mono.just("안녕");
        });

        Mono<String> monoFromJust = Mono.just("안녕");

just는 쓰레드가 "안녕" 이라는 로직까지 바로 읽지만 defer는 구독이 이뤄질때 실제 로직을 읽는다.

        Mono<String> monoFromDefer = Mono.defer(() -> {
            return Mono.just("안녕");
        });
        monoFromDefer.subscribe(); //defer 실제 구독



FlatMap을 언제사용하지?

Mono<Mono<T>> -> Mono<T>
Mono<Flux<T>> -> Flux<T>
Flux<Mono<T>> -> Flux<T>

위와 같이 비동기가 겹쳐진 구조를 비동기 1개로 평탄화 시켜주는것이 FlatMap이다.

1.처음엔 Mono로 만들었지만 값이 여러개 추가되어서 Flux로 변환해야 할때

@Test
public void monoToFlux(){
    Mono<Integer> one = Mono.just(1);
    Mono<Flux<Integer>> integerFlux = one.map(data -> {
        return Flux.just(data, data + 1, data + 2);
    });

    integerFlux.subscribe(data -> System.out.println("data = " + data));
}

비동기 객체 2개가 겹처진(Mono<Flux>) 기형적인 형태가 된다.

언제 만들어질지 모르는 객체안에 언제 만들어질지 모르는 객체가 있는 것이다 ㄷㄷ;;

비동기+비동기 = 비동기 이기 때문에 평탄화(FlatMap) 시켜서 하나의 비동기 객체로 만들 수 있다.

FlatMap 사용

@Test
public void monoToFlux(){
    Mono<Integer> one = Mono.just(1);

    Flux<Integer> integerFlux = one.flatMapMany(data -> {
        return Flux.just(data, data+1, data+2);
    });
    integerFlux.subscribe(data -> System.out.println("data = " + data));
}

2.외부 api 호출시

//외부 api 호출 로직
public Mono<String> callWebClient(String request, long delay){
    return Mono.defer(() -> {
        try {
            Thread.sleep(delay);
            return Mono.just(request+ " -> 딜레이: " + delay);

        } catch (Exception e) {
            return Mono.empty();
        }
    }).subscribeOn(Schedulers.boundedElastic());//블로킹시 스케줄러 스레드 가동
}
@Test
public void testWebClientFlatMap(){
    Flux<Mono<String>> just = Flux.just(callWebClient("1단계 - 문제 이해하기", 1500),
            callWebClient("2단계 - 문제 단계별로 풀어가기", 1000),
            callWebClient("3단계 - 최종 응답", 500));

    Flux<Mono<String>> create = Flux.<Mono<String>>create( sink -> {
        sink.next(callWebClient("1단계 - 문제 이해하기", 1500));
        sink.next(callWebClient("2단계 - 문제 단계별로 풀어가기", 1000));
        sink.next(callWebClient("3단계 - 최종 응답", 500));
        sink.complete();
    });
}

외부 api를 Flux에서 호출할땐 just, create 2가지 방법을 사용할 수 있다.

이럴때 중첩된 비동기 구조 Flux<Mono>가 생긴다. 이럴때도 FlatMap을 사용하자.

FlatMap 사용

@Test
public void testWebClientFlatMap(){
    Flux<String> just = Flux.just(callWebClient("1단계 - 문제 이해하기", 1500),
            callWebClient("2단계 - 문제 단계별로 풀어가기", 1000),
            callWebClient("3단계 - 최종 응답", 500))
            .flatMap(monoData -> {
                return monoData;
            });

     flatMapSequential.subscribe(data -> System.out.println("flatMapSequential data = " + data));

    Flux<String> create = Flux.<Mono<String>>create( sink -> {
        sink.next(callWebClient("1단계 - 문제 이해하기", 1500));
        sink.next(callWebClient("2단계 - 문제 단계별로 풀어가기", 1000));
        sink.next(callWebClient("3단계 - 최종 응답", 500));
        sink.complete();
    }).flatMap(monoData -> {
        return monoData;
    });

    try {
        Thread.sleep(10000);
    }catch (Exception e){
    }
}

결과값

flatMap data = 3단계 - 최종 응답 -> 딜레이: 500
flatMap data = 2단계 - 문제 단계별로 풀어가기 -> 딜레이: 1000
flatMap data = 1단계 - 문제 이해하기 -> 딜레이: 1500

FlatMap은 입력 순서와 상관 없이 먼저 처리되는 순서대로 데이터를 방출한다.

입력 순서대로 출력 순서를 보장하고 싶다면? flatMapSequential 사용하자

        Flux<String> flatMapSequential = Flux.just(callWebClient("1단계 - 문제 이해하기", 1500),
                        callWebClient("2단계 - 문제 단계별로 풀어가기", 1000),
                        callWebClient("3단계 - 최종 응답", 500))
                .flatMapSequential(monoData -> {
                    return monoData;
                });

        flatMapSequential.subscribe(data -> System.out.println("flatMapSequential data = " + data));

결과값

flatMap data = 1단계 - 문제 이해하기 -> 딜레이: 1500
flatMap data = 2단계 - 문제 단계별로 풀어가기 -> 딜레이: 1000
flatMap data = 3단계 - 최종 응답 -> 딜레이: 500

해당 코드의 flatMap에서 데이터에 대한 아무 가공도 안하고 있다면 merge를 사용해서 코드를 더 깔끔하게 만들자.

Flux<String> just = Flux.just(callWebClient("1단계 - 문제 이해하기", 1500),
        callWebClient("2단계 - 문제 단계별로 풀어가기", 1000),
        callWebClient("3단계 - 최종 응답", 500))
        .flatMap(monoData -> {
            return monoData;
        });

merge사용

Flux<String> merge = Flux.merge(callWebClient("1단계 - 문제 이해하기", 1500),
                callWebClient("2단계 - 문제 단계별로 풀어가기", 1000),
                callWebClient("3단계 - 최종 응답", 500));

merge.subscribe(data -> {
    System.out.println("merge data = " + data);
});

merge의 방출 순서도 입력순서대로 맞추고 싶다면 mergeSequential을 사용하자

Flux<String> merge = Flux.mergeSequential(callWebClient("1단계 - 문제 이해하기", 1500),
                callWebClient("2단계 - 문제 단계별로 풀어가기", 1000),
                callWebClient("3단계 - 최종 응답", 500));

이런 방법들은 입력을 병렬로 처리한 뒤 나중에 순서를 맟춰서 방출되지만,

concat, concatMap은 내부에 로직 호출 순서대로 실행하고 끝날때까지 쓰레드가 대기하고 다음 호출을 또 실행하는 식으로 진행되므로 비효율적이다.

==> concat, concatMap은 사용하지 말자.



Signal과 에러처리

데이터가 방출 될 때 --> onNext

스트림이 완료 됐을 때 --> onComplete

스트림에서 에러가 발생 했을 때 --> onError

@Test
public void testBasicSignal(){
    Flux.just(1,2,3,4)
            .doOnNext(publishedData -> System.out.println("publishedData = " + publishedData))
            .doOnComplete(() -> System.out.println("스트림이 끝났습니다"))
            .doOnError(ex -> {
                System.out.println("ex 발생: " + ex);
            })
            .subscribe(data -> System.out.println("data = " + data));
}

리액티브 스트림 안에서 발생하는 예외는 밖으로 던져지지 않는다.

때문에 스트림 안에서 적절히 try catch를 사용하거나, onError 류의 오퍼레이터를 사용하여 처리해야 한다.

예외처리 2가지 방법

    @Test
    public void testFluxMonoError(){
        Flux.just(1,2,3,4)
                .map(data -> {
                    if (data == 3){
                        throw new RuntimeException();
                    }
                    return data*2;
                })
                .subscribe(data -> System.out.println("data = " + data));
    }

    @Test
    public void testFluxMonoDotError(){
        Flux.just(1,2,3,4)
                .map(data -> {
                    if (data == 3){
                       return Mono.error(new RuntimeException());
                    }
                    return Mono.just(data);
                })
                .subscribe(data -> System.out.println("data = " + data));
    }



Scheduler에 대해서

BoundedElastic

  • 생성 방법
    • 유저가 스레드를 요청 할 때 마다 스레드를 탄력적으로 생성해서 할당.
    • 이미 만들어둔 스레드가 있으면 해당 스레드를 할당
    • 생성 가능한 스레드 제한이 있으며, 스레드를 할당 받지 못한 작업들은 큐에서 대기한다.
  • 생명 주기
    • 한 번 만들어지면 일정 시간 동안 스레드를 삭제하지 않고 유지하며 , 일정 시간동안 사용되지 않으면 삭제된다.
  • 용도
    • 블로킹 작업에 보통 쓰임

Parallel

  • 생성 방법
    • 유저가 최초 한번 호출하면 물리 스레드와 같은 양의 스레드를 생성해두고 삭제하지 않는다.
  • 생명 주기
    • 한번 만들어지면 계속 유지한다.
  • 용도
    • cpu 작업을 병렬로 처리할 때

pub/sub 용 스레드를 별도로 할당 할 수 있다.

@Test
public void testBasic(){
    Mono.<Integer>just(2)
            .map(data -> {
                System.out.println("map Thread name: " + Thread.currentThread().getName());
                return data*2;
            })
            .publishOn(Schedulers.parallel())
            .filter(data -> {
                System.out.println("filter Thread name: " + Thread.currentThread().getName());
                return data%4==0;
            })
            .subscribeOn(Schedulers.boundedElastic())
            .subscribe(data -> System.out.println(Thread.currentThread().getName()+" data = " + data));
}
map Thread name: boundedElastic-1
filter Thread name: parallel-1
parallel-1 data = 4
728x90
반응형
LIST