云服务器内容精选
-
如何选择Kafka实例的存储空间? 存储空间主要是指用于存储消息(包括副本中的消息)、日志和元数据所需要的空间。选择存储空间时,需要选择磁盘类型和磁盘大小。更多磁盘信息,请参考如何选择磁盘类型。 假设业务存储数据保留天数内磁盘大小为100GB,则磁盘容量最少为100GB*副本数 + 预留磁盘大小100GB。Kafka集群中,每个Kafka节点会使用33GB的磁盘作为日志和Zookeeper数据的存储,因而实际可用存储会小于购买存储。 其中,副本数在创建Topic时可以选择,默认为3副本存储。如果开启了Kafka自动创建Topic功能,自动创建的Topic默认为3副本,副本数可以通过“配置参数”页签中的“default.replication.factor”修改。 父主题: 实例问题
-
Kafka扩容/缩容会影响业务吗? 扩容/缩容Kafka实例的影响如表1所示。 表1 变更实例规格的影响 变更配置类型 影响 基准带宽/代理数量 扩容基准带宽/代理数量不会影响原来的代理,业务也不受影响。 扩容基准带宽/代理数量时,系统会根据当前磁盘大小进行相应比例的存储空间扩容。例如扩容前实例的代理数为3,每个代理的磁盘大小为200GB,扩容后实例的代理数为10,此时代理的磁盘大小依旧为200GB,但是总磁盘大小为2000GB。 新创建的Topic才会分布在新代理上,原有Topic还分布在原有代理上,造成分区分布不均匀。通过修改Kafka分区平衡,实现将原有Topic分区的副本迁移到新代理上。 存储空间 扩容存储空间有次数限制,只能扩容20次。 扩容存储空间不会影响业务。 代理规格 若Topic为单副本,扩容/缩容期间无法对该Topic生产消息或消费消息,会造成业务中断。 若Topic为多副本,扩容/缩容代理规格不会造成服务中断,但会逐个节点重启,负载会转移到剩余节点上,建议您在业务低峰期扩容/缩容。 扩容/缩容代理规格的过程中,节点滚动重启造成分区Leader切换,会发生秒级连接闪断,在用户网络环境稳定的前提下,Leader切换时长一般为1分钟以内。多副本的Topic需要在生产客户端配置重试机制,方法如下: 生产客户端为Kafka开源客户端时,检查是否配置retries参数,建议此参数值设置为3~5。 生产客户端为Flink客户端时,检查是否配置重启策略,配置重启策略可以参考如下代码。 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(20))); 若实例已创建的分区数总和大于待缩容规格的实例分区数上限,此时无法缩容。不同规格配置的实例分区数上限不同,具体请参见产品规格。 例如:kafka.4u8g.cluster*3的实例,已创建800个分区,您想把此实例的规格缩容为kafka.2u4g.cluster*3,kafka.2u4g.cluster*3规格的实例分区数上限为750,此时无法缩容。 变更代理规格的时长和代理数量有关,单个代理的变更时长一般在5~10分钟,代理数量越多,规格变更时间越长。 父主题: 实例规格变更问题
-
可能原因 Kafka实例每个代理会使用33GB的磁盘作为日志和Zookeeper数据的存储,且创建实例时磁盘格式化会占用部分磁盘空间,因此实际数据存储空间大约为66GB。Topic的每个分区都是由多个segment文件组成,每个segment文件的最大存储容量为500MB。Kafka删除消息是删除segment文件,而不是删除一条消息。Kafka要求每个分区至少保留一个segment文件用来存储消息,如果分区中仅剩一个segment文件,此segment文件不会被删除。在分区数到达132个(66GB/500MB=132)及以上时,可能会达到磁盘存储空间使用上限。 磁盘容量到达95%(即62.7GB)时,由于Topic配置了180个分区,每个分区中可能只存在一个segment文件,此segment文件无法被删除,其存储的消息也不会被删除。
-
准备Kafka配置信息 为了方便,下文分生产与消费两个配置文件介绍。其中涉及SASL认证配置,如果Kafka实例没有开启密文接入,使用的是不加密连接,请注释相关代码;如果Kafka实例开启了密文接入,则必须使用加密方式连接,请设置相关参数。 生产消息配置文件(对应生产消息代码中的dms.sdk.producer.properties文件) 以下粗体部分为不同Kafka实例特有的信息,必须修改。客户端其他参数,可以自主添加。 #Topic名称在具体的生产与消费代码中。 ####################### #Kafka实例的broker信息,ip:port为实例的连接地址和端口,参考“收集连接信息”章节获取。举例:bootstrap.servers=100.xxx.xxx.87:909x,100.xxx.xxx.69:909x,100.xxx.xxx.155:909x bootstrap.servers=ip1:port1,ip2:port2,ip3:port3 #发送确认参数 acks=all #键的序列化方式 key.serializer=org.apache.kafka.common.serialization.StringSerializer #值的序列化方式 value.serializer=org.apache.kafka.common.serialization.StringSerializer #producer可以用来缓存数据的内存大小 buffer.memory=33554432 #重试次数 retries=0 ####################### #如果不使用密文接入,以下参数请注释掉。 ####################### #设置SASL认证机制、账号和密码。 #sasl.mechanism为SASL认证机制,username和password为SASL的用户名和密码,参考“收集连接信息”章节获取。为了确保用户名和密码的安全性,建议对用户名和密码进行加密处理,使用时解密。 #SASL认证机制为“PLAIN”时,配置信息如下。 sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="username" \ password="password"; #SASL认证机制为“SC RAM -SHA-512”时,配置信息如下。 sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="username" \ password="password"; #设置Kafka安全协议。security.protocol为安全协议。 #安全协议为“SASL_SSL”时,配置信息如下。 security.protocol=SASL_SSL #ssl truststore.location为SSL证书的存放路径,如下代码以Windows系统路径格式举例,您在使用时请根据实际运行环境修改路径格式。 ssl.truststore.location=E:\\temp\\client.jks #ssl.truststore.password为Kafka客户端证书密码,如果使用Kafka控制台提供的SSL证书,默认为dms@kafka,不可更改。如果使用您自制的客户端证书,请根据实际情况配置。配置此密码是为了访问Java生成的jks文件。 ssl.truststore.password=dms@kafka #ssl.endpoint.identification.algorithm为证书 域名 校验开关,为空则表示关闭,这里需要保持关闭状态,必须设置为空。 ssl.endpoint.identification.algorithm= #安全协议为“SASL_PLAINTEXT”时,配置信息如下。 security.protocol=SASL_PLAINTEXT 消费消息配置文件(对应消费消息代码中的dms.sdk.consumer.properties文件) 以下粗体部分为不同Kafka实例特有的信息,必须修改。客户端其他参数,可以自主添加。 #Topic名称在具体的生产与消费代码中。 ####################### #Kafka实例的broker信息,ip:port为实例的连接地址和端口,参考“收集连接信息”章节获取。举例:bootstrap.servers=100.xxx.xxx.87:909x,100.xxx.xxx.69:909x,100.xxx.xxx.155:909x bootstrap.servers=ip1:port1,ip2:port2,ip3:port3 #用来唯一标识consumer进程所在组的字符串,如果设置同样的group id,表示这些processes都是属于同一个consumer group group.id=1 #键的序列化方式 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer #值的序列化方式 value.deserializer=org.apache.kafka.common.serialization.StringDeserializer #偏移量的方式 auto.offset.reset=earliest ####################### #如果不使用密文接入,以下参数请注释掉。 ####################### #设置SASL认证机制、账号和密码。 #sasl.mechanism为SASL认证机制,username和password为SASL的用户名和密码,参考“收集连接信息”章节获取。为了确保用户名和密码的安全性,建议对用户名和密码进行加密处理,使用时解密。 #SASL认证机制为“PLAIN”时,配置信息如下。 sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="username" \ password="password"; #SASL认证机制为“SCRAM-SHA-512”时,配置信息如下。 sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="username" \ password="password"; #设置Kafka安全协议。security.protocol为安全协议。 #安全协议为“SASL_SSL”时,配置信息如下。 security.protocol=SASL_SSL #ssl truststore.location为SSL证书的存放路径,如下代码以Windows系统路径格式举例,您在使用时请根据实际运行环境修改路径格式。 ssl.truststore.location=E:\\temp\\client.jks #ssl.truststore.password为Kafka客户端证书密码,如果使用Kafka控制台提供的SSL证书,默认为dms@kafka,不可更改。如果使用您自制的客户端证书,请根据实际情况配置。配置此密码是为了访问Java生成的jks文件。 ssl.truststore.password=dms@kafka #ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭,这里需要保持关闭状态,必须设置为空。 ssl.endpoint.identification.algorithm= #安全协议为“SASL_PLAINTEXT”时,配置信息如下。 security.protocol=SASL_PLAINTEXT
-
客户端网络环境说明 客户端可以通过以下方式访问Kafka实例: 如果客户端是云上E CS ,与Kafka实例处于同region同VPC,则可以直接访问Kafka实例提供的内网连接地址。 如果客户端是云上ECS,与Kafka实例处于相同region但不同VPC,通过以下任意一种方式访问。 创建VPC对等连接,将两个VPC的网络打通,实现跨VPC访问。具体步骤请参考对等连接。注意修改Kafka实例的安全组,允许端口9092(明文接入)/9093(密文接入)被外部请求访问。 利用 VPC终端节点 在不同VPC间建立跨VPC的连接通道,实现Kafka客户端通过内网访问Kafka实例,具体步骤请参考使用VPCEP实现跨VPC访问Kafka。注意修改Kafka实例的安全组,允许端口9011被外部请求访问。 如果客户端在其他网络环境,或者与Kafka实例处于不同region,则访问实例的公网地址。 公网访问时,注意修改Kafka实例的安全组,允许端口9094(明文接入)/9095(密文接入)被外部网络访问。 不同网络环境,对于客户端配置来说,只是连接地址的差异,其他都一样。因此,本手册以同一VPC内子网地址的方式,介绍客户端开发环境搭建。 遇到连接超时或失败时,请注意确认网络是否连通。可使用telnet方式,检测实例连接地址与端口。
-
密文接入信息 如果实例开启密文接入,则需要获得连接实例的用户名与密码、SASL认证机制和Kafka安全协议。Kafka安全协议设置为“SASL_SSL”时,还需要获取SSL证书。 连接实例的用户名在Kafka实例控制台的“用户管理”页面中查看,如果忘记密码,可通过重置密码重新获得。 图2 查看SASL用户名 SASL认证机制在Kafka实例控制台的基本信息页面中获取。 如果SCRAM-SHA-512和PLAIN都开启了,根据实际情况选择其中任意一种配置连接。很久前创建的Kafka实例在详情页如果未显示“开启的SASL认证机制”,默认使用PLAIN机制。 图3 开启的SASL认证机制 Kafka安全协议在Kafka实例控制台的基本信息页面中获取。很久前创建的Kafka实例在详情页如果未显示“启用的安全协议”,默认使用SASL_SSL协议。 图4 启用的安全协议 如果Kafka安全协议设置为“SASL_SSL”,在Kafka实例控制台的基本信息页面中下载SSL证书。如果您用自制的证书替换了Kafka控制台的证书,请获取您自制的证书。 使用Java语言连接实例时,需要用JKS格式的证书。使用Python语言连接实例时,需要用CRT格式的证书。
-
在application.properties文件中填写配置 #=============== Kafka ========================== ## Kafka实例的broker信息,ip:port为实例的连接地址和端口 spring.kafka.bootstrap-servers=ip1:port1,ip2:port2,ip3:port3 #=============== 生产者配置 ======================= spring.kafka.producer.retries=0 spring.kafka.producer.batch-size=16384 spring.kafka.producer.buffer-memory=33554432 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #=============== 消费者配置 ======================= spring.kafka.consumer.group-id=test-consumer-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=100 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer #======== SASL配置(不开启SASL时将以下配置删除) ======= ## 设置SASL认证机制、账号和密码。 ## spring.kafka.properties.sasl.mechanism为SASL认证机制,username和password为SASL的用户名和密码,参考“收集连接信息”章节获取。为了确保用户名和密码的安全性,建议对用户名和密码进行加密处理,使用时解密。 ## SASL认证机制为“PLAIN”时,配置信息如下。 spring.kafka.properties.sasl.mechanism=PLAIN spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="username" \ password="password"; ## SASL认证机制为“SCRAM-SHA-512”时,配置信息如下。 spring.kafka.properties.sasl.mechanism=SCRAM-SHA-512 spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="username" \ password="password"; ## 设置Kafka安全协议。spring.kafka.security.protocol为安全协议。 ## 安全协议为“SASL_SSL”时,配置信息如下。 spring.kafka.security.protocol=SASL_SSL ## spring.kafka.ssl.trust-store-location为SSL证书的存放路径,如下代码以Windows系统路径格式举例,您在使用时请根据实际运行环境修改路径格式。 spring.kafka.ssl.trust-store-location=file:D:\\temp\\client.jks ## spring.kafka.ssl.trust-store-password为Kafka客户端证书密码,如果使用Kafka控制台提供的SSL证书,默认为dms@kafka,不可更改。如果使用您自制的客户端证书,请根据实际情况配置。配置此密码是为了访问Java生成的jks文件。 spring.kafka.ssl.trust-store-password=dms@kafka ## spring.kafka.properties.ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭,这里需要保持关闭状态,必须设置为空。 spring.kafka.properties.ssl.endpoint.identification.algorithm= ## 安全协议为“SASL_PLAINTEXT”时,配置信息如下。 spring.kafka.security.protocol=SASL_PLAINTEXT
-
Smart Connect监控指标 仅集群实例支持Smart Connect监控指标。 表5 Smart Connect支持的监控指标 指标ID 指标名称 指标含义 取值范围 单位 进制 测量对象(维度) 监控周期(原始指标) kafka_wait_synchronize_data 待同步Kafka数据量 Kafka任务的待同步数据量。 ≥ 0 Count 不涉及 Kafka实例的Smart Connect任务 1分钟 kafka_synchronize_rate Kafka每分钟同步数据量 Kafka任务每分钟同步的数据量。 ≥ 0 Count 不涉及 Kafka实例的Smart Connect任务 1分钟 task_status 任务状态 当前任务状态。 0:任务异常 1:任务正常 不涉及 不涉及 Kafka实例的Smart Connect任务 1分钟 message_delay 消息时延 消息到达源端的时间与到达目标端的时间之差。 ≥ 0 ms 不涉及 Kafka实例的Smart Connect任务 1分钟 使用Smart Connect监控指标时,请注意如下几点: Kafka双向数据复制的Smart Connect任务在监控中会被拆分为2个任务,分别为“Smart Connect任务名_source_0”和“Smart Connect任务名_source_1”。 如果Topic中的消息在进行下一次数据同步前,已经全部老化,此时实际是没有待同步的Kafka数据,但是Kafka数据同步监控指标使用的是包含老化数据的offset值,“待同步Kafka数据量”会显示老化的消息数。
-
维度 Key Value kafka_instance_id Kafka实例 kafka_broker Kafka实例节点 kafka_topics Kafka实例主题 kafka_partitions Kafka实例分区 kafka_groups-partitions Kafka实例分区的消费组 kafka_groups_topics Kafka实例队列的消费组 kafka_groups Kafka实例的消费组 connector_task Kafka实例的Smart Connect任务
-
主题监控指标 表3 主题支持的监控指标 指标ID 指标名称 指标含义 取值范围 单位 进制 测量对象(维度) 监控周期(原始指标) topic_bytes_in_rate 生产流量 该指标用于统计每秒生产的字节数。 在“主题”页签,当“监控类型”为“基本监控”时,才包含该指标。 0~500000000 Byte/s 1024(IEC) Kafka实例队列 1分钟 topic_bytes_out_rate 消费流量 该指标用于统计每秒消费的字节数。 在“主题”页签,当“监控类型”为“基本监控”时,才包含该指标。 0~500000000 Byte/s 1024(IEC) Kafka实例队列 1分钟 topic_data_size 队列数据容量 该指标用于统计队列当前的消息数据大小。 在“主题”页签,当“监控类型”为“基本监控”时,才包含该指标。 0~5000000000000 Byte 1024(IEC) Kafka实例队列 1分钟 topic_messages 队列消息总数 该指标用于统计队列当前的消息总数。 在“主题”页签,当“监控类型”为“基本监控”时,才包含该指标。 ≥ 0 Count 不涉及 Kafka实例队列 1分钟 topic_messages_in_rate 消息生产速率 该指标用于统计每秒生产的消息数量。 在“主题”页签,当“监控类型”为“基本监控”时,才包含该指标。 0~500000 Count/s 不涉及 Kafka实例队列 1分钟 partition_messages 分区消息数 该指标用于统计分区中当前的消息个数。 在“主题”页签,当“监控类型”为“分区监控”时,才包含该指标。 ≥ 0 Count 不涉及 Kafka实例队列 1分钟 produced_messages 生产消息数 该指标用于统计目前生产的消息总数。 在“主题”页签,当“监控类型”为“分区监控”时,才包含该指标。 ≥ 0 Count 不涉及 Kafka实例队列 1分钟
-
实例监控指标 表1 实例支持的监控指标 指标ID 指标名称 指标含义 取值范围 单位 进制 测量对象(维度) 监控周期(原始指标) current_partitions 分区数 该指标用于统计Kafka实例中已经使用的分区数量。 0~100000 Count 不涉及 Kafka实例 1分钟 current_topics 主题数 该指标用于统计Kafka实例中已经创建的主题数量。 0~100000 Count 不涉及 Kafka实例 1分钟 group_msgs 消息堆积数 该指标用于统计Kafka实例中所有消费组中总堆积消息数。 0~1000000000 Count 不涉及 Kafka实例 1分钟 instance_bytes_in_rate 生产流量 统计实例中每秒生产的字节数。 部分存量实例不支持此监控,具体以控制台为准。 0~1000000 Byte/s 1024(IEC) Kafka实例 1分钟 instance_bytes_out_rate 消费流量 统计实例中每秒消费的字节数。 部分存量实例不支持此监控,具体以控制台为准。 0~1000000 Byte/s 1024(IEC) Kafka实例 1分钟 current_partitions_usage 分区使用率 该指标用于统计分区使用率。 部分存量实例不支持此监控,具体以控制台为准。 0~100 % 不涉及 Kafka实例 1分钟
-
消费组监控指标 表4 消费组支持的监控指标 指标ID 指标名称 指标含义 取值范围 单位 进制 测量对象(维度) 监控周期(原始指标) messages_consumed 分区已消费消息数 该指标用于统计当前消费组已经消费的消息个数。 在“消费组”页签,“主题”为指定的Topic名称,“监控类型”为“分区监控”时,才包含该指标。 ≥ 0 Count 不涉及 Kafka实例的消费组 1分钟 messages_remained 分区可消费消息数 该指标用于统计消费组可消费的消息个数。 在“消费组”页签,“主题”为指定的Topic名称,“监控类型”为“分区监控”时,才包含该指标。 ≥ 0 Count 不涉及 Kafka实例的消费组 1分钟 topic_messages_remained 队列可消费消息数 该指标用于统计消费组指定队列可以消费的消息个数。 在“消费组”页签,“主题”为指定的Topic名称,“监控类型”为“基本监控”时,才包含该指标。 0~(263-1) Count 不涉及 Kafka实例的消费组 1分钟 topic_messages_consumed 队列已消费消息数 该指标用于统计消费组指定队列当前已经消费的消息数。 在“消费组”页签,“主题”为指定的Topic名称,“监控类型”为“基本监控”时,才包含该指标。 0~(263-1) Count 不涉及 Kafka实例的消费组 1分钟 consumer_messages_remained 消息堆积数(消费组可消费消息数) 该指标用于统计消费组剩余可以消费的消息个数。 在“消费组”页签,“主题”为“全部队列”时,才包含该指标。 0~(263-1) Count 不涉及 Kafka实例的消费组 1分钟 consumer_messages_consumed 消费组已消费消息数 该指标用于统计消费组当前已经消费的消息数。 在“消费组”页签,“主题”为“全部队列”时,才包含该指标。 0~(263-1) Count 不涉及 Kafka实例的消费组 1分钟 messages_consumed_per_min 分区消费速率 统计消费组指定队列分区每分钟的消费数。 在“消费组”页签,“主题”为指定的Topic名称,“监控类型”为“分区监控”时,才包含该指标。 部分存量实例不支持此监控,具体以控制台为准。 0~30000000 Count/min 不涉及 Kafka实例的消费组 1分钟 topic_messages_consumed_per_min 队列消费速率 统计消费组指定队列每分钟的消费数。 在“消费组”页签,“主题”为指定的Topic名称,“监控类型”为“基本监控”时,才包含该指标。 部分存量实例不支持此监控,具体以控制台为准。 0~30000000 Count/min 不涉及 Kafka实例的消费组 1分钟 consumer_messages_consumed_per_min 消费组消费速率 统计消费组每分钟的消费数。 在“消费组”页签,“主题”为“全部Topic”时,才包含该指标。 部分存量实例不支持此监控,具体以控制台为准。 0~30000000 Count/min 不涉及 Kafka实例的消费组 1分钟
-
准备实例依赖资源 创建Kafka实例前,请提前准备好如表1所示资源。 表1 Kafka实例依赖资源 资源名称 要求 创建指导 VPC和子网 Kafka实例可以使用当前账号下已创建的VPC和子网,也可以使用新创建的VPC和子网,还可以使用共享VPC和子网,请根据实际需要进行配置。 共享VPC是基于 资源访问管理 (Resource Access Manager,简称RAM)服务的机制,VPC的所有者可以将VPC内的子网共享给一个或者多个账号使用。通过共享VPC功能,可以简化网络配置,帮助您统一配置和运维多个账号下的资源,有助于提升资源的管控效率,降低运维成本。有关VPC子网共享的更多信息,请参见共享VPC。 在创建VPC和子网时应注意: 创建的VPC与Kafka实例在相同的区域。 子网开启IPv6后,Kafka实例支持IPv6功能。Kafka实例开启IPv6后,客户端可以使用IPv6地址连接实例。 创建VPC和子网的操作指导请参考创建虚拟私有云和子网,若需要在已有VPC上创建和使用新的子网,请参考为虚拟私有云创建新的子网。 安全组 不同的Kafka实例可以重复使用相同的安全组,也可以使用不同的安全组,请根据实际需要进行配置。 安全组与Kafka实例必须在相同的区域。 连接Kafka实例前,请根据连接方式配置对应的安全组,具体请参考表2。 创建安全组的操作指导请参考创建安全组,为安全组添加规则的操作指导请参考添加安全组规则。 弹性IP地址 如果客户端使用公网连接Kafka实例,请提前创建弹性IP地址。 在创建弹性IP地址时,应注意如下要求: 创建的弹性IP地址与Kafka实例在相同的区域。 弹性IP地址的数量必须与Kafka实例的代理个数相同。 Kafka控制台无法识别开启IPv6转换功能的弹性IP地址。 创建弹性IP地址的操作指导请参考申请弹性公网IP。
-
步骤三:创建Topic 在“Kafka实例”页面,单击Kafka实例的名称,进入实例详情页面。 在左侧导航栏单击“Topic管理”,进入Topic列表页。 单击“创建Topic”,弹出“创建Topic”对话框。 填写Topic名称和配置信息,配置详情请参考表7,单击“确定”,完成创建Topic。 表7 Topic参数说明 参数 说明 Topic名称 名称支持自定义,但需要符合命名规则:以英文字母、数字、下划线开头,且只能由英文字母、数字、句点、中划线、下划线组成,长度为3~200个字符。 名称不能为以下内置Topic: __consumer_offsets __transaction_state __trace __connect-status __connect-configs __connect-offsets __dms_dial_test 创建Topic后不能修改名称。 输入“topic-01”。 分区数 如果分区数与消费者数一致,分区数越大消费的并发度越大。 输入“3”。 副本数 Kafka会自动在每个副本上备份数据,当其中一个Broker故障时数据依然是可用的,副本数越大可靠性越高。 输入“3”。 老化时间(小时) 消息的最长保留时间,消费者必须在此时间结束前消费消息,否则消息将被删除。删除的消息,无法被消费。 输入“72”。 同步复制 选择“不开启”,即Leader副本接收到消息并成功写入到本地日志后,就马上向客户端发送写入成功的消息,不需要等待所有Follower副本复制完成。 同步落盘 选择“不开启”,即生产的消息存在内存中,不会立即写入磁盘。 消息时间戳类型 选择“CreateTime”,即生产者创建消息的时间。 批处理消息最大值(字节) Kafka允许的最大批处理大小,如果在生产客户端配置文件或代码中启用消息压缩,则表示压缩后的最大批处理大小。 输入“10485760”。 描述 不设置。 图8 创建Topic
-
步骤四:修改客户端配置文件 开启SSL双向认证后,需要在客户端的“consumer.properties”和“producer.properties”文件中,分别修改服务端证书配置,并增加客户端证书配置。 security.protocol=SSL ssl.truststore.location=/opt/kafka_2.12-2.7.2/config/client.truststore.jks ssl.truststore.password=axxxb ssl.endpoint.identification.algorithm= #增加以下的客户端证书配置 ssl.keystore.location=/var/private/ssl/kafka/client.keystore.jks ssl.keystore.password=txxx3 ssl.key.password=txxx3 security.protocol配置证书协议类型,开启SSL双向认证时,必须设置为SSL。 ssl.truststore.location配置为client.truststore.jks证书的存放路径。 ssl.truststore.password为client.truststore.jks的密码。 ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭。这里需要保持关闭状态,必须设置为空。 ssl.keystore.location配置为client.keystore.jks证书的存放路径。 ssl.keystore.password配置为client.keystore.jks的密码。 ssl.key.password配置为client.keystore.jks的密码。