본문으로 바로가기
728x90
반응형
SMALL

이슈: RabbitMQ 에서 PUB 만 햇는데 RabbitMQ 채널이 51개 생성되었음

PUB 코드

package com.example.rabbitmqperformance.service;

import com.example.rabbitmqperformance.dto.BenchmarkMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

@Component
public class PerformanceProducer implements CommandLineRunner {

    private final RabbitTemplate rabbitTemplate;

    public PerformanceProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    @Override
    public void run(String... args) throws InterruptedException {
        int total = 91900000;
        String data = "X".repeat(1024); // 2KB
        int threadPoolSize = 50;

        ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize);

        AtomicInteger sentCount = new AtomicInteger(0);
        long start = System.currentTimeMillis();

        // TPS 출력용 스레드
        Thread tpsPrinter = new Thread(() -> {
            int prevCount = 0;
            while (!executor.isTerminated()) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    break;
                }
                int currentCount = sentCount.get();
                int tps = currentCount - prevCount;
                prevCount = currentCount;
                System.out.printf("Current TPS: %d messages/sec\n", tps);
            }
        });
        tpsPrinter.start();

        for (int i = 0; i < total; i++) {
            executor.submit(() -> {
                BenchmarkMessage msg = new BenchmarkMessage(data, System.currentTimeMillis());
                rabbitTemplate.convertAndSend("test", msg); // exchange와 routing key는 설정에 맞게 변경
                sentCount.incrementAndGet();
            });
        }

        executor.shutdown();
        boolean finished = executor.awaitTermination(10, TimeUnit.MINUTES);

        tpsPrinter.interrupt();
        tpsPrinter.join();

        long elapsed = System.currentTimeMillis() - start;
        System.out.printf("Sent %d messages in %dms%n", total, elapsed);
    }
}

Grafana 모니터링 사진

connetrion 개수


channel 개수


connection은 1개지만 51개의 채널이 확인된다. 결국 한곳에서 51개의 쓰레드로 MQ에 무언가 하고 있다라고 말할 수 있다.

왜그럴까?

MQ 는 쓰레드 별로 채널을 공유하지 않고 개별로 사용한다. 때문에 해당 코드에서 50개의 쓰레드를 생성했고, 소비자는 1개로 운영중이기 때문에 합해서 51개의 채널이 생성된것임이 확인된다.

int threadPoolSize = 50;

ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize);
...
...
for (int i = 0; i < total; i++) {
    executor.submit(() -> {
        BenchmarkMessage msg = new BenchmarkMessage(data, System.currentTimeMillis());
        rabbitTemplate.convertAndSend("test", msg); // exchange와 routing key는 설정에 맞게 변경
        sentCount.incrementAndGet();
    });
}

RabbitMQ 도큐먼트 참고사항

  • Channel스레드-안전하지 않다
  • 여러 스레드가 하나의 채널을 동시에 사용하는 것은 금지되어야 하며, 예상치 못한 예외나 메시지 손실이 생깁니다.
  • 하나의 채널하나의 스레드만 사용해야 한다



다시 소스코드를 수정해서 쓰레드를 1로 변경해보자

int threadPoolSize = 1; //이전에는 50이였음

ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize);

채널과 커넥션 둘다 1로 설정되었다.

728x90
반응형
LIST