PullInterval: 配置消费端拉取MQ消息的间隔时间 。间隔时间是按照上次消费完成之后(比如rocketMQ收到Ack回复消息之后) 。PullInterval=20s,比如上次rocketMq服务收到Ack消息是12:15:15,则 12:15:35再去拉消息 。
PullBatchSize: 消费端每个队列一次拉取多少个消息,若该消费端分赔了N个监控队列 , 每次拉取M个,那么消费端每次去rocketMq拉取的消息为N * M 。消费端每次pull到消息总数=PullBatchSize * 监听队列数,如 PullBatchSize = 2 , 监听队列=5,则 消息总数量 = 2 * 5 = 10 。
ThreadMin和ThreadMax: 消费端消费pull到的消息需要的线程数量 。
- ThreadMin:消费端拉取到消息后分配消费的线程数
- ThreadMax:最大消费线程,如果默认队列满了,则启用新的线程
RocketMq 逻辑消费队列数量的配置rocketMq 可以配置消费队列,如 queue Read1 , queue Read2,配置数量决定每次pull到的消息总数 。Rocket MQ 提供了读写队列数量的配置 。
消费端节点部署数量多节点消费端线程数量要比单节点消费线程数量多 , 理论上消费速度大于单节点,分治思维 。
1.5 消息的过滤在过滤消息的时候,标签模式简单而是用,可以筛选出你需要的数据 。如下:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("groupTest");consumer.subscribe("testTopic", MessageSelector.byTag("Tag1|| Tag2 || Tag3").bySql("sex = 'male' AND name = 'brand'));
这种情况下,消息中带有 Tag1 、Tag2、Tag3 标签就会被过滤出来,但是单个消限制息只能有一个标签 , 这就远远满足不了各种复杂的并交集场景的需要了 。这时候Rocket MQ可以在消息中设置一些属性,再使用SQL表达式筛选属性来过滤出需要的数据 。如下
------------| message||----------|sex = male AND name = 'brand' , Gotten| name = 'brand' || sex = 'male'|| age = 21|------------------------| message||----------|sex = male AND name = 'brand', Gotten , Missed| name = 'Anny'|| sex = 'female'|| age = 20 |------------
1.8 提高Consumer的处理能力 :看情况
- 提高消费并行度在同一个ConsumerGroup下(Clustering方式),可以通过增加Consumer实例的数量来提高并行度 。通过加机器,或者在已有机器中启动多个Consumer进程都可以增加Consumer实例数 。注意:总的Consumer数量不要超过Topic下Read Queue数量,超过的Consumer实例接收不到消息 。此外 , 通过提高单个Consumer实例中的并行处理的线程数,可以在同一个Consumer内增加并行度来提高吞吐量(设置方法是修改consumeThreadMin和consumeThreadMax) 。
- 以批量方式进行消费某些业务场景下,多条消息同时处理的时间会大大小于逐个处理的时间总和,比如消费消息中涉及update某个数据库,一次update10条的时间会大大小于十次update1条数据的时间 。可以通过批量方式消费来提高消费的吞吐量 。实现方法是设置Consumer的consumeMessageBatchMaxSize这个参数,默认是1,如果设置为N , 在消息多的时候每次收到的是个长度为N的消息链表 。
- 检测延时情况,跳过非重要消息Consumer在消费的过程中,如果发现由于某种原因发生严重的消息堆积 , 短时间无法消除堆积,这个时候可以选择丢弃不重要的消息,使Consumer尽快追上Producer的进度 。
2 消息消费的模式2.1 基本信息消费消费者的基本实现 , 连接 NameServer的地址,指定Topic和Tag,读取到需要消费的数据,然后轮询并处理 。
public class SimpleConsumerApplication {public static void main(String[] args) throws MQClientException {// 1.创建消费者Consumer , 并指定消费者组名为 testConsumGroupDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testConsumGroup");// 2.指定NameServer的地址,以获取Broker路由地址consumer.setNamesrvAddr("192.168.139.1:9876");// 3.指定Topic和Tag 信息 。* 代表所有consumer.subscribe("testTopic", "*");// 4.设置回调函数,用来处理读取到的消息, MessageListenerOrderly 用单个线程处理处理队列的数据consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgList, ConsumeOrderlyContext context) {for (MessageExt msg : msgList) {System.out.println("线程 " + Thread.currentThread().getName() + " : " + msg.getBody().toString());// Todo , 具体的业务逻辑}return ConsumeOrderlyStatus.SUCCESS;}});// 5.消费者开始执行消费任务consumer.start();}}
2.2 顺序消费相比与基本消费,多了一个ConsumeFromWhere的设置 。代表消费者从哪个位置开始消费,枚举如下:
推荐阅读
-
-
-
-
-
-
-
2023武汉中考志愿填报规则详解 2021年武汉中考志愿填报时间
-
关于深圳市万科教育发展基金会简述 深圳市万科教育发展基金会
-
没有足够的可用内存来运行此程序 没有足够的可用内存来运行程序
-
-
-
-
-
-
绝地求生刺激战场军团名字怎么改?军团名字修改攻略[多图]
-
-
-
OPPO Reno6简单评测 oppo reno6有nfc吗
-
-