云服务器内容精选
-
如何选择Kafka实例的存储空间? 存储空间主要是指用于存储消息(包括副本中的消息)、日志和元数据所需要的空间。选择存储空间时,需要选择磁盘类型和磁盘大小。更多磁盘信息,请参考如何选择磁盘类型。 假设业务存储数据保留天数内磁盘大小为100GB,则磁盘容量最少为100GB*副本数 + 预留磁盘大小100GB。Kafka集群中,每个Kafka节点会使用33GB的磁盘作为日志和Zookeeper数据的存储,因而实际可用存储会小于购买存储。 其中,副本数在创建Topic时可以选择,默认为3副本存储。如果开启了Kafka自动创建Topic功能,自动创建的Topic默认为3副本,副本数可以通过“配置参数”页签中的“default.replication.factor”修改。 父主题: 实例问题
-
Kafka扩容/缩容会影响业务吗? 扩容/缩容Kafka实例的影响如表1所示。 表1 变更实例规格的影响 变更配置类型 影响 基准带宽/代理数量 扩容基准带宽/代理数量不会影响原来的代理,业务也不受影响。 扩容基准带宽/代理数量时,系统会根据当前磁盘大小进行相应比例的存储空间扩容。例如扩容前实例的代理数为3,每个代理的磁盘大小为200GB,扩容后实例的代理数为10,此时代理的磁盘大小依旧为200GB,但是总磁盘大小为2000GB。 新创建的Topic才会分布在新代理上,原有Topic还分布在原有代理上,造成分区分布不均匀。通过修改Kafka分区平衡,实现将原有Topic分区的副本迁移到新代理上。 存储空间 扩容存储空间有次数限制,只能扩容20次。 扩容存储空间不会影响业务。 代理规格 若Topic为单副本,扩容/缩容期间无法对该Topic生产消息或消费消息,会造成业务中断。 若Topic为多副本,扩容/缩容代理规格不会造成服务中断,但会逐个节点重启,负载会转移到剩余节点上,建议您在业务低峰期扩容/缩容。 扩容/缩容代理规格的过程中,节点滚动重启造成分区Leader切换,会发生秒级连接闪断,在用户网络环境稳定的前提下,Leader切换时长一般为1分钟以内。多副本的Topic需要在生产客户端配置重试机制,方法如下: 生产客户端为Kafka开源客户端时,检查是否配置retries参数,建议此参数值设置为3~5。 生产客户端为Flink客户端时,检查是否配置重启策略,配置重启策略可以参考如下代码。 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(20))); 若实例已创建的分区数总和大于待缩容规格的实例分区数上限,此时无法缩容。不同规格配置的实例分区数上限不同,具体请参见产品规格。 例如:kafka.4u8g.cluster*3的实例,已创建800个分区,您想把此实例的规格缩容为kafka.2u4g.cluster*3,kafka.2u4g.cluster*3规格的实例分区数上限为750,此时无法缩容。 变更代理规格的时长和代理数量有关,单个代理的变更时长一般在5~10分钟,代理数量越多,规格变更时间越长。 父主题: 实例规格变更问题
-
可能原因 Kafka实例每个代理会使用33GB的磁盘作为日志和Zookeeper数据的存储,且创建实例时磁盘格式化会占用部分磁盘空间,因此实际数据存储空间大约为66GB。Topic的每个分区都是由多个segment文件组成,每个segment文件的最大存储容量为500MB。Kafka删除消息是删除segment文件,而不是删除一条消息。Kafka要求每个分区至少保留一个segment文件用来存储消息,如果分区中仅剩一个segment文件,此segment文件不会被删除。在分区数到达132个(66GB/500MB=132)及以上时,可能会达到磁盘存储空间使用上限。 磁盘容量到达95%(即62.7GB)时,由于Topic配置了180个分区,每个分区中可能只存在一个segment文件,此segment文件无法被删除,其存储的消息也不会被删除。
-
原因分析 使用客户端命令,打印NoAuthException异常。 Error while executing topic command org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /config/topics org.I0Itec.zkclient.exception.ZkException: org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /config/topics at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:68) at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685) at org.I0Itec.zkclient.ZkClient.create(ZkClient.java:304) at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:213) at kafka.utils.ZkUtils$.createParentPath(ZkUtils.scala:215) at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:338) at kafka.admin.AdminUtils$.writeTopicConfig(AdminUtils.scala:247) 通过客户端命令klist查询当前认证用户: [root@10-10-144-2 client]# klist Ticket cache: FILE:/tmp/krb5cc_0 Default principal: test@HADOOP.COM Valid starting Expires Service principal 01/25/17 11:06:48 01/26/17 11:06:45 krbtgt/HADOOP.COM@HADOOP.COM 如上例中当前认证用户为test。 通过命令id查询用户组信息。 [root@10-10-144-2 client]# id test uid=20032(test) gid=10001(hadoop) groups=10001(hadoop),9998(ficommon),10003(kafka)
-
问题背景与现象 在使用Kafka客户端命令创建Topic时,发现Topic无法被创建。 kafka-topics.sh --create --zookeeper 192.168.234.231:2181/kafka --replication-factor 1 --partitions 2 --topic test 提示错误NoAuthException,KeeperErrorCode = NoAuth for /config/topics。 具体如下: Error while executing topic command org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /config/topics org.I0Itec.zkclient.exception.ZkException: org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /config/topics at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:68) at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685) at org.I0Itec.zkclient.ZkClient.create(ZkClient.java:304) at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:213) at kafka.utils.ZkUtils$.createParentPath(ZkUtils.scala:215) at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:338) at kafka.admin.AdminUtils$.writeTopicConfig(AdminUtils.scala:247)
-
准备Kafka配置信息 为了方便,下文分生产与消费两个配置文件介绍。其中涉及SASL认证配置,如果Kafka实例没有开启密文接入,使用的是不加密连接,请注释相关代码;如果Kafka实例开启了密文接入,则必须使用加密方式连接,请设置相关参数。 生产消息配置文件(对应生产消息代码中的dms.sdk.producer.properties文件) 以下粗体部分为不同Kafka实例特有的信息,必须修改。客户端其他参数,可以自主添加。 #Topic名称在具体的生产与消费代码中。 ####################### #Kafka实例的broker信息,ip:port为实例的连接地址和端口,参考“收集连接信息”章节获取。举例:bootstrap.servers=100.xxx.xxx.87:909x,100.xxx.xxx.69:909x,100.xxx.xxx.155:909x bootstrap.servers=ip1:port1,ip2:port2,ip3:port3 #发送确认参数 acks=all #键的序列化方式 key.serializer=org.apache.kafka.common.serialization.StringSerializer #值的序列化方式 value.serializer=org.apache.kafka.common.serialization.StringSerializer #producer可以用来缓存数据的内存大小 buffer.memory=33554432 #重试次数 retries=0 ####################### #如果不使用密文接入,以下参数请注释掉。 ####################### #设置SASL认证机制、账号和密码。 #sasl.mechanism为SASL认证机制,username和password为SASL的用户名和密码,参考“收集连接信息”章节获取。为了确保用户名和密码的安全性,建议对用户名和密码进行加密处理,使用时解密。 #SASL认证机制为“PLAIN”时,配置信息如下。 sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="username" \ password="password"; #SASL认证机制为“SC RAM -SHA-512”时,配置信息如下。 sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="username" \ password="password"; #设置Kafka安全协议。security.protocol为安全协议。 #安全协议为“SASL_SSL”时,配置信息如下。 security.protocol=SASL_SSL #ssl truststore.location为SSL证书的存放路径,如下代码以Windows系统路径格式举例,您在使用时请根据实际运行环境修改路径格式。 ssl.truststore.location=E:\\temp\\client.jks #ssl.truststore.password为Kafka客户端证书密码,如果使用Kafka控制台提供的SSL证书,默认为dms@kafka,不可更改。如果使用您自制的客户端证书,请根据实际情况配置。配置此密码是为了访问Java生成的jks文件。 ssl.truststore.password=dms@kafka #ssl.endpoint.identification.algorithm为证书 域名 校验开关,为空则表示关闭,这里需要保持关闭状态,必须设置为空。 ssl.endpoint.identification.algorithm= #安全协议为“SASL_PLAINTEXT”时,配置信息如下。 security.protocol=SASL_PLAINTEXT 消费消息配置文件(对应消费消息代码中的dms.sdk.consumer.properties文件) 以下粗体部分为不同Kafka实例特有的信息,必须修改。客户端其他参数,可以自主添加。 #Topic名称在具体的生产与消费代码中。 ####################### #Kafka实例的broker信息,ip:port为实例的连接地址和端口,参考“收集连接信息”章节获取。举例:bootstrap.servers=100.xxx.xxx.87:909x,100.xxx.xxx.69:909x,100.xxx.xxx.155:909x bootstrap.servers=ip1:port1,ip2:port2,ip3:port3 #用来唯一标识consumer进程所在组的字符串,如果设置同样的group id,表示这些processes都是属于同一个consumer group group.id=1 #键的序列化方式 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer #值的序列化方式 value.deserializer=org.apache.kafka.common.serialization.StringDeserializer #偏移量的方式 auto.offset.reset=earliest ####################### #如果不使用密文接入,以下参数请注释掉。 ####################### #设置SASL认证机制、账号和密码。 #sasl.mechanism为SASL认证机制,username和password为SASL的用户名和密码,参考“收集连接信息”章节获取。为了确保用户名和密码的安全性,建议对用户名和密码进行加密处理,使用时解密。 #SASL认证机制为“PLAIN”时,配置信息如下。 sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="username" \ password="password"; #SASL认证机制为“SCRAM-SHA-512”时,配置信息如下。 sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="username" \ password="password"; #设置Kafka安全协议。security.protocol为安全协议。 #安全协议为“SASL_SSL”时,配置信息如下。 security.protocol=SASL_SSL #ssl truststore.location为SSL证书的存放路径,如下代码以Windows系统路径格式举例,您在使用时请根据实际运行环境修改路径格式。 ssl.truststore.location=E:\\temp\\client.jks #ssl.truststore.password为Kafka客户端证书密码,如果使用Kafka控制台提供的SSL证书,默认为dms@kafka,不可更改。如果使用您自制的客户端证书,请根据实际情况配置。配置此密码是为了访问Java生成的jks文件。 ssl.truststore.password=dms@kafka #ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭,这里需要保持关闭状态,必须设置为空。 ssl.endpoint.identification.algorithm= #安全协议为“SASL_PLAINTEXT”时,配置信息如下。 security.protocol=SASL_PLAINTEXT
-
客户端网络环境说明 客户端可以通过以下方式访问Kafka实例: 如果客户端是云上E CS ,与Kafka实例处于同region同VPC,则可以直接访问Kafka实例提供的内网连接地址。 如果客户端是云上ECS,与Kafka实例处于相同region但不同VPC,通过以下任意一种方式访问。 创建VPC对等连接,将两个VPC的网络打通,实现跨VPC访问。具体步骤请参考对等连接。注意修改Kafka实例的安全组,允许端口9092(明文接入)/9093(密文接入)被外部请求访问。 利用 VPC终端节点 在不同VPC间建立跨VPC的连接通道,实现Kafka客户端通过内网访问Kafka实例,具体步骤请参考使用VPCEP实现跨VPC访问Kafka。注意修改Kafka实例的安全组,允许端口9011被外部请求访问。 如果客户端在其他网络环境,或者与Kafka实例处于不同region,则访问实例的公网地址。 公网访问时,注意修改Kafka实例的安全组,允许端口9094(明文接入)/9095(密文接入)被外部网络访问。 不同网络环境,对于客户端配置来说,只是连接地址的差异,其他都一样。因此,本手册以同一VPC内子网地址的方式,介绍客户端开发环境搭建。 遇到连接超时或失败时,请注意确认网络是否连通。可使用telnet方式,检测实例连接地址与端口。
-
密文接入信息 如果实例开启密文接入,则需要获得连接实例的用户名与密码、SASL认证机制和Kafka安全协议。Kafka安全协议设置为“SASL_SSL”时,还需要获取SSL证书。 连接实例的用户名在Kafka实例控制台的“用户管理”页面中查看,如果忘记密码,可通过重置密码重新获得。 图2 查看SASL用户名 SASL认证机制在Kafka实例控制台的基本信息页面中获取。 如果SCRAM-SHA-512和PLAIN都开启了,根据实际情况选择其中任意一种配置连接。很久前创建的Kafka实例在详情页如果未显示“开启的SASL认证机制”,默认使用PLAIN机制。 图3 开启的SASL认证机制 Kafka安全协议在Kafka实例控制台的基本信息页面中获取。很久前创建的Kafka实例在详情页如果未显示“启用的安全协议”,默认使用SASL_SSL协议。 图4 启用的安全协议 如果Kafka安全协议设置为“SASL_SSL”,在Kafka实例控制台的基本信息页面中下载SSL证书。如果您用自制的证书替换了Kafka控制台的证书,请获取您自制的证书。 使用Java语言连接实例时,需要用JKS格式的证书。使用Python语言连接实例时,需要用CRT格式的证书。
-
在application.properties文件中填写配置 #=============== Kafka ========================== ## Kafka实例的broker信息,ip:port为实例的连接地址和端口 spring.kafka.bootstrap-servers=ip1:port1,ip2:port2,ip3:port3 #=============== 生产者配置 ======================= spring.kafka.producer.retries=0 spring.kafka.producer.batch-size=16384 spring.kafka.producer.buffer-memory=33554432 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #=============== 消费者配置 ======================= spring.kafka.consumer.group-id=test-consumer-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=100 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer #======== SASL配置(不开启SASL时将以下配置删除) ======= ## 设置SASL认证机制、账号和密码。 ## spring.kafka.properties.sasl.mechanism为SASL认证机制,username和password为SASL的用户名和密码,参考“收集连接信息”章节获取。为了确保用户名和密码的安全性,建议对用户名和密码进行加密处理,使用时解密。 ## SASL认证机制为“PLAIN”时,配置信息如下。 spring.kafka.properties.sasl.mechanism=PLAIN spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="username" \ password="password"; ## SASL认证机制为“SCRAM-SHA-512”时,配置信息如下。 spring.kafka.properties.sasl.mechanism=SCRAM-SHA-512 spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="username" \ password="password"; ## 设置Kafka安全协议。spring.kafka.security.protocol为安全协议。 ## 安全协议为“SASL_SSL”时,配置信息如下。 spring.kafka.security.protocol=SASL_SSL ## spring.kafka.ssl.trust-store-location为SSL证书的存放路径,如下代码以Windows系统路径格式举例,您在使用时请根据实际运行环境修改路径格式。 spring.kafka.ssl.trust-store-location=file:D:\\temp\\client.jks ## spring.kafka.ssl.trust-store-password为Kafka客户端证书密码,如果使用Kafka控制台提供的SSL证书,默认为dms@kafka,不可更改。如果使用您自制的客户端证书,请根据实际情况配置。配置此密码是为了访问Java生成的jks文件。 spring.kafka.ssl.trust-store-password=dms@kafka ## spring.kafka.properties.ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭,这里需要保持关闭状态,必须设置为空。 spring.kafka.properties.ssl.endpoint.identification.algorithm= ## 安全协议为“SASL_PLAINTEXT”时,配置信息如下。 spring.kafka.security.protocol=SASL_PLAINTEXT
-
Kafka UI Kafka UI提供Kafka Web服务,通过界面展示Kafka集群中Broker、Topic、Partition、Consumer等功能模块的基本信息,同时提供Kafka服务常用命令的界面操作入口。该功能作为Kafka Manager替代,提供符合安全规范的Kafka Web服务。 通过Kafka UI可以进行以下操作: 支持界面检查集群状态(主题,消费者,偏移量,分区,副本,节点) 支持界面执行集群内分区重新分配 支持界面选择配置创建主题 支持界面删除主题(Kafka服务设置了参数“delete.topic.enable = true”) 支持为已有主题增加分区 支持更新现有主题的配置 可以为分区级别和主题级别度量标准启用JMX查询
-
Kafka开源特性 可靠性 提供At-Least Once,At-Most Once,Exactly Once消息可靠传递。消息被处理的状态是在Consumer端维护,需要结合应用层实现Exactly Once。 高吞吐 同时为发布和订阅提供高吞吐量。 持久化 将消息持久化到磁盘,因此可用于批量消费以及实时应用程序。通过将数据持久化到硬盘以及replication的方式防止数据丢失。 分布式 分布式系统,易于向外扩展。每个集群支持部署多个Producer、Broker和Consumer,从而形成分布式的集群,无需停机即可扩展系统。
-
Kafka结构 生产者(Producer)将消息发布到Kafka主题(Topic)上,消费者(Consumer)订阅这些主题并消费这些消息。在Kafka集群上一个服务器称为一个Broker。对于每一个主题,Kafka集群保留一个用于缩放、并行化和容错性的分区(Partition)。每个分区是一个有序、不可变的消息序列,并不断追加到提交日志文件。分区的消息每个也被赋值一个称为偏移顺序(Offset)的序列化编号。 图1 Kafka结构
-
Kafka原理 消息可靠性 Kafka Broker收到消息后,会持久化到磁盘,同时,Topic的每个Partition有自己的Replica(备份),每个Replica分布在不同的Broker节点上,以保证当某一节点失效时,可以自动故障转移到可用消息节点。 高吞吐量 Kafka通过以下方式提供系统高吞吐量: 数据磁盘持久化:消息不在内存中缓存,直接写入到磁盘,充分利用磁盘的顺序读写性能。 Zero-copy:减少IO操作步骤。 数据批量发送:提高网络利用率。 Topic划分为多个Partition,提高并发度,可以由多个Producer、Consumer数目之间的关系并发来读、写消息。Producer根据用户指定的算法,将消息发送到指定的Partition。 消息订阅-通知机制 消费者对感兴趣的主题进行订阅,并采取pull的方式消费数据,使得消费者可以根据其消费能力自主地控制消息拉取速度,同时,可以根据自身情况自主选择消费模式,例如批量、重复消费,从尾端开始消费等;另外,需要消费者自己负责维护其自身消息的消费记录。 可扩展性 当在Kafka集群中可通过增加Broker节点以提供更大容量时。新增的Broker会向ZooKeeper注册,而Producer及Consumer会及时从ZooKeeper感知到这些变化,并做出调整。
-
资源和成本规划 表1 资源规划 类别 子类 规划 备注 VPC VPC名称 vpc-DRStest 自定义,易理解可识别。 所属Region 华南-广州 选择和自己业务区最近的Region,减少网络时延。 可用区 可用区1 - 子网网段 10.0.0.0/24 子网选择时建议预留足够的网络资源。 子网名称 subnet-drs01 自定义,易理解可识别。 RDS(源库) RDS实例名 rds-mysql 自定义,易理解可识别。 数据库版本 MySQL 5.7 - 实例类型 单机 本示例中为单机。 实际使用时,为提升业务可靠性,推荐选择主备RDS实例。 存储类型 SSD云盘 - 可用区 可用区三 本示例中为单机。 实际业务场景推荐选择主备RDS实例,此时建议将两个实例创建在不同的可用区,提升业务可靠性。 规格 通用型 4 vCPUs | 8GB - Kafka(目标库) Kafka实例名 kafka-drs 自定义,易理解可识别。 版本 2.3.0 - 可用区 可用区三 可选择1个或者3个及以上可用区。实际业务场景推荐选择创建在不同的可用区,提升业务可靠性。 规格 c6.2u4g.cluster - 代理个数 3 - 存储空间 高I/O,200GB 存储空间主要用于存储消息(包含副本,Kafka默认使用3副本),除了存储消息外还需要预留部分空间用于存储日志和元数据。 DRS同步任务 同步任务名 DRS-MySQLToKafka 自定义 源数据库引擎 MySQL 本示例中源数据库为MySQL,使用的华为云RDS实例。 目标数据库引擎 Kafka 本示例中目标数据库为Kafka。 网络类型 VPC网络 本示例中采用“VPC网络”。 以上资源成本仅供参考,资源配置和使用费用详情请参见产品价格详情。您可以通过华为云提供的价格计算器,选择您需要的配置规格,来快速计算出参考价格。 父主题: RDS for MySQL同步到Kafka
-
实例管理概述 数据迁移功能采用独立集群的方式为用户提供安全可靠的数据迁移服务,各集群之间相互隔离,不可互相访问。其中实例管理通过购买GDS-Kafka实例帮助用户创建、管理集群。GDS-Kafka的工作方式是从Kafka中消耗数据并缓存,当达到设置好的时间或数据量之后,通过COPY写入 GaussDB (DWS)临时表,再从临时表进行插入或更新操作。 Kafka的消息生产端必须按照一定的格式要求进行数据生产,其中消息格式由“kafka.source.event.type”配置参数指定。当前支持的消息格式详见GDS-Kafka支持的消息格式。 GDS-Kafka支持直接insert(仅限无主键表)和merge覆盖更新两种入库模式,您可以根据DWS目标表的类型进行灵活配置,直接insert模式由于不涉及更新在性能上要更优一些。其中入库模式由“app.insert.directly”配置参数和有无主键共同决定,详见GDS-Kafka入库模式。 GDS-kafka只支持目标表表名和字段全小写。 GDS-Kafka的删除是根据扩展字段中的pos进行历史删除,如果入库数据中有delete操作,则必须使用扩展字段。