RocketMQ的事务消息详解

RocketMQ的事务消息详解

开发环境

在使用RocketMQ消息队列时,尝试使用事务消息,来执行分布式事务,过程中遇到不少麻烦,这里来梳理一下

使用的RocketMQ版本为:5.1.0,客户端为Java依赖rocketmq-spring-boot-starter,版本:2.2.2

事务消息介绍

事务消息为 Apache RocketMQ 中的高级特性消息,在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。

通俗地说,就是通过二阶段提交方式,保证本地事务执行发送消息,这两个操作的一致性(最终一致性)。

那么RocketMQ事务消息相比而言,解决了哪些特殊问题呢? 主要有以下几点:

  1. 消息与本地事务强一致性。普通消息只保证消息发送,但不能保证与本地事务同时成功或同时失败。事务消息通过准备消息-本地事务-提交/回滚真实消息的流程,保证了二者的强一致性。

  2. 自动回滚消息。如果本地事务失败,普通消息无法自动回滚,需要重试或补偿处理。事务消息在本地事务失败时,可以自动回滚消息,简化开发。

  3. 消息幂等性。同一消息如果重复消费,普通消息会导致业务逻辑再次执行。事务消息通过检查准备消息状态,可以实现幂等消费,避免重复执行业务逻辑。

  4. 支持消息与远程调用一致。通过RocketMQ事务消息,可以实现消息的发送与远程服务调用在同一个事务内。如果远程调用失败,消息自动回滚。这可以跨进程跨机房保证一致性。

所以,总体来说,RocketMQ事务消息相比直接发送普通消息的优势在于:强一致性保证、自动回滚支持、幂等消费和与远程调用一致这四个方面。这四点可以解决普通消息无法解决的分布式事务问题,比如跨进程跨机房的强一致性要求等。但其实对许多场景来说,普通消息的重试/补偿机制也可以满足需要,RocketMQ事务消息的优势并不总是必要的。这还是取决于具体的业务场景和一致性要求。

踩坑过程

以下是RocketMQTemplate中有关发送事务消息的源代码:

    /**
     * Send Spring Message in Transaction
     * ...
     */
    public TransactionSendResult sendMessageInTransaction(final String destination,
        final Message<?> message, final Object arg) throws MessagingException {
        try {
            // 事务监听器的非空判断,为空则抛异常
            if (((TransactionMQProducer) producer).getTransactionListener() == null) {
                throw new IllegalStateException("The rocketMQTemplate does not exist TransactionListener");
            }
            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
            return producer.sendMessageInTransaction(rocketMsg, arg);
        } catch (MQClientException e) {
            throw RocketMQUtil.convert(e);
        }
    }

这段代码中对 getTransactionListener的非空判断,代表如果要发送事务消息,当前TransactionMQProducer上必须绑定事务监听器

事务监听器将用于 执行本地事务,以及 二次检查本地事务执行结果

相当于要求发送事务消息本地事务要在同一个JVM中进行。当然执行的事务和发送消息可以是RPC远程调用方式。但是最理想的应该还是同一个JVM的情况。

定义事务监听器

如果要发送事务消息,就需要定义事务消息,用来执行本地事务并向MQ发送执行结果,以此来控制事务消息是否要下发给下游的消费者。

下面将定义事务监听器来模拟本地事务执行可能遇到的绝大部分情况,以及后续产生的影响,希望能帮助后面学习RocketMQ事务消息的小伙伴理解整个事务流程:

package com.ryujung.listener;

import com.alibaba.fastjson.JSON;
import com.ryujung.model.Order;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import java.util.Random;

/**
 * 生产者将消息发送至Apache RocketMQ服务端。
 *
 * Apache RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,
 * 此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。
 *
 * 生产者开始执行本地事务逻辑。
 */
@Slf4j
@RocketMQTransactionListener
public class TransactionMessageListener implements RocketMQLocalTransactionListener {

