RocketMQ进击(五)集群消费模式与广播消费模式_rocketmq 集群模式换广播模式-程序员宅基地

技术标签: Java  广播消费模式  集群消费模式  mq的两种消费方式  RocketMQ  

楔子:新一天的旅程,掠过天空海湾,越过低谷高山,跃过深渊浅滩,在天南地北,走两江四岸,与日月星辰,看锦绣山河。

 

1. 两种消费模式

RocketMQ 有两种消费模式:

集群消费模式:CLUSTERING,可以理解为同组公共消费。公共资源我拿了你就没有。即同一 Topic 下,一个 ConsumerGroup 下如果有多个实例(可以是多个进程,或者多个机器),那么这些实例会均摊消费这些消息,但我消费了这条消费你就不会再消费。消费者默认是集群消费方式适用于大部分消息业务。

广播消费模式:BROADCASTING,可以理解为同组各自消费。即同一 Topic 下,同一消息会被多个实例各自都消费一次。所以,广播消费模式中的 ConsumerGroup 概念没有太大的意义。这适用于一些分发消息的场景。

 

1.1. 集群消费模式

1.1.1 生产者

package com.meiwei.service.mq.tcp.producer;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class ClusteringMqProducer {

    // Topic 为 Message 所属的一级分类,就像学校里面的初中、高中
    // Topic 名称长度不得超过 64 字符长度限制,否则会导致无法发送或者订阅
    private static final String MQ_CONFIG_TOPIC = "TOPIC_MEIWEI_SMS_NOTICE_TEST";

    // Tag 为 Message 所属的二级分类,比如初中可分为初一、初二、初三;高中可分为高一、高二、高三
    private static final String MQ_CONFIG_TAG_PUSH = "PID_MEIWEI_SMS_CLUSTERING";

    public static void main(String[] args) throws Exception {
        // 声明并实例化一个 producer 生产者来产生消息
        // 需要一个 producer group 名字作为构造方法的参数
        DefaultMQProducer producer = new DefaultMQProducer("meiwei-producer-clustering");

        // 指定 NameServer 地址列表,多个nameServer地址用半角分号隔开。此处应改为实际 NameServer 地址
        // NameServer 的地址必须有,但也可以通过启动参数指定、环境变量指定的方式设置,不一定要写死在代码里
        producer.setNamesrvAddr("127.0.0.1:9876");

        // 在发送MQ消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
        producer.start();

        // 循环发送MQ测试消息
        String content = "";
        for (int i = 0; i < 10; i++) {
            // 配置容灾机制,防止当前消息异常时阻断发送流程
            try {
                content = "【MQ测试消息】测试消息 " + i;

                // Message Body 可以是任何二进制形式的数据,消息队列不做任何干预,需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
                Message message = new Message(MQ_CONFIG_TOPIC, MQ_CONFIG_TAG_PUSH, "KEY" + i, content.getBytes(RemotingHelper.DEFAULT_CHARSET));

                // 发送消息
                SendResult sendResult = producer.send(message);

                // 日志打印
                System.out.printf("Send MQ message success! Topic: %s,Tag: %s, msgId: %s, Message: %s %n",
                        message.getTopic(), message.getTags(), sendResult.getMsgId(), new String(message.getBody()));
            } catch (Exception e) {
                // 消息发送失败
                System.out.printf("%-10d Exception %s %n", i, e);
                e.printStackTrace();
            }
        }

        // 在发送完消息之后,销毁 Producer 对象。如果不销毁也没有问题
        producer.shutdown();
    }
}

1.1.2 集群消费模式消费

package com.meiwei.service.mq.tcp.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * 集群消息模式
 */
public class ClusteringMqConsumer {

    // Message 所属的 Topic 一级分类,须要与提供者的频道保持一致才能消费到消息内容
    private static final String MQ_CONFIG_TOPIC = "TOPIC_MEIWEI_SMS_NOTICE_TEST";
    private static final String MQ_CONFIG_TAG_PUSH = "PID_MEIWEI_SMS_CLUSTERING";

    public static void main(String[] args) throws Exception {

        // 声明并初始化一个 consumer
        // 需要一个 consumer group 名字作为构造方法的参数
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("meiwei-consumer-clustering");

        // 同样也要设置 NameServer 地址,须要与提供者的地址列表保持一致
        consumer.setNamesrvAddr("127.0.0.1:9876");

        // 设置 consumer 所订阅的 Topic 和 Tag,*代表全部的 Tag
        consumer.subscribe(MQ_CONFIG_TOPIC, MQ_CONFIG_TAG_PUSH);

        // 注册消息监听者
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                list.forEach(mq->{
                    System.out.printf("Thread: %s, Host: %s, Key: %s, QueueId: %s, Topic: %s, Tags: %s, Message: %s",
                            Thread.currentThread().getName(),
                            mq.getBornHost(),
                            mq.getKeys(),
                            mq.getQueueId(),
                            mq.getTopic(),
                            mq.getTags(),
                            new String(mq.getBody()));
                    System.out.println();
                });

                // 返回消费状态
                // CONSUME_SUCCESS 消费成功
                // RECONSUME_LATER 消费失败,需要稍后重新消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 调用 start() 方法启动 consumer
        consumer.start();
        System.out.println("Clustering Consumer Started.");
    }
}

