본문으로 바로가기

리액티브 프로그래밍 개념정리(2)

category SPRING/리액티브 프로그래밍 2024. 5. 10. 15:20
728x90
반응형
SMALL

Backpressure란?

emit 되는 데이터 처리 속도를 통제,조절 하는 것

 

Backpressure 전략 5가지

IGNORE, ERROR, DROP, LATEST, BUFFER

IGNORE 전략

  • Backpressure를 적용하지 않는 전략, 이 전략을 그대로 사용할 경우 Downstream에서의 Backpressure 요청이 무시되기 때문에 illegalStateException이 발생한다.

 

ERROR 전략

  • Downstream 의 데이터 처리 속도가 느려 Upstream의 emit 속도를 따라가지 못할 경우 illegalStateException을 발생시킴, Publisher는 Error Signal을 Subscriber에게 전송하고 삭제한 데이터는 폐기
    public static void error전략() throws InterruptedException {
            Flux
                .interval(Duration.ofMillis(1L))
                .onBackpressureError() //error 전략 사용
                .doOnNext(data -> System.out.println("# doOnNext: " + data))
                .publishOn(Schedulers.parallel())
                .subscribe(data -> {
                    try {
                        Thread.sleep(5L);
                    } catch (InterruptedException e) {}
                        System.out.println("# onNext: " + data);
                    },
                    error -> System.out.println("# Error " + error));

                Thread.sleep(2000L);
    }

 

DROP 전략

  • Publisher 가 Downstream으로 전달할 데이터가 버퍼에 가득 할 경우, 버퍼 밖에서 대기 중인 데이터 중에서 먼저 emit된 데이터부터 Drop킨다. Drop된 데이터는 폐기된다.
 public static void drop전략() throws InterruptedException {
            Flux
                .interval(Duration.ofMillis(1L))
                .onBackpressureDrop(dropped -> System.out.println("# dropped: " + dropped))
                .publishOn(Schedulers.parallel())
                .subscribe(data -> {
                    try {
                        Thread.sleep(5L);
                    } catch (InterruptedException e){}
                    System.out.println("# onNext: " + data);
                },
                erorr -> System.out.println("# onError: " + erorr));

        Thread.sleep(2000L);
    }

 

LATEST 전략

  • Publisher 가 Downstream으로 전달할 데이터가 버퍼에 가득 할 경우, 버퍼 밖에서 대기 중인 데이터 중에서 가장 최근에 emit된 데이터부터 버퍼에 채우는 전략
  • Drop은 버퍼가 가득 찰 경우 버퍼 밖에서 대기 중인 데이터를 하나씩 차례대로 Drop하면서 폐기하지만, LATEST는 새로운 데이터가 들어오는 시점에 가장 최근의 데이터만 남겨두고 나머지 데이터를 폐기한다.
    public static void latest전략() throws InterruptedException {
            Flux
                .interval(Duration.ofMillis(1L))
                .onBackpressureLatest()
                .publishOn(Schedulers.parallel())
                .subscribe(data -> {
                    try {
                        Thread.sleep(5L);
                    } catch (InterruptedException e) {}
                    System.out.println("# onNext: " + data);
                },
                    error -> System.out.println("# onError: " + error));

        Thread.sleep(2000L);
    }

 

BUFFER 전략

  • 버퍼의 데이터를 폐기하지 않고 버퍼링을 하는 전략, 버퍼가 가득차면 버퍼 내의 데이터를 폐기하는 전략, 버퍼가 가득차면 에러를 발생시키는 전략을 지원

그중 버퍼가 가득차면 버퍼 내의 데이터를 폐기하는 전략은 2가지로 나뉜다.

 

BUFFER_DROP_LATEST 전략

  • Publisher가 Downstream으로 전달할 데이터가 버퍼에 가득 할 경우, 가장 최근에 버퍼 안에 채워진 데이터를 DROP하여 폐기하고, 확보한 공간에 emit된 데이터를 채우는 전략
   public static void buffer전략중_DROP_LATEST전략() throws InterruptedException {
        Flux
                .interval(Duration.ofMillis(300L))
                .doOnNext(data -> System.out.println("# emitted by original Flux: " + data))
                .onBackpressureBuffer(2, // 버퍼 최대 용량 설정
                        dropped -> System.out.println("** Overflow & Dropped: " + dropped),
                        BufferOverflowStrategy.DROP_LATEST)
                .doOnNext(data -> System.out.println("# emitted by Buffer: " + data))
                .publishOn(Schedulers.parallel(),false,1)
                .subscribe(data -> {
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e) {}
                    System.out.println("# onNext: " + data);
                },
                    error -> System.out.println("# onError: " + error));

        Thread.sleep(3000L);
    }

 

