分布式消息服务KAFKA版-Java客户端接入示例:准备Kafka配置信息

时间:2024-05-15 11:01:15

准备Kafka配置信息

为了方便,下文分生产与消费两个配置文件介绍。其中涉及SASL认证配置,如果Kafka实例没有开启SASL,使用的是不加密连接,请注释相关代码;如果Kafka实例开启了SASL,则必须使用加密方式连接,请设置相关参数。

  • 生产消息配置文件(对应生产消息代码中的dms.sdk.producer.properties文件)

    以下粗体部分为不同Kafka实例特有的信息,必须修改。客户端其他参数,可以自主添加。

    #Topic名称在具体的生产与消费代码中。
    #######################
    #Kafka实例的broker信息,ip:port为实例的连接地址和端口,参考“收集连接信息”章节获取。举例:bootstrap.servers=100.xxx.xxx.87:909x,100.xxx.xxx.69:909x,100.xxx.xxx.155:909x
    bootstrap.servers=ip1:port1,ip2:port2,ip3:port3
    #发送确认参数
    acks=all
    #键的序列化方式
    key.serializer=org.apache.kafka.common.serialization.StringSerializer
    #值的序列化方式
    value.serializer=org.apache.kafka.common.serialization.StringSerializer
    #producer可以用来缓存数据的内存大小
    buffer.memory=33554432
    #重试次数
    retries=0
    #######################
    #如果不使用SASL认证,以下参数请注释掉
    #######################
    #设置SASL认证机制、账号和密码。
    #sasl.mechanism为SASL认证机制,username和password为SASL_SSL的用户名和密码,参考“收集连接信息”章节获取。
    #SASL认证机制为“PLAIN”时,配置信息如下。
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="username" \
        password="password";
    #SASL认证机制为“SCRAM-SHA-512”时,配置信息如下。
    sasl.mechanism=SCRAM-SHA-512
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
        username="username" \
        password="password";
    
    #设置Kafka安全协议。security.protocol为安全协议。
    #安全协议为“SASL_SSL”时,配置信息如下。
    security.protocol=SASL_SSL
    #ssl truststore.location为SSL证书的存放路径,如下代码以Windows系统路径格式举例,您在使用时请根据实际运行环境修改路径格式。
    ssl.truststore.location=E:\\temp\\client.jks
    #ssl truststore.password为服务器证书密码,配置此密码是为了访问Java生成的jks文件。
    ssl.truststore.password=dms@kafka
    #ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭,这里需要保持关闭状态,必须设置为空。
    ssl.endpoint.identification.algorithm=
    #安全协议为“SASL_PLAINTEXT”时,配置信息如下。
    security.protocol=SASL_PLAINTEXT
  • 消费消息配置文件(对应消费消息代码中的dms.sdk.consumer.properties文件)
    以下粗体部分为不同Kafka实例特有的信息,必须修改。客户端其他参数,可以自主添加。
    #Topic名称在具体的生产与消费代码中。
    #######################
    #Kafka实例的broker信息,ip:port为实例的连接地址和端口,参考“收集连接信息”章节获取。举例:bootstrap.servers=100.xxx.xxx.87:909x,100.xxx.xxx.69:909x,100.xxx.xxx.155:909x
    bootstrap.servers=ip1:port1,ip2:port2,ip3:port3
    #用来唯一标识consumer进程所在组的字符串,如果设置同样的group id,表示这些processes都是属于同一个consumer group
    group.id=1
    #键的序列化方式
    key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    #值的序列化方式
    value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    #偏移量的方式
    auto.offset.reset=earliest
    #######################
    #如果不使用SASL认证,以下参数请注释掉。
    #######################
    #设置SASL认证机制、账号和密码。
    #sasl.mechanism为SASL认证机制,username和password为SASL_SSL的用户名和密码,参考“收集连接信息”章节获取。
    #SASL认证机制为“PLAIN”时,配置信息如下。
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="username" \
        password="password";
    #SASL认证机制为“SCRAM-SHA-512”时,配置信息如下。
    sasl.mechanism=SCRAM-SHA-512
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
        username="username" \
        password="password";
    
    #设置Kafka安全协议。security.protocol为安全协议。
    #安全协议为“SASL_SSL”时,配置信息如下。
    security.protocol=SASL_SSL
    #ssl truststore.location为SSL证书的存放路径,如下代码以Windows系统路径格式举例,您在使用时请根据实际运行环境修改路径格式。
    ssl.truststore.location=E:\\temp\\client.jks
    #ssl truststore.password为服务器证书密码,配置此密码是为了访问Java生成的jks文件。
    ssl.truststore.password=dms@kafka
    #ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭,这里需要保持关闭状态,必须设置为空。
    ssl.endpoint.identification.algorithm=
    #安全协议为“SASL_PLAINTEXT”时,配置信息如下。
    security.protocol=SASL_PLAINTEXT
support.huaweicloud.com/devg-kafka/Kafka-java-demo.html