Spring Boot & JMS ActiveMQ 연동 및 테스트
- 준비물
ActiveMQ 클래식 버전 다운(https://activemq.apache.org/)
Postman
- 실행환경
sts 4(이클립스)
Spring-boot 2.6.1
1.ActiveMQ 다운
다운 후 activemq 를 눌러 실행시킨다.
콘솔에서 빨간줄을 확인하면 해당 주소로 접속이 가능하다고 나온다. 해당 주소로 접속한다.
접속시 아이디, 비번은 admin 이다.
해당 사이트로 접속하게 되고 해당 사이트에서 MQ를 모니터링 할 수 있는 것 같다.
2.의존성 추가
필자는 gradle을 사용중이다
의존성 추가를 해주자.
implementation 'org.springframework.boot:spring-boot-starter-activemq'
compile("com.fasterxml.jackson.core:jackson-databind")
3.코드 작성
3-1. application.properties 작성
61616 개까지 ActivqMQ 가 포트번호를 지원한다
server.port = 9091
activemq.broker.url=tcp://localhost:61616
3-2. SpringActiveMQConfig.java 작성
import javax.jms.Queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.core.JmsTemplate;
@Configuration
@EnableJms
public class SpringActiveMQConfig {
@Value("${activemq.broker.url}")
private String brokerUrl;
@Bean
public Queue queue() {
return new ActiveMQQueue("Yoon-ju-young-queue");
}
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(brokerUrl);
return activeMQConnectionFactory;
}
@Bean
public JmsTemplate jmsTemplate() {
return new JmsTemplate(activeMQConnectionFactory());
}
}
@Bean
public Queue queue() {
return new ActiveMQQueue("Yoon-ju-young-queue");
}
여기서 넣어주는 큐의 이름은 추후 통신 테스트 후 사이트 모니터에 등록됨을 확인할수 있다.
3.3 DTO Student.java 작성
import java.io.Serializable;
public class Student implements Serializable {
private static final long serialVersionUID = 1L;
private String studentId;
private String name;
private String rollNumber;
public String getStudentId() {
return studentId;
}
public void setStudentId(String studentId) {
this.studentId = studentId;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getRollNumber() {
return rollNumber;
}
public void setRollNumber(String rollNumber) {
this.rollNumber = rollNumber;
}
}
3.4 Producer.java 컨트롤러 작성
Activemq Queue에 메시지는 보내는데 사용된다.
import javax.jms.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.fasterxml.jackson.databind.ObjectMapper;
@RestController
@RequestMapping("/produce")
public class Producer {
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private Queue queue;
@PostMapping("/message")
public Student sendMessage(@RequestBody Student student) {
try {
ObjectMapper mapper = new ObjectMapper();
String studentAsJson = mapper.writeValueAsString(student);
jmsTemplate.convertAndSend(queue, studentAsJson);
} catch (Exception e) {
e.printStackTrace();
}
return student;
}
}
3.5 Consumer.java
소비자 클래스는 ActiveMQ 대기열에서 메시지를 수신하는데 사용된다.
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
@JmsListener(destination = "Yoon-ju-young-queue")
public void consumeMessage(String message) {
logger.info("Message received from activemq queue---"+message);
}
}
@JmsListener(destination = "Yoon-ju-young-queue")
해당 목적지의 메시지를 읽는다는 주석이다.
3.6 Application 클래스 작성
@SpringBootApplication
@EnableJms
@ComponentScan(basePackages = "com.example.*")
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
3.7 실행 확인
실행포트가 9091로 돌아가는지 확인하자
4.Postman을 이용한 ActiveMQ 통신 테스트
JSON 타입의 데이터로 테스트를 진행한다.
{
"studentId": "20",
"name": "rakesh",
"rollNumber": "0126CS01"
}
내가 보낸 데이터가 콘솔에서 소비자에 의해 읽힘을 확인할 수 있다.
현재는 생산자를 컨트롤러로 만들고 소비자를 리스너로 만들어 통신하는 방법이였다.
리스너에서 대기열의 메시지를 확인을 하면 맨 처음에 들어왔던 메시지를 확인하고 대기열에서 빠져 나가게 된다.
5.소비자를 리스너->클래스 형태로 만들어 통신해보자
5.1 NewConsumer.java
기존 리스너 형태의 소비자를 모두 주석처리 하여 콘솔에 메시지가 읽히지 않게 하고 새로운 소비자를 만들어서 생산자와 같이 컨트럴로 형태로 만들자.
import javax.jms.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.fasterxml.jackson.databind.ObjectMapper;
@RestController
@RequestMapping("/consume")
public class NewConsumer {
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private Queue queue;
@GetMapping("/message")
public Student consumeMessage() {
Student student = null;
try {
ObjectMapper mapper = new ObjectMapper();
String jsonMessage = (String) jmsTemplate.receiveAndConvert(queue);
student = mapper.readValue(jsonMessage, Student.class);
} catch (Exception e) {
e.printStackTrace();
}
return student;
}
}
5.2 Postman을 이용한 소비자 컨트롤러 테스트
새로운 메시지를 대기열에 추가하고 컨트롤러를 통해서 읽어보자
-Postman을 통한 메시지 대기열 추가 전
-메시지 대기열 추가 후
대기열에 방금 내가 만든 메시지가 추가 되었음을 확인할 수 있다.
기존에 리스너 였으면 대기열에 있는 메시지를 바로바로 읽어서 콘솔에 찍어줬기 때문에 대기열에 항상 0이였지만
현재는 컨트롤러 형태로 만들어서 따로 읽어줘야 한다.
5.2 메시지 읽기 테스트
내가 방금 대기열에 추가한 메시지를 컨트롤러를 통해서 읽어보았다.
대기열에 있는 메시지를 읽었으니 당연히
1에서 0으로 변경됨을 모니터 사이트에서 확인할 수 있다.
내부 코드 파악해보기
1.ConnectionFactory 인터페이스
해당 인터페이스는 다양한 방식으로 커넥션을 만들어주는 방법을 제공하고 있다.
package javax.jms;
public interface ConnectionFactory {
Connection createConnection() throws JMSException;
Connection createConnection(String userName, String password) throws JMSException;
JMSContext createContext();
JMSContext createContext(String userName, String password);
JMSContext createContext(String userName, String password, int sessionMode);
JMSContext createContext(int sessionMode);
}
우리는 현재 ActiveMQConnectionFactory 를 생성하여 setBrokerURL 만 넣어줘 간단히 메시지를 주고 받는 통신 테스트를 했다.
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(brokerUrl);
return activeMQConnectionFactory;
}
ActiveMQConnectionFactory 는 ConnectionFactory를 비롯한 여러개를 상속하고 있다.
ActiveMQConnectionFactory 내부의 필드에는 brokerURL을 비롯한 해당 사진에 나와있는것 말고도 많은 필드들이 존재한다.
우리는 현재 MQ서버가 없기 때문에 가상으로 brokerURL만 넣어서 통신테스트를 했지만
물리적으로 MQ서버를 구성하여 통신을 하고 싶다면 그 서버의 정보에 맞게 필드값을 채워서 통신하면 될꺼같다.
/**
* @return Returns the Connection.
*/
@Override
public Connection createConnection() throws JMSException {
return createActiveMQConnection();
}
/**
* @return Returns the Connection.
*/
@Override
public Connection createConnection(String userName, String password) throws JMSException {
return createActiveMQConnection(userName, password);
}
/**
* @return Returns the QueueConnection.
* @throws JMSException
*/
@Override
public QueueConnection createQueueConnection() throws JMSException {
return createActiveMQConnection().enforceQueueOnlyConnection();
}
/**
* @return Returns the QueueConnection.
*/
@Override
public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
return createActiveMQConnection(userName, password).enforceQueueOnlyConnection();
}
/**
* @return Returns the TopicConnection.
* @throws JMSException
*/
@Override
public TopicConnection createTopicConnection() throws JMSException {
return createActiveMQConnection();
}
/**
* @return Returns the TopicConnection.
*/
@Override
public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
return createActiveMQConnection(userName, password);
}
protected ActiveMQConnection createActiveMQConnection() throws JMSException {
return createActiveMQConnection(userName, password);
}
그리고 ActiveMQConnectionFactory는ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory 을 상속받고 있기 때문에
MQ/TOPIC/QUEUE 의 연결점들이 모두 만들어진다고 생각하자.
또한 JMSStatsImpl 이 생성되있는것을 확인할수 있는데 내부적으로 JMS 메시지를 ActiveMQConnectionFactory 에서 만들어진 커넥션에 넣어주어 하나의 연결점 역할을 하고있다.
JMS가 제공하는 연결을 만들고있다 라고 생각하자
package org.apache.activemq.management;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.util.IndentPrinter;
public class JMSStatsImpl extends StatsImpl {
private List<ActiveMQConnection> connections = new CopyOnWriteArrayList<ActiveMQConnection>();
public JMSStatsImpl() {
}
public JMSConnectionStatsImpl[] getConnections() {
Object[] connectionArray = connections.toArray();
int size = connectionArray.length;
JMSConnectionStatsImpl[] answer = new JMSConnectionStatsImpl[size];
for (int i = 0; i < size; i++) {
ActiveMQConnection connection = (ActiveMQConnection)connectionArray[i];
answer[i] = connection.getConnectionStats();
}
return answer;
}
public void addConnection(ActiveMQConnection connection) {
connections.add(connection);
}
public void removeConnection(ActiveMQConnection connection) {
connections.remove(connection);
}
public void dump(IndentPrinter out) {
out.printIndent();
out.println("factory {");
out.incrementIndent();
JMSConnectionStatsImpl[] array = getConnections();
for (int i = 0; i < array.length; i++) {
JMSConnectionStatsImpl connectionStat = (JMSConnectionStatsImpl)array[i];
connectionStat.dump(out);
}
out.decrementIndent();
out.printIndent();
out.println("}");
out.flush();
}
/**
* @param enabled the enabled to set
*/
public void setEnabled(boolean enabled) {
super.setEnabled(enabled);
JMSConnectionStatsImpl[] stats = getConnections();
int size = stats.length;
for (int i = 0; i < size; i++) {
stats[i].setEnabled(enabled);
}
}
}
server.port = 9091 activemq.broker.url=tcp://localhost:61616 |
우리는 application.properties에 가상서버의 정보를 넣어줬다.
이를 통해서 우리가 메시지를 보내는데 사용하는 JMS 그리고 가상 서버 정보를 통해서 커넥션을 만들어주는지 알수있는 부분이다.
또한 세션정보와 쓰레드 관리까지 하고있다는 것을 코드를 보며 확인하자
현재 ActiveMQConnectionFactory 안에 커넥션을 만들어주는 메서드 중 하나이다.
package javax.jms;
public interface QueueConnection extends Connection {
QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException;
ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException;
}
QueueConnection 을 확인해보면 Queue에 대한 세션들 만들어주는 QueueSession을 확인 할 수가 있고
package javax.jms;
public interface QueueSession extends Session {
@Override
Queue createQueue(String queueName) throws JMSException;
QueueReceiver createReceiver(Queue queue) throws JMSException;
QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException;
QueueSender createSender(Queue queue) throws JMSException;
@Override
QueueBrowser createBrowser(Queue queue) throws JMSException;
@Override
QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException;
@Override
TemporaryQueue createTemporaryQueue() throws JMSException;
}
QueueSession 은 Session을 상속하고 있으며
메시지를 보낼때, 받을때 모두 세션을 가지고 있으며
하나의 Connection은 하나의 Session 으로서 그안에서 rollback,commmit,recover 같은 트랜잭션 처리 메소드를 지원하여 처리가 가능하다.
아래 세션.class의 메서드를 간단히 확인하자. 기나긴 설명 주석들은 지웠있으니 궁금하면 직접 ide를 켜서 확인하자.
package javax.jms;
import java.io.Serializable;
public interface Session extends Runnable, AutoCloseable {
int AUTO_ACKNOWLEDGE = 1;
int CLIENT_ACKNOWLEDGE = 2;
int DUPS_OK_ACKNOWLEDGE = 3;
int SESSION_TRANSACTED = 0;
BytesMessage createBytesMessage() throws JMSException;
MapMessage createMapMessage() throws JMSException;
Message createMessage() throws JMSException;
ObjectMessage createObjectMessage() throws JMSException;
ObjectMessage createObjectMessage(Serializable object) throws JMSException;
StreamMessage createStreamMessage() throws JMSException;
TextMessage createTextMessage() throws JMSException;
TextMessage createTextMessage(String text) throws JMSException;
boolean getTransacted() throws JMSException;
int getAcknowledgeMode() throws JMSException;
void commit() throws JMSException;
void rollback() throws JMSException;
@Override
void close() throws JMSException;
void recover() throws JMSException;
MessageListener getMessageListener() throws JMSException;
void setMessageListener(MessageListener listener) throws JMSException;
@Override
void run();
MessageProducer createProducer(Destination destination) throws JMSException;
MessageConsumer createConsumer(Destination destination) throws JMSException;
MessageConsumer createConsumer(Destination destination, java.lang.String messageSelector) throws JMSException;
MessageConsumer createConsumer(Destination destination, java.lang.String messageSelector, boolean noLocal) throws JMSException;
MessageConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName) throws JMSException;
MessageConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName, java.lang.String messageSelector) throws JMSException;
Queue createQueue(String queueName) throws JMSException;
Topic createTopic(String topicName) throws JMSException;
TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException;
TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException;
MessageConsumer createDurableConsumer(Topic topic, String name) throws JMSException;
MessageConsumer createDurableConsumer(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException;
MessageConsumer createSharedDurableConsumer(Topic topic, String name) throws JMSException;
MessageConsumer createSharedDurableConsumer(Topic topic, String name, String messageSelector) throws JMSException;
QueueBrowser createBrowser(Queue queue) throws JMSException;
QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException;
TemporaryQueue createTemporaryQueue() throws JMSException;
TemporaryTopic createTemporaryTopic() throws JMSException;
void unsubscribe(String name) throws JMSException;
}
아래의 사진처럼 결국에는 다중 스레드 방식으로 여러 메시지들을 받고 처리할수 있게된다.
ActiveMQConnection 의 내부 필드에는 스레드/스레드 풀 관련 인스턴스 멤버들이 존재함을 확인할 수 있다.
간단히 알 수 있었던 점
-ActiveMQConnectionFactory는 JMS와의 다양한 연결점을 만들어주는 공장이다.
-하나의 메시지 통로는 하나의 세션 스레드를 갖게되며 트랜잭션 처리가 가능하다.
-내부적으로 쓰레드/쓰레드 풀 관리를 지원한다.
* 실제 MQ서버가 있다는 하에 생산자를 구현하였을때의 로직순서
참고
https://www.netsurfingzone.com/spring-boot/spring-boot-jms-activemq-producer-and-consumer-example/
'SPRING > 스프링부트' 카테고리의 다른 글
4. 스프링 부트에서 웹 환경 제외하고 순수 자바 어플리케이션으로 사용하기 (0) | 2022.01.14 |
---|---|
3.스프링 부트 Failed to configure a DataSource: 'url' attribute is not specified and no embedded datasource could be configured. (0) | 2022.01.10 |
[스프링부트]1. [Error] Querydsl Cannot find symbol 오류 (0) | 2021.11.20 |
Thymeleaf 작동이 안되는 원인 (3) | 2021.11.19 |
어노테이션 정리 (0) | 2021.11.15 |