    /**
     * 生产者执行本地事务逻辑。
     * 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
     * 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
     * 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            String payload = new String((byte[]) msg.getPayload());
            log.info("处理事务消息:{},arg:{},执行本地事务中……", JSON.parseObject(payload, Order.class), arg);

            Random random = new Random();
            if (random.nextInt(5) < 1) {
                throw new RuntimeException("本地事务执行发生异常的情况");
            }

            boolean finished = random.nextBoolean();
            boolean success = random.nextBoolean();

            if (!finished) {
                log.info("网络中断或抖动等原因导致本地事务尚未完成,成败未知,需要等待MQ后续回查本地事务结果...");
                return RocketMQLocalTransactionState.UNKNOWN;
            }

            if (finished && success) {
                log.info("本地事务执行成功,事务消息将重新存储到普通存储系统中,对下游消费者可见,等待被消费者获取并消费");
                return RocketMQLocalTransactionState.COMMIT;
            }

            if (finished && !success) {
                log.info("本地事务执行失败,本地事务回滚,半事务消息回滚,该事务消息流程终止");
                return RocketMQLocalTransactionState.ROLLBACK;
            }

        } catch (Exception e) {
            log.error("执行本地事务失败,半事务消息回滚,该事务消息流程终止,异常信息:{}", e.getMessage());
            e.printStackTrace();
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        return RocketMQLocalTransactionState.UNKNOWN;
    }

    /**
     * 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,
     * 或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
     *
     * @return 回查结果
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        try {
            String payload = new String((byte[]) msg.getPayload());
            log.info("处理事务消息:{},确认事务中……", JSON.parseObject(payload, Order.class));

            Random random = new Random();
            if (random.nextInt(5) < 1) {
                throw new RuntimeException("回查过程,检查事务时发生异常的情况");
            }

            boolean finished = random.nextBoolean();
            boolean success = random.nextBoolean();

            if (!finished) {
                log.info("回查结果:本地事务尚未完成,成败未知,需要等待MQ后续再回查本地事务结果...");
                return RocketMQLocalTransactionState.UNKNOWN;
            }

            if (finished && success) {
                log.info("回查结果:本地事务执行成功,事务消息将重新存储到普通存储系统中,对下游消费者可见,等待被消费者获取并消费");
                return RocketMQLocalTransactionState.COMMIT;
            }

            if (finished && !success) {
                log.info("回查结果:本地事务执行失败,本地事务回滚,半事务消息回滚,该事务消息流程终止");
                return RocketMQLocalTransactionState.ROLLBACK;
            }
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            log.error("确认本地事务失败,半事务消息回滚,该事务消息流程终止,异常信息:{}", e.getMessage());
            e.printStackTrace();
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

}

RocketMQ官方使用建议:

  • 避免大量未决事务导致超时
  • Apache RocketMQ支持在事务提交阶段异常的情况下发起事务回查,保证事务一致性。
  • 但生产者应该尽量避免本地事务返回未知结果。大量的事务检查会导致系统性能受损,容易导致事务处理延迟。
  • 正确处理"进行中"的事务
    消息回查时,对于正在进行中的事务不要返回Rollback或Commit结果,应继续保持Unknown的状态。
    一般出现消息回查时事务正在处理的原因为:事务执行较慢,消息回查太快。解决方案如下:

    • 将第一次事务回查时间设置较大一些,但可能导致依赖回查的事务提交延迟较大。
    • 程序能正确识别正在进行中的事务。

事务消息整体流程

总体来说,事务消息的使用流程是:

  1. 生产者发送事务消息,该操作在MQ中产生半事务消息,暂时对下游消费者不可见
  2. 生产者执行本地事务,并根据执行结果提交/回滚消息,即事务监听器的executeLocalTransaction方法被触发
  3. RocketMQ根据本地事务执行结果对事务消息做不同处理,分为三种执行结果:
    • 执行成功(提交):将半事务消息转换为普通消息,对下游消费者可见,等待消费者消费
    • 执行失败(回滚):或者删除半事务消息,并终止本次事务消息
    • 未知结果(未知或未收到):MQ在一定时间内未收到结果,将会主动回查生产者事务,从而触发事务监听器的checkLocalTransaction方法,该检查结果分为三种
      • 结果成功(提交)或失败(回滚):同上方的处理方式
      • 依然未知:MQ在一定回查间隔后,再次执行checkLocalTransaction方法回查事务结果
      • 达到回查次数上限(默认15次)或者事务消息执行超时:半事务消息将会被强制回滚,事务消息终止
  4. 消息被消费者消费,消费者不需要关心事务消息的状态
  5. 事务消息的状态完全由生产者控制

事务相关MQ参数

事务异常检查间隔
默认值:60秒。
事务异常检查间隔指的是,半事务消息因系统重启或异常情况导致没有提交,生产者客户端会按照该间隔时间进行事务状态回查。 间隔时长不建议设置过短,否则频繁的回查调用会影响系统性能。

半事务消息第一次回查时间
默认值:取值等于[事务异常检查间隔] * 最大限制:不超过1小时。

半事务消息最大超时时长
默认值:4小时。
取值范围:不支持自定义修改。
半事务消息因系统重启或异常情况导致没有提交,生产者客户端会按照事务异常检查间隔时间进行回查,若超过半事务消息超时时长后没有返回结果,半事务消息将会被强制回滚。 您可以通过监控该指标避免异常事务。

关于回查次数上限(默认15次)

可以查看RocketMQ项目源码中,有关broker中事务检查的源代码

package org.apache.rocketmq.broker.transaction;

import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.ServiceThread;
import ...

public class TransactionalMessageCheckService extends ServiceThread {
    ...
    private BrokerController brokerController;
    ...
    @Override
    protected void onWaitEnd() {
        long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
        // 获取broker配置中的最大回查次数配置
        int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
        long begin = System.currentTimeMillis();
        log.info("Begin to check prepare message, begin time:{}", begin);
        // 事务消息服务执行回查操作
        this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
        log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
    }
}

可以看到,broker在执行事务检查时,会先获取配置中的最大回查次数,然后执行check方法,以下是定义check的接口中的有关描述:

package org.apache.rocketmq.broker.transaction;

import ...

public interface TransactionalMessageService {
    ...
    /**
     * 翻译:遍历未提交/未回滚的半事务消息,向producer发送check back请求获取事务执行状态。
     * Traverse uncommitted/unroll back half message and send check back request to producer to obtain transaction status.
     * ...
     * 翻译:事务最大检查次数:消息被检查的最大次数,如果超过这个值,消息将被丢弃
     * @param transactionCheckMax The maximum number of times the message was checked, 
     * if exceed this value, this message will be discarded.
     * ...
     */
    void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener);
    ...
}