BUFFER_DROP_OLDEST 전략

  • Publisher가 Downstream으로 전달할 데이터가 버퍼에 가득 할 경우, 버퍼 안에 채워진 데이터 중에서 가장 오래된 데이터를 Drop하여 폐기 후, 확보된 공간에 emit된 데이터를 채우는 전략
    public static void buffer전략중_DROP_OLDEST전략() throws InterruptedException {
        Flux
                .interval(Duration.ofMillis(300L))
                .doOnNext(data -> System.out.println("# emitted by original Flux: " + data))
                .onBackpressureBuffer(2, // 버퍼 최대 용량 설정
                        dropped -> System.out.println("** Overflow & Dropped: " + dropped),
                        BufferOverflowStrategy.DROP_OLDEST)
                .doOnNext(data -> System.out.println("# emitted by Buffer: " + data))
                .publishOn(Schedulers.parallel(),false,1)
                .subscribe(data -> {
                            try {
                                Thread.sleep(1000L);
                            } catch (InterruptedException e) {}
                            System.out.println("# onNext: " + data);
                        },
                        error -> System.out.println("# onError: " + error));

        Thread.sleep(3000L);
    }

 

Sinks란?

  • Publisher와 Subscriber의 기능을 모두 지니고 있는 구조, Signal을 프로그래밍 방식으로 푸시할 수 있는 구조, Flux/Mono의 의미 체계를 가진것
  • 전통적으로 Signal을 전송하는 generate(), create() 오퍼레이터는 싱글스레드 기반이나, Sinks는 멀티스레드 방식으로 스레드 안정성을 보장해 예기치 않은 동작을 방지해준다.

전통적인 create 오퍼레이터 사용코드

public static void create_오퍼레이터사용() throws InterruptedException {
        int tasks = 6;
        Flux.create((FluxSink<String> sink) -> {
            IntStream
                    .range(1, tasks)
                    .forEach(n -> sink.next(doTasks(n)));
        })
        .subscribeOn(Schedulers.boundedElastic())
            .doOnNext(n -> System.out.println("# create():: "+Thread.currentThread().getName() + ":::" +n))
        .publishOn(Schedulers.parallel())
            .map(result -> result + " success!")
            .doOnNext(n -> System.out.println("# map():: " +Thread.currentThread().getName() + ":::" +n))
        .publishOn(Schedulers.parallel())
        .subscribe(data -> System.out.println("# onNext:: " + Thread.currentThread().getName() + ":::" +data));

        Thread.sleep(500L);
    }

    public static String doTasks(int taskNumber){
        return "task " +taskNumber + " result";
    }

Sinks 사용 코드

public static void sinks사용() throws InterruptedException {
        int tasks = 6;

        Sinks.Many<String> unicastSink = Sinks.many().unicast()
                                            .onBackpressureBuffer();
        Flux<String> fluxView = unicastSink.asFlux();

        IntStream.range(1, tasks)
                .forEach(n -> {
                    try{
                        new Thread(() -> {
                            unicastSink.emitNext(doTasks(n),
                                                Sinks.EmitFailureHandler.FAIL_FAST);
                            System.out.println(Thread.currentThread().getName()+" # emitted: " + n); //Sinks 방식 사용으로 멀티스레드로 동작한다.
                        }).start();
                        Thread.sleep(100L);
                    }catch (InterruptedException e){
                        System.out.println(e);
                    }
                });
        fluxView.publishOn(Schedulers.parallel())
                .map(result -> result + " success!")
                .doOnNext(n -> System.out.println(Thread.currentThread().getName()+" map(): " +n))
                .publishOn(Schedulers.parallel())
                .subscribe(data -> System.out.println(Thread.currentThread().getName()+" onNext: " + data));

        Thread.sleep(200L);
    }
728x90
반응형
LIST

'SPRING > 리액티브 프로그래밍' 카테고리의 다른 글

리액티브 프로그래밍 개념정리(1)  (0) 2024.05.09