RocketMQ为例
1.生产者把顺序消息添加到同一个队列。
rocketMq在send方法可以自定义一个消息队列选择器,可根据传入的订单号,选择哪个队列。
@Test
public void orderlyProducer() throws Exception {
List<MsgModel> msgModels = new ArrayList();
msgModels.add(new MsgModel("a", "1", "下单"));
DefaultMQProducer producer = new DefaultMQProducer("orderly-producer-group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for(MsgModel msgModel : msgModels){
Message message = new Message("orderlyTopic", msgModel.toString().getBytes());
producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object arg) {
return list.get(arg.toString().hashCode() % list.size());
}
}, msgModel.getOrderSn());
}
}2.消费者注册顺序消息监听器
RocketMQ中注册一个MessageListenerOrderly,可实现顺序消息处理
@Test
public void orderlyConsumer() throws Exception{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderly-consumer-group");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("orderlyTopic", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
return ConsumeOrderlyStatus.SUCCESS;
}
});
}