分布式消息服务ROCKETMQ版-收发事务消息:发送事务消息

时间:2025-08-14 18:54:58

发送事务消息

参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。

import time

from rocketmq.client import Message, TransactionMQProducer, TransactionStatus

topic = 'TopicTest'
name_srv = '192.168.0.1:8100'


def create_message():
    msg = Message(topic)
    msg.set_keys('messageKey')
    msg.set_tags('messageTag')
    msg.set_property('property', 'test')
    msg.set_body('message body')
    return msg


def check_callback(msg):
    print('check: ' + msg.body.decode('utf-8'))
    return TransactionStatus.COMMIT


def local_execute(msg, user_args):
    print('local:   ' + msg.body.decode('utf-8'))
    return TransactionStatus.UNKNOWN


def send_transaction_message(count):
    producer = TransactionMQProducer('producerGroup', check_callback)
    producer.set_name_server_address(name_srv)
    producer.start()
    for n in range(count):
        msg = create_message()
        ret = producer.send_message_in_transaction(msg, local_execute, None)
        print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)
    print('send transaction message done')

    while True:
        time.sleep(3600)


if __name__ == '__main__':
    send_transaction_message(10)
support.huaweicloud.com/devg-hrm/hrm-devg-020.html