云服务器内容精选

  • 消费者增加用户认证信息 无论是普通消息、顺序消息、定时消息,还是事务消息,都参考如下代码。以下加粗内容需要替换为实例自有信息,请根据实际情况替换。 import time from rocketmq.client import PushConsumer, ConsumeStatus def callback(msg): print(msg.id, msg.body, msg.get_property('property')) return ConsumeStatus.CONSUME_SUCCESS def start_consume_message(): consumer = PushConsumer('consumer_group') consumer.set_name_server_address('192.168.0.1:8100') # 设置权限(角色名和密钥) consumer.set_session_credentials( "ROCKETMQ_AK", # 角色名称 "ROCKETMQ_SK", # 角色密钥 '' )#用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。 consumer.subscribe('TopicTest', callback) print('start consume message') consumer.start() while True: time.sleep(3600) if __name__ == '__main__': start_consume_message() 示例代码中的参数说明如下,请参考收集连接信息获取参数值。 consumer_group:表示消费组名称。 192.168.0.1:8100:表示实例连接地址和端口。 ROCKETMQ_AK:表示用户名。创建用户的步骤,请参见创建用户。 ROCKETMQ_SK:表示用户的密钥。 TopicTest:表示Topic名称。
  • 生产者增加用户认证信息 普通消息、顺序消息和定时消息,参考如下代码。以下加粗内容需要替换为实例自有信息,请根据实际情况替换。 from rocketmq.client import Producer, Message topic = 'TopicTest' gid = 'test' name_srv = '192.168.0.1:8100' def create_message(): msg = Message(topic) msg.set_keys('XXX') msg.set_tags('XXX') msg.set_property('property', 'test') msg.set_body('message body') return msg def send_message_sync(): producer = Producer(gid) producer.set_name_server_address(name_srv) # 设置权限(角色名和密钥) producer.set_session_credentials( "ROCKETMQ_AK", # 角色名称 "ROCKETMQ_SK", # 角色密钥 '' )#用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。 producer.start() msg = create_message() ret = producer.send_sync(msg) print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id) producer.shutdown() if __name__ == '__main__': send_message_sync() 示例代码中的参数说明如下,请参考收集连接信息获取参数值。 topic:表示Topic名称。 gid:表示生产者组名称,请根据业务实际情况输入生产者组名称。 name_srv:表示实例连接地址和端口。 ROCKETMQ_AK:表示用户名。创建用户的步骤,请参见创建用户。 ROCKETMQ_SK:表示用户的密钥。 事务消息,参考如下代码。以下加粗内容需要替换为实例自有信息,请根据实际情况替换。 import time from rocketmq.client import Message, TransactionMQProducer, TransactionStatus topic = 'TopicTest' gid = 'test' name_srv = '192.168.0.1:8100' def create_message(): msg = Message(topic) msg.set_keys('XXX') msg.set_tags('XXX') msg.set_property('property', 'test') msg.set_body('message body') return msg def check_callback(msg): print('check: ' + msg.body.decode('utf-8')) return TransactionStatus.COMMIT def local_execute(msg, user_args): print('local: ' + msg.body.decode('utf-8')) return TransactionStatus.UNKNOWN def send_transaction_message(count): producer = TransactionMQProducer(gid, check_callback) producer.set_name_server_address(name_srv) # 设置权限(角色名和密钥) producer.set_session_credentials( "ROCKETMQ_AK", # 角色名称 "ROCKETMQ_SK", # 角色密钥 '' )#用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。 producer.start() for n in range(count): msg = create_message() ret = producer.send_message_in_transaction(msg, local_execute, None) print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id) print('send transaction message done') while True: time.sleep(3600) if __name__ == '__main__': send_transaction_message(10) 示例代码中的参数说明如下,请参考收集连接信息获取参数值。 topic:表示Topic名称。 gid:表示生产者组名称,请根据业务实际情况输入生产者组名称。 name_srv:表示实例连接地址和端口。 ROCKETMQ_AK:表示用户名。创建用户的步骤,请参见创建用户。 ROCKETMQ_SK:表示用户的密钥。