signed

QiShunwang

“诚信为本、客户至上”

RocketMQ代码实战(八):事务消息(半消息)

2021/5/14 21:08:17   来源:

maven依赖和配置参考RocketMQ代码实战(一):使用rocketmq-spring-boot-starter发送和消费消息

首需要注意的是 事务消息(半消息) 仅仅只是保证本地事务和MQ消息发送形成整体的 原子性 ,而投递到MQ服务器后,并无法保证消费者一定能消费成功!

以下代码实例实现的是创建user后再发送消息。

消息生产

@RestController
@Slf4j
public class RocketMqController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping("sendMqTransaction")
    public Object sendMqTransaction() {
        int i = new Random().nextInt(1000);
        String name = "name" + i;
        MqMessage message = MqMessage.builder().name("事务消息" + i).msg("这是事务消息" + i).build();
        Message<MqMessage> mqMessage = MessageBuilder.withPayload(message).setHeader("key", name).build();
        User user = new User();
        user.setName(name);
        user.setSex("" + i);
        log.info("sex:{}", i);
        TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(MqUtil.tx_group, MqUtil.tx_topic, mqMessage , user);
        return transactionSendResult;
    }

}

常量:

public class MqUtil {


    public static final String  tx_topic = "tx_topic";

    public static final String tx_group = "tx_producer_group";
}

与发送普通消息不同,发送事务消息org.apache.rocketmq.spring.core.RocketMQTemplate#sendMessageInTransaction(final String txProducerGroup, final String destination,         final Message<?> message, final Object arg)。不光需要destination和message参数,还需要txProducerGroup,arg。txProducerGroup指定处理半消息的group。arg表示传递的额外参数,此处我传递的是需要创建的user信息。

接下来是txProducerGroup对应的代码,即半消息的处理类。

@Slf4j
@Service
//@RocketMQTransactionListener表明这个一个生产端的消息监听器,需要配置监听的事务消息生产者组。
// 实现RocketMQLocalTransactionListener接口,重写执行本地事务的方法和检查本地事务方法
@RocketMQTransactionListener(txProducerGroup = MqUtil.tx_group)
public class TxProducerListener implements RocketMQLocalTransactionListener {

    @Autowired
    private UserMapper userMapper;


    /**
     * 每次推送消息会执行executeLocalTransaction方法,首先会发送半消息,到这里的时候是执行具体本地业务,
     * 执行成功后手动返回RocketMQLocalTransactionState.COMMIT状态,
     * 这里是保证本地事务执行成功,如果本地事务执行失败则可以返回ROLLBACK进行消息回滚。 此时消息只是被保存到broker,并没有发送到topic中,broker会根据本地返回的状态来决定消息的处理方式。
     * @param msg
     * @param arg
     * @return
     */
    @Override
    @Transactional
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        log.info("开始执行本地事务");
        User u = (User) arg;
        userMapper.insert(u);
        if (Integer.parseInt(u.getSex()) % 2 == 0) {
            int i = 1 / 0;
            //这个地方抛出异常,消息状态会是UNKNOWN状态
        }
        log.info("本地事务提交");
        return RocketMQLocalTransactionState.COMMIT;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        log.info("开始执行回查");
        String key = msg.getHeaders().get("key").toString();
        User u = new User();
        u.setName(key);
        List<User> select = userMapper.select(u);
        if (CollectionUtils.isEmpty(select)) {
            log.info("回滚半消息");
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        log.info("提交半消息");
        return RocketMQLocalTransactionState.COMMIT;
    }
}

该监听器的实现有两个方法一个是本地事务的执行方法executeLocalTransaction,一个是本地事务回查方法checkLocalTransaction。

两个方法的返回值类型为RocketMQLocalTransactionState,该枚举有三种:

// COMMIT:即生产者通知Rocket该消息可以消费
RocketMQLocalTransactionState.COMMIT;
// ROLLBACK:即生产者通知Rocket将该消息删除
RocketMQLocalTransactionState.ROLLBACK;
// UNKNOWN:即生产者通知Rocket继续查询该消息的状态
RocketMQLocalTransactionState.UNKNOWN;

    对于长时间没有 Commit/Rollback 的事务消息( pending 状态的消息),从服务端发起一次 回查Producer 收到回查消息,检查回查消息对应的 本地事务状态根据本地事务状态,重新 Commit 或者 Rollback。

以上代码中,如果sex是偶数,executeLocalTransaction会抛出异常,本地事务会回滚,半消息状态是UNKNOWN,此时就会启动消息的回查机制,mq会在一定的时间调用checkLocalTransaction方法查询执行状态,根据执行状态来决定是继续回查、删除消息、发送消息。

executeLocalTransaction也可以自己捕获异常,手动回滚事务,返回RocketMQLocalTransactionState.ROLLBACK,这样能减少消息回查

消费者的代码和普通消费者的代码一致:

@Slf4j
@Component
@RocketMQMessageListener(
        topic = MqUtil.tx_topic,
        consumerGroup = "tx_consumer_group")
public class TxConsumerListener implements RocketMQListener<MqMessage>{
    @Override
    public void onMessage(MqMessage message) {
        log.info("{}收到消息:{}", this.getClass().getSimpleName(), message);
    }

}

启动程序后,多次访问http://127.0.0.1:8080/sendMqTransaction,执行结果如下