注:将此消费者拷贝一份为 ClusteringMqConsumer2,其它不动,以便测试。

 

1.1.3 测试及结果

启动集群消费者1、集群消费者2 和 消息生产者。

消息生产者(Producer)发送结果:

集群消费模式 消费者1 消费结果:

集群消费模式 消费者2 消费结果:

可以看到两个消费者是在多线程下的分摊消费效果。

 

1.2. 广播消费模式

1.2.1 生产者

package com.meiwei.service.mq.tcp.producer;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class BroadcastingMqProducer {

    // Topic 为 Message 所属的一级分类,就像学校里面的初中、高中
    // Topic 名称长度不得超过 64 字符长度限制,否则会导致无法发送或者订阅
    private static final String MQ_CONFIG_TOPIC = "TOPIC_MEIWEI_SMS_NOTICE_TEST";

    // Tag 为 Message 所属的二级分类,比如初中可分为初一、初二、初三;高中可分为高一、高二、高三
    private static final String MQ_CONFIG_TAG_PUSH = "PID_MEIWEI_SMS_BROADCASTING";

    public static void main(String[] args) throws Exception {
        // 声明并实例化一个 producer 生产者来产生消息
        // 需要一个 producer group 名字作为构造方法的参数
        DefaultMQProducer producer = new DefaultMQProducer("meiwei-producer-broadcasting");

        // 指定 NameServer 地址列表,多个nameServer地址用半角分号隔开。此处应改为实际 NameServer 地址
        // NameServer 的地址必须有,但也可以通过启动参数指定、环境变量指定的方式设置,不一定要写死在代码里
        producer.setNamesrvAddr("127.0.0.1:9876");

        // 在发送MQ消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
        producer.start();

        // 循环发送MQ测试消息
        String content = "";
        for (int i = 0; i < 10; i++) {
            // 配置容灾机制,防止当前消息异常时阻断发送流程
            try {
                content = "【MQ测试消息】测试消息 " + i;

                // Message Body 可以是任何二进制形式的数据,消息队列不做任何干预,需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
                Message message = new Message(MQ_CONFIG_TOPIC, MQ_CONFIG_TAG_PUSH, "KEY" + i, content.getBytes(RemotingHelper.DEFAULT_CHARSET));

                // 发送消息
                SendResult sendResult = producer.send(message);

                // 日志打印
                System.out.printf("Send MQ message success! Topic: %s,Tag: %s, msgId: %s, Message: %s %n",
                        message.getTopic(), message.getTags(), sendResult.getMsgId(), new String(message.getBody()));
            } catch (Exception e) {
                // 消息发送失败
                System.out.printf("%-10d Exception %s %n", i, e);
                e.printStackTrace();
            }
        }

        // 在发送完消息之后,销毁 Producer 对象。如果不销毁也没有问题
        producer.shutdown();
    }
}

1.2.2. 广播消费模式消费

package com.meiwei.service.mq.tcp.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;

/**
 * 广播消费模式(Broadcasting)
 * 广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。
 */
public class BroadcastingMqConsumer {

    // Message 所属的 Topic 一级分类,须要与提供者的频道保持一致才能消费到消息内容
    private static final String MQ_CONFIG_TOPIC = "TOPIC_MEIWEI_SMS_NOTICE_TEST";
    private static final String MQ_CONFIG_TAG_PUSH = "PID_MEIWEI_SMS_BROADCASTING";
    private static final String MQ_CONFIG_TAG_PUSH_OTHER = "PID_MEIWEI_SMS_OTHER";

