华为云用户手册

  • 使用示例 importClass(com.roma.apic.livedata.client.v1.RedisClient);importClass(com.roma.apic.livedata.config.v1.JedisConfig);function execute(data) { var config = new JedisConfig(); config.setIp(["1.1.1.1"]); config.setPort(["6379"]); config.setMode("SINGLE"); var redisClient = new RedisClient(config); var count = redisClient.get("visit_count") if (!count) { redisClient.put("visit_count", 1); }else { redisClient.put("visit_count", parseInt(count) + 1); } return redisClient.get("visit_count");}
  • 方法列表 返回类型 方法和说明 static KafkaConfig getConfig(String servers, String groupId) 获取一个配置,用于访问MQS提供的Kafka(不开启sasl_ssl)。 static KafkaConfig getSaslConfig(String servers, String groupId, String username, String password) 获取一个配置,用于访问MQS提供的Kafka(开启sasl_ssl)。
  • 方法详情 public static KafkaConfig getConfig(String servers, String groupId) 访问MQS提供的kafka(不开启sasl_ssl)。 输入参数 servers:kafkaConfig中的bootstrap.servers信息 groupId:kafkaConfig中的group.id信息 返回信息 返回KafkaConfig对象 public static KafkaConfig getSaslConfig(String servers, String groupId, String username, String password) 访问MQS提供的kafka(开启sasl_ssl)。 输入参数 servers:kafkaConfig中的bootstrap.servers信息 groupId:kafkaConfig中的group.id信息 username:用户名 password:密码 返回信息 返回KafkaConfig对象
  • 使用示例 importClass(com.roma.apic.livedata.client.v1.KafkaConsumer);importClass(com.roma.apic.livedata.config.v1.KafkaConfig);var kafka_brokers = '1.1.1.1:26330,2.2.2.2:26330'var topic = 'YourKafkaTopic'var group = 'YourKafkaGroupId'function execute(data) { var config = KafkaConfig.getConfig(kafka_brokers, group) var consumer = new KafkaConsumer(config) var records = consumer.consume(topic, 5000, 10); var res = [] var iter = records.iterator() while (iter.hasNext()) { res.push(iter.next()) } return JSON.stringify(res);}
  • 使用示例 importClass(com.roma.apic.livedata.client.v1.HttpClient);importClass(com.roma.apic.livedata.config.v1.HttpConfig);function execute(data) { var requestConfig = new HttpConfig(); requestConfig.setAccessKey("071fe245-9cf6-4d75-822d-c29945a1e06a"); requestConfig.setSecretKey("c6e52419-2270-****-****-ae7fdd01dcd5"); requestConfig.setMethod('POST'); requestConfig.setUrl("https://30030113-3657-4fb6-a7ef-90764239b038.apigw.exampleRegion.com/app1"); requestConfig.setContent("body"); requestConfig.setContentType('application/json'); var client = new HttpClient(); var resp = client.request(requestConfig); return resp.body().string()}
  • 示例 { "body": { "orginParameters": { "temperature": 123 }, "state": "ok" }, "errcode": 0, "mid": 54132, "msgType": "deviceRsp"}
  • 使用示例 通过以下java代码生成公钥和私钥: import java.security.KeyPair;import java.security.KeyPairGenerator;import java.security.PrivateKey;import java.security.PublicKey;import java.util.Base64;public class Main { public static void main(String[] args) { try { KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance("RSA"); keyPairGenerator.initialize(1024); KeyPair keyPair = keyPairGenerator.generateKeyPair(); PublicKey publicKey = keyPair.getPublic(); System.out.println("publicKey:" + new String(Base64.getEncoder().encode(publicKey.getEncoded()))); PrivateKey privateKey = keyPair.getPrivate(); System.out.println("privateKey:" + new String(Base64.getEncoder().encode(privateKey.getEncoded()))); } catch (Exception e) { e.printStackTrace(); return; } }} 将上述公钥和私钥填入到下面代码中: importClass(com.roma.apic.livedata.common.v1.RSAUtils);importClass(com.roma.apic.livedata.common.v1.Base64Utils);function execute(data) { var publicKeyString = "MIGfMA0G CS qGSIb3DQEBAQUAA4GNADCBiQKBgQDd4CRRppmYVlFl3dX4iVGN+2Twy5gLeEPRbvhOko/xFipGF7XV0weTp4wCakgdnm+DR4gBBrQtfAuKwYIBPIr+C1FI5sKYA3NxazDWUcXR3xlPM5D0DWjacjcMjnaj2v21WZxGpwHZHQ9TLd4OBBq3fva1r/cE8s1Lji5QeFiklwIDAQAB"; var privateKeyString = "**********"; var publicKey = RSAUtils.getPublicKey(publicKeyString) var privateKey = RSAUtils.getPrivateKey(privateKeyString) var origin = "hello rsa" var encrypted = RSAUtils.encrypt(Base64Utils.encode(origin), publicKey) var decrypted = RSAUtils.decrypt(encrypted, privateKey) return decrypted}
  • 示例 { "data": [{ "deviceInfo": { "manufacturerId": "Test_n", "name": "n-device", "model": "A_n", "nodeId": "n-device", "deviceId": "D59eGSxy" }, "statusCode": 0 }], "mid": 7, "statusCode": 0}
  • 使用前必读 物联网平台 作为消息接收方时,已默认订阅了相关Topic,设备只要向对应Topic发送消息,物联网平台就可以接收。 设备作为消息接收方时,需要先订阅相关Topic,这样物联网平台向对应Topic发送消息时,设备才能接收到。设备需要根据具体实现的业务来决定订阅哪些Topic。 Topic 支持的协议 消息发送方 (Publisher) 消息接收方 (Subscriber) 用途 /v1/devices/{gatewayId}/topo/add MQTT 边设备 物联网平台 边设备添加子设备 /v1/devices/{gatewayId}/topo/addResponse 物联网平台 边设备 物联网平台返回的添加子设备的响应 /v1/devices/{gatewayId}/topo/update 边设备 物联网平台 边设备更新子设备状态 /v1/devices/{gatewayId}/topo/updateResponse 物联网平台 边设备 物联网平台返回的更新子设备状态的响应 /v1/devices/{gatewayId}/topo/delete 物联网平台 边设备 物联网平台删除子设备 /v1/devices/{gatewayId}/topo/query 边设备 物联网平台 边设备查询网关信息 /v1/devices/{gatewayId}/topo/queryResponse 物联网平台 边设备 物联网平台返回的网关信息响应 /v1/devices/{gatewayId}/command 物联网平台 边设备 物联网平台给设备或边设备下发命令 /v1/devices/{gatewayId}/commandResponse 边设备 物联网平台 边设备返回给物联网平台的命令响应 /v1/devices/{gatewayId}/datas 边设备 物联网平台 边设备上报数据 {gatewayId}指设备标识。其中delete延用了之前局点的规范,deleteResponse暂未提供。 父主题: MQTT协议Topic规范
  • 前提条件 准备装有1.8及以上版本JDK的Linux服务器。 IntelliJ IDEA版本为:2018.3.5或以上版本,Eclipse版本为:3.6.0或以上版本。 通过Demo获取MysqlConnctor.rar包。 如果使用自定义连接器来写数据,需要用户自行保证数据的可重复幂等写入。 RESTful接口单次请求的处理时间不能超过60s。 FDI端会循环调用RESTful接口地址,直到读完数据为止。
  • 参数说明 参数 必选/可选 类型 参数描述 clientId 必选 String(256) 一机一密的设备clientId由4个部分组成:deviceId/nodeId、鉴权类型、密码签名类型、时间戳,通过下划线“_”分隔。 鉴权类型:长度1字节,当前支持2个类型: “0”,表示使用一机一密设备的deviceId接入。 “2”,表示使用一机一密设备的nodeId接入。 密码签名类型:长度1字节,当前支持2种类型。 “0”代表HMACSHA256不校验时间戳。 “1”代表HMACSHA256校验时间戳。 时间戳:为设备连接平台时的UTC时间,格式为YYYYMMDDHH,如UTC 时间2018/7/24 17:56:20,则应表示为2018072417。 以deviceId为例的clientId示例为:D39564861q3gDa_0_0_2018072417 Username 必选 String(256) 一机一密的设备“Username”。 使用deviceId接入时填写为设备注册成功后返回的“deviceId”值。 使用nodeId接入时填写为设备注册成功时的“nodeId”值。 Password 必选 String(256) Password的值为使用“HMACSHA256”算法以时间戳为密钥,对secret进行加密后的值。 secret为注册设备时平台返回的secret,或者是设备自身的secret。
  • 客户端参数配置建议 Kafka客户端的配置参数很多,以下提供Producer和Consumer几个常用参数配置。 表1 Producer参数 参数 默认值 推荐值 说明 acks 1 高可靠:all 高吞吐:1 收到Server端确认信号个数,表示procuder需要收到多少个这样的确认信号,算消息发送成功。acks参数代表了数据备份的可用性。常用选项: acks=0:表示producer不需要等待任何确认收到的信息,副本将立即加到socket buffer并认为已经发送。没有任何保障可以保证此种情况下server已经成功接收数据,同时重试配置不会发生作用(因为客户端不知道是否失败),回馈的offset会总是设置为-1。 acks=1:这意味着至少要等待leader已经成功将数据写入本地log,但是并没有等待所有follower是否成功写入。如果follower没有成功备份数据,而此时leader又无法提供服务,则消息会丢失。 acks=all:这意味着leader需要等待所有备份都成功写入日志,只有任何一个备份存活,数据都不会丢失。 retries 0 结合实际业务调整 客户端发送消息的重试次数。值大于0时,这些数据发送失败后,客户端会重新发送。 注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。 request.timeout.ms 30000 结合实际业务调整 设置一个请求最大等待时间,超过这个时间则会抛Timeout异常。 超时时间如果设置大一些,如120000(120秒),高并发的场景中,能减少发送失败的情况。 block.on.buffer.full TRUE TRUE TRUE表示当内存用尽时,停止接收新消息记录或者抛出错误。 默认情况下,这个设置为TRUE。然而某些阻塞可能不值得期待,因此立即抛出错误更好。如果设置为false,则producer抛出一个异常错误:BufferExhaustedException batch.size 16384 262144 默认的批量处理消息字节数上限。producer将试图批处理消息记录,以减少请求次数。这将改善client与server之间的性能。不会试图处理大于这个字节数的消息字节数。 发送到brokers的请求将包含多个批量处理,其中会包含对每个partition的一个请求。 较小的批量处理数值比较少用,并且可能降低吞吐量(0则会仅用批量处理)。较大的批量处理数值将会浪费更多内存空间,这样就需要分配特定批量处理数值的内存大小。 buffer.memory 33554432 67108864 producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常,以“block.on.buffer.full”来表明。 这项设置将和producer能够使用的总内存相关,但并不是一个硬性的限制,因为不是producer使用的所有内存都是用于缓存。一些额外的内存会用于压缩(如果引入压缩机制),同样还有一些用于维护请求。 max.request.size 1048576 5242880 生产者给服务端单次发送的最大消息字节数,该参数影响单次生产的消息记录数量。 表2 Consumer参数 参数 默认值 推荐值 说明 auto.commit.enable TRUE FALSE 如果为TRUE,consumer所fetch的消息的offset将会自动的同步到zookeeper。这项提交的offset将在进程无法提供服务时,由新的consumer使用。 约束: 设置为FALSE后,需要先成功消费再提交,这样可以避免消息丢失。 auto.offset.reset latest earliest 没有初始化offset或者offset被删除时,可以设置以下值: earliest:自动复位offset为最早 latest:自动复位offset为最新 none:如果没有发现offset则向消费者抛出异常 anything else:向消费者抛出异常。 connections.max.idle.ms 600000 30000 空连接的超时时间,设置为30000可以在网络异常场景下减少请求卡顿的时间。 父主题: 开发说明
  • 请求消息 请求参数 参数 类型 必选 说明 messages Array 是 消息列表,数组大小不能超过10,且不能为空数组。 表2 参数说明 参数 类型 必选 说明 handler String 是 消息handler。 status String 是 消费状态。只能为success,或者fail。 请求示例 { "messages": [ { "handler": "NCMxMDAjMTgjMA==", "status": "success" } ]}
  • 消息可以批量生产和消费 为提高消息发送和消息消费效率,推荐使用批量消息发送和消费。通常,默认消息消费为批量消费,而消息发送尽可能采用批量发送,可以有效减少API调用次数。 如下面两张示意图对比所示,消息批量生产与消费,可以减少API调用次数,节约资源。 图1 消息批量生产与消费 批量发送消息时,单次不能超过10条消息,总大小不能超过512KB。 批量生产(发送)消息可以灵活使用,在消息并发多的时候,批量发送,并发少时,单条发送。这样能够在减少调用次数的同时保证消息发送的实时性。 图2 消息逐条生产与消费 此外,批量消费消息时,消费者应按照接收的顺序对消息进行处理、确认,当对某一条消息处理失败时,不再需要继续处理本批消息中的后续消息,直接对已正确处理的消息进行确认即可。
  • 重视消息生产与消费的确认过程 消息生产 生产消息后,生产者需要根据ROMA Connect的返回信息确认消息是否发送成功,如果返回失败需要重新发送。 每次生产消息,生产者都需要等待消息发送API的应答信号,以确认消息是否成功发送。在消息传递过程中,如果发生异常,生产者没有接收到发送成功的信号,生产者自己决策是否需要重复发送消息。如果接收到发送成功的信号,则表明该消息已经被ROMA Connect可靠存储。 消息消费 消息消费时,消费者需要确认消息是否已被成功消费。 生产的消息被依次存储在ROMA Connect的存储介质中。消费时依次获取ROMA Connect中存储的消息。消费者获取消息后,进行消费并记录消费成功或失败的状态,并将消费状态提交到ROMA Connect,由ROMA Connect决定消费下一批消息或回滚重新消费消息。 在消费过程中,如果出现异常,没有提交消费确认,该批消息会在后续的消费请求中再次被获取。
  • 响应消息 响应参数 参数 类型 说明 handler String 消息handler。 message JSON对象 消息的内容。 表2 message参数 参数 类型 说明 content JSON 消息体的内容。Base64加密密文。 响应示例 [ { "handler": "NCMxMDAjMTgjMA==", "message": { "content": "ImhlbGxvIGh1YXdlaWNsb3VkLTIi" } }]
  • 概述 spring-kafka兼容开源Kafka客户端,其与开源Kafka客户端的版本对应关系可参见Spring官网。spring-kafka兼容的Kafka客户端版本主要为2.x.x版本,而ROMA Connect消息集成的Kafka服务端版本为1.1.0、2.3.0版本。因此在Spring Boot项目工程中使用spring-kafka连接ROMA Connect时,请确保客户端与服务端的Kafka版本一致。 若spring-kafka连接的ROMA Connect实例为Kafka 1.1.0版本时,大部分的功能可以正常使用,仅少数新增功能不支持。经排查验证,以下为不支持的功能,除此以外的其他功能暂未发现问题。如果在使用过程中遇到其他问题,请提工单联系技术支持。
  • 不支持消费者组静态成员功能 Kafka客户端在2.3版本新增了Consumer参数“group.instance.id”,设置了该ID的消费者被视为一个静态成员。 配置文件: src/main/resources/application.yml 配置项: spring: kafka: consumer: properties: group.instance.id: xxx 使用限制: 不能添加“group.instance.id”参数配置。
  • 不支持zstd压缩类型 Kafka在2.1.0版本新增了zstd压缩类型,而1.1.0版本的Kafka不支持zstd压缩类型。 配置文件: src/main/resources/application.yml 配置项: spring: kafka: producer: compression-type: xxx 使用限制: “compression-type”的值不能设置为“zstd”。
  • 消息生产与消费的幂等传递 ROMA Connect设计了一系列可靠性保障措施,确保消息不丢失。例如使用消息同步存储机制防止系统与服务器层面的异常重启或者掉电,使用消息确认(ACK)机制解决消息传输过程中遇到的异常。 考虑到网络异常等极端情况,用户除了做好消息生产与消费的确认,还需要配合ROMA Connect完成消息发送与消费的重复传输设计。 当无法确认消息是否已发送成功,生产者需要将消息重复发送给ROMA Connect。 当重复收到已处理过的消息,消费者需要告诉ROMA Connect消费成功且保证不重复处理。
  • 操作流程 获取MQS连接信息,具体请参考开发准备。 若ROMA Connect实例的消息集成在开启SASL_SSL的同时,也开启了VPC内网明文访问,则VPC内无法使用SASL方式连接消息集成的Topic。 使用SASL方式连接消息集成的Topic时,建议在客户端所在主机的“/etc/hosts”文件中配置host和IP的映射关系,否则会引入时延。 其中,IP地址必须为消息集成的连接地址,host为每个实例主机的名称,可以自定义,但不能重复。例如: 10.10.10.11 host01 10.10.10.12 host02 10.10.10.13 host03 参考示例代码,组装API请求,包括对API请求的签名。 对API请求签名,指使用SASL的用户名与密码作为密钥对,将请求URL、消息头时间戳等内容进行签名,供后端服务进行校验。点此了解签名流程 使用Demo向指定Topic生产消息、消费消息和确认消息时,返回的响应消息结构请参考生产消息接口说明、消费消息接口说明和消费确认接口说明。
  • URI GET /v1/topic/{topic_name}/group/{group_name}/messages?ack_wait={ack_wait}& time_wait={time_wait}&max_msgs={max_msgs} 表1 参数说明 参数 类型 必选 说明 topic_name String 是 主题名称。 group_name String 是 消费组名称。长度不超过249位的字符串,包含a~z,A~Z,0~9、中划线(-)和下划线(_)。 ack_wait Integer 否 提交确认消费的超时时间,客户端需要在该时间内提交消费确认,如果超过指定时间,没有确认消费,系统会报消息确认超时或handler无效,则默认为消费失败。取值范围:1~300s。默认值:15s time_wait Integer 否 设定队列可消费的消息为0时的读取消息等待时间。 如果在等待时间内有新的消息,则立即返回消费结果,如果等待时间内没有新的消息,则到等待时间后返回消费结果。取值范围:1~30s。 默认值:3s max_msgs Integer 否 获取可消费的消息的条数。取值范围:1~10。默认值:10 max_bytes Integer 否 每次消费的消息总负载最大值。取值范围:1~ 2097152。默认值:524288。
  • 导入工程 打开IntelliJ IDEA,在菜单栏选择“Import Project”。 弹出“Select File or Directory to Import”对话框。 在弹出的对话框中选择解压后的RESTful API Java Demo路径,单击“OK”。 “Import project from external model”选择“Eclipse”,单击“Next”,进入下一页后保持默认连续单击“Next”,直到“Please select project SDK”页面。 图1 Import Project 单击“Finish”,完成工程导入。 图2 Finish 编辑rest-config.properties 文件在src/main/resources目录下。将获取到的MQS实例连接地址、Topic名称,以及SASL信息填写到下述配置中。其中参数kafka.rest.group为消费组ID,可在客户端指定。 # Kafka rest endpoint.kafka.rest.endpoint=https://{MQS_Instance_IP_Addr}:9292# Kafka topic name.kafka.rest.topic=topic_name_demo# Kafka consume group.kafka.rest.group=group_id_demo# Kafka sasl username.kafka.rest.username=sasl_username_demo# Kafka sasl password.kafka.rest.password=sasl_user_passwd_demo 编辑log4j.properties 修改日志存储目录: log.directory=D://workspace/logs 运行示例工程,查看消息生产与消费样例。 消息生成与消费的Main方法在RestMain.java中,以Java Application的方式运行即可。
  • 请求消息 请求参数 参数 类型 必选 说明 messages Array 是 消息列表,数组大小不能超过10,且不能为空数组。 表2 messages参数 参数 类型 必选 说明 content Object 是 消息内容。 id String 是 消息序号,序列号不能重复。 请求示例 { "messages": [ { "content": "hello roma-1", "id": "1" }, { "content": "hello roma-2", "id": "2" }, { "content": "hello roma-3", "id": "3" } ]}
  • 响应消息 响应参数 参数 类型 说明 state String 结果状态。成功为success,失败为fail。 id String 消息序号。 响应示例 [ { "state": "success", "id": "1" }, { "state": "success", "id": "2" }, { "state": "success", "id": "3" }]
  • 生产消息 SASL认证方式 注意,加粗内容需要替换为实例自有信息。 from kafka import KafkaProducerimport ssl##连接信息conf = { 'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"], 'topic_name': 'topic_name', 'sasl_plain_username': 'username', 'sasl_plain_password': 'password'}context = ssl.create_default_context()context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)context.verify_mode = ssl.CERT_REQUIRED##证书文件context.load_verify_locations("phy_ca.crt")print('start producer')producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'], sasl_mechanism="PLAIN", ssl_context=context, security_protocol='SASL_SSL', sasl_plain_username=conf['sasl_plain_username'], sasl_plain_password=conf['sasl_plain_password'])data = bytes("hello kafka!", encoding="utf-8")producer.send(conf['topic_name'], data)producer.close()print('end producer') 示例代码中的参数说明,可参考获取MQS连接信息获取参数值。 bootstrap_servers:MQS连接地址和端口。 topic_name:要生产消息的Topic名称。 sasl_plain_username和sasl_plain_password:开启SASL_SSL认证时所使用的用户名和密码。 context.load_verify_locations:开启SASL_SSL认证时所使用的客户端证书。 非SASL认证方式 注意,加粗内容需要替换为实例自有信息。 from kafka import KafkaProducerconf = { 'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"], 'topic_name': 'topic_name',}print('start producer')producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'])data = bytes("hello kafka!", encoding="utf-8")producer.send(conf['topic_name'], data)producer.close()print('end producer') 示例代码中的参数说明,可参考获取MQS连接信息获取参数值。 bootstrap_servers:MQS连接地址和端口。 topic_name:要生产消息的Topic名称。
  • 消费消息 SASL认证方式 注意,加粗内容需要替换为实例自有信息。 from kafka import KafkaConsumerimport ssl##连接信息conf = { 'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"], 'topic_name': 'topic_name', 'sasl_plain_username': 'username', 'sasl_plain_password': 'password', 'consumer_id': 'consumer_id'}context = ssl.create_default_context()context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)context.verify_mode = ssl.CERT_REQUIRED##证书文件context.load_verify_locations("phy_ca.crt")print('start consumer')consumer = KafkaConsumer(conf['topic_name'], bootstrap_servers=conf['bootstrap_servers'], group_id=conf['consumer_id'], sasl_mechanism="PLAIN", ssl_context=context, security_protocol='SASL_SSL', sasl_plain_username=conf['sasl_plain_username'], sasl_plain_password=conf['sasl_plain_password'])for message in consumer: print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))print('end consumer') 示例代码中的参数说明,可参考获取MQS连接信息获取参数值。 bootstrap_servers:MQS连接地址和端口。 topic_name:要消费消息的Topic名称。 sasl_plain_username和sasl_plain_password:开启SASL_SSL认证时所使用的用户名和密码。 consumer_id:消费组名称。根据业务需要,自定义消费组名称,如果设置的消费组不存在,系统会自动创建。 context.load_verify_locations:开启SASL_SSL认证时所使用的客户端证书。 非SASL认证方式 注意,加粗内容需要替换为实例自有信息。 from kafka import KafkaConsumerconf = { 'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"], 'topic_name': 'topic_name', 'consumer_id': 'consumer_id'}print('start consumer')consumer = KafkaConsumer(conf['topic_name'], bootstrap_servers=conf['bootstrap_servers'], group_id=conf['consumer_id'])for message in consumer: print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))print('end consumer') 示例代码中的参数说明,可参考获取MQS连接信息获取参数值。 bootstrap_servers:MQS连接地址和端口。 topic_name:要消费消息的Topic名称。 consumer_id:消费组名称。根据业务需要,自定义消费组名称,如果设置的消费组不存在,系统会自动创建。
  • 其他建议 连接数限制:3000 消息大小:不能超过10MB 使用SASL_SSL协议访问ROMA Connect:确保DNS具有反向解析能力,或者在hosts文件配置ROMA Connect所有节点ip和主机名映射,避免Kafka client做反向解析,阻塞连接建立。 磁盘容量申请超过业务量 * 副本数的2倍,即保留磁盘空闲50%左右。 业务进程JVM内存使用确保无频繁FGC,否则会阻塞消息的生产和消费。 建议在Kafka客户端侧配置日志转储,否则容易引起磁盘被日志打满的情况。 若ROMA Connect实例的消息集成在开启SASL_SSL的同时,也开启了VPC内网明文访问,则VPC内无法使用SASL方式连接消息集成的Topic。 使用SASL方式连接消息集成的Topic时,建议在客户端所在主机的“/etc/hosts”文件中配置host和IP的映射关系,否则会引入时延。 其中,IP地址必须为消息集成的连接地址,host为每个实例主机的名称,可以自定义,但不能重复。例如: 10.10.10.11 host01 10.10.10.12 host02 10.10.10.13 host03
  • Consumer使用建议 Consumer的owner线程需确保不会异常退出,避免客户端无法发起消费请求,阻塞消费。 确保处理完消息后再做消息commit,避免业务消息处理失败,无法重新拉取处理失败的消息。 Consumer不能频繁加入和退出group,频繁加入和退出,会导致Consumer频繁做rebalance,阻塞消费。 Consumer数量不能超过Topic分区数,否则会有Consumer拉取不到消息。 Consumer需周期poll,维持和server的心跳,避免心跳超时,导致Consumer频繁加入和退出,阻塞消费。 Consumer拉取的消息本地缓存应有大小限制,避免OOM(Out of Memory)。 Consumer session的超时时间设置为30秒,session.timeout.ms=30000,避免时间过短导致Consumer频繁超时做rebalance,阻塞消费。 ROMA Connect不能保证消费重复的消息,业务侧需保证消息处理的幂等性。 消费线程退出要调用Consumer的close方法,避免同一个组的其他消费者阻塞sesstion.timeout.ms的时间。
  • 数据API中的执行语句说明 数据API调用存储过程时,参数可通过后端服务请求的Headers、Parameters或者Body传递,参数名的语法为:{参数名}.{数据类型}.{传输类型}。 数据类型包括String和int。 传输类型指入参或出参声明,入参使用in,出参使用out。 数据API中调用存储过程的执行语句示例: call sb_test(${nname.String.in},${nsal.int.out}) 该脚本示例中,nname为字符串类型的入参,参数名为nname.String.in ,value则是你要查询的参数值。nsal为数值类型的出参,参数名为nsal.int.out,由于格式限定,出参的value也需要填写,可填写符合数据类型的任意值,不影响输出结果。 数据API中对存储过程的调用,用String和int来区分字符串和数值,无需加单引号,这一点和SQL要求不一样。 在后端服务的Headers、Parameters或者Body中定义的参数名不能相同,否则将被覆盖。 Body传递参数示例: 后端服务请求的Body内容 { "nname.String.in": "zhang", "nsal": 0} 响应结果 { "test": [ 5000 ]} Parameters传递参数示例: 后端服务请求的Parameters内容 https://example.com?nname.String.in=zhang&nsal=0 响应结果 { "test": [ 5000 ]}
共100000条