妙博客

主机评测 香港服务器 洛杉矶VPS测评

MQ如何保证消息消费的顺序性

RocketMQ为例

1.生产者把顺序消息添加到同一个队列。

rocketMq在send方法可以自定义一个消息队列选择器,可根据传入的订单号,选择哪个队列。

@Test
public void orderlyProducer() throws Exception {
    List<MsgModel> msgModels = new ArrayList();
    msgModels.add(new MsgModel("a", "1", "下单"));
    DefaultMQProducer producer = new DefaultMQProducer("orderly-producer-group");
    producer.setNamesrvAddr("127.0.0.1:9876");
    producer.start();
    for(MsgModel msgModel : msgModels){
        Message message = new Message("orderlyTopic", msgModel.toString().getBytes());
        producer.send(message, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> list, Message message, Object arg) {
                return list.get(arg.toString().hashCode() % list.size());
            }
        }, msgModel.getOrderSn());
    }
}


2.消费者注册顺序消息监听器

RocketMQ中注册一个MessageListenerOrderly,可实现顺序消息处理

@Test
public void orderlyConsumer() throws Exception{
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderly-consumer-group");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.subscribe("orderlyTopic", "*");
    consumer.registerMessageListener(new MessageListenerOrderly() {
        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
            return ConsumeOrderlyStatus.SUCCESS;
        }
    });
}


Copyright Your 142132.com Rights Reserved. 赣ICP备17010829号-2