    public static void main(String[] args) throws Exception {

        // 声明并初始化一个 consumer
        // 需要一个 consumer group 名字作为构造方法的参数
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("meiwei-consumer-broadcasting");

        // 同样也要设置 NameServer 地址,须要与提供者的地址列表保持一致
        consumer.setNamesrvAddr("127.0.0.1:9876");

        // 设置广播模式
        consumer.setMessageModel(MessageModel.BROADCASTING);

        // 设置 consumer 所订阅的 Topic 和 Tag,*代表全部的 Tag
        consumer.subscribe(MQ_CONFIG_TOPIC, MQ_CONFIG_TAG_PUSH + " || " + MQ_CONFIG_TAG_PUSH_OTHER);

        // 注册消息监听者
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                list.forEach(mq->{
                    System.out.printf("Thread: %s, Host: %s, Key: %s, QueueId: %s, Topic: %s, Tags: %s, Message: %s",
                            Thread.currentThread().getName(),
                            mq.getBornHost(),
                            mq.getKeys(),
                            mq.getQueueId(),
                            mq.getTopic(),
                            mq.getTags(),
                            new String(mq.getBody()));
                    System.out.println();
                });

                // 返回消费状态
                // CONSUME_SUCCESS 消费成功
                // RECONSUME_LATER 消费失败,需要稍后重新消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 调用 start() 方法启动 consumer
        consumer.start();
        System.out.println("Broadcasting Consumer Started.");
    }
}

注:

  1. 将此消费者拷贝一份为 BroadcastingMqConsumer2,其它不动。
  2. 再拷贝一份为 BroadcastingMqConsumer3,修改其 ConsumerGroup 参数为 meiwei-consumer-broadcasting3。

这样得到 Consumer 和 Consumer2 同组,又与 Consumer3 不同组的消费者实例,以便测试。

 

1.2.3. 测试及结果

分别启动广播消费者1、广播消费者2 和 消息生产者。

消息生产者(Producer)发送结果:

广播消费模式 消息者1(BroadcastingMqConsumer)消费结果:

广播消费模式 消息者2(BroadcastingMqConsumer2)消费结果:

可以看到两个消息消费者都收到了同样的消息。

再启动广播消费者3(BroadcastingMqConsumer3),再次执行消息生产者,广播消费者3同样消费了所有消息。

 

2. 源码分析

MessageModel:

package org.apache.rocketmq.common.protocol.heartbeat;

public enum MessageModel {
    BROADCASTING("BROADCASTING"),
    CLUSTERING("CLUSTERING");

    private String modeCN;

    private MessageModel(String modeCN) {
        this.modeCN = modeCN;
    }

    public String getModeCN() {
        return this.modeCN;
    }
}

DefaultMQPushConsumer:

    public DefaultMQPushConsumer(String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
        this.messageModel = MessageModel.CLUSTERING;
        this.consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
        this.consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - 1800000L);
        this.subscription = new HashMap();
        this.consumeThreadMin = 20;
        this.consumeThreadMax = 64;
        this.adjustThreadPoolNumsThreshold = 100000L;
        this.consumeConcurrentlyMaxSpan = 2000;
        this.pullThresholdForQueue = 1000;
        this.pullThresholdSizeForQueue = 100;
        this.pullThresholdForTopic = -1;
        this.pullThresholdSizeForTopic = -1;
        this.pullInterval = 0L;
        this.consumeMessageBatchMaxSize = 1;
        this.pullBatchSize = 32;
        this.postSubscriptionWhenPull = false;
        this.unitMode = false;
        this.maxReconsumeTimes = -1;
        this.suspendCurrentQueueTimeMillis = 1000L;
        this.consumeTimeout = 15L;
        this.consumerGroup = consumerGroup;
        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
        this.defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
    }

默认的 DefaultMQPushConsumer 内部定义了很多默认值,比如默认为集群消费方式,线程最小默认20,最大默认64,批量下拉消息默认32,默认一次消费一条消息等等。


参考资料:
RocketMQ 官网:http://rocketmq.apache.org/docs/motivation/
阿里云消息队列 MQ:https://help.aliyun.com/document_detail/29532.html
阿里巴巴中间件团队:http://jm.taobao.org/2016/11/29/apache-rocketmq-incubation/


RocketMQ进击物语:
RocketMQ进击(零)RocketMQ这个大水池子
RocketMQ进击(一)Windows环境下安装部署Apache RocketMQ
RocketMQ进击(二)一个默认生产者,两种消费方式,三类普通消息详解分析
RocketMQ进击(三)顺序消息与高速公路收费站
RocketMQ进击(四)定时消息(延时队列)
RocketMQ进击(五)集群消费模式与广播消费模式
RocketMQ进击(六)磕一磕RocketMQ的事务消息
RocketMQ进击(七)盘一盘RocketMQ的重试机制
RocketMQ进击(八)RocketMQ的日志收集Logappender
RocketMQ异常:RocketMQ顺序消息收不到或者只能收到一部分消息
RocketMQ异常:Unrecognized VM option 'MetaspaceSize=128m'

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/itanping/article/details/101070265

智能推荐

模型智能体开发之metagpt-多智能体实践

参照单智能体的测试case。

学习笔记-数据结构-线性表(2024-04-27)