可以看到,brocker在检查尚未完成的半事务消息时,会查看消息被检查的最大次数,如果超过这个值,事务消息将被丢弃

为了确定默认的回查最大次数,我们需要查看broker的配置类中,有关事务消息的定义:

package org.apache.rocketmq.common;

import ...

public class BrokerConfig extends BrokerIdentity {
    ....
    /**
     * The minimum time of the transactional message  to be checked firstly, one message only exceed this time interval
     * that can be checked.
     */
    @ImportantField
    private long transactionTimeOut = 6 * 1000; // 默认事务超时触发检查的时间间隔,也就是最少6s才能第一次触发回查

    /**
     * The maximum number of times the message was checked, if exceed this value, this message will be discarded.
     */
    @ImportantField
    private int transactionCheckMax = 15;  // 默认最大回查次数

    /**
     * Transaction message check interval.
     */
    @ImportantField
    private long transactionCheckInterval = 30 * 1000; // 默认回查间隔 30s
    ...
}

由此,可以确定默认最大回查次数是基于Broker的配置,默认为15次,超过15次则会抛弃消息,终止该事务消息。

RocketTemplate 发送事务消息的时机踩坑

另外一个小问题:如果使用spring-boot-starterRocketMQTemplate操作发送事务消息,请尝试在容器初始化完成之后再进行,不要试图在Bean的初始化阶段操作,会失败

1. 错误演示

@Service
public class TransactionMessageService implements CommandLineRunner {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    // 试图使用注解方式在初始化阶段进行事务消息的发送
    @PostConstruct
    public void customInitMethod() throws Exception {
        ...
        Message<String> message = MessageBuilder.withPayload(payload).build();
        // 发送事务消息
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(destination, message, null);

        if (result.getLocalTransactionState() == LocalTransactionState.ROLLBACK_MESSAGE) {
            throw new Exception("事务消息发送失败,回滚本地事务");
        }
    }
}

