博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
rocketmq精读
阅读量:4144 次
发布时间:2019-05-25

本文共 6099 字,大约阅读时间需要 20 分钟。

rocketmq简单使用

1.项目介绍
2.结构组件及工作流程
3.基本使用
4.各种功能
5.可靠性优先
6.吞吐量优先
7.其它

1.项目介绍

rocketmq是由阿里开源的Apache中间件顶级项目.经历过多次阿里双十一生产验证.

项目地址:

2.结构组件

Producer:消息生产者

Consumer:消息消费者
Broker:消息暂存
NameServer:路由信息存储

消息组件:Topic Queue Tag Key

Topic: 消息分类
Queue: 逻辑上消息存储在队列里
Tag: 消息标签
Key: 消息标识

工作流程:

首先咱们启动NameServer,Broker.然后可以在broker上创建Topic并指定Queue数量.
生产者连接到NameServer上获取路由信息,发送消息到broker
消费者连接到NmaeServer上获取路由信息,注册监听器,拉取消息,消费

3.基本使用

3.1同步发送

public class SyncProducer {    public static void main(String[] args) throws Exception {        //Instantiate with a producer group name.        DefaultMQProducer producer = new            DefaultMQProducer("please_rename_unique_group_name");        // Specify name server addresses.        producer.setNamesrvAddr("localhost:9876");        //Launch the instance.        producer.start();        for (int i = 0; i < 100; i++) {            //Create a message instance, specifying topic, tag and message body.            Message msg = new Message("TopicTest" /* Topic */,                "TagA" /* Tag */,                ("Hello RocketMQ " +                    i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */            );            //Call send message to deliver message to one of brokers.            SendResult sendResult = producer.send(msg);            System.out.printf("%s%n", sendResult);        }        //Shut down once the producer instance is not longer in use.        producer.shutdown();    }}

3.2异步发送

public class AsyncProducer {    public static void main(String[] args) throws Exception {        //Instantiate with a producer group name.        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");        // Specify name server addresses.        producer.setNamesrvAddr("localhost:9876");        //Launch the instance.        producer.start();        producer.setRetryTimesWhenSendAsyncFailed(0);        for (int i = 0; i < 100; i++) {                final int index = i;                //Create a message instance, specifying topic, tag and message body.                Message msg = new Message("TopicTest",                    "TagA",                    "OrderID188",                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));                producer.send(msg, new SendCallback() {                    @Override                    public void onSuccess(SendResult sendResult) {                        System.out.printf("%-10d OK %s %n", index,                            sendResult.getMsgId());                    }                    @Override                    public void onException(Throwable e) {                        System.out.printf("%-10d Exception %s %n", index, e);                        e.printStackTrace();                    }                });        }        //Shut down once the producer instance is not longer in use.        producer.shutdown();    }}

3.3one-way发送

public class OnewayProducer {    public static void main(String[] args) throws Exception{        //Instantiate with a producer group name.        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");        // Specify name server addresses.        producer.setNamesrvAddr("localhost:9876");        //Launch the instance.        producer.start();        for (int i = 0; i < 100; i++) {            //Create a message instance, specifying topic, tag and message body.            Message msg = new Message("TopicTest" /* Topic */,                "TagA" /* Tag */,                ("Hello RocketMQ " +                    i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */            );            //Call send message to deliver message to one of brokers.            producer.sendOneway(msg);        }        //Shut down once the producer instance is not longer in use.        producer.shutdown();    }}

3.4消息消费

public class Consumer {    public static void main(String[] args) throws InterruptedException, MQClientException {        // Instantiate with specified consumer group name.        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");                 // Specify name server addresses.        consumer.setNamesrvAddr("localhost:9876");                // Subscribe one more more topics to consume.        consumer.subscribe("TopicTest", "*");        // Register callback to execute on arrival of messages fetched from brokers.        consumer.registerMessageListener(new MessageListenerConcurrently() {            @Override            public ConsumeConcurrentlyStatus consumeMessage(List
msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //Launch the consumer instance. consumer.start(); System.out.printf("Consumer Started.%n"); }}

总结一下:

生产者步骤:
创建生产者->设置NameServer->启动生产者->发送消息->关闭生产者

消费者步骤:

创建消费者->设置NameServer->订阅topic->注册监听服务->启动消费者

4.支持功能

顺序消息

广播消息
延时消息
批量发送
过滤消费(基于Tag,基于属性)
事务消息
请参照:

5.可靠性优先

broker集群方面推荐开启同步多写,如果性能足够可开启同步刷盘

生产端可配置同步发送或异步回调发送
顺序消息推荐使用部分顺序,生产端建议配置同步发送,消费端使用顺序监听
重复消费问题:幂等性(消息主键过滤,状态识别等)
消息必须发送:建议使用补偿机制(本地异步,自主查询等)

6.吞吐量优先

消费端提高并行能力:

批量消费,设置适当的队列数量,使用适当的分配算法等
生产端提高并行能力:
多个生产者,批量发送,异步或者one-way方式发送

7.其它

客户端启动的服务介绍:

客户端API实现:网络客户端服务,发送接收报文,基于netty实现
获取NameServerAddr定时器服务:默认10秒(未配置NameServer启用)
更新Topic路由信息定时服务
发送心跳到所有broker定时服务,还会上传filter类
提交offest定时服务
调整消费线程池定时服务
拉取消息服务
再平衡服务
可参照:MQClientInstance.start方法
发送端队列选择:
实现接口MessageQueueSelector进行自定义
ConsumeMessageHook:
消费消息拦截器
SendMessageHook:
发送消息拦截器
FilterMessageHook:
对pull消息过滤拦截
分配再平衡系统实现:
AllocateMessageQueueAveragely
AllocateMeageQueueAveragelyByCircle
AllocateMessageQueueByConfig
AllocateMessageQueueByMachineRoom
MessageListener:
消费监听:
MessageListenerConcurrently 并发消费监听
MessageListenerOrderly 顺序消费监听
PullMessageService:
拉取消息服务实现
RebalanceService:
消费再平衡服务实现
顺序性消费实现:
顺序消费过程中,会锁定队列,消费时锁定消费者,还会锁定broker上的队列结合再平衡进行broker端队列消费一致性
参照:ConsumeMessageOrderlyService服务
ProcessQueue:
消费队列镜像,pull消息缓冲区,进行本地消费再分配等操作

转载地址:http://ebuti.baihongyu.com/

你可能感兴趣的文章
(转载)正确理解cookie和session机制原理
查看>>
jQuery ajax - ajax() 方法
查看>>
将有序数组转换为平衡二叉搜索树
查看>>
最长递增子序列
查看>>
从一列数中筛除尽可能少的数,使得从左往右看这些数是从小到大再从大到小...
查看>>
判断一个整数是否是回文数
查看>>
经典shell面试题整理
查看>>
腾讯的一道面试题—不用除法求数字乘积
查看>>
素数算法
查看>>
java多线程环境单例模式实现详解
查看>>
将一个数插入到有序的数列中,插入后的数列仍然有序
查看>>
在有序的数列中查找某数,若该数在此数列中,则输出它所在的位置,否则输出no found
查看>>
万年历
查看>>
作为码农你希望面试官当场指出你错误么?有面试官这样遭到投诉!
查看>>
好多程序员都认为写ppt是很虚的技能,可事实真的是这样么?
查看>>
如果按照代码行数发薪水会怎样?码农:我能刷到公司破产!
查看>>
程序员失误造成服务停用3小时,只得到半月辞退补偿,发帖喊冤
查看>>
码农:很多人称我“技术”,感觉这是不尊重!纠正无果后果断辞职
查看>>
php程序员看过来,这老外是在吐糟你吗?看看你中了几点!
查看>>
为什么说程序员是“培训班出来的”就是鄙视呢?
查看>>