ActiveMQ详细入门教程系列(一)_activemq入门教程-程序员宅基地

技术标签: activemq  java  消息中间件  队列  分布式  

在这里插入图片描述

一、什么是消息中间件

两个系统或两个客户端之间进行消息传送,利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。

消息中间件,总结起来作用有三个:异步化提升性能、降低耦合度、流量削峰。
在这里插入图片描述

系统A发送消息给中间件后,自己的工作已经完成了,不用再去管系统B什么时候完成操作。而系统B拉去消息后,执行自己的操作也不用告诉系统A执行结果,所以整个的通信过程是异步调用的。

二、消息中间件的应用场景

2.1 异步通信

有些业务不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

在这里插入图片描述

2.2 缓冲

在任何重要的系统中,都会有需要不同的处理时间的元素。消息队列通过一个缓冲层来帮助任务最高效率的执行,该缓冲有助于控制和优化数据流经过系统的速度。以调节系统响应时间。

2.3 解耦

降低工程间的强依赖程度,针对异构系统进行适配。在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。通过消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口,当应用发生变化时,可以独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
在这里插入图片描述

2.4 冗余

有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。

2.5 扩展性

因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。便于分布式扩容。

2.6 可恢复性

系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

2.7 顺序保证

在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。

2.8 过载保护

在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量无法提取预知;如果以为了能处理这类瞬间峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

2.9 数据流处理

分布式系统产生的海量数据流,如:业务日志、监控数据、用户行为等,针对这些数据流进行实时或批量采集汇总,然后进行大数据分析是当前互联网的必备技术,通过消息队列完成此类数据收集是最好的选择。

三、常用消息队列(ActiveMQ、RabbitMQ、RocketMQ、Kafka)比较

特性MQ ActiveMQ RabbitMQ RocketMQ Kafka
生产者消费者模式 支持 支持 支持 支持
发布订阅模式 支持 支持 支持 支持
请求回应模式 支持 支持 不支持 不支持
Api完备性
多语言支持 支持 支持 java 支持
单机吞吐量 万级 万级 万级 十万级
消息延迟 微秒级 毫秒级 毫秒级
可用性 高(主从) 高(主从) 非常高(分布式) 非常高(分布式)
消息丢失 理论上不会丢失 理论上不会丢失
文档的完备性
提供快速入门
社区活跃度
商业支持 商业云 商业云

四、消息中间件的角色

Queue: 队列存储,常用与点对点消息模型 ,默认只能由唯一的一个消费者处理。一旦处理消息删除。

Topic: 主题存储,用于订阅/发布消息模型,主题中的消息,会发送给所有的消费者同时处理。只有在消息可以重复处 理的业务场景中可使用,Queue/Topic都是 Destination 的子接口

ConnectionFactory: 连接工厂,客户用来创建连接的对象,例如ActiveMQ提供的ActiveMQConnectionFactory

Connection: JMS Connection封装了客户与JMS提供者之间的一个虚拟的连接。

Destination: 消息的目的地,目的地是客户用来指定它生产的消息的目标和它消费的消息的来源的对象。JMS1.0.2规范中定义了两种消息传递域:点对点(PTP)消息传递域和发布/订阅消息传递域。

点对点消息传递域的特点如下:

  • 每个消息只能有一个消费者。
  • 消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,它都可以提取消息。

发布/订阅消息传递域的特点如下:

  • 每个消息可以有多个消费者。
  • 生产者和消费者之间有时间上的相关性。
  • 订阅一个主题的消费者只能消费自它订阅之后发布的消息。JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求 。持久订阅允许消费者消费它在未处于激活状态时发送的消息。
    在点对点消息传递域中,目的地被成为队列(queue);在发布/订阅消息传递域中,目的地被成为主题(topic)。

五、JMS的消息格式

JMS消息由以下三部分组成的:

  • 消息头:

    每个消息头字段都有相应的getter和setter方法。

  • 消息属性:

    如果需要除消息头字段以外的值,那么可以使用消息属性。

  • 消息体:

    JMS定义的消息类型有TextMessage、MapMessage、BytesMessage、StreamMessage和ObjectMessage。