@Slf4j
@RocketMQTransactionListener
class TransactionMessageListener implements RocketMQLocalTransactionListener {

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            String payload = new String((byte[]) msg.getPayload());
            log.info("处理事务消息:{},arg:{},执行本地事务中……", JSON.parseObject(payload, Order.class), arg);
            return RocketMQLocalTransactionState.UNKNOWN;
        } catch (Exception e) {
            log.error("执行本地事务失败,等待重试,异常消息:{}", e.getMessage());
            e.printStackTrace();
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        try {
            String payload = new String((byte[]) msg.getPayload());
            log.info("处理事务消息:{},确认事务中……", JSON.parseObject(payload, Order.class));
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            log.error("确认本地事务失败,异常消息:{}", e.getMessage());
            e.printStackTrace();
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
}

然后启动容器,会报错:

Caused by: java.lang.IllegalStateException: The rocketMQTemplate does not exist TransactionListener

满脸问号???为什么,明明有事务监听器,怎么会提示不存在呢?

2. 问题分析

如果我们已经明确定义的事务监听器,依然报错提示不存在

那可能的原因就是:发送事务消息时,事务监听器还没有和rocketMQTemplate完成绑定操作(注册事务监听器)

那么问题来了:RocketMQTemplate的事务监听器是在什么时候被注册的?

先说结论:是在所有单例Bean初始化之后才被注册的

3. 溯源过程

在RocketMQ的自动配置类中可以找到问题的端倪,下面是注册事务监听器的配置类源代码:

package org.apache.rocketmq.spring.autoconfigure;

import ....

@Configuration
public class RocketMQTransactionConfiguration implements ApplicationContextAware, SmartInitializingSingleton {
    ...
    @Override public void afterSingletonsInstantiated() {
        Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQTransactionListener.class)
            .entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

        beans.forEach(this::registerTransactionListener);
    }

    private void registerTransactionListener(String beanName, Object bean) {
        Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
        ...
        RocketMQTransactionListener annotation = clazz.getAnnotation(RocketMQTransactionListener.class);
        RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) applicationContext.getBean(annotation.rocketMQTemplateBeanName());
        ...
        // 在事务监听器注解中使用 rocketMQTemplateBeanName 指定的 rocketMQTemplate
        // 设置的rocketMQTemplate的事务监听器
        ((TransactionMQProducer) rocketMQTemplate.getProducer()).setTransactionListener(RocketMQUtil.convert((RocketMQLocalTransactionListener) bean));
        log.debug("RocketMQLocalTransactionListener {} register to {} success", clazz.getName(), annotation.rocketMQTemplateBeanName());
    }
}

可以看到设置监听器的方法调用链是:
afterSingletonsInstantiated -> registerTransactionListener -> 设置事务监听器

afterSingletonsInstantiated是从哪来的,在什么时候调用呢?

afterSingletonsInstantiatedSmartInitializingSingleton接口的实现方法,该接口方法会在容器中的所有单例bean对象初始化之后进行调用。源代码:

public interface SmartInitializingSingleton {
    /**
     * Invoked right at the end of the singleton pre-instantiation phase,
     * with a guarantee that all regular singleton beans have been created already. 
     * 渣翻:在单例预实例化阶段结束时调用,保证已经创建了所有常规的单例bean。
     */
    void afterSingletonsInstantiated();
}

Spring的bean默认都是单例的,所以如果在初始化阶段使用事务消息,会在这段代码前执行,导致以上错误。

总结

  1. 事务消息的发送和事务提交要在同一个JVM中进行,因为是保证本地事务和发送消息的最终一致性,当然执行的事务和发送消息可以是RPC等方式。但是最理想的应该还是同一个JVM的情况。

  2. 事务消息的使用对代码入侵严重,为了保证消息不丢失且与事务一致,需要编写对应的事务监听器方法,分别对二阶段的各种情况进行处理。学习成本高,代码入侵强。

  3. 在 RocketMQ 中,事务消息的实现方案是先发半消息(半消息对消费者不可见),待半消息发送成功之后,才能执行本地事务,等本地事务执行成功之后,再向 Broker 发送请求将半消息转成正常消息,这样消费者就可以消费此消息。由于本地事务和MQ强绑定,且执行顺序是:先发MQ,MQ接收后再执行本地事务。如果MQ出现异常宕机、网络抖动,也会影响到本地事务的执行。

对RocketMQ事务消息的评价

知乎网友评价

MQ的事务消息是解决生产者的单机业务事务与消息发送的一致性。即解决生产者做完核心业务后MQ消息发送不成功,或者MQ消息发送成功后生产者核心业务失败的一致性问题。

所以RocketMQ从自己的视角搞出来事务消息,让生产者本地事务开启后未提交前成功发送半消息,在通过后续MQ Server的check交互来让生产者根据本地事务成功与否来决定commit or rollback半消息。只能说基于MQ视角是一种解决方案。

