技术标签: activemq 学习 java-activemq 消息队列
本文为Activemq简单入门文章
JMS即Java Message Service,是JavaEE的消息服务接口。JMS主要有两个版本:1.1和2.0。2.0和1.1相比,主要是简化了收发消息的代码。
JMS为Java程序提供了一种通用方法, 用于创建、发送、接收和读取企业消息系统中的消息。
JMS是一组接口定义,如果我们要使用JMS,还需要选择一个具体的JMS产品。常用的JMS服务器有开源的ActiveMQ,商业服务器如WebLogic、WebSphere等也内置了JMS支持。
使用消息服务,而不是直接调用对方的API,它的好处是:
JMS提供了两种消息模型:
PTP(点对点消息模型)
Pub/Sub(发布订阅消息模式)
Queue是一种一对一的通道
Topic则是一种一对多通道。
注意:
- 如果一个Topic的消息全部都持久化了,并且只有一个Consumer,那么它和Queue其实是一样的。实际上,很多消息服务器内部都只有Topic类型的消息架构,Queue可以通过Topic“模拟”出来。
- 无论是Queue还是Topic,对Producer没有什么要求。多个Producer也可以写入同一个Queue或者Topic,此时消息服务器内部会自动排序确保消息总是有序的。
上面是消息服务的基本模型。当某个到达消息服务器时,Producer和Consumer通常是通过TCP连接消息服务器,在编写JMS程序时,又会遇到ConnectionFactory、Connection、Session等概念,其实这和JDBC连接是类似的:
在JMS 1.1中,发送消息的代码示例如下:
try {
Connection connection = null;
try {
// 创建连接:
connection = connectionFactory.createConnection();
// 创建会话:
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
// 创建一个Producer并关联到某个Queue:
MessageProducer messageProducer = session.createProducer(queue);
// 创建一个文本消息:
TextMessage textMessage = session.createTextMessage(text);
// 发送消息:
messageProducer.send(textMessage);
} finally {
// 关闭连接:
if (connection != null) {
connection.close();
}
}
} catch (JMSException ex) {
// 处理JMS异常
}
JMS 2.0改进了一些API接口,发送消息变得更简单:
try (JMSContext context = connectionFactory.createContext()) {
context.createProducer().send(queue, text);
}
注意:
- JMSContext实现了AutoCloseable接口,可以使用try(resource)语法,代码更简单。
JMS的消息类型支持以下几种:
消息类型 | 适用场景 | 通信机制 | 消息传递模型 |
---|---|---|---|
TextMessage | 传递文本消息 String XML JSON | 同步或异步 | 点对点或发布/订阅 |
MapMessage | 传递带有多个属性的消息 | 同步或异步 | 点对点或发布/订阅 |
BytesMessage | 传递二进制数据 | 同步或异步 | 点对点或发布/订阅 |
StreamMessage | 传递流式数据,适用于需要分块读取数据的场景 | 同步或异步 | 点对点或发布/订阅 |
ObjectMessage | 传递Java对象 | 同步或异步 | 点对点或发布/订阅 |
Message | 用于接收任意类型的消息,适用于需要动态解析消息类型的场景 | 同步或异步 | 点对点或发布/订阅 |
JMS支持的消息头字段(Message header fields)有:
Activemq简介:
插件:
- Camel 插件:将 ActiveMQ 与 Apache Camel 集成,以支持各种数据转换和消息路由模式。
- LevelDB 存储插件:使用 LevelDB 作为消息存储的替代方案。
- MQTT 插件:支持使用 MQTT 协议进行消息传递。
- STOMP 插件:支持使用 STOMP 协议进行消息传递。
- Virtual Topics 插件:提供虚拟主题,以简化发布订阅模式的实现。
- WebSocket 插件:支持使用 WebSocket 协议进行消息传递。
- JMX 监控插件:提供 JMX 监控功能,以监视 ActiveMQ 运行时的性能和状态信息。
ActiveMQ Classic或者ActiveMQ Artemis的关系:
所以,我们这里直接选择ActiveMQ Artemis。从官网下载最新的2.x版本,解压后设置环境变量ARTEMIS_HOME
,指向Artemis根目录,例如C:\Apps\artemis
,然后,把ARTEMIS_HOME/bin
加入PATH
环境变量:
%ARTEMIS_HOME%\bin到Path
路径;$ARTEMIS_HOME/bin到PATH
路径。Artemis有个很好的设计,就是它把程序和数据完全分离了。我们解压后的ARTEMIS_HOME
目录是程序目录,要启动一个Artemis服务,还需要创建一个数据目录。我们把数据目录直接设定在项目spring-integration-jms
的jms-data
目录下。执行命令artemis create jms-data
:
在创建过程中,会要求输入连接用户和口令,这里我们设定admin和password,以及是否允许匿名访问(这里选择N)。
此数据目录jms-data
不仅包含消息数据、日志,还自动创建了两个启动服务的命令bin/artemis
和bin/artemis-service
,前者在前台启动运行,按Ctrl+C
结束,后者会一直在后台运行。
我们把目录切换到jms-data/bin
,直接运行artemis run
即可启动Artemis服务:
启动成功后,Artemis提示可以通过URL: http://localhost:8161/console访问管理后台。注意不要关闭命令行窗口
。
如果Artemis启动时显示警告:AMQ222212: Disk Full! … Clients will report blocked.这是因为磁盘空间不够,可以在etc/broker.xml配置中找到
<max-disk-usage>
并改为99。
上面简单介绍了JMS的基本概念,本节结合Activemq来具体说明JMS的使用。
JMS中的核心概念:
注意:
- Non-JMS client 可能使用其他协议或方式与消息中间件进行通信,例如: 直接使用底层的消息中间件提供的原生 API、使用自定义的消息格式或协议进行通信等。
- 这就好像你不使用java提供的JBDC统一驱动接口进行调用,而直接调用各个第三方厂商提供的驱动实现类一样
- 需要注意的是,使用 Non-JMS client 进行消息通信可能会导致与特定消息中间件的耦合性增加。因为它们直接依赖于消息中间件提供的接口和协议,所以在切换或迁移到其他消息中间件时可能需要进行修改和适配
- Non-JMS client 的存在也提供了一种灵活性和自由度,可以根据特定需求选择更适合的通信方式和协议。它们可能适用于特定的应用场景或需要与非 JMS 兼容的系统进行集成的情况。
类比: mysql-connector-java
注意:
- 在JMS(Java Message Service)中,Administered Objects(管理对象)是由JMS提供者(如消息中间件)管理和提供的一些资源,包括队列(Queue)、主题(Topic)、连接工厂(ConnectionFactory)等。这些对象提供了与消息传递相关的基础设施,并允许应用程序与消息中间件进行交互。
- 通过使用 Administered Objects,应用程序可以更方便地与消息中间件进行交互,而无需了解底层的通信协议和细节。应用程序可以通过配置或通过编程的方式访问和使用这些对象,以满足不同的消息传递需求
引入相关依赖:
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jms-client</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>javax.json</groupId>
<artifactId>javax.json-api</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>org.glassfish</groupId>
<artifactId>javax.json</artifactId>
<version>1.1.4</version>
</dependency>
低版本的Artemis的Client依赖包中,没有下面这个问题,无需引入。
高版本的Artemis的Client依赖包中,虽然依赖了jakarta.jms:jakarta.jms-api
,但是由于artemis-jakarta-client包依赖的jakarta.jms-api版本比较老,内部包名还是旧版本的javax.jms开头:
因此需要手动引入高版本jakarta.jms-api,以maven依赖的就近查找原则,覆盖旧版本:
上面已经介绍过了JMS中支持的消息类型,下面我们来看一下如何创建并发送不同类型的消息:
public Message createMessage(Session session) throws JMSException {
Message m = session.createMessage();
m.setStringProperty("exception", "java.lang.NosuchMethodException");
return m;
}
public Message createMessage(Session session) throws JMSException {
TextMessage tm = session.createTextMessage();
tm.setText("{ \"name\": \"John\", \"age\": 30}");
return tm;
}
public Message createMessage(Session session) throws JMSException {
MapMessage mm = session.createMapMessage();
mm.setInt("id", 2017);
mm.setString("name", "zhangsan");
mm.setString("password", "123456");
return mm;
}
MessageProducer producer = session.createProducer(queue);
BytesMessage message = session.createBytesMessage();
// 设置消息内容
byte[] payload = "Hello, JMS!".getBytes();
message.writeBytes(payload);
// 发送消息
producer.send(message);
public Message createMessage(Session session) throws JMSException {
ObjectMessage om = session.createObjectMessage();
User user = new User();
user.setID(2020001);
user.setName("zhangsan");
om.setObject(user);
return om;
}
生产者:
import jakarta.jms.*;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
public class Producer {
public static void main(String[] args) {
try {
// 创建连接工厂和连接
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616","root","123456");
Connection connection = connectionFactory.createConnection();
connection.start();
// 创建会话和队列
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("test");
// 创建生产者并发送多个消息
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 5; i++) {
// 创建消息并设置消息内容
StreamMessage message = session.createStreamMessage();
message.writeInt(i);
message.writeString("Message " + i);
// 发送消息
producer.send(message);
System.out.println("Sent message: " + message);
}
// 关闭连接和会话
producer.close();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
消费者:
import jakarta.jms.*;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
public class Consumer {
public static void main(String[] args) {
try {
// 创建连接工厂和连接
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616","root","123456");
Connection connection = connectionFactory.createConnection();
connection.start();
// 创建会话和队列
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("test");
// 创建消费者并接收消息
MessageConsumer consumer = session.createConsumer(destination);
while (true) {
Message message = consumer.receive();
if (message instanceof StreamMessage) {
StreamMessage streamMessage = (StreamMessage) message;
int intValue = streamMessage.readInt();
String stringValue = streamMessage.readString();
System.out.println("Received message: " + intValue + ", " + stringValue);
}else {
break;
}
}
// 关闭连接和会话
consumer.close();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
JMS的跨平台实现与JDBC类似,核心思路是如何定位到第三方厂商提供的服务实现类。
就像JDBC中,我们需要通过JDK SPI机制获取Drive接口实现类一样,JMS同样需要采用一种服务发现机制,获取第三方提供的:
被管理的对象一般被管理员放在JNDI名字空间中,通常在JMS客户端应用的文档中说明它所需要的JMS被管理对象,以及应以何种JNDI名字来提供这些JMS被管理对象。
JMS 1.1中各个接口之间的关系如下图所示:
JMS 2.0中改进了一些API接口,此时各个接口之间的关系如下所示:
在JMS(Java消息服务)的简化API中,单个JMSContext
对象包含了经典API中由两个独立对象提供的行为,即Connection
和Session
。尽管该规范中提到了JMSContext
具有底层的“连接”和“会话”,但简化API并不使用Connection
和Session
接口。
在简化API中,JMSContext
作为一个统一的入口点,用于创建JMS对象、发送和接收消息以及管理事务。它以更简洁和方便的方式封装了连接和会话的行为。
通过使用JMSContext
,可以执行诸如创建生产者或消费者、发送和接收消息、提交或回滚事务等操作。JMSContext
在后台管理底层的连接和会话,提供了一个简化和更直观的编程模型。
简化API的目标是使JMS的使用更加简单和直观,减少开发人员直接使用底层连接和会话对象的需求。相反,他们可以依赖于JMSContext
来处理这些细节,集中精力进行核心消息操作。
总之,JMSContext
是JMS简化API中的主要对象,它整合了连接和会话的功能,并提供了更简洁、易用的编程模型,使JMS的使用更加便捷。
下面列举一个demo示例:
生产者:
package org.example;
import javax.jms.*;
public class JMSProducer {
public static void main(String[] args) {
// 设置 ActiveMQ 的连接信息
String brokerUrl = "tcp://localhost:61616";
String username = "root";
String password = "123456";
String queueName = "jms/queue/mail";
// 创建连接工厂
ConnectionFactory connectionFactory = new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory(brokerUrl);
try (JMSContext context = connectionFactory.createContext(username, password)) {
// 创建目标队列
Destination destination = context.createQueue(queueName);
// 创建消息生产者
javax.jms.JMSProducer producer = context.createProducer();
// 创建文本消息
TextMessage message = context.createTextMessage();
message.setText("Hello, JMS!");
// 发送消息
producer.send(destination, message);
System.out.println("Message sent successfully.");
} catch (JMSException e) {
e.printStackTrace();
}
}
}
消费者:
package org.example;
import javax.jms.*;
public class JMSConsumer {
public static void main(String[] args) {
// 设置 ActiveMQ 的连接信息
String brokerUrl = "tcp://localhost:61616";
String username = "root";
String password = "123456";
String queueName = "jms/queue/mail";
// 创建连接工厂
ConnectionFactory connectionFactory = new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory(brokerUrl);
try (JMSContext context = connectionFactory.createContext(username, password)) {
// 创建目标队列
Destination destination = context.createQueue(queueName);
// 创建消息消费者
javax.jms.JMSConsumer consumer = context.createConsumer(destination);
// 接收消息
Message message = consumer.receive();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Received message: " + textMessage.getText());
} else {
System.out.println("Received message of unsupported type.");
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
目标1:JMS提供一种标准的、平台无关的方法,使分布式应用程序之间可以可靠地交换消息。
目标2:实现松耦合的消息传递机制,使得分布式应用程序能够通过消息的方式进行通信,而不需要知道对方的具体实现。
At least once 模式确保消息至少被传递一次,但是可能会被传递多次。
以下是使用 ActiveMQ 实现 “at least once” 语义的消息消费者代码示例:
package org.example;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import javax.jms.*;
public class JMSConsumer {
public static void main(String[] args) {
String brokerUrl = "tcp://localhost:61616";
String username = "your-username";
String password = "your-password";
String queueName = "jms/queue/mail";
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
try (Connection connection = connectionFactory.createConnection(username, password)) {
connection.start();
//CLIENT_ACKNOWLEDGE:表示消费者receive消息后必须手动的调用acknowledge()方法进行签收
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Destination destination = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("Received message: " + text);
// 执行具体的业务逻辑
// 手动确认消息已被处理
message.acknowledge();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 暂停主线程,等待消息到达和处理
Thread.sleep(30000);
session.close();
connection.close();
} catch (JMSException | InterruptedException e) {
e.printStackTrace();
}
}
}
At most once模式确保消息最多被传递一次。
以下是一个使用 ActiveMQ 的消息消费者代码示例,展示了 “at most once” 的行为:
package org.example;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import javax.jms.*;
public class JMSConsumer {
public static void main(String[] args) {
String brokerUrl = "tcp://localhost:61616";
String username = "your-username";
String password = "your-password";
String queueName = "jms/queue/mail";
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
try (Connection connection = connectionFactory.createConnection(username, password)) {
connection.start();
//AUTO_ACKNOWLEDGE:表示在消费者receive消息的时候自动的签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("Received message: " + text);
// 执行具体的业务逻辑
}
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 暂停主线程,等待消息到达和处理
Thread.sleep(30000);
session.close();
connection.close();
} catch (JMSException | InterruptedException e) {
e.printStackTrace();
}
}
}
Exactly once模式确保消息只被传递一次。
以下是使用 ActiveMQ 实现精确一次发送语义的消息生产者和消费者代码示例:
生产者:
public class JMSProducer {
public static void main(String[] args) {
String brokerUrl = "tcp://localhost:61616";
String username = "your-username";
String password = "your-password";
String queueName = "jms/queue/mail";
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
try (Connection connection = connectionFactory.createConnection(username, password)) {
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Destination destination = session.createQueue(queueName);
MessageProducer producer = session.createProducer(destination);
// 创建并发送消息
TextMessage message = session.createTextMessage("Hello, ActiveMQ!");
producer.send(message);
// 提交事务
session.commit();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
消费者:
public class JMSConsumer {
public static void main(String[] args) {
String brokerUrl = "tcp://localhost:61616";
String username = "your-username";
String password = "your-password";
String queueName = "jms/queue/mail";
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
try (Connection connection = connectionFactory.createConnection(username, password)) {
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Destination destination = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("Received message: " + text);
// 执行具体的业务逻辑
// 手动提交事务
session.commit();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 暂停主线程,等待消息到达和处理
Thread.sleep(30000);
session.close();
connection.close();
} catch (JMSException | InterruptedException e) {
e.printStackTrace();
}
}
}
在这个代码示例中,我们创建了一个消息生产者和一个消息消费者。消息生产者使用 Session.SESSION_TRANSACTED 模式创建了会话,并在发送消息后提交了事务。消息消费者也使用 Session.SESSION_TRANSACTED 模式创建了会话,并在处理消息后手动提交了事务。
通过使用事务机制,可以确保消息在发送和接收过程中的可靠性。如果消息发送或处理过程中发生异常,事务会回滚,消息不会被确认,从而实现了消息的精确一次发送语义(Exactly Once)。
ActiveMQ的事务机制的底层原理涉及到消息的持久化和日志的记录。
当使用事务提交时,ActiveMQ会将事务中的消息写入持久化存储,通常是磁盘上的数据库或文件系统。这样可以确保在发生故障或断电等情况下,消息的持久性得到保证,不会丢失。
同时,ActiveMQ会将事务的操作记录在事务日志中。事务日志记录了所有发送、接收和确认消息的操作,以及事务的状态信息。这样可以在系统恢复时,根据事务日志的内容来恢复之前未完成的事务,并确保事务的一致性。
事务提交的过程可以简述为以下几个步骤:
- 在事务提交时,将事务中的消息写入持久化存储。
- 将事务的操作记录写入事务日志。
- 标记事务为已提交。
- 释放事务相关资源。
在事务回滚的情况下,会根据事务日志中的记录进行回滚操作,包括将持久化存储中的消息删除或标记为未发送状态,并将事务标记为已回滚。
通过持久化存储和事务日志的机制,ActiveMQ能够提供消息的可靠性传递和事务的原子性,确保消息在发送和接收过程中的可靠性和一致性。这样可以保证消息在分布式系统中的可靠性传递,并提供消息的精确一次发送语义(Exactly
Once)。
DUPS_OK_ACKNOWLEDGE(重复确认模式)
适用于消息重复消费不会造成严重问题的场景,
Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
需要注意的是,由于DUPS_OK_ACKNOWLEDGE
模式下允许消息的重复消费,因此不能将该模式应用于对数据准确性要求极高的场景,例如金融交易等。
AUTO_ACKNOWLEDGE
: 自动确认,这就意味着消息的确认时机将由consumer择机确认."择机确认"似乎充满了不确定性,这也意味着,开发者必须明确知道"择机确认"的具体时机,否则将有可能导致消息的丢失,或者消息的重复接受.那么在ActiveMQ中,AUTO_ACKNOWLEDGE是如何运作的呢?
因此当我们使用messageListener方式消费消息时,通常建议在onMessage方法中使用try-catch,这样可以在处理消息出错时记录一些信息,而不是让consumer不断去重发消息;如果你没有使用try-catch,就有可能会因为异常而导致消息重复接收的问题,需要注意你的onMessage方法中逻辑是否能够兼容对重复消息的判断。
CLIENT_ACKNOWLEDGE
: 客户端手动确认,这就意味着AcitveMQ将不会“自作主张”的为你ACK任何消息,开发者需要自己择机确认。
1. message.acknowledge(),
2. ActiveMQMessageConsumer.acknowledege(),
3. ActiveMQSession.acknowledge();
DUPS_OK_ACKNOWLEDGE
: "消息可重复"确认,意思是此模式下,可能会出现重复消息,并不是一条消息需要发送多次ACK才行。它是一种潜在的"AUTO_ACK"确认机制,为批量确认而生,而且具有“延迟”确认的特点。对于开发者而言,这种模式下的代码结构和AUTO_ACKNOWLEDGE一样,不需要像CLIENT_ACKNOWLEDGE那样调用acknowledge()方法来确认消息。
SESSION_TRANSACTED
: 当session使用事务时,就是使用此模式。在事务开启之后,和session.commit()之前,所有消费的消息,要么全部正常确认,要么全部redelivery。这种严谨性,通常在基于GROUP(消息分组)或者其他场景下特别适合。在SESSION_TRANSACTED模式下,optimizeACK并不能发挥任何效果,因为在此模式下,optimizeACK会被强制设定为false,不过prefetch仍然可以决定DELIVERED_ACK_TYPE的发送时机。
因为Session非线程安全,那么当前session下所有的consumer都会共享同一个transactionContext;同时建议,一个事务类型的Session中只有一个Consumer,已避免rollback()或者commit()方法被多个consumer调用而造成的消息混乱。
当consumer接受到消息之后,首先检测TransactionContext是否已经开启,如果没有,就会开启并生成新的transactionId,并把信息发送给broker;此后将检测事务中已经消费的消息个数是否 >= prefetch * 0.5,如果大于则补充发送一个“DELIVERED_ACK_TYPE”的确认指令;这时就开始调用onMessage()方法,如果是同步(receive),那么即返回message。上述过程,和其他确认模式没有任何特殊的地方。
当开发者决定事务可以提交时,必须调用session.commit()方法,commit方法将会导致当前session的事务中所有消息立即被确认;事务的确认过程中,首先把本地的deliveredMessage队列中尚未确认的消息全部确认(STANDARD_ACK_TYPE);此后向broker发送transaction提交指令并等待broker反馈,如果broker端事务操作成功,那么将会把本地deliveredMessage队列清空,新的事务开始;如果broker端事务操作失败(此时broker已经rollback),那么对于session而言,将执行inner-rollback,这个rollback所做的事情,就是将当前事务中的消息清空并要求broker重发(REDELIVERED_ACK_TYPE),同时commit方法将抛出异常。
当session.commit方法异常时,对于开发者而言通常是调用session.rollback()回滚事务(事实上开发者不调用也没有问题),当然你可以在事务开始之后的任何时机调用rollback(),rollback意味着当前事务的结束,事务中所有的消息都将被重发。需要注意,无论是inner-rollback还是调用session.rollback()而导致消息重发,都会导致message.redeliveryCounter计数器增加,最终都会受限于brokerUrl中配置的"jms.redeliveryPolicy.maximumRedeliveries",如果rollback的次数过多,而达到重发次数的上限时,消息将会被DLQ(dead letter)。
INDIVIDUAL_ACKNOWLEDGE
: 单条消息确认,这种确认模式,我们很少使用,它的确认时机和CLIENT_ACKNOWLEDGE几乎一样,当消息消费成功之后,需要调用message.acknowledege来确认此消息(单条),而CLIENT_ACKNOWLEDGE模式先message.acknowledge()方法将导致整个session中所有消息被确认(批量确认)。
这里以In Jvm协议进行讲解:
ActiveMQ在内部支持以下的In-JVM协议:
VM Transport Protocol(VM传输协议):这是ActiveMQ默认的In-JVM协议,它允许在同一个Java虚拟机(JVM)内的不同线程之间进行消息的传递。使用VM传输协议,消息可以在应用程序内部的不同组件之间快速传递,而无需通过网络进行通信。
VM Transport Bridge(VM传输桥):这是ActiveMQ提供的用于连接不同Java虚拟机(JVM)之间的In-JVM通信的协议。它允许在不同的JVM实例之间通过内存进行消息传递,提供了一种简单而高效的方式来实现进程间通信。
这些In-JVM协议适用于在同一个Java虚拟机内的不同线程或不同进程之间进行高性能的消息传递。它们可以避免使用网络通信带来的延迟和开销,提供了更快的消息传递速度和更低的资源消耗。
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
factory.setObjectMessageSerializationDefered(true);
Activemq还支持AMQP协议,MQTT协议等,这里不多列举了。
这里简单介绍一下,具体配置和使用说明,大家参考官方文档或自行查询资料学习。
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
long time = 60 * 1000;
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
producer.send(message);
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
long delay = 30 * 1000;
long period = 10 * 1000;
int repeat = 9;
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
producer.send(message);
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");
producer.send(message);
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 1000);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 1000);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 9);
producer.send(message);
ActiveMQ 是一个流行的开源消息中间件,它支持多种消息选择器的方式。消息选择器允许您从消息队列中选择特定的消息,以便只有满足某些条件的消息会被消费者接收。
在 ActiveMQ 中,消息选择器使用 SQL-92 类似的语法来定义选择条件。您可以在创建消费者时使用消息选择器,通过在消息选择器表达式中指定条件来选择感兴趣的消息。
下面是一些常见的消息选择器示例:
String selector = "color = 'red'";
MessageConsumer consumer = session.createConsumer(destination, selector);
上述代码中,消息选择器指定了一个属性 color
的值为 'red'
的消息将被选择。
String selector = "JMSPriority > 5";
MessageConsumer consumer = session.createConsumer(destination, selector);
上述代码中,消息选择器指定了一个 JMS 头部属性 JMSPriority
的值大于 5 的消息将被选择。
String selector = "text LIKE '%important%'";
MessageConsumer consumer = session.createConsumer(destination, selector);
上述代码中,消息选择器指定了一个消息体内容包含 'important'
关键字的消息将被选择。
请注意,消息选择器只能应用于支持消息选择器功能的消息中间件,而且消费者必须使用带有消息选择器的
createConsumer
方法来创建。另外,使用过多的消息选择器可能会对系统性能产生负面影响,因此在使用时需要权衡选择条件的复杂性和性能需求。
这里只给出一个简单的Demo,关于消息选择器的更多知识,可以参考官方文档,或者自行查找资料学习:
生产者:
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class MessageProducerExample {
public static void main(String[] args) throws JMSException {
// 连接到 ActiveMQ 代理
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目标队列
Destination destination = session.createQueue("myQueue");
// 创建生产者
MessageProducer producer = session.createProducer(destination);
// 创建消息
TextMessage message1 = session.createTextMessage();
message1.setText("Message 1");
message1.setStringProperty("color", "red");
TextMessage message2 = session.createTextMessage();
message2.setText("Message 2");
message2.setStringProperty("color", "blue");
// 发送消息
producer.send(message1);
producer.send(message2);
// 关闭连接
connection.close();
}
}
消费者:
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class MessageConsumerExample {
public static void main(String[] args) throws JMSException {
// 连接到 ActiveMQ 代理
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目标队列
Destination destination = session.createQueue("myQueue");
// 创建消费者并设置消息选择器
MessageConsumer consumer = session.createConsumer(destination, "color = 'red'");
// 接收消息
Message message = consumer.receive();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Received message: " + textMessage.getText());
}
// 关闭连接
connection.close();
}
}
上述示例中,生产者向名为 “myQueue” 的队列发送了两条消息,每条消息都带有一个名为 “color” 的属性。然后消费者通过设置消息选择器 “color = ‘red’”,只接收具有红色属性的消息。
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
AppConfig
中,通过@EnableJms
让Spring
自动扫描JMS
相关的Bean
,并加载JMS
配置文件jms.properties
:@EnableJms
@PropertySource({
"classpath:/jms.properties"})
@SpringBootApplication
public class AppConfig {
public static void main(String[] args) {
SpringApplication.run(AppConfig.class, args);
}
}
@Bean
ConnectionFactory createJMSConnectionFactory(
@Value("${jms.uri:tcp://localhost:61616}") String uri,
@Value("${jms.username:admin}") String username,
@Value("${jms.password:password}") String password) {
return new ActiveMQJMSConnectionFactory(uri, username, password);
}
因为我们使用的消息服务器是ActiveMQ Artemis,所以ConnectionFactory的实现类就是消息服务器提供的ActiveMQJMSConnectionFactory,它需要的参数均由配置文件读取后传入,并设置了默认值。
JmsTemplate
,它是Spring提供的一个工具类,和JdbcTemplate类似,可以简化发送消息的代码: @Bean
JmsTemplate createJmsTemplate(@Autowired ConnectionFactory connectionFactory) {
return new JmsTemplate(connectionFactory);
}
@Bean("jmsListenerContainerFactory")
DefaultJmsListenerContainerFactory createJmsListenerContainerFactory(@Autowired ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
return factory;
}
@Component
public class MessagingService {
@Autowired
private ObjectMapper objectMapper;
@Autowired
private JmsTemplate jmsTemplate;
public void sendMailMessage(MailMessage msg) throws Exception {
final String text = objectMapper.writeValueAsString(msg);
jmsTemplate.send("jms/queue/mail", new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(text);
}
});
}
public static class MailMessage {
private String header;
private String body;
}
}
JMS的消息类型支持以下几种:
最常用的是发送基于JSON的文本消息,上述代码通过JmsTemplate创建一个TextMessage并发送到名称为jms/queue/mail
的Queue。
注意:Artemis消息服务器默认配置下会自动创建Queue,因此不必手动创建一个名为jms/queue/mail的Queue,但不是所有的消息服务器都会自动创建Queue,生产环境的消息服务器通常会关闭自动创建功能,需要手动创建Queue。
再注意到MailMessage是我们自己定义的一个JavaBean,真正的JMS消息是创建的TextMessage,它的内容是JSON。
当用户注册成功后,我们就调用MessagingService.sendMailMessage()
发送一条JMS消息,此代码十分简单,这里不再贴出。
下面我们要详细讨论的是如何处理消息,即编写Consumer。从理论上讲,可以创建另一个Java进程来处理消息,但对于我们这个简单的Web程序来说没有必要,直接在同一个Web应用中接收并处理消息即可。
处理消息的核心代码是编写一个Bean,并在处理方法上标注@JmsListener
:
@Component
public class MailMessageListener {
final Logger logger = LoggerFactory.getLogger(getClass());
@Autowired ObjectMapper objectMapper;
@Autowired MailService mailService;
@JmsListener(destination = "jms/queue/mail", concurrency = "10")
public void onMailMessageReceived(Message message) throws Exception {
logger.info("received message: " + message);
if (message instanceof TextMessage) {
String text = ((TextMessage) message).getText();
MailMessage mm = objectMapper.readValue(text, MailMessage.class);
mailService.sendRegistrationMail(mm);
} else {
logger.error("unable to process non-text message!");
}
}
}
注意到@JmsListener
指定了Queue的名称,因此,凡是发到此Queue的消息都会被这个onMailMessageReceived()
方法处理,方法参数是JMS的Message
接口,我们通过强制转型为TextMessage
并提取JSON,反序列化后获得自定义的JavaBean,也就获得了发送邮件所需的所有信息。
下面问题来了:Spring处理JMS消息的流程是什么?
如果我们直接调用JMS的API来处理消息,那么编写的代码大致如下:
// 创建JMS连接:
Connection connection = connectionFactory.createConnection();
// 创建会话:
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建一个Consumer:
MessageConsumer consumer = session.createConsumer(queue);
// 为Consumer指定一个消息处理器:
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
// 在此处理消息...
}
});
// 启动接收消息的循环:
connection.start();
我们自己编写的MailMessageListener.onMailMessageReceived()
相当于消息处理器:
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
mailMessageListener.onMailMessageReceived(message);
}
});
所以,Spring根据AppConfig
的注解@EnableJms
自动扫描带有@JmsListener
的Bean方法,并为其创建一个MessageListener
把它包装起来。
注意到前面我们还创建了一个JmsListenerContainerFactory
的Bean,它的作用就是为每个MessageListener
创建MessageConsumer
并启动消息接收循环。
再注意到@JmsListener
还有一个concurrency
参数,10表示可以最多同时并发处理10个消息,5-10
表示并发处理的线程可以在5~10之间调整。
因此,Spring在通过MessageListener
接收到消息后,并不是直接调用mailMessageListener.onMailMessageReceived()
,而是用线程池调用,因此,要时刻牢记,onMailMessageReceived()
方法可能被多线程并发执行,一定要保证线程安全。
我们总结一下Spring接收消息的步骤:
通过JmsListenerContainerFactory
配合@EnableJms
扫描所有@JmsListener
方法,自动创建MessageConsumer
、MessageListener
以及线程池,启动消息循环接收处理消息,最终由我们自己编写的@JmsListener
方法处理消息,可能会由多线程同时并发处理。
要验证消息发送和处理,我们注册一个新用户,可以看到如下日志输出:
2020-06-02 08:04:27 INFO c.i.learnjava.web.UserController - user registered: [email protected]
2020-06-02 08:04:27 INFO c.i.l.service.MailMessageListener - received message: ActiveMQMessage[ID:9fc5...]:PERSISTENT/ClientMessageImpl[messageID=983, durable=true, address=jms/queue/mail, ...]]
2020-06-02 08:04:27 INFO c.i.learnjava.service.MailService - [send mail] sending registration mail to [email protected]...
2020-06-02 08:04:30 INFO c.i.learnjava.service.MailService - [send mail] registration mail was sent to [email protected].
可见,消息被成功发送到Artemis,然后在很短的时间内被接收处理了。
使用消息服务对发送Email进行改造的好处是,发送Email的能力通常是有限的,通过JMS消息服务,如果短时间内需要给大量用户发送Email,可以先把消息堆积在JMS服务器上慢慢发送,对于批量发送邮件、短信等尤其有用。
ActiveMQ支持基于队列和主题两种模式,即Queue和Topic。
1.基于队列(Queue)的消息系统:在基于队列的消息系统中,生产者将消息发送到队列中,而消费者则从队列中获取消息。队列是一种点对点的通信模型,每个消息只能被一个消费者处理。当有多个消费者连接到同一个队列时,队列将进行负载均衡,以确保每个消费者都能获得相同数量的消息。
2.基于主题(Topic)的消息系统:在基于主题的消息系统中,生产者将消息发送到主题中,而消费者则从主题中订阅消息。主题是一种发布/订阅的通信模型,多个消费者可以同时订阅同一个主题,并且每个消费者都可以接收到相同的消息。当生产者向主题发送一条消息时,所有订阅该主题的消费者都将收到这条消息。
需要注意的是,在使用ActiveMQ时,可以同时支持基于队列和基于主题的消息系统。例如,在订阅主题时可以使用持久订阅来实现基于队列的语义和可靠性,或者使用选择器(Selector)来实现类似于过滤器的功能,以便更精细地控制消息的传递和处理。
总之,在设计和实现基于ActiveMQ的队列和主题的消息系统时,需要根据应用需求和场景选择合适的通信模型,同时还需要考虑消息的路由、传递、确认和处理等方面的问题,以确保消息的可靠传递和高效处理。
文章浏览阅读4.6k次。10月11日,联想集团在全球总部未来中心举行了主题为“解密X空间”的新品发布会,正式发布了LEGIONY9000X笔记本电脑,并公布了“联想个人云存储核心测试用户招募”计划。高性能标压轻薄本 LEGION Y9000X赋能内容创造者“你正在用的笔记本是游戏本还是轻薄本?”通过这样的一个问题,联想中国区消费业务笔记本产品规划总监林林,拉开了“解密X空间”的序幕,带来重磅新品——高性能标..._y9000x写代码够用吗
文章浏览阅读3.4w次,点赞2次,收藏29次。前一阵研究强化学习,发现中文的资料非常少,实例就更少。于是翻译一篇q学习算法的教程,供需要的人学习。原文链接:http://mnemstudio.org/path-finding-q-learning-tutorial.htm正文:Q学习算法是一种用来解决马尔可夫决策过程中最优化问题的方法。Q学习算法最大的特点是它具有选择瞬时奖励和延迟奖励的能力。在每一步中,agent通过观察状态_强化学习教程
文章浏览阅读81次。后端:Java+SpringBoot前端:Vue数据库:MySQL开发软件:Eclipse、MyEclipse、IDEA都可以运行。_基于vue+springboot的校园二手商品交易网站论文
文章浏览阅读231次。对于每个前端从业者来说,除了F5键之外,用的最多的另外一个键就是F12了。今天,大神(@小鱼二)推荐我一个网站,才知道chrome还有各种骚姿势。网站是:umaar.com/dev-tip...
文章浏览阅读2k次。【jeecg-boot】jeecg-boot的一些功能扩展:_jeecg-boot
文章浏览阅读2.7k次。首先确保你的电脑有安装git环境,本人使用的是windows下的git环境。双击桌面图标 的Git Bash 打开窗口修改配置git config --global user.namegit config --global user.email如:git config --global user.name "muzidigbig"git config --glo..._gitlab 18: transfer closed with outstanding read data remaining
文章浏览阅读164次。第一章 未恋先失<?xml:namespace prefix = o ns = "urn:schemas-microsoft-com:office:office" />初中时代的我,还是一个单纯的女孩,对于爱情,以为是眼泪制造出来的。我的圈子并不大,只有几个要好的女生,彭老二,周薇,秋毛。彭老二是个大嘴,校园里发生了什么事情她总是最先知道,通过她的大嘴,什么八卦新闻都逃..._操小帅
文章浏览阅读4.4k次,点赞2次,收藏23次。小波变换3级分解Mallat图:将带噪语音作为输入信号进行逐级DWT小波分解,并将分解出的低频成分cA3cA_3cA3与强制置0后的高频成分cD3cD_3cD3,cD2cD_2cD2,cD1cD_1cD1进行小波重构。Demo:clc,clear[x,Fs]= audioread('MUsic_Test.wav');snr = 20; %设定信噪比,单位dbnoise = randn(size(x)); % 用randn函数产生高斯白噪声Nx = length(x_db4小波
文章浏览阅读8.3k次,点赞5次,收藏34次。首先需要安装 snmp ,使用下面的命令进行安装安装完毕之后,使用下面的命令查看是否安装成功当命令行显示如图即为安装成功。_snmp工具
文章浏览阅读6.4k次,点赞5次,收藏40次。练习打字的官网:http://dazi.kukuw.com/关于打字的详细介绍:一个过来人的打字指法纠正之路_怎么敲键盘
文章浏览阅读9.6k次,点赞3次,收藏68次。一,网络安全体系结构网络安全体系结构是对网络信息安全基本问题的应对措施的集合,通常由保护,检测,响应和恢复等手段构成。1,网络信息安全的基本问题研究信息安全的困难在于:边界模糊数据安全与平台安全相交叉;存储安全与传输安全相制约;网络安全,应用安全与系统安全共存;集中的安全模式与分权制约安全模式相互竞争等。评估困难安全结构非常复杂,网络层,系统层,应用层的安全设备,安全协议和安全程序构成一个有机的整体,加上安全机制与人的互动性,网络的动态运行带来的易变性,使得评价网络安全性成为极_网络安全解决方案
文章浏览阅读1.2k次,点赞22次,收藏29次。QGIS在Windows下的编译——QGIS3.28.15 + Qt5.15.3 +CMake3.28.0 + VS2022 ---64位版本_qgis windows编译