消息类型:

属性 类型
TextMessage 文本消息
MapMessage k/v
BytesMessage 字节流
StreamMessage java原始的数据流
ObjectMessage 序列化的java对象

六、消息可靠性机制

只有在被确认之后,才认为已经被成功地消费了,消息的成功消费通常包含三个阶段 :客户接收消息、客户处理消息和消息被确认在事务性会话中,当一个事务被提交的时候,确认自动发生。在非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)。该参数有以下三个可选值:

  • Session.AUTO_ACKNOWLEDGE:当客户成功的从receive方法返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。
  • Session.CLIENT_ACKNOWLEDGE:客户通过消息的acknowledge方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消费的消息。例如,如果一个消息消费者消费了10个消息,然后确认第5个消息,那么所有10个消息都被确认。
  • Session.DUPS_ACKNOWLEDGE:该选择只是会话迟钝的确认消息的提交。如果JMS Provider失败,那么可能会导致一些重复的消息。如果是重复的消息,那么JMS Provider必须把消息头的JMSRedelivered字段设置为true。
6.1 优先级

可以使用消息优先级来指示JMS Provider首先提交紧急的消息。优先级分10个级别,从0(最低)到9(最高)。如果不指定优先级,默认级别是4。需要注意的是,JMS Provider并不一定保证按照优先级的顺序提交消息。

6.2 消息过期

可以设置消息在一定时间后过期,默认是永不过期。

6.3 临时目的地

可以通过会话上的createTemporaryQueue方法和createTemporaryTopic方法来创建临时目的地。它们的存在时间只限于创建它们的连接所保持的时间。只有创建该临时目的地的连接上的消息消费者才能够从临时目的地中提取消息。

七、什么是ActiveMQ

ActiveMQ是一种开源的基于JMS(Java Message Servie)规范的一种消息中间件的实现,ActiveMQ的设计目标是提供标准的,面向消息的,能够跨越多语言和多系统的应用集成消息通信中间件。

官网地址:http://activemq.apache.org/

7.1 存储方式

1. KahaDB存储: KahaDB是默认的持久化策略,所有消息顺序添加到一个日志文件中,同时另外有一个索引文件记录指向这些日志的存储地址,还有一个事务日志用于消息回复操作。是一个专门针对消息持久化的解决方案,它对典型的消息使用模式进行了优化

特性:
1、日志形式存储消息;
2、消息索引以 B-Tree 结构存储,可以快速更新;
3、 完全支持 JMS 事务;
4、支持多种恢复机制kahadb 可以限制每个数据文件的大小。不代表总计数据容量。

2. AMQ 方式: 只适用于 5.3 版本之前。 AMQ 也是一个文件型数据库,消息信息最终是存储在文件中。内存中也会有缓存数据。

3. JDBC存储 : 使用JDBC持久化方式,数据库默认会创建3个表,每个表的作用如下:

activemq_msgs:queue和topic的消息都存在这个表中
activemq_acks:存储持久订阅的信息和最后一个持久订阅接收的消息ID
activemq_lock:跟kahadb的lock文件类似,确保数据库在某一时刻只有一个broker在访问

4. LevelDB存储 : LevelDB持久化性能高于KahaDB,但是在ActiveMQ官网对LevelDB的表述:LevelDB官方建议使用以及不再支持,推荐使用的是KahaDB

5.Memory 消息存储: 顾名思义,基于内存的消息存储,就是消息存储在内存中。persistent=”false”,表示不设置持 久化存储,直接存储到内存中,在broker标签处设置。

7.2 协议

