Spring Boot 技术探索

Spring Boot makes it easy to create stand-alone, production-grade Spring based Applications that you can "just run".

29、Spring Boot之ActiveMQ的使用

平台环境:

名称

版本号

Mac OS X

10.15.1

JDK

1.8.0_201

Apache Maven

3.6.0

IntelliJ IDEA

2019.2.4 (Ultimate Edition)

Spring Boot

2.2.1.RELEASE

 

  什么是JMS?

  JMS 即 Java 消息服务(Java Message Service)应用程序接口,是一个 Java 平台中关于面向消息中间件(MOM)的 API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

 

  什么是ActiveMQ?

  ActiveMQ是JMS的一个实现。Spring Boot提供了ActiveMQ组件spring-boot-starter-activemq,用来支持 ActiveMQ在Spring Boot体系内使用。

 

pom.xml中引入依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>


    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
        <exclusions>
            <exclusion>
                <groupId>org.junit.vintage</groupId>
                <artifactId>junit-vintage-engine</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>

 

application.properties

# 基于内存的 ActiveMQ
spring.activemq.in-memory=true
# 不适应连接池
spring.activemq.pool.enabled=false
# true为广播模式,false为队列模式(默认)
spring.jms.pub-sub-domain=false


# 独立安装的 ActiveMQ
#spring.activemq.broker-url=tcp://192.168.0.1:61616
#spring.activemq.user=admin
#spring.activemq.password=admin


# 在考虑结束之前等待的时间
#spring.activemq.close-timeout=15s
# 是否在回滚回滚消息之前停止消息传递。这意味着当启用此命令时,消息顺序不会被保留。
spring.activemq.non-blocking-redelivery=false
# 等待消息发送响应的时间。设置为0等待永远。
spring.activemq.send-timeout=0
#默认情况下activemq提供的是queue模式,若要使用topic模式需要配置下面配置
#spring.jms.pub-sub-domain=true
#账号
# spring.activemq.user=admin
# 密码
# spring.activemq.password=admin
# 是否信任所有包
#spring.activemq.packages.trust-all=
# 要信任的特定包的逗号分隔列表(当不信任所有包时)
#spring.activemq.packages.trusted=
# 当连接请求和池满时是否阻塞。设置false会抛“JMSException异常”。
#spring.activemq.pool.block-if-full=true
# 如果池仍然满,则在抛出异常前阻塞时间。
#spring.activemq.pool.block-if-full-timeout=-1ms
# 是否在启动时创建连接。可以在启动时用于加热池。
#spring.activemq.pool.create-connection-on-startup=true
# 是否用Pooledconnectionfactory代替普通的ConnectionFactory。
#spring.activemq.pool.enabled=false
# 连接过期超时。
#spring.activemq.pool.expiry-timeout=0ms
# 连接空闲超时
#spring.activemq.pool.idle-timeout=30s
# 连接池最大连接数
#spring.activemq.pool.max-connections=1
# 每个连接的有效会话的最大数目。
#spring.activemq.pool.maximum-active-session-per-connection=500
# 当有"JMSException"时尝试重新连接
#spring.activemq.pool.reconnect-on-exception=true
# 在空闲连接清除线程之间运行的时间。当为负数时,没有空闲连接驱逐线程运行。
#spring.activemq.pool.time-between-expiration-check=-1ms
# 是否只使用一个MessageProducer
#spring.activemq.pool.use-anonymous-producers=true

 

新建MqConfig.java

package com.example.demo.config;


import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


import javax.jms.Queue;
import javax.jms.Topic;


@Configuration
public class MqConfig
{
    @Bean
    public Queue queue()
    {
        // 定义一个Queue,实例化为ActiveMQQueue类型
        return new ActiveMQQueue("com.queue");
    }


    @Bean
    public Topic topic()
    {
        // 定义一个Topic,实例化为ActiveMQTopic类型
        return new ActiveMQTopic("com.topic");
    }
}

 

 

新建Producer.java类,用于发送消息。

package com.example.demo.producer;


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;


import javax.jms.Queue;
import javax.jms.Topic;


@Component
public class Producer
{
    @Autowired
    // JmsMessagingTemplate是Spring提供发送消息的工具类
    private JmsMessagingTemplate jmsMessagingTemplate;


    @Autowired
    private Queue queue;


    @Autowired
    private Topic topic;


    public void sendQueue(String msg)
    {
        System.out.println("Send queue msg :" + msg);
        this.jmsMessagingTemplate.convertAndSend(this.queue, msg);
    }


    public void sendTopic(String msg)
    {
        System.out.println("Send topic msg :" + msg);
        this.jmsMessagingTemplate.convertAndSend(this.topic, msg);
    }
}

 

新建Consumer.java类,用于接收消息。

package com.example.demo.consumer;


import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;


