RocketMQ为例
1.生产者把顺序消息添加到同一个队列。
rocketMq在send方法可以自定义一个消息队列选择器,可根据传入的订单号,选择哪个队列。
@Test public void orderlyProducer() throws Exception { List<MsgModel> msgModels = new ArrayList(); msgModels.add(new MsgModel("a", "1", "下单")); DefaultMQProducer producer = new DefaultMQProducer("orderly-producer-group"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); for(MsgModel msgModel : msgModels){ Message message = new Message("orderlyTopic", msgModel.toString().getBytes()); producer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> list, Message message, Object arg) { return list.get(arg.toString().hashCode() % list.size()); } }, msgModel.getOrderSn()); } }
2.消费者注册顺序消息监听器
RocketMQ中注册一个MessageListenerOrderly,可实现顺序消息处理
@Test public void orderlyConsumer() throws Exception{ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderly-consumer-group"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.subscribe("orderlyTopic", "*"); consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) { return ConsumeOrderlyStatus.SUCCESS; } }); }