协议官网API:http://activemq.apache.org/configuring-version-5-transports.html

  • Transmission Control Protocol (TCP):

    1. 这是默认的Broker配置,TCP的Client监听端口是61616。
    2. 在网络传输数据前,必须要序列化数据,消息是通过一个叫wire protocol的来序列化成字节流。默认情况下,ActiveMQ把wire protocol叫做OpenWire,它的目的是促使网络上的效率和数据快速交互。
    3. TCP连接的URI形式:tcp://hostname:port?key=value&key=value
    4. TCP传输的优点:

      (1)TCP协议传输可靠性高,稳定性强
      (2)高效性:字节流方式传递,效率很高
      (3)有效性、可用性:应用广泛,支持任何平台

  • New I/O API Protocol(NIO)

    1. NIO协议和TCP协议类似,但NIO更侧重于底层的访问操作。它允许开发人员对同一资源可有更多的client调用和服务端有更多的负载。

    2. 适合使用NIO协议的场景:

      (1)可能有大量的Client去链接到Broker上一般情况下,大量的Client去链接Broker是被操作系统的线程数所限制的。因此,NIO的实现比TCP需要更少的线程去运行,所以建议使用NIO协议
      (2)可能对于Broker有一个很迟钝的网络传输NIO比TCP提供更好的性能

    3. NIO连接的URI形式:nio://hostname:port?key=value

    4. Transport Connector配置示例:

<transportConnectors>
  <transportConnector
    name="tcp"
    uri="tcp://localhost:61616?trace=true" />
  <transportConnector
    name="nio"
    uri="nio://localhost:61618?trace=true" />
</transportConnectors>
  • User Datagram Protocol(UDP)
    1:UDP和TCP的区别
    (1)TCP是一个原始流的传递协议,意味着数据包是有保证的,换句话说,数据包是不会被复制和丢失的。UDP,另一方面,它是不会保证数据包的传递的
    (2)TCP也是一个稳定可靠的数据包传递协议,意味着数据在传递的过程中不会被丢失。这样确保了在发送和接收之间能够可靠的传递。相反,UDP仅仅是一个链接协议,所以它没有可靠性之说
    2:从上面可以得出:TCP是被用在稳定可靠的场景中使用的;UDP通常用在快速数据传递和不怕数据丢失的场景中,还有ActiveMQ通过防火墙时,只能用UDP
    3:UDP连接的URI形式:udp://hostname:port?key=value
    4:Transport Connector配置示例:
<transportConnectors>
    <transportConnector
        name="udp"
        uri="udp://localhost:61618?trace=true" />
</transportConnectors>
  • Secure Sockets Layer Protocol (SSL)
    1:连接的URI形式:ssl://hostname:port?key=value
    2:Transport Connector配置示例:
<transportConnectors>
    <transportConnector name="ssl" uri="ssl://localhost:61617?trace=true"/>
</transportConnectors>

八、案例(Hello World)

这里以windows为案例演示

下载地址:http://activemq.apache.org/components/classic/download/

8.1 安装启动

解压后直接执行
bin/win64/activemq.bat

在这里插入图片描述

8.2 web控制台

http://localhost:8161/
账号密码:admin/admin

在这里插入图片描述
在这里插入图片描述

8.3 web控制台

修改 ActiveMQ 配置文件 activemq/conf/jetty.xml
jettyport节点: 配置文件修改完毕,保存并重新启动 ActiveMQ 服务

 <bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
             <!-- the default port number for the web console -->
        <property name="host" value="127.0.0.1"/>
        <property name="port" value="8161"/>
    </bean>
8.4 开发

1. jar引入:

   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-activemq</artifactId>
   </dependency>

2. Sender :

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

/**
 * @program: activemq_01
 * @ClassName Sender
 * @description: 消息发送
 * @author: muxiaonong
 * @create: 2020-10-02 13:01
 * @Version 1.0
 **/
public class Sender {
    

    public static void main(String[] args) throws Exception{
    
        // 1. 获取连接工厂
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
                ActiveMQConnectionFactory.DEFAULT_USER,
                ActiveMQConnectionFactory.DEFAULT_PASSWORD,
                "tcp://localhost:61616"
        );

        // 2. 获取一个向activeMq的连接
        Connection connection = factory.createConnection();
        // 3. 获取session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 4.找目的地,获取destination,消费端,也会从这个目的地取消息
        Queue queue = session.createQueue("user");

        // 5.1 消息创建者
        MessageProducer producer = session.createProducer(queue);