扫描顺序表L的前半部分元素,对于元素L.data[i](0

matlab保存矩阵为txt,matlab保存矩阵成txt-程序员宅基地

文章浏览阅读2k次。将 A 定义成: A= 而 A(:,3)=[]; 将删除 A 的第三列 ,得 1 2 3 4 0 A= 1 2 4 0 10 8 6 4 2 10 8 4 2 1 2 0 1 0 1 2 1 0 2 4 1 0 4 2 4 0 4 §3 命令生成使用 MATLAB 命令生成矩阵一般......matlab 的各种数据读取(txt,dat,mat 等格式) ,文件打开 关闭 MATLAB 提供了多种..._matlab将矩阵保存txt

php中实现精确设置session过期时间的方法_php 把$_session某个值过期-程序员宅基地

文章浏览阅读6k次。大多数据情况下我们对于session过期时间使用的是默认设置的时间,而对于一些有特殊要求的情况下我们可以设置一下session过期时间。对此,可以在PHP中,设置php.ini,找到session.gc_maxlifetime = 1440 #(PHP5默认24分钟)这里你可以随便设置一下过期时间.但是有人说设置以后,好象不起作用!其实不是不起作用,而是因为系统默认:_php 把$_session某个值过期

idm线程怎么设置 idm线程数怎么上不去 idm免安装

IDM(Internet Download Manager)是一款流行的下载管理软件,IDM采用高级的多线程下载技术,可以将下载文件分成多个部分同时下载,从而提高下载速度,它因高效的下载速度和丰富的功能而受到用户的喜爱。接下来,我们学习idm线程怎么设置,idm线程数怎么上不去的内容。

[附源码]Python计算机毕业设计毕业生就业信息管理系统Django(程序+LW)_毕业生就业统计系统python代码-程序员宅基地

文章浏览阅读291次。岗位信息管理,在岗位信息管理页面中可查看职位名称、工作环境、岗位类型、薪资范围、工作性质、工作地点、经验要求、最低学历、企业邮箱、企业账号、企业名称、负责人、联系方式等内容,并根据需要进行查看评论、修改或删除等操作,如图5-7所示。岗位信息,在岗位信息页面中可通过填写职位名称、工作环境、岗位类型、工作性质、经验要求、企业邮箱、企业名称、联系方式、职位描述、薪资范围、工作地点、最低学历、企业账号、负责人等内容进行提交操作,如图5-13所示。或者 Mac OS;该项目含有源码、文档、程序。_毕业生就业统计系统python代码

随便推点

简单的网页制作期末作业——HTML+CSS+JavaScript小礼品购物商城网站_用html,js,css可以写个什么小项目-程序员宅基地

文章浏览阅读347次,点赞7次,收藏5次。网页作品编辑作品下载后可使用任意HTML编辑软件(如:`DW、HBuilder、NotePAD 、Vscode 、Sublime 、Webstorm、 Notepad++ 等任意HTML软件编辑修改网页)网页作品技术:Div+CSS、鼠标滑过特效、Table、导航栏效果、banner、表单、二级三级页面等,视频、 音频元素 、Flash,同时设计了logo(源文件),基本期末作业所需的知识点全覆盖。_用html,js,css可以写个什么小项目

使用TensorFlow训练神经网络进行价格预测-程序员宅基地

文章浏览阅读1.3k次。Using Deep Neural Networks for regression problems might seem like overkill (and quite often is), but for some cases where you have a significant amount of high dimensional data they can outperform an..._tensorflow 定价项目代码

用模态的方式打开自定义JDialog,并获取返回值_jdialog获取输入的值-程序员宅基地

文章浏览阅读1.7k次。打开模态对话框,获取用户输入的内容_jdialog获取输入的值

vue绑定数据,渲染页面时会出现变量闪烁_vue的变量一闪-程序员宅基地

文章浏览阅读439次。在使用vue绑定数据的时候,渲染页面时会出现变量闪烁,例如<div class="#app"> <p>{{value.name}}</p></div>在加载的时候会看到{{value.name}}原因:由于JavaScript去操作DOM,都会等待DOM加载完成(DOM ready)。对于vuejs、angularjs这些会在DOM ready完会才回去解析html view Template,所以对于Chrome这类快速的浏_vue的变量一闪

IGMP协议实验及配置_igmp实验-程序员宅基地

文章浏览阅读178次。简单的IGMP协议实验配置及结果验证_igmp实验

CentOS安装MariaDB数据库详细过程_mariadb安装时mysql -uroot -p报未找到命令-程序员宅基地

文章浏览阅读1.8k次。一、添加 MariaDB yum 仓库vi /etc/yum.repos.d/MariaDB.repo在该文件中添加以下内容保存:[mariadb]name = MariaDBbaseurl = http://yum.mariadb.org/10.2/centos7-amd64gpgkey=https://yum.mariadb.org/RPM-GPG-KEY-MariaDBgpgcheck=1二..._mariadb安装时mysql -uroot -p报未找到命令