防止消息丢失:
生产者默认重试次数为2
producer.setRetryTimesWhenSendFailed(3);
消费者默认重试次数为无限次
consumer.setMaxReconsumeTimes(5);
防止重复消息:
1.生产者发送的消息中,带一个业务唯一标识
public void keyProducer() throws Exception{ DefaultMQProducer producer = new DefaultMQProducer("key-producer-group"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); String key = UUID.randomUUID().toString(); producer.send(new Message("keyTopic", "tagVip1", key, "我是vip1的文章".getBytes())); producer.shutdown(); }
2.消费者中,利用redis的setnx,如果设置成功则表示第一次消费
public void keyConsumer() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("key-consumer-group"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.subscribe("keyTopic", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { MessageExt messageExt = list.get(0); Long result = jedis.setnx(messageExt.getKeys(), ""); if (result == 1L) { System.out.println("正在消费消息,业务标识:" + messageExt.getKeys()); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); }
一、DEMO
先看一个RocketMQ的Demo
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.0</version> </dependency>
1、Producer端发送同步消息
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
public class SyncProducer { public static void main(String[] args) throws Exception { // 实例化消息生产者Producer DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // 设置NameServer的地址 producer.setNamesrvAddr("localhost:9876"); // 启动Producer实例 producer.start(); for (int i = 0; i < 100; i++) { // 创建消息,并指定Topic,Tag和消息体 Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); // 发送消息到一个Broker SendResult sendResult = producer.send(msg); // 通过sendResult返回消息是否成功送达 System.out.printf("%s%n", sendResult); } // 如果不再发送消息,关闭Producer实例。 producer.shutdown(); } }
2、发送异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
异步发送在使用上其实就多了一个send时候在里面加一个回调函数的实现
如果我们还没发送完毕就producer.shutdown();关闭实例化生产者会出现send失败的情况,因此这里引用了countDownLatch。
public class AsyncProducer { public static void main(String[] args) throws Exception { // 实例化消息生产者Producer DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // 设置NameServer的地址 producer.setNamesrvAddr("localhost:9876"); // 启动Producer实例 producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0); int messageCount = 100; // 根据消息数量实例化倒计时计算器 final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount); for (int i = 0; i < messageCount; i++) { final int index = i; // 创建消息,并指定Topic,Tag和消息体 Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); // SendCallback接收异步返回结果的回调 producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); } @Override public void onException(Throwable e) { System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); } }); } // 等待5s countDownLatch.await(5, TimeUnit.SECONDS); // 如果不再发送消息,关闭Producer实例。 producer.shutdown(); } }
3、单向发送消息
这种方式主要用在不特别关心发送结果的场景,例如日志发送。
public class OnewayProducer { public static void main(String[] args) throws Exception{ // 实例化消息生产者Producer DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // 设置NameServer的地址 producer.setNamesrvAddr("localhost:9876"); // 启动Producer实例 producer.start(); for (int i = 0; i < 100; i++) { // 创建消息,并指定Topic,Tag和消息体 Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); // 发送单向消息,没有任何返回结果 producer.sendOneway(msg); } // 如果不再发送消息,关闭Producer实例。 producer.shutdown(); } }
4、消费消息
这里用subscribe进行订阅,这里可以传入多个topic或者tag
public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { // 实例化消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name"); // 设置NameServer的地址 consumer.setNamesrvAddr("localhost:9876"); // 订阅 ,订阅一个或者多个Topic,以及Tag来过滤需要消费的消息 consumer.subscribe("TopicTest", "*"); // 注册回调实现类来处理从broker拉取回来的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); // 标记该消息已经被成功消费 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者实例 consumer.start(); System.out.printf("Consumer Started.%n"); } }
二、如何保证消息不丢失
Q1: 如何保证「消息生产」不丢失?
丢失场景:生产者将发送消息时,如果出现了网络抖动或者通信异常等问题,消息就有可能会丢失。
方案:
「消息确认机制」和「失败重试机制」来保证
消息发送成功返回确认消息,那就能确保消息不丢失。如果发送失败了,mq-client就尝试自动重试,避免网络抖动导致发送丢失。
如果超过一定超时时间还是失败,那就抛出异常,由开发者自己在应用层面进行处理,手动重试发送 或者 记录失败消息后续补偿。
Q2: 如何保证「消息存储」不丢失?
丢失场景:
场景1:消息保存到内存中,还没来得及刷盘到磁盘,机器宕机或者重启,导致内存中消息丢失。
方案:
默认情况下,消息在到达 Broker 端后会首先被保存在内存中,并立即向生产者返回确认响应。随后,Broker 会定期批量将一组消息异步刷入磁盘。
场景2:为了提高可用性,Broker通常采用一主多从的部署方式,为了确保消息不丢失,消息需要被复制到从节点。当消息发送到master但是还没同步到slave broker时,master broker磁盘损坏,导致消息数据丢失。或者master宕机,consumer切换到slave消费数据,消息丢失。
方案:
要确保 Broker 端不丢失消息并保证消息的可靠性,我们需要修改消息保存机制为同步刷盘方式,即只有当消息成功存储到磁盘后才返回响应。可以通过flushDiskType = SYNC_FLUSH 参数进行控制。
Q3: 如何保证「消息消费」不丢失?
丢失场景:
因为各种原因消费失败,但是还是提交了消费位点,这条消息从业务角度来说就“丢失”了。
方案:
跟消息生产一样,其实思路是比较直接的,就是 「消息确认机制」和「失败重试机制」。
消费者从RocketMQ拉取消息后,需要返回"CONSUME_SUCCESS"来表示业务方已经正常完成消费。只有返回"CONSUME_SUCCESS"才算作消费完成。这就是消费时的「消息确认机制」。
如果返回"CONSUME_LATER",则会按照不同的消息延迟级别进行再次消费,延迟级别从秒到小时不等,最长延迟时间为2个小时后再次尝试消费。这就是消费时的「失败重试机制」。
三、如何保证消息不被重复消费
解决方案
让消费端自己进行处理,对于重复的消息能够识别,保持幂等性(多次接收到同一消息处理结果是一样的)
利用一张日志表来记录已经处理成功的消息ID,如果这个消息ID已经在日志表中,则不再处理消息;这个地方可以由消息系统或业务实现,如果由消息系统实现会影响到性能,所以最好还是由消费端进行处理,这也是RocketMQ不处理重复消息问题的原因
总的来说就是RocketMQ为了性能考虑不保证消息不重复,需要通过业务端自己实现重复消息的识别、处理