本文共 6099 字,大约阅读时间需要 20 分钟。
rocketmq简单使用
1.项目介绍 2.结构组件及工作流程 3.基本使用 4.各种功能 5.可靠性优先 6.吞吐量优先 7.其它rocketmq是由阿里开源的Apache中间件顶级项目.经历过多次阿里双十一生产验证.
项目地址:Producer:消息生产者
Consumer:消息消费者 Broker:消息暂存 NameServer:路由信息存储消息组件:Topic Queue Tag Key
Topic: 消息分类 Queue: 逻辑上消息存储在队列里 Tag: 消息标签 Key: 消息标识工作流程:
首先咱们启动NameServer,Broker.然后可以在broker上创建Topic并指定Queue数量. 生产者连接到NameServer上获取路由信息,发送消息到broker 消费者连接到NmaeServer上获取路由信息,注册监听器,拉取消息,消费3.1同步发送
public class SyncProducer { public static void main(String[] args) throws Exception { //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // Specify name server addresses. producer.setNamesrvAddr("localhost:9876"); //Launch the instance. producer.start(); for (int i = 0; i < 100; i++) { //Create a message instance, specifying topic, tag and message body. Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send message to deliver message to one of brokers. SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } //Shut down once the producer instance is not longer in use. producer.shutdown(); }}
3.2异步发送
public class AsyncProducer { public static void main(String[] args) throws Exception { //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // Specify name server addresses. producer.setNamesrvAddr("localhost:9876"); //Launch the instance. producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0); for (int i = 0; i < 100; i++) { final int index = i; //Create a message instance, specifying topic, tag and message body. Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); } @Override public void onException(Throwable e) { System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); } }); } //Shut down once the producer instance is not longer in use. producer.shutdown(); }}
3.3one-way发送
public class OnewayProducer { public static void main(String[] args) throws Exception{ //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // Specify name server addresses. producer.setNamesrvAddr("localhost:9876"); //Launch the instance. producer.start(); for (int i = 0; i < 100; i++) { //Create a message instance, specifying topic, tag and message body. Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send message to deliver message to one of brokers. producer.sendOneway(msg); } //Shut down once the producer instance is not longer in use. producer.shutdown(); }}
3.4消息消费
public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { // Instantiate with specified consumer group name. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name"); // Specify name server addresses. consumer.setNamesrvAddr("localhost:9876"); // Subscribe one more more topics to consume. consumer.subscribe("TopicTest", "*"); // Register callback to execute on arrival of messages fetched from brokers. consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //Launch the consumer instance. consumer.start(); System.out.printf("Consumer Started.%n"); }}
总结一下:
生产者步骤: 创建生产者->设置NameServer->启动生产者->发送消息->关闭生产者消费者步骤:
创建消费者->设置NameServer->订阅topic->注册监听服务->启动消费者顺序消息
广播消息 延时消息 批量发送 过滤消费(基于Tag,基于属性) 事务消息 请参照:broker集群方面推荐开启同步多写,如果性能足够可开启同步刷盘
生产端可配置同步发送或异步回调发送 顺序消息推荐使用部分顺序,生产端建议配置同步发送,消费端使用顺序监听 重复消费问题:幂等性(消息主键过滤,状态识别等) 消息必须发送:建议使用补偿机制(本地异步,自主查询等)消费端提高并行能力:
批量消费,设置适当的队列数量,使用适当的分配算法等 生产端提高并行能力: 多个生产者,批量发送,异步或者one-way方式发送客户端启动的服务介绍:
客户端API实现:网络客户端服务,发送接收报文,基于netty实现 获取NameServerAddr定时器服务:默认10秒(未配置NameServer启用) 更新Topic路由信息定时服务 发送心跳到所有broker定时服务,还会上传filter类 提交offest定时服务 调整消费线程池定时服务 拉取消息服务 再平衡服务 可参照:MQClientInstance.start方法 发送端队列选择: 实现接口MessageQueueSelector进行自定义 ConsumeMessageHook: 消费消息拦截器 SendMessageHook: 发送消息拦截器 FilterMessageHook: 对pull消息过滤拦截 分配再平衡系统实现: AllocateMessageQueueAveragely AllocateMeageQueueAveragelyByCircle AllocateMessageQueueByConfig AllocateMessageQueueByMachineRoom MessageListener: 消费监听: MessageListenerConcurrently 并发消费监听 MessageListenerOrderly 顺序消费监听 PullMessageService: 拉取消息服务实现 RebalanceService: 消费再平衡服务实现 顺序性消费实现: 顺序消费过程中,会锁定队列,消费时锁定消费者,还会锁定broker上的队列结合再平衡进行broker端队列消费一致性 参照:ConsumeMessageOrderlyService服务 ProcessQueue: 消费队列镜像,pull消息缓冲区,进行本地消费再分配等操作转载地址:http://ebuti.baihongyu.com/