rocketmq 删除topic一个topic可以多个producer吗

rocketmq-producer原理解析 - 博客频道 - CSDN.NET
时光清浅 愿心安的博客
分类:分布式组件
Producer随机与一个NameServer建立长连接,从NameServer获取topic的最新队列情况。Producer会向提供topic服务的master建立长连接,且定时向master发送心跳。
发送消息demo:
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
Message msg = new Message("TopicTest1",
"OrderID188",
("Hello MetaQ").getBytes());
SendResult sendResult = producer.send(msg);
producer.shutdown();
一:start()
在整个应用生命周期内,生产者只需要调用一次start方法来初始化以下工作:
如果没有指定namesrv地址,将会自动寻址
启动定时任务:从namsrv更新topic路由信息、清理已经挂掉的broker、向所有在线的broker master发送心跳…
启动负载均衡的服务:producer根据roundbin方式轮询topic下的所有队列来实现发送方的负载均衡。
二:DefaultMQProducer
send的内部实现原理伪代码如下:
private SendResult sendDefaultImpl(Message msg,......) {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
MessageQueue mq = topicPublishInfo.selectOneMessageQueue(lastBrokerName);
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
1:producer初始化的时候会从namesrv获取topic的路由信息更新到本地缓存,所以tryToFindTopicPublishInfo会先从本地缓存取,如果没取到再从namesrv获取最新的路由信息。
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (topicPublishInfo.isHaveTopicRouterInfo() || (topicPublishInfo != null &&
topicPublishInfo.ok())) {
return topicPublishI
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishI
2.发送消息时,当没有指定队列或队列选择器时,调用selectOneMessageQueue:使用轮询方式,返回一个队列。当指定队列选择器时,通过selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);获取队列。
3.消息发送
CommunicationMode:
SYNC:同步(默认),
ASYNC:异步(callback),
ONEWAY:单向,
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
switch (communicationMode) {
case ASYNC:
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
requestHeader,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
case ONEWAY:
case SYNC:
排名:千里之外
(1)(5)8102人阅读
&对消息队列+storm+hbase的框架很感兴趣,感觉好多事情可以做。。。hbase已经搭了,现在开始消息队列的选择:
开源的消息中间件有好多,RabbitMQ、ActiveMQ、ZeroMQ、kafka,还有淘宝的rocketMq, 前身是metaq,淘宝还有另一款消息中间件notify。
对淘宝开源的东东比较感兴趣,毕竟是经过生产验证。。。经受住了血与火的考验的。。。。
从rocketMq开搞吧。。。:
淘宝的RocketMQ是什么?
RocketMQ 是一款分布式、队列模型的消息中间件,具有以下特点:
能够保证严格的消息顺序
提供丰富的消息拉取模式
高效的订阅者水平扩展能力
实时的消息订阅机制
亿级消息堆积能力
Metaq3.0 版本改名,产品名称改为RocketMQ
先学习几个术语:
 Producer
消息生产者,负责产生消息,一般由业务系统负责产生消息。
 Consumer
消息消费者,负责消费消息,一般是后台系统负责异步消费。
 Push Consumer
Consumer的一种,应用通常向Consumer对象注册一个Listener接口,一旦收到消息,Consumer对象立 刻回调Listener接口方法。
 Pull Consumer
Consumer的一种,应用通常主动调用Consumer的拉消息方法从Broker拉消息,主动权由应用控制。
 Producer Group
一类Producer的集合名称,这类Producer通常发送一类消息,且发送逻辑一致。
 Consumer Group
一类Consumer的集合名称,这类Consumer通常消费一类消息,且消费逻辑一致。
 Broker
消息中转角色,负责存储消息,转发消息,一般也称为Server。在JMS规范中称为Provider。
 广播消费
一条消息被多个Consumer消费,即使这些Consumer属于同一个Consumer Group,消息也会被Consumer Group中的每个Consumer都消费一次,广播消费中的Consumer Group概念可以认为在消息划分方面无意义。
在CORBA Notification规范中,消费方式都属于广播消费。
在JMS规范中,相当于JMS publish/subscribe model
一个Consumer Group中的Consumer实例平均分摊消费消息。例如某个Topic有9条消息,其中一个Consumer Group有3个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中的3条消息。
在CORBA Notification规范中,无此消费方式。
在JMS规范中,JMS point-to-point model与之类似,但是RocketMQ的集群消费功能大等于PTP模型。因为RocketMQ单个Consumer Group内的消费者类似于PTP,但是一个Topic/Queue可以被多个Consumer Group消费。
 顺序消息
消费消息的顺序要同发送消息的顺序一致,在RocketMQ中,主要指的是局部顺序,即一类消息为满足顺序性,必须Producer单线程顺序发送,且发送到同一个队列,这样Consumer就可以按照Producer发送的顺序去消费消息。
 普通顺序消息
顺序消息的一种,正常情况下可以保证完全的顺序消息,但是一旦发生通信异常,Broker重启,由于队列总数发生变化,哈希取模后定位的队列会变化,产生短暂的消息顺序不一致。
如果业务能容忍在集群异常情况(如某个Broker宕机或者重启)下,消息短暂的乱序,使用普通顺序方式比较合适。
 严格顺序消息
顺序消息的一种,无论正常异常情况都能保证顺序,但是牺牲了分布式Failover特性,即Broker集群中只要有一台机器不可用,则整个集群都不可用,服务可用性大大降低。
如果服务器部署为同步双写模式,此缺陷可通过备机自动切换为主避免,不过仍然会存在几分钟的服务不可用。(依赖同步双写,主备自动切换,自动切换功能目前还未实现)
目前已知的应用只有数据库binlog同步强依赖严格顺序消息,其他应用绝大部分都可以容忍短暂乱序,推荐使用普通的顺序消息。
 Message Queue
在RocketMQ中,所有消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的每个存储单元都是定长,访问其中的存储单元使用Offset来访问,offset为java long类型,64位,理论上在100年内不会溢出,所以认为是长度无限,另外队列中只保存最近几天的数据,之前的数据会按照过期时间来删除。
也可以认为Message Queue是一个长度无限的数组,offset就是下标。
好吧。。开始动手:
从下载rocketMq
2.开发测试环境搭建
1.& &安装&启动
解压RocketMQ包,进到bin目录
//启动mqnamesrv
D:\devtest\message\rockmq\alibaba-rocketmq\bin&start/b mqnamesrv.exe &d:\devtest
\log\mqnamesrv.log
查看:mqnamesrv.log
The Name Server boot success.
//启动mqbroker
D:\devtest\message\rockmq\alibaba-rocketmq\bin&start/b mqbroker.exe -n &192.168.
119.1:9876& &d:/devtest/log/mqbroker.log
看日志中信息:mqbroker.log
The broker[papawa, 192.168.119.1:10911] boot success.
用jps看看当前java进程
jps(Java Virtual Machine Process Status Tool)是JDK提供的一个显示当前所有java进程pid的命令,简单实用,非常适合在linux/unix平台上简单察看当前java进程的一些简单情况。
D:\devtest\message\rockmq\alibaba-rocketmq\bin&jps -v
15640 &-Djava.ext.dirs=D:\devtest\message\rockmq\alibaba-rocketmq\bin/../lib -Dr
ocketmq.home.dir=D:\devtest\message\rockmq\alibaba-rocketmq\bin/.. -XX:MaxNewSiz
e=512M -XX:MaxPermSize=128M -XX:NewSize=256M -XX:PermSize=128M -Xms512m -Xmx1g e
18664 PULSEI~1.JAR -Xmx768m -XX:MaxPermSize=320m -XX:ReservedCodeCacheSize=64m -
Dosgi.nls.warnings=ignore
18576 &-Djava.ext.dirs=D:\devtest\message\rockmq\alibaba-rocketmq\bin/../lib -Dr
ocketmq.home.dir=D:\devtest\message\rockmq\alibaba-rocketmq\bin/.. -XX:MaxNewSiz
e=512M -XX:MaxPermSize=128M -XX:NewSize=256M -XX:PermSize=128M -Xms512m -Xmx1g e
14652 Jps -Denv.class.path=.;D:\develop\jdk/lib/rt.D:\develop\jdk/lib/tools.
-Dapplication.home=D:\develop\jdk -Xms8m
启动起来后:
3、运行:rocketmq带的例子
RocketMQ-3.0.9\rocketmq-example\src\main\java\com\alibaba\rocketmq\example\quickstart
producer.java
package com.alibaba.rocketmq.example.
import com.alibaba.rocketmq.client.exception.MQClientE
import com.alibaba.rocketmq.client.producer.DefaultMQP
import com.alibaba.rocketmq.client.producer.SendR
import com.mon.message.M
public class Producer {
& & public static void main(String[] args) throws MQClientException, InterruptedException {
& & & & DefaultMQProducer producer = new DefaultMQProducer(&testProductGroup&);
& & & &producer.start();
& & & & for (int i = 0; i & 20; i++) {
& & & & & & try {
& & & & & & & & Message msg = new Message(&TopicTest&,// topic
& & & & & & & & & & &TagA&,// tag
& & & & & & & & & & (&Hello RocketMQ & + i).getBytes()// body
& & & & & & & & & & & & );
& & & & & & & & SendResult sendResult = producer.send(msg);
& & & & & & & & System.out.println(sendResult);
& & & & & & }
& & & & & & catch (Exception e) {
& & & & & & & & e.printStackTrace();
& & & & & & & & Thread.sleep(1000);
& & & & & & }
& & & & producer.shutdown();
跑起来报错
com.alibaba.rocketmq.client.exception.MQClientException:&No&name&server&address,&please&set&it.
--需要在淘宝的例子中增加以下两行
& &&& & producer.setNamesrvAddr(&192.168.119.1:9876&);
& & & & producer.setInstanceName(&Producer&);
再次运行:
6:22:21.681&[NettyClientWorkerThread_1]&DEBUG&io.netty.util.ResourceLeakDetector&-&-Dio.netty.leakDetectionLevel:&simple
SendResult&[sendStatus=SEND_OK,&msgId=C0AA9F1920,&messageQueue=MessageQueue&[topic=TopicTest,&brokerName=papawa,&queueId=0],&queueOffset=250]
SendResult&[sendStatus=SEND_OK,&msgId=C0AA9F19A8,&messageQueue=MessageQueue&[topic=TopicTest,&brokerName=papawa,&queueId=1],&queueOffset=250]
SendResult&[sendStatus=SEND_OK,&msgId=C0AA9F1A30,&messageQueue=MessageQueue&[topic=TopicTest,&brokerName=papawa,&queueId=2],&queueOffset=249]
在样例代码中增加发送的消息主题
public&class&Producer&{
&&&&public&static&void&main(String[]&args)&throws&MQClientException,&InterruptedException&{
&&&&&&&&DefaultMQProducer&producer&=&new&DefaultMQProducer(&testProductGroup&);
&&&&&&&&producer.setNamesrvAddr(&127.0.0.1:9876&);
&&&&&&&&producer.setInstanceName(&Producer1&);
&&&&&&&&producer.start();
&&&&&&&&for&(int&i&=&0;&i&&&2;&i++)&{
&&&&&&&&&&&&try&{
&&&&&&&&&&&&&&&&{&&Message&msg1&=&new&Message(&TopicTest1&,//&topic
&&&&&&&&&&&&&&&&&&&&&TagA&,//&tag
&&&&&&&&&&&&&&&&&&&&(&Hello&RocketMQ-1&&&+&i).getBytes()//&body
&&&&&&&&&&&&&&&&&&&&&&&&);
&&&&&&&&&&&&&&&&SendResult&sendResult1&=&producer.send(msg1);
&&&&&&&&&&&&&&&&System.out.println(sendResult1);
&&&&&&&&&&&&&&&&}
&&&&&&&&&&&&&&&&
&&&&&&&&&&&&&&&&
&&&&&&&&&&&&&&&&{
&&&&&&&&&&&&&&&&&&&&Message&msg2&=&new&Message(
&TopicTest2&,//&topic&
&TagB&,//&tag&
&OrderID0034&,//&key&&&
(&Hello&RocketMq-2&&+&i).getBytes());//&body
&&&&&&&&&&&&&&&&&&&&SendResult&sendResult2&=&producer.send(msg2);
&&&&&&&&&&&&&&&&&&&&System.out.println(sendResult2);
&&&&&&&&&&&&&&&&}
&&&&&&&&&&&&&&&&{
&&&&&&&&&&&&&&&&&&&&Message&msg3&=&new&Message(
&&&&&TopicTest3&,//&topic
&&&&&TagC&,//&tag&
&&&&&OrderID061&,//&key&&&&&&&&&&&&&&&&&&&&
&&&&(&Hello&RocketMq-3&&+&i).getBytes());//&body
&&&&&&&&&&&&&&&&&&&&SendResult&sendResult3&=&producer.send(msg3);
&&&&&&&&&&&&&&&&&&&&System.out.println(sendResult3);
&&&&&&&&&&&&&&&&}
&&&&&&&&&&&&&&&&
&&&&&&&&&&&&}
&&&&&&&&&&&&catch&(Exception&e)&{
&&&&&&&&&&&&&&&&e.printStackTrace();
&&&&&&&&&&&&&&&&Thread.sleep(1000);
&&&&&&&&&&&&}
&&&&&&&&//&应用退出时,要调用shutdown来清理资源,关闭网络连接,从rocketMq服务器上注销自己
&&&&&&&&producer.shutdown();
运行看到消息发送情况:
SendResult&[sendStatus=SEND_OK,&msgId=C0AA9FBCFE,&messageQueue=MessageQueue&[topic=TopicTest1,&brokerName=bfsc-papawa,&queueId=0],&queueOffset=26]
SendResult&[sendStatus=SEND_OK,&msgId=C0AA9FBD89,&messageQueue=MessageQueue&[topic=TopicTest2,&brokerName=bfsc-papawa,&queueId=0],&queueOffset=31]
SendResult&[sendStatus=SEND_OK,&msgId=C0AA9FBE25,&messageQueue=MessageQueue&[topic=TopicTest3,&brokerName=bfsc-papawa,&queueId=0],&queueOffset=31]
SendResult&[sendStatus=SEND_OK,&msgId=C0AA9FBEC0,&messageQueue=MessageQueue&[topic=TopicTest1,&brokerName=bfsc-papawa,&queueId=1],&queueOffset=18]
SendResult&[sendStatus=SEND_OK,&msgId=C0AA9FBF4B,&messageQueue=MessageQueue&[topic=TopicTest2,&brokerName=bfsc-papawa,&queueId=1],&queueOffset=23]
SendResult&[sendStatus=SEND_OK,&msgId=C0AA9FBFE7,&messageQueue=MessageQueue&[topic=TopicTest3,&brokerName=bfsc-papawa,&queueId=1],&queueOffset=23]
消息说明:
字段名 默认值 说明
Topic:null:必填,线下环境不需要申请,线上环境需要申请后才能使用
Body:null:必填,二进制形式,序列化由应用决定,Producer与Consumer要协商好序列化形式。
Tags:null:选填,类似于Gmail为每封邮件设置的标签,方便服务器过滤使用。目前只支持每个消息设置一个tag,所以也可以类比为Notify的MessageType概念
Keys:null:选填,代表这条消息的业务关键词,服务器会根据keys创建哈希索引,设置后,可以在Console系统根据Topic、Keys来查询消息,由于是哈希索引,请尽可能保证key唯一,例如订单号,商品Id等。
Flag:0:选填,完全由应用来设置,RocketMQ不做干预
DelayTimeLevel:0:选填,消息延时级别,0表示不延时,大于0会延时特定的时间才会被消费
WaitStoreMsgOK:TRUE:选填,表示消息是否在服务器落盘后才返回应答。
Message数据结构各个字段都可以通过get、set方式访问,例如访问topic
msg.getTopic();
msg.setTopic(&TopicTest&);
其他字段访问方式类似。
运行Consumer.java收消息:
Consumer.java
在Producer端,使用com.mon.message.Message这个数据结构,由于Broker会为Message增加数据结构,所以消息到达Consumer后,会在Message基础之上增加多个字段,Consumer看到的是com.mon.message.MessageExt这个数据结构,MessageExt继承于Message,
public&class&Consumer&{
&&&&public&static&void&main(String[]&args)&throws&InterruptedException,&MQClientException&{
&&&&&&&&DefaultMQPushConsumer&consumer&=&new&DefaultMQPushConsumer(&consumeGroupName&);
&&&&&&&&consumer.setNamesrvAddr(&192.168.119.1:9876&);
&&&&&&&&consumer.setInstanceName(&Consumer&);
&&&&&&&&/**
&&&&&&&&&*&订阅指定topic下tags分别等于TagA或TagC或TagD
&&&&&&&&&*/
&&&&&&&&consumer.subscribe(&TopicTest1&,&&&);
&&&&&&&&/**
&&&&&&&&&*&订阅指定topic下所有消息&br&
&&&&&&&&&*&注意:一个consumer对象可以订阅多个topic
&&&&&&&&&*/
&&&&&&&&consumer.subscribe(&TopicTest2&,&*&);
&&&&&&&&consumer.registerMessageListener(new&MessageListenerConcurrently()&{
&&&&&&&&&&&&@Override
&&&&&&&&&&&&public&ConsumeConcurrentlyStatus&consumeMessage(List&MessageExt&&msgs,
&&&&&&&&&&&&&&&&&&&&ConsumeConcurrentlyContext&context)&{
&&&&&&&&&&&&& & System.out.println(Thread.currentThread().getName()&+&&&Receive&New&Messages1:&&&+&msgs);
&&&&&&&&&&&&&&&&
&&&&&&&&&&&&&&&&& & System.out.println(Thread.currentThread().getName()
&&&&&&&&&&&&&&&&& & & & & +&&Receive&New&Messages-size:&&&+&msgs.size());
&&&&&&&&&&&&&&&
&&&&&&&&&&&&&&&&MessageExt&msg&=&msgs.get(0);
&&&&&&&&&&&&&&&&if&(msg.getTopic().equals(&TopicTest1&))&{
&&&&&&&&&&&&&&&&&&&&//&执行TopicTest1的消费逻辑
&&&&&&&&&&&&&&&&&&&&if&(msg.getTags()&!=&null&&&&msg.getTags().equals(&TagA&))&{
&&&&&&&&&&&&&&&&&&&&&&&&//&执行TagA的消费
&&&&&&&&&&&&&&&&&&&&&&&&System.out.println(&topic1-&+new&String(msg.getBody()));
&&&&&&&&&&&&&&&&&&&&}&else&if&(msg.getTags()&!=&null
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&msg.getTags().equals(&TagC&))&{
&&&&&&&&&&&&&&&&&&&&&&&&//&执行TagC的消费
&&&&&&&&&&&&&&&&&&&&&&&&System.out.println(&topic1-&+new&String(msg.getBody()));
&&&&&&&&&&&&&&&&&&&&}&else&if&(msg.getTags()&!=&null
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&msg.getTags().equals(&TagD&))&{
&&&&&&&&&&&&&&&&&&&&&&&&//&执行TagD的消费
&&&&&&&&&&&&&&&&&&&&&&&&System.out.println(&topic1-&+new&String(msg.getBody()));
&&&&&&&&&&&&&&&&&&&&}
&&&&&&&&&&&&&&&&}&else&if&(msg.getTopic().equals(&TopicTest2&))&{
&&&&&&&&&&&&&&&&&&&&System.out.println(&topic2-&+new&String(msg.getBody()));
&&&&&&&&&&&&&&&&}
&&&&&&&&&&&&&&
&&&&&&&&&&&&&&&&
&&&&&&&&&&&&&&&&return&ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
&&&&&&&&&&&&}
&&&&&&&&});
&&&&&&&&consumer.start();
&&&&&&&&System.out.println(&Consumer&Started.&);
运行结果:
Consumer&Started.
ConsumeMessageThread-consumeGroupName-2&Receive&New&Messages1:&[MessageExt&[queueId=1,&storeSize=139,&queueOffset=17,&sysFlag=0,&bornTimestamp=6,&bornHost=/192.168.119.1:52514,&storeTimestamp=7,&storeHost=/192.168.119.1:10911,&msgId=C0AA9FBB3C,&commitLogOffset=179004,&bodyCRC=,&reconsumeTimes=0,&preparedTransactionOffset=0,&toString()=Message&[topic=TopicTest1,&flag=0,&properties={TAGS=TagA,&WAIT=true,&MAX_OFFSET=18,&MIN_OFFSET=0},&body=18]]]
ConsumeMessageThread-consumeGroupName-2&Receive&New&Messages-size:&1
topic1-Hello&RocketMQ-1&1
ConsumeMessageThread-consumeGroupName-1&Receive&New&Messages1:&[MessageExt&[queueId=0,&storeSize=139,&queueOffset=25,&sysFlag=0,&bornTimestamp=8,&bornHost=/192.168.119.1:52514,&storeTimestamp=7,&storeHost=/192.168.119.1:10911,&msgId=C0AA9FB97A,&commitLogOffset=178554,&bodyCRC=,&reconsumeTimes=0,&preparedTransactionOffset=0,&toString()=Message&[topic=TopicTest1,&flag=0,&properties={TAGS=TagA,&WAIT=true,&MAX_OFFSET=26,&MIN_OFFSET=0},&body=18]]]
ConsumeMessageThread-consumeGroupName-1&Receive&New&Messages-size:&1
topic1-Hello&RocketMQ-1&0
ConsumeMessageThread-consumeGroupName-3&Receive&New&Messages1:&[MessageExt&[queueId=1,&storeSize=156,&queueOffset=22,&sysFlag=0,&bornTimestamp=5,&bornHost=/192.168.119.1:52514,&storeTimestamp=8,&storeHost=/192.168.119.1:10911,&msgId=C0AA9FBBC7,&commitLogOffset=179143,&bodyCRC=,&reconsumeTimes=0,&preparedTransactionOffset=0,&toString()=Message&[topic=TopicTest2,&flag=0,&properties={TAGS=TagB,&KEYS=OrderID0034,&WAIT=true,&MAX_OFFSET=23,&MIN_OFFSET=0},&body=18]]]
ConsumeMessageThread-consumeGroupName-3&Receive&New&Messages-size:&1
topic2-Hello&RocketMq-2&1
ConsumeMessageThread-consumeGroupName-4&Receive&New&Messages1:&[MessageExt&[queueId=0,&storeSize=156,&queueOffset=30,&sysFlag=0,&bornTimestamp=7,&bornHost=/192.168.119.1:52514,&storeTimestamp=1,&storeHost=/192.168.119.1:10911,&msgId=C0AA9FBA05,&commitLogOffset=178693,&bodyCRC=,&reconsumeTimes=0,&preparedTransactionOffset=0,&toString()=Message&[topic=TopicTest2,&flag=0,&properties={TAGS=TagB,&KEYS=OrderID0034,&WAIT=true,&MAX_OFFSET=31,&MIN_OFFSET=0},&body=18]]]
ConsumeMessageThread-consumeGroupName-4&Receive&New&Messages-size:&1
topic2-Hello&RocketMq-2&0
成功完成消息发送与接收。。。。接下来就是弄个集群测试rocketmq,看看啥情况。。。
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:16369次
排名:千里之外
(2)(1)(3)(2)(1)源代码版本是3.2.6。在rocketmq里,consumer被分为2类:MQPullConsumer和MQPushConsumer,其实本质都是拉模式(pull),即consumer轮询从broker拉取消息。
push方式里,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的。
pull方式里,取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。
文字描述可能不是很清楚,前面的文章都是push方式的,所以这里只上pull方式的,贴代码:
&properties&
&project.build.sourceEncoding&UTF-8&/project.build.sourceEncoding&
&logback.version&1.0.13&/logback.version&
&rocketmq.version&3.2.6&/rocketmq.version&
&/properties&
&dependencies&
&dependency&
&groupId&ch.qos.logback&/groupId&
&artifactId&logback-classic&/artifactId&
&version&1.0.13&/version&
&/dependency&
&dependency&
&groupId&ch.qos.logback&/groupId&
&artifactId&logback-core&/artifactId&
&version&1.0.13&/version&
&/dependency&
&dependency&
&groupId&com.alibaba.rocketmq&/groupId&
&artifactId&rocketmq-client&/artifactId&
&version&${rocketmq.version}&/version&
&/dependency&
&dependency&
&groupId&junit&/groupId&
&artifactId&junit&/artifactId&
&version&4.10&/version&
&scope&test&/scope&
&/dependency&
&/dependencies&
Producer:
package com.zoo.
import com.alibaba.rocketmq.client.exception.MQClientE
import com.alibaba.rocketmq.client.producer.DefaultMQP
import com.alibaba.rocketmq.client.producer.SendR
import com.mon.message.M
* Producer,发送消息
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer(&please_rename_unique_group_name&);
producer.setNamesrvAddr(&192.168.0.104:9876&);
producer.start();
for (int i = 0; i & 5; i++) {
Message msg = new Message(&TopicTest&,// topic
&TagA&,// tag
(&Hello RocketMQ & + i).getBytes()// body
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
Thread.sleep(6000);
catch (Exception e) {
e.printStackTrace();
Thread.sleep(3000);
producer.shutdown();
package com.zoo.quickstart.
import java.util.HashM
import java.util.M
import java.util.S
import com.alibaba.rocketmq.client.consumer.DefaultMQPullC
import com.alibaba.rocketmq.client.consumer.PullR
import com.alibaba.rocketmq.client.exception.MQClientE
import com.mon.message.MessageQ
* PullConsumer,订阅消息
public class PullConsumer {
private static final Map&MessageQueue, Long& offseTable = new HashMap&MessageQueue, Long&();
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(&please_rename_unique_group_name_5&);
consumer.setNamesrvAddr(&192.168.0.104:9876&);
consumer.start();
Set&MessageQueue& mqs = consumer.fetchSubscribeMessageQueues(&TopicTest&);
for (MessageQueue mq : mqs) {
System.out.println(&Consume from the queue: & + mq);
SINGLE_MQ: while (true) {
PullResult pullResult =
consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.println(pullResult);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
case NO_MATCHED_MSG:
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
catch (Exception e) {
e.printStackTrace();
consumer.shutdown();
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
offseTable.put(mq, offset);
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = offseTable.get(mq);
if (offset != null)
还有一种定时的Consumer:
package com.zoo.quickstart.
import com.alibaba.rocketmq.client.consumer.MQPullC
import com.alibaba.rocketmq.client.consumer.MQPullConsumerScheduleS
import com.alibaba.rocketmq.client.consumer.PullR
import com.alibaba.rocketmq.client.consumer.PullTaskC
import com.alibaba.rocketmq.client.consumer.PullTaskC
import com.alibaba.rocketmq.client.exception.MQClientE
import com.mon.message.MessageQ
import com.mon.protocol.heartbeat.MessageM
public class PullScheduleService {
public static void main(String[] args) throws MQClientException {
final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService(&GroupName1&);
scheduleService.getDefaultMQPullConsumer().setNamesrvAddr(&192.168.0.104:9876&);
scheduleService.setMessageModel(MessageModel.CLUSTERING);
scheduleService.registerPullTaskCallback(&TopicTest&, new PullTaskCallback() {
public void doPullTask(MessageQueue mq, PullTaskContext context) {
MQPullConsumer consumer = context.getPullConsumer();
// 获取从哪里拉取
long offset = consumer.fetchConsumeOffset(mq, false);
if (offset & 0)
offset = 0;
PullResult pullResult = consumer.pull(mq, &*&, offset, 32);
System.out.println(offset + &\t& + mq + &\t& + pullResult);
switch (pullResult.getPullStatus()) {
case FOUND:
case NO_MATCHED_MSG:
case NO_NEW_MSG:
case OFFSET_ILLEGAL:
// 存储Offset,客户端每隔5s会定时刷新到Broker
consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
// 设置再过100ms后重新拉取
context.setPullNextDelayTimeMillis(100);
catch (Exception e) {
e.printStackTrace();
scheduleService.start();
此条目发表在分类目录,贴了, 标签。将加入收藏夹。

我要回帖

更多关于 rocketmq topic tag 的文章

 

随机推荐