@Component
public class Consumer
{
    // @JmsListener用于定义一个JMS监听器,监听destination定义的名称的消息
    @JmsListener(destination = "com.queue")
    public void receiveQueue(String text)
    {
        System.out.println("Consumer queue msg : " + text);
    }


    @JmsListener(destination = "com.topic")
    public void receiveTopic(String text)
    {
        System.out.println("Consumer topic msg : " + text);
    }
}

 

新建Consumer2.java类,用于接收消息。

package com.example.demo.consumer;


import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;


@Component
public class Consumer2
{
    @JmsListener(destination = "com.queue")
    public void receiveQueue(String text)
    {
        System.out.println("Consumer2 queue msg : " + text);
    }


    @JmsListener(destination = "com.topic")
    public void receiveTopic(String text)
    {
        System.out.println("Consumer2 topic msg : " + text);
    }
}

 

接下来开始测试:

新建ActiveMqTests.java

package com.example.demo;


import com.example.demo.producer.Producer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.system.CapturedOutput;
import org.springframework.boot.test.system.OutputCaptureExtension;


import static org.assertj.core.api.Assertions.assertThat;


@ExtendWith(OutputCaptureExtension.class)
@SpringBootTest
class ActiveMqTests
{
    @Autowired
    private Producer producer;


    @Test
    void sendQueueMessage(CapturedOutput output) throws InterruptedException
    {
        // 测试一对一发送消息(队列模式)
        this.producer.sendQueue("Test queue message");
        Thread.sleep(1000L);
        assertThat(output).contains("Test queue");
    }


    @Test
    void sendQueueMessage2()
    {
        // 测试一对一批量发送消息(队列模式)
        for (int i = 0; i < 100; i++)
        {
            this.producer.sendQueue("Test queue message" + i);
        }
    }


    @Test
    void sendTopicMessage()
    {
        // 测试群发送消息(广播模式)
        this.producer.sendTopic("Test Topic message");
    }
}

sendQueueMessage测试结果,可能是以下中的任意一个。因为当有多个消费者监听一个队列时,消费者会自动负载均衡的接收消息,且每个消息只会有一个消费者所接收。

Send queue msg :Test queue message
Consumer queue msg : Test queue message
Send queue msg :Test queue message
Consumer2 queue msg : Test queue message

 

sendQueueMessage2测试结果,Consumer与Consumer2随机接收,但基本上是各占50%。

Send queue msg :Test queue message0
Send queue msg :Test queue message1
Send queue msg :Test queue message2
Consumer2 queue msg : Test queue message0
Consumer queue msg : Test queue message1
Send queue msg :Test queue message3
Consumer2 queue msg : Test queue message2
Send queue msg :Test queue message4
Consumer queue msg : Test queue message3
Send queue msg :Test queue message5
Consumer2 queue msg : Test queue message4

 

测试sendTopicMessage之前要把配置文件中的此配置项改为true

# true为广播模式,false为队列模式(默认)
spring.jms.pub-sub-domain=true

测试结果

Send topic msg :Test Topic message
Consumer2 topic msg : Test Topic message
Consumer topic msg : Test Topic message

 


 

同时支持队列和广播

  通过上面的例子可以看到,Spring Boot中ActiveMQ发送消息默认只能是队列或者广播中的一种。如果想同时支持队列和广播,则需要通过@JmsListener标签的containerFactory属性来实现。

 

  复制上面的工程项目,重命名为springBootDemo29B。

  新建ActiveMQConfig.java类,创建两个JmsListenerContainerFactory Bean,用于@JmsListener标签的containerFactory属性。其中,最重要的是factory.setPubSubDomain的值,true为广播模式,false为队列模式。

package com.example.demo.config;


import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;


import javax.jms.ConnectionFactory;


@Configuration
@EnableJms
public class ActiveMQConfig
{
    @Bean("queueListenerFactory")
    public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory)
    {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setPubSubDomain(false); // 队列模式
        return factory;
    }


    @Bean("topicListenerFactory")
    public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory)
    {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setPubSubDomain(true); // 广播模式
        return factory;
    }
}

 

修改Consumer.java类中所有@JmsListener标签的containerFactory属性。

同理修改Consumer2.java类。

package com.example.demo.consumer;


import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;


@Component
public class Consumer
{
    @JmsListener(destination = "com.queue", containerFactory = "queueListenerFactory")
    public void receiveQueue(String text)
    {
        System.out.println("Consumer queue msg : " + text);
    }


    @JmsListener(destination = "com.topic", containerFactory = "topicListenerFactory")
    public void receiveTopic(String text)
    {
        System.out.println("Consumer topic msg : " + text);
    }
}

 

测试类ActiveMqTests保持不变。

经过测试,可以同时支持队列或者广播模式了。

 

 

Bootstrap Thumbnail Second
MySQL

MySQL is the world's most popular open source database.

GO

Bootstrap Thumbnail Third
算法基础

本书介绍了什么是计算机算法,如何描述它们,以及如何来评估它们。

GO