资讯

精准传达 • 有效沟通

从品牌网站建设到网络营销策划,从策略到执行的一站式服务

如何整合RocketMQ事务消息

今天就跟大家聊聊有关如何整合RocketMQ事务消息,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

专注于为中小企业提供网站设计制作、成都网站设计服务,电脑端+手机端+微信端的三站合一,更高效的管理,为中小企业抚顺县免费做网站提供优质的服务。我们立足成都,凝聚了一批互联网行业人才,有力地推动了成百上千企业的稳健成长,帮助中小企业通过网站建设实现规模扩充和转变。

一、 选择RocketMQ原因

ActiveMQ、RabbitMQ、ZeroMQ、Kafka、RocketMQ选型

二、 整合思路

RocketMQ提供了事务消息回查,查看官方Demo

@SpringBootApplication
public class ProducerApplication implements CommandLineRunner {
    private static final String TX_PGROUP_NAME = "myTxProducerGroup";
    @Resource
    private RocketMQTemplate rocketMQTemplate;
    @Value("${demo.rocketmq.transTopic}")
    private String springTransTopic;
    
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        // Send transactional messages
        testTransaction();
    }


    private void testTransaction() throws MessagingException {
        String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {

                Message msg = MessageBuilder
                                        .withPayload("Hello RocketMQ " + i)
                                        .setHeader(RocketMQHeaders.TRANSACTION_ID, "KEY_" + i)
                                        .build();
                SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(TX_PGROUP_NAME,
                                                                        springTransTopic + ":" + tags[i % tags.length],
                                                                        msg,
                                                                        null);
                System.out.printf("------ send Transactional msg body = %s , sendResult=%s %n",
                                    msg.getPayload(),
                                    sendResult.getSendStatus());

                Thread.sleep(10);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    @RocketMQTransactionListener(txProducerGroup = TX_PGROUP_NAME)
    class TransactionListenerImpl implements RocketMQLocalTransactionListener {
        private AtomicInteger transactionIndex = new AtomicInteger(0);

        private ConcurrentHashMap localTrans = new ConcurrentHashMap<>();

        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
            System.out.printf("#### executeLocalTransaction is executed, msgTransactionId=%s %n", transId);
            int value = transactionIndex.getAndIncrement();
            int status = value % 3;
            localTrans.put(transId, status);
            if (status == 0) {
                // Return local transaction with success(commit), in this case,
                // this message will not be checked in checkLocalTransaction()
                System.out.printf("    # COMMIT # Simulating msg %s related local transaction exec succeeded! ### %n", msg.getPayload());
                return RocketMQLocalTransactionState.COMMIT;
            }

            if (status == 1) {
                // Return local transaction with failure(rollback) , in this case,
                // this message will not be checked in checkLocalTransaction()
                System.out.printf("    # ROLLBACK # Simulating %s related local transaction exec failed! %n", msg.getPayload());
                return RocketMQLocalTransactionState.ROLLBACK;
            }

            System.out.printf("    # UNKNOW # Simulating %s related local transaction exec UNKNOWN! \n");
            return RocketMQLocalTransactionState.UNKNOWN;
        }

        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
            RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT;
            Integer status = localTrans.get(transId);
            if (null != status) {
                switch (status) {
                    case 0:
                        retState = RocketMQLocalTransactionState.UNKNOWN;
                        break;
                    case 1:
                        retState = RocketMQLocalTransactionState.COMMIT;
                        break;
                    case 2:
                        retState = RocketMQLocalTransactionState.ROLLBACK;
                        break;
                }
            }
            System.out.printf("------ !!! checkLocalTransaction is executed once," +
                    " msgTransactionId=%s, TransactionState=%s status=%s %n",
                transId, retState, status);
            return retState;
        }
    }

}

需要在testTransaction()中发送消息,然后在TransactionListenerImpl类中实现executeLocalTransaction()方法才能执行整个本地事务,然后在checkLocalTransaction()中实现事务消息回查。

查看源代码可以知道testTransaction()方法和executeLocalTransaction()是在同一个线程当中,只不过包装RocketMQTemplate中。

三、问题和解决方法

3.1事务消息面临的几个问题:

  1. 消息发送的事务消息回调查询和本地事务没严格的先后顺序,怎么保证,回查时,事务操作肯定已经完成。

  2. 事务消息回调使用transaction_id查询,那么transaction_id存放在哪里,同时保证transaction_id关联的业务操作执行成功。

  3. 怎么把事务回调查询操作隔离出业务,保证不侵入代码中。

  4. 下游消费者怎么保证接口幂等性。

  5. 下游消费者怎么提高幂等性查询性能。

  6. 怎么把幂等性操作隔离出业务,保证不侵入代码中。

3.2 解决方法

  1. 因为数据库或者其他业务操作可能会存在延时,那么不能保证回查时业务操作已完成,那么可以多次回查,并设置最大回查次数,同时不能丢弃MQ消息持久化,方便手动恢复。

  2. 可以使用本地消息表落地的发送消息,同时可以采用切面、继承等等方式将落地消息隔离出业务代码之外,保证本地消息落库不侵入,注意必须要保证本地消息落库和本地业务落库在同一个事务之内!

  3. 事务消息回查可以使用第2点的本地消息表,根据transaction_id查询,判断本地事务的执行结果,也和第2点一样,可以使用一些方式将事务消息回查代码隔离出业务代码,保证不侵入。

  4. 幂等性的方法:

    • 数据库唯一约束

    • 状态机CAS单向流转

    • 消息去重表

  5. ,在执行本地业务前,先对redis判断是业务id是否存在,存在则直接返回消费成功,在执行本地业务之后,可以将消费信息异步落地到redis当中。注意:需要保证本地业务和消息幂等性操作在同一个事务当中,同时redis落地操作在事务之外。

  6. 比较好的方案应该是数据库唯一约束 + 消息去重表,在消息去重表中对业务id设置唯一约束,同时将消息落地操作隔离出本地业务之外,保证不侵入。

  7. 定时清理历史的本地消息表(消息去重表)。

看完上述内容,你们对如何整合RocketMQ事务消息有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注创新互联行业资讯频道,感谢大家的支持。


文章标题:如何整合RocketMQ事务消息
标题链接:http://cdkjz.cn/article/jecdsj.html
多年建站经验

多一份参考,总有益处

联系快上网,免费获得专属《策划方案》及报价

咨询相关问题或预约面谈,可以通过以下方式与我们联系

大客户专线   成都:13518219792   座机:028-86922220