妙博客

主机评测 香港服务器 洛杉矶VPS测评

MQ怎么保证消息不丢失和重复消费

防止消息丢失:

生产者默认重试次数为2

producer.setRetryTimesWhenSendFailed(3);


消费者默认重试次数为无限次

consumer.setMaxReconsumeTimes(5);


防止重复消息:

1.生产者发送的消息中,带一个业务唯一标识

public void keyProducer() throws Exception{
    DefaultMQProducer producer = new DefaultMQProducer("key-producer-group");
    producer.setNamesrvAddr("127.0.0.1:9876");
    producer.start();
    String key = UUID.randomUUID().toString();
    producer.send(new Message("keyTopic", "tagVip1", key, "我是vip1的文章".getBytes()));
    producer.shutdown();
}


2.消费者中,利用redis的setnx,如果设置成功则表示第一次消费

public void keyConsumer() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("key-consumer-group");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.subscribe("keyTopic", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            MessageExt messageExt = list.get(0);
            Long result = jedis.setnx(messageExt.getKeys(), "");
            if (result == 1L) {
                System.out.println("正在消费消息,业务标识:" + messageExt.getKeys());
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
}





一、DEMO

先看一个RocketMQ的Demo

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.3.0</version>
</dependency>


1、Producer端发送同步消息

这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。


public class SyncProducer {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动Producer实例
        producer.start();
        for (int i = 0; i < 100; i++) {
            // 创建消息,并指定Topic,Tag和消息体
            Message msg = new Message("TopicTest" /* Topic */,
            "TagA" /* Tag */,
            ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            // 发送消息到一个Broker
            SendResult sendResult = producer.send(msg);
            // 通过sendResult返回消息是否成功送达
            System.out.printf("%s%n", sendResult);
        }
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}


2、发送异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。

异步发送在使用上其实就多了一个send时候在里面加一个回调函数的实现

如果我们还没发送完毕就producer.shutdown();关闭实例化生产者会出现send失败的情况,因此这里引用了countDownLatch。

public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动Producer实例
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);
    
    int messageCount = 100;
        // 根据消息数量实例化倒计时计算器
    final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
        for (int i = 0; i < messageCount; i++) {
                final int index = i;
                // 创建消息,并指定Topic,Tag和消息体
                Message msg = new Message("TopicTest",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // SendCallback接收异步返回结果的回调
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.printf("%-10d OK %s %n", index,
                            sendResult.getMsgId());
                    }
                    @Override
                    public void onException(Throwable e) {
                      System.out.printf("%-10d Exception %s %n", index, e);
                      e.printStackTrace();
                    }
                });
        }
    // 等待5s
    countDownLatch.await(5, TimeUnit.SECONDS);
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}


3、单向发送消息

这种方式主要用在不特别关心发送结果的场景,例如日志发送。

public class OnewayProducer {
    public static void main(String[] args) throws Exception{
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动Producer实例
        producer.start();
        for (int i = 0; i < 100; i++) {
            // 创建消息,并指定Topic,Tag和消息体
            Message msg = new Message("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            // 发送单向消息,没有任何返回结果
            producer.sendOneway(msg);
        }
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}


4、消费消息

这里用subscribe进行订阅,这里可以传入多个topic或者tag

public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
        // 设置NameServer的地址
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅  ,订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
        consumer.subscribe("TopicTest", "*");
        // 注册回调实现类来处理从broker拉取回来的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者实例
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}


二、如何保证消息不丢失

Q1: 如何保证「消息生产」不丢失?

丢失场景:生产者将发送消息时,如果出现了网络抖动或者通信异常等问题,消息就有可能会丢失。

方案

「消息确认机制」和「失败重试机制」来保证

消息发送成功返回确认消息,那就能确保消息不丢失。如果发送失败了,mq-client就尝试自动重试,避免网络抖动导致发送丢失。

如果超过一定超时时间还是失败,那就抛出异常,由开发者自己在应用层面进行处理,手动重试发送 或者 记录失败消息后续补偿。


Q2: 如何保证「消息存储」不丢失?

丢失场景:

场景1:消息保存到内存中,还没来得及刷盘到磁盘,机器宕机或者重启,导致内存中消息丢失。

方案

默认情况下,消息在到达 Broker 端后会首先被保存在内存中,并立即向生产者返回确认响应。随后,Broker 会定期批量将一组消息异步刷入磁盘。


场景2:为了提高可用性,Broker通常采用一主多从的部署方式,为了确保消息不丢失,消息需要被复制到从节点。当消息发送到master但是还没同步到slave broker时,master broker磁盘损坏,导致消息数据丢失。或者master宕机,consumer切换到slave消费数据,消息丢失。

方案

要确保 Broker 端不丢失消息并保证消息的可靠性,我们需要修改消息保存机制为同步刷盘方式,即只有当消息成功存储到磁盘后才返回响应。可以通过flushDiskType = SYNC_FLUSH 参数进行控制。


Q3: 如何保证「消息消费」不丢失?

丢失场景:

因为各种原因消费失败,但是还是提交了消费位点,这条消息从业务角度来说就“丢失”了。

方案:

跟消息生产一样,其实思路是比较直接的,就是 「消息确认机制」和「失败重试机制」。

消费者从RocketMQ拉取消息后,需要返回"CONSUME_SUCCESS"来表示业务方已经正常完成消费。只有返回"CONSUME_SUCCESS"才算作消费完成。这就是消费时的「消息确认机制」。

如果返回"CONSUME_LATER",则会按照不同的消息延迟级别进行再次消费,延迟级别从秒到小时不等,最长延迟时间为2个小时后再次尝试消费。这就是消费时的「失败重试机制」。


三、如何保证消息不被重复消费

解决方案

让消费端自己进行处理,对于重复的消息能够识别,保持幂等性(多次接收到同一消息处理结果是一样的)

利用一张日志表来记录已经处理成功的消息ID,如果这个消息ID已经在日志表中,则不再处理消息;这个地方可以由消息系统或业务实现,如果由消息系统实现会影响到性能,所以最好还是由消费端进行处理,这也是RocketMQ不处理重复消息问题的原因

总的来说就是RocketMQ为了性能考虑不保证消息不重复,需要通过业务端自己实现重复消息的识别、处理


Copyright Your 142132.com Rights Reserved. 赣ICP备17010829号-2