        // consumer --> 消费者
        // producer --> 创建者
        // 5.2. 创建消息
        for (int i = 0; i < 100; i++) {
    
            TextMessage textMessage = session.createTextMessage("hi:"+i);
            // 5.3 向目的地写入消息
            producer.send(textMessage);
            Thread.sleep(1000);
        }

        // 6.关闭连接
        connection.close();

        System.out.println("结束。。。。。");

    }
}

3. Receiver :


import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;


/**
 * @program: activemq_01
 * @ClassName Receiver
 * @description: 消息接收
 * @author: muxiaonong
 * @create: 2020-10-02 13:01
 * @Version 1.0
 **/
public class Receiver {
    

    public static void main(String[] args) throws Exception{
    
        // 1. 获取连接工厂
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
                ActiveMQConnectionFactory.DEFAULT_USER,
                ActiveMQConnectionFactory.DEFAULT_PASSWORD,
                "tcp://localhost:61616"
        );

        // 2. 获取一个向activeMq的连接
        Connection connection = factory.createConnection();
        connection.start();

        // 3. 获取session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 4.找目的地,获取destination,消费端,也会从这个目的地取消息
        Destination queue = session.createQueue("user");

        // 5 获取消息
        MessageConsumer consumer = session.createConsumer(queue);

        while(true){
    
            TextMessage message = (TextMessage)consumer.receive();
            System.out.println("message:"+message.getText());
        }

    }
}

测试结果:

message:hi:38
message:hi:39
message:hi:40
message:hi:41
message:hi:42
message:hi:43
message:hi:44
message:hi:45

web后台显示有一个消费者处于连接状态,且已消费了68个message,而该条队列已没有message待消费了
在这里插入图片描述

九、总结

今天的MQ入门教程系列就这里了,感兴趣的小伙伴可以试试,遇到了什么问题,或者有疑问的,都可以在下方留言,小农看见了会第一时间回复大家,MQ作为一个消息中间件,不管是面试还是工作中都会经常用到,所以是很有必要去了解和学习的一个技术点,今天的分享就到这里了,谢谢各位小伙伴的观看,我们下篇文章见,大家加油!

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/qq_14996421/article/details/108987860

智能推荐

Oracle EBS Forms查看trace file_ebs form trace-程序员宅基地

文章浏览阅读7.5k次。Introduction:Some times we need to diagnose the issue or error coming in forms. For such situation we need to get more information about the issue we are facing in forms. One of the best way to get su_ebs form trace

SpringBoot集成Spring Security(3)——异常处理_disabledexception-程序员宅基地

文章浏览阅读2.5w次,点赞25次,收藏92次。Step1 常见异常Step2 源码分析Step3 处理异常不知道你有没有注意到,当我们登陆失败时候,spring security帮我们跳转到了/login?error,奇怪的是不管是控制台还是网页上都没有打印错误信息。这是因为首先/login?error是spring security默认的失败url,其次如果你不手动处理这个异常,这个异常是不会被处理..._disabledexception

webview 加载h5页面,播放视频+全屏,实现简单封装_webview 加载的时候 可以直接播放视频吗-程序员宅基地

文章浏览阅读1.8w次,点赞4次,收藏7次。前段时间项目中用到了h5。从目前的市场来看,原生和h5的结合受到很多公司的欢迎,刚好最近微信也推出了它自己的“小程序”,这在Android程序员之间也掀起了波澜,引起大家讨论。个人觉得Google提供的webview有很多的坑,我这次就踩了不少,比如在某些版本的系统上某个方法不会执行,或者执行的顺序不一样,有的方法会多执行一次,说白了就是兼容性做的很差,究其主要原因,是Android4_webview 加载的时候 可以直接播放视频吗

ComfyUI 一键整合包- AIStarter启动器专属_comfyui-aki-v1.1.7z-程序员宅基地

文章浏览阅读532次,点赞9次,收藏7次。AIStarter使用教程及注意事项 -AIStarter tutorials and notes on how to use it。_comfyui-aki-v1.1.7z

