分布式消息服务KAFKA版-Java客户端接入示例:准备Kafka配置信息
准备Kafka配置信息
为了方便,下文分生产与消费两个配置文件介绍。其中涉及SASL认证配置,如果Kafka实例没有开启密文接入,使用的是不加密连接,请注释相关代码;如果Kafka实例开启了密文接入,则必须使用加密方式连接,请设置相关参数。
- 生产消息配置文件(对应生产消息代码中的dms.sdk.producer.properties文件)
以下粗体部分为不同Kafka实例特有的信息,必须修改。客户端其他参数,可以自主添加。
#Topic名称在具体的生产与消费代码中。 ####################### #Kafka实例的broker信息,ip:port为实例的连接地址和端口,参考“收集连接信息”章节获取。举例:bootstrap.servers=100.xxx.xxx.87:909x,100.xxx.xxx.69:909x,100.xxx.xxx.155:909x bootstrap.servers=ip1:port1,ip2:port2,ip3:port3 #发送确认参数 acks=all #键的序列化方式 key.serializer=org.apache.kafka.common.serialization.StringSerializer #值的序列化方式 value.serializer=org.apache.kafka.common.serialization.StringSerializer #producer可以用来缓存数据的内存大小 buffer.memory=33554432 #重试次数 retries=0 ####################### #如果不使用密文接入,以下参数请注释掉。 ####################### #设置SASL认证机制、账号和密码。 #sasl.mechanism为SASL认证机制,username和password为SASL的用户名和密码,参考“收集连接信息”章节获取。为了确保用户名和密码的安全性,建议对用户名和密码进行加密处理,使用时解密。 #SASL认证机制为“PLAIN”时,配置信息如下。 sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="username" \ password="password"; #SASL认证机制为“SC RAM -SHA-512”时,配置信息如下。 sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="username" \ password="password"; #设置Kafka安全协议。security.protocol为安全协议。 #安全协议为“SASL_SSL”时,配置信息如下。 security.protocol=SASL_SSL #ssl truststore.location为SSL证书的存放路径,如下代码以Windows系统路径格式举例,您在使用时请根据实际运行环境修改路径格式。 ssl.truststore.location=E:\\temp\\client.jks #ssl.truststore.password为Kafka客户端证书密码,如果使用Kafka控制台提供的SSL证书,默认为dms@kafka,不可更改。如果使用您自制的客户端证书,请根据实际情况配置。配置此密码是为了访问Java生成的jks文件。 ssl.truststore.password=dms@kafka #ssl.endpoint.identification.algorithm为证书 域名 校验开关,为空则表示关闭,这里需要保持关闭状态,必须设置为空。 ssl.endpoint.identification.algorithm= #安全协议为“SASL_PLAINTEXT”时,配置信息如下。 security.protocol=SASL_PLAINTEXT
- 消费消息配置文件(对应消费消息代码中的dms.sdk.consumer.properties文件)
以下粗体部分为不同Kafka实例特有的信息,必须修改。客户端其他参数,可以自主添加。
#Topic名称在具体的生产与消费代码中。 ####################### #Kafka实例的broker信息,ip:port为实例的连接地址和端口,参考“收集连接信息”章节获取。举例:bootstrap.servers=100.xxx.xxx.87:909x,100.xxx.xxx.69:909x,100.xxx.xxx.155:909x bootstrap.servers=ip1:port1,ip2:port2,ip3:port3 #用来唯一标识consumer进程所在组的字符串,如果设置同样的group id,表示这些processes都是属于同一个consumer group group.id=1 #键的序列化方式 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer #值的序列化方式 value.deserializer=org.apache.kafka.common.serialization.StringDeserializer #偏移量的方式 auto.offset.reset=earliest ####################### #如果不使用密文接入,以下参数请注释掉。 ####################### #设置SASL认证机制、账号和密码。 #sasl.mechanism为SASL认证机制,username和password为SASL的用户名和密码,参考“收集连接信息”章节获取。为了确保用户名和密码的安全性,建议对用户名和密码进行加密处理,使用时解密。 #SASL认证机制为“PLAIN”时,配置信息如下。 sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="username" \ password="password"; #SASL认证机制为“SCRAM-SHA-512”时,配置信息如下。 sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="username" \ password="password"; #设置Kafka安全协议。security.protocol为安全协议。 #安全协议为“SASL_SSL”时,配置信息如下。 security.protocol=SASL_SSL #ssl truststore.location为SSL证书的存放路径,如下代码以Windows系统路径格式举例,您在使用时请根据实际运行环境修改路径格式。 ssl.truststore.location=E:\\temp\\client.jks #ssl.truststore.password为Kafka客户端证书密码,如果使用Kafka控制台提供的SSL证书,默认为dms@kafka,不可更改。如果使用您自制的客户端证书,请根据实际情况配置。配置此密码是为了访问Java生成的jks文件。 ssl.truststore.password=dms@kafka #ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭,这里需要保持关闭状态,必须设置为空。 ssl.endpoint.identification.algorithm= #安全协议为“SASL_PLAINTEXT”时,配置信息如下。 security.protocol=SASL_PLAINTEXT