这篇文章主要介绍“RocketMQ顺序消息是什么意思”,在日常操作中,相信很多人在RocketMQ顺序消息是什么意思问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”RocketMQ顺序消息是什么意思”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
十余年的都江堰网站建设经验,针对设计、前端、开发、售后、文案、推广等六对一服务,响应快,48小时及时工作处理。营销型网站的优势是能够根据用户设备显示端的尺寸不同,自动调整都江堰建站的显示方式,使网站能够适用不同显示终端,在浏览器中调整网站的宽度,无论在任何一种浏览器上浏览网站,都能展现优雅布局与设计,从而大程度地提升浏览体验。成都创新互联从事“都江堰网站设计”,“都江堰网站推广”以来,每个客户项目都认真落实执行。
我们知道消息队列的特性导致其消息不是顺序进行消费的,RocketMQ没有提供所谓的顺序消息来供我们使用,但是有时候一些场景需要需要顺序的去接收消息。今天我们重点讨论一下如何实现这种功能。虽然RocketMQ没有提供顺序消费但是我们可以变相的来实现它。我们知道消息需要放入队列中才能被消费,而队列本身的特性就是FIFO先进先出,我们可以将需要顺序的消息放入一个队列中,则就可以实现这个功能。
场景:两个业务系统之间消息通过MQ传输,业务系统A数据传输至业务系统B,要求消息准确、实时。但是业务系统A的原始的数据可能会存在修改的情况,要求业务系统B需要实时的更改。保证消息的实时性、一致性、可靠性。
主题test_1发送消息到RocketMQ的双主Broker1、Broker2上,每个broker上test_1主题对应4个队列,消息id为001001的消息存在创建(create)、更新(update),MQ集群是双主的,使用默认的消息发送算法,消息将轮询的丢弃到各个队列中。
默认按照轮询算法将消息分发到各个broker的不同的队列中,保证每个队列的消息都是均匀分配,集群消费且消费者多个时,多个消费者会分散到不同的队列中消费消息,保证消息能够实时消费。
因为消息本身是放入到不同的队列中消费的就不能保证其顺序性,更新的消息可能是最先被消费掉,创建的消息消费时业务需要判断消息是否是最新的,需要进行查库验证,是则更新,不是则丢弃保存最新的消息,保证业务系统A与业务系统B,数据的一致性,增加了业务处理的难度。
我们在生产消息的时候可以将同一个消息ID的消息放入到相同的队列中,保证同一类需要顺序消费的消息放入到同一个队列中,这样队列中的消息就是有序的。但是同时也需要保证消息的消费也是有序的才可以保证消息的顺序消费。
集群模式下同一个消费组内的消费者共同承担其订阅主题下的消息队列的消费,同一个消息消息队列在同一时刻只会被消费组内的一个消费者消费,一个消费者同一时刻可以分配多个消费队列。
集群模式下的普通消息,线程池默认创建20个(可配置)线程。多线程从队列中拉取消息,提高并发加快消息的消费。
集群模式下的顺序消息,顺序消费是单线程,一个线程只能去一个队列获取数据,当需要获取某个队列中的消息时,需要锁定该消息队列(PS:后面会根据源码详细分析其原理)。
广播模式下的顺序消息,顺序消费是单线程,直接进行消费,无需锁定消息队列,因为相互之间无竞争
public static void main(String[] args) throws Exception { //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("order_group_test_1"); //Launch the instance. producer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876"); producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 100; i++) { int orderId = i % 10; //Create a message instance, specifying topic, tag and message body. Message msg = new Message("order_test_1", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(Listmqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId); System.out.printf("%s%n", sendResult); } //server shutdown producer.shutdown(); }
我们根据100个消息的序号来放入到不同的队列中,根据序号%10取模,相同的放入到一个队列中。
上面是我们实现自定义队列选择器的算法,RocketMQ也提供了三种队列选择算法
从图中我们可以看到一共三种
SelectMessageQueueByHash:通过 hash 进行选择 queue。
SelectMessageQueueByRandom:随机选择 queue。
SelectMessageQueueByMachineRoom:机房选择queue(未实现)
我们分别来看一下实现
public class SelectMessageQueueByHash implements MessageQueueSelector { public MessageQueue select(Listmqs, Message msg, Object arg) { int value = arg.hashCode(); if (value < 0) { value = Math.abs(value); } value %= mqs.size(); return ((MessageQueue) mqs.get(value)); } }
我们差看源码可以发现,通过提供的参数获取其HashCode,如果为负值则取绝对值,hash值与队列的总数进行取模获取其队列。
public class SelectMessageQueueByRandom implements MessageQueueSelector { private Random random; public SelectMessageQueueByRandom() { this.random = new Random(System.currentTimeMillis()); } public MessageQueue select(Listmqs, Message msg, Object arg) { int value = this.random.nextInt(mqs.size()); return ((MessageQueue) mqs.get(value)); } }
生成一个队列数以内的随机数,通过随机数获取队列。
public class SelectMessageQueueByMachineRoom implements MessageQueueSelector { private Setconsumeridcs; public MessageQueue select(List mqs, Message msg, Object arg) { return null; } public Set getConsumeridcs() { return this.consumeridcs; } public void setConsumeridcs(Set consumeridcs) { this.consumeridcs = consumeridcs; } }
我们发现其select方法为null,其实是没有进行实现。需要我们自己实现。
虽然RocketMQ提供了三种(其实2种,SelectMessageQueueByMachineRoom未实现)队列选择算法,但是不建议使用,不同的业务规则其选择队列的算法也不尽相同,建议手动实现。
public static void main(String[] args){ try { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); consumer.setConsumerGroup("order_consumer_test_push"); consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876"); consumer.subscribe("order_test_1", "*"); consumer.registerMessageListener(new MessageListenerOrderly(){ @Override public ConsumeOrderlyStatus consumeMessage(ListparamList, ConsumeOrderlyContext paramConsumeOrderlyContext) { try { for(MessageExt msg : paramList){ String msgbody = new String(msg.getBody(), "utf-8"); System.out.println(" MessageBody: "+ msgbody);//输出消息内容 } } catch (Exception e) { e.printStackTrace(); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } return ConsumeOrderlyStatus.SUCCESS; //消费成功 } }); consumer.start(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } }
查看结果
我们找到两个典型的一组数字,尾号是6和9的,尾号是6的计较集中,尾号是9的比较分散,但是结果都是一样的,按照顺序消费的。
到此,关于“RocketMQ顺序消息是什么意思”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注创新互联网站,小编会继续努力为大家带来更多实用的文章!