【linu相关】Ubuntu 文件系统相关命令_bantu系统命令-程序员宅基地

文章浏览阅读8k次,点赞4次,收藏35次。linux系统(以Ubuntu系统为例)的文件系统Linux下“/”就是根目录,所有的目录都是有根目录衍生出来的。/是一切目录的起点,如大树的主干。其它的所有目录都是基于树干的枝条或者枝叶。在ubuntu中硬件设备如光驱、软驱、usb设备都将挂载到这颗繁茂的枝干之下,作为文件来管理。_bantu系统命令

有效解决linux后台运行python脚本_sh 脚本后台执行还在输出-程序员宅基地

文章浏览阅读4.4k次。1.sh test.sh &将sh test.sh任务放到后台 ,即使关闭xshell退出当前session依然继续运行,但标准输出和标准错误信息会丢失(缺少的日志的输出)将sh test.sh任务放到后台 ,关闭xshell,对应的任务也跟着停止。2. nohup sh test.sh将sh test.sh任务放到后台,关闭标准输入,终端不再能够接收任何输入(标准输入),重定向标..._sh 脚本后台执行还在输出

随便推点

Mysql 日志分析工具介绍_mysql日志分析工具-程序员宅基地

文章浏览阅读1.1w次。1. 工具简介pt-query-digest是用于分析mysql慢查询的一个工具,它可以分析binlog、General log、slowlog,也可以通过SHOWPROCESSLIST或者通过tcpdump抓取的MySQL协议数据来进行分析。可以把分析结果输出到文件中,分析过程是先对查询语句的条件进行参数化,然后对参数化以后的查询进行分组统计,统计出各查询的执行时间、次数、占比等,可以借助分_mysql日志分析工具

javaweb基于SSH开发校园社团管理系统源码 课程设计 大作业 毕业设计_tp6校园社团管理系统-程序员宅基地

文章浏览阅读106次。开发校园社团管理系统(大作业/毕业设计)+Jdk+Tomcat+MYSQL数据库。开发环境: Windows操作系统。_tp6校园社团管理系统

网络安全学习--用户和用户组_组或用户名-程序员宅基地

文章浏览阅读1.1k次。文章基于Windows2003服务器系统版本Windows:win200,win2003,win2008r2,win2012Linux:Redhat,CentOS用户每个用户登录系统后,拥有不同的操作权限每个账户有自己唯一的SID(安全标识符)用户SID:S-1-5-21-42342423434-1433343434-500系统SID:S-1-5-21-42342423434-1433343434用户ID:500windows系统管理员administrator的UID是500普_组或用户名

Java注解 编译_Java注解之编译时注解-程序员宅基地

文章浏览阅读427次。新建两个moduleannotation用来定义注解compiler用来编写处理注解的代码这两个module都要选择Java Library 那为什么要拆分两个module呢,因为编译期注解的处理代码是只在代码编译的时候使用的,所以这些代码要和主module分开拆成compiler,但是compiler又依赖于注解,主module也要使用注解。所以就将注解的定义也拆分出来。这样做的好处是可以..._bw.append

mini2440 uart 裸机c-程序员宅基地

文章浏览阅读66次。转载于:http://blog.csdn.net/yx_l128125/article/details/7703653Uart工作原理:数据通信方式为:并行通信与串行通信两种:§并行通信:利用多条数据线将数据的各位同时传送。它的特点是:传输速度快,是用于短距离通信;§串行通信:利用一条数据线将数据一位位地顺序传送。特点是通信线路简单,利用简单的线缆就实现通信,低成本..._mini2440 nand uart 裸机

解决“libboost_thread.so.xxx.xxx.xxx: cannot open shared object file: No such file or directory”问题-程序员宅基地

文章浏览阅读2.8k次。使用boost库的时候,编译没问题,但是运行的时候报“libboost_thread.so.xxx.xxx.xxx: cannot open shared object file: No such file or directory”错误,解决方法如下: 执行:sudo ldconfig /usr/local/boost_xxx_xxx_xxx/stage/lib..._libboost_thread.so

推荐文章

热门文章

相关标签