分布式消息服务ROCKETMQ版-使用ACL权限访问:消费者增加用户认证信息
时间:2025-05-06 16:56:18
消费者增加用户认证信息
无论是普通消息、顺序消息、定时消息,还是事务消息,都参考如下代码。以下加粗内容需要替换为实例自有信息,请根据实际情况替换。
package main import ( "context" "fmt" "os" "time" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/consumer" "github.com/apache/rocketmq-client-go/v2/primitive" ) func main() { c, err := rocketmq.NewPushConsumer( consumer.WithGroupName("testGroup"), consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})), consumer.WithCredentials(primitive.Credentials{ AccessKey: os.Getenv("ACL_User_Name"), //用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。 SecretKey: os.Getenv("ACL_Secret_Key"), }), //consumer.WithTls(true), //创建实例时,如果开启了SSL,请添加此行代码。 ) if err != nil { fmt.Println("init consumer error: " + err.Error()) os.Exit(0) } err = c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { fmt.Printf("subscribe callback: %v \n", msgs) return consumer.ConsumeSuccess, nil }) if err != nil { fmt.Println(err.Error()) } // Note: start after subscribe err = c.Start() if err != nil { fmt.Println(err.Error()) os.Exit(-1) } time.Sleep(time.Hour) err = c.Shutdown() if err != nil { fmt.Printf("Shutdown Consumer error: %s", err.Error()) } }
示例代码中的参数说明如下,请参考收集连接信息获取参数值。
- testGroup:表示消费组名称。
- 192.168.0.1:8100:表示实例连接地址和端口。
- AccessKey:表示用户名。创建用户的步骤,请参见创建用户。
- SecretKey:表示用户的密钥。
- test:表示Topic名称。
support.huaweicloud.com/devg-hrm/hrm-devg-017.html
推荐文章