有些开发者更习惯生产者的业务视角,所以直接执行本地事务,完成后发送MQ消息,因为会出现发送MQ消息失败的可能性(可能性比较低),可以采用定时check补偿发送MQ方案也是可行的。

无所谓优劣,两种视角下的方案均可。

思否博客评价

RocketMQ的事务消息可以用来解决分布式事务的问题,但并不是一个完整的分布式事务解决方案。
它适用于以下场景:

  1. 业务数据和消息数据一致性要求不高的场景。RocketMQ事务消息最大的限制是只支持单Broker消息回查,不支持跨Broker消息回查。这意味着如果消息与业务数据不在同一Broker,事务消息机制无法保证强一致性。
  2. 本地事务和MQ消息同在一个JVM的场景。RocketMQ事务消息需要应用在消费方法内执行本地事务,并根据事务结果提交或回滚消息。所以最理想的场景是消息消费方和生产方部署在同一JVM,消费方法可以方便执行本地事务。
  3. 可以容忍最终一致性的场景。RocketMQ事务消息机制会导致事务消息在最终提交前有一定的延迟。在某些场景下这种延迟是可以容忍的,就适用于RocketMQ事务消息。
  4. 发消息后需要执行本地事务的场景。RocketMQ事务消息的模式就是:先发送半消息(准备消息),然后执行本地事务,根据事务结果提交或回滚消息。如果业务场景需要这个流程,那么事务消息是一个很好的选择。所以,总体来说,RocketMQ事务消息可以解决一定场景下的分布式事务问题,但由于其本身的限制,不适合需要高度数据一致性和完备XA事务支持的场景。
    但对于许多业务场景来说,RocketMQ事务消息提供的最终一致性和原子性已经足够,这也是它的优势所在。

扩展:事务消息其他实现方案

有没有一种办法可以降低代码入侵,降低学习成本,从而实现事务消息呢?有,另一种方案 —— 本地消息表方案。

下面扩展一下另外一个消息队列及其实现原理 —— QMQ(去哪儿网消息队列)

QMQ简介

QMQ是去哪儿网内部广泛使用的消息中间件,自2012年诞生以来在去哪儿网所有业务场景中广泛的应用,包括跟交易息息相关的订单场景; 也包括报价搜索等高吞吐量场景。目前在公司内部日常消息qps在60W左右,生产上承载将近4W+消息topic,消息的端到端延迟可以控制在10ms以内。

QMQ使用本地消息表的方式实现分布式事务。详情查看QMQ官网:事务消息

本地消息表就是利用了关系型数据库的事务能力,会在数据库中存放一张本地事务消息表,在进行本地事务操作中加入了本地消息表的插入,即将业务的执行和将消息放入到消息表中的操作放在同一个事务中提交

这样本地事务执行成功的话,消息肯定也插入成功,然后再调用其他服务,如果其他服务调用成功就修改这条本地消息的状态

如果失败也不要紧,会有一个后台线程扫描,发现这些状态的消息,会一直调用相应的服务,一般会设置重试的次数,如果一直不行则特殊记录,待人工介入处理

可以看到,本地事务消息表是一种最大努力通知的思想

以下是来自QMQ官网中,介绍事务消息使用方法的相关截图:

官网事务消息使用方法截图

sendMessage执行的时候,消息并没有通过网络发送出去,而仅仅是往业务DB同一个实例上的消息库插入了一条记录,然后注册事务的回调,在这个事务真正提交后消息才从网络发送出去,这个时候如果发送到server成功的话消息会被立即删除掉。而如果消息发送失败则消息就留在消息库里,这个时候我们会有一个补偿任务会将这些消息从消息库里捞出然后重新发送,直到发送成功。

所以发送事务消息整体流程就如下图所示:

发送事务消息整体流程

补偿任务

在使用事务消息之前需要启动补偿任务服务,使用bin/watchdog.sh 来启动补偿任务服务。为了高可用,最好部署多个补偿任务实例,watchdog实现了选举机制,某个时刻只有一个实例在运行,当该实例出现问题时会自动切换到其他实例。

参考

RocketMQ官网:事务消息
思否博客:消息队列中的事务消息
微信公众号文章:面试官:RocketMQ 分布式事务消息的缺点?
www.baiyp.ren博客:RocketMQ消费消息

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注