-
修改配置信息 为了方便,下文分生产与消费两个配置文件介绍。如果ROMA Connect实例开启了SASL认证,在Java客户端的配置文件中必须配置涉及SASL认证的相关信息,否则无法连接。如果没有使用SASL认证,请注释掉相关配置。 生产消息配置文件(对应生产消息代码中的mqs.sdk.producer.properties文件) 以下粗体部分为不同MQS特有的信息,必须修改。客户端其他参数,可以自主添加。 #Topic名称在具体的生产与消费代码中。
#######################
#举例:bootstrap.servers=192.168.0.196:9095,192.168.0.196:9096,192.168.0.196:9094
bootstrap.servers=ip1:port1,ip2:port2,ip3:port3
#发送确认参数
acks=all
#键的序列化方式
key.serializer=org.apache.kafka.common.serialization.StringSerializer
#值的序列化方式
value.serializer=org.apache.kafka.common.serialization.StringSerializer
#producer可以用来缓存数据的内存大小
buffer.memory=33554432
#重试次数
retries=0
#######################
#如果不使用SASL认证,以下参数请注释掉。
#######################
#设置用户名和密码
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="username" \
password="********";
#SASL鉴权方式
sasl.mechanism=PLAIN
#加密协议,目前支持SASL_SSL协议
security.protocol=SASL_SSL
#ssl truststore文件的位置
ssl.truststore.location=E:\\temp\\client.truststore.jks
#ssl truststore文件的密码,固定,请勿修改。配置此密码是为了访问Java生成的jks文件。
ssl.truststore.password=dms@kafka
ssl.endpoint.identification.algorithm= 示例代码中的参数说明,可参考获取MQS连接信息获取参数值。 bootstrap.servers:MQS连接地址和端口。 username和password:开启SASL认证所使用的用户名和密码。 ssl.truststore.location:开启SASL_SSL认证时所使用的客户端证书。 消费消息配置文件(对应消费消息代码中的mqs.sdk.consumer.properties文件) 以下粗体部分为不同MQS特有的信息,必须修改。客户端其他参数,可以自主添加。 #Topic名称在具体的生产与消费代码中。
#######################
#举例:bootstrap.servers=192.168.0.196:9095,192.168.0.196:9096,192.168.0.196:9094
bootstrap.servers=ip1:port1,ip2:port2,ip3:port3
#用来唯一标识consumer进程所在组的字符串,请您自行设定。
#如果设置同样的group id,表示这些processes都是属于同一个consumer group
group.id=1
#键的序列化方式
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
#值的序列化方式
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
#偏移量的方式
auto.offset.reset=earliest
#######################
#如果不使用SASL认证,以下参数请注释掉。
#######################
#设置用户名和密码
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="username" \
password="********";
#SASL鉴权方式
sasl.mechanism=PLAIN
#加密协议,目前支持SASL_SSL协议
security.protocol=SASL_SSL
#ssl truststore文件的位置
ssl.truststore.location=E:\\temp\\client.truststore.jks
#ssl truststore文件的密码,配置此密码是为了访问Java生成的jks文件。
ssl.truststore.password=dms@kafka
ssl.endpoint.identification.algorithm= 示例代码中的参数说明,可参考获取MQS连接信息获取参数值。 bootstrap.servers:MQS连接地址和端口。 group.id:消费组名称。根据业务需要,自定义消费组名称,如果设置的消费组不存在,系统会自动创建。 username和password:开启SASL认证所使用的用户名和密码。 ssl.truststore.location:开启SASL_SSL认证时所使用的客户端证书。
-
生产消息 SASL认证方式 注意,加粗内容需要替换为实例自有信息。 from kafka import KafkaProducer
import ssl
##连接信息
conf = {
'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],
'topic_name': 'topic_name',
'sasl_plain_username': 'username',
'sasl_plain_password': 'password'
}
context = ssl.create_default_context()
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_REQUIRED
##证书文件。
context.load_verify_locations("phy_ca.crt")
print('start producer')
producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'],
sasl_mechanism="PLAIN",
ssl_context=context,
security_protocol='SASL_SSL',
sasl_plain_username=conf['sasl_plain_username'],
sasl_plain_password=conf['sasl_plain_password'])
data = bytes("hello kafka!", encoding="utf-8")
producer.send(conf['topic_name'], data)
producer.close()
print('end producer') 示例代码中的参数说明,可参考获取MQS连接信息获取参数值。 bootstrap_servers:MQS连接地址和端口。 topic_name:要生产消息的Topic名称。 sasl_plain_username和sasl_plain_password:开启SASL认证时所使用的用户名和密码。 context.load_verify_locations:开启SASL_SSL认证时所使用的客户端证书。 非SASL认证方式 注意,加粗内容需要替换为实例自有信息。 from kafka import KafkaProducer
conf = {
'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],
'topic_name': 'topic_name',
}
print('start producer')
producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'])
data = bytes("hello kafka!", encoding="utf-8")
producer.send(conf['topic_name'], data)
producer.close()
print('end producer') 示例代码中的参数说明,可参考获取MQS连接信息获取参数值。 bootstrap_servers:MQS连接地址和端口。 topic_name:要生产消息的Topic名称。
-
消费消息 SASL认证方式 注意,加粗内容需要替换为实例自有信息。 from kafka import KafkaConsumer
import ssl
##连接信息
conf = {
'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],
'topic_name': 'topic_name',
'sasl_plain_username': 'username',
'sasl_plain_password': 'password',
'consumer_id': 'consumer_id'
}
context = ssl.create_default_context()
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_REQUIRED
##证书文件。
context.load_verify_locations("phy_ca.crt")
print('start consumer')
consumer = KafkaConsumer(conf['topic_name'],
bootstrap_servers=conf['bootstrap_servers'],
group_id=conf['consumer_id'],
sasl_mechanism="PLAIN",
ssl_context=context,
security_protocol='SASL_SSL',
sasl_plain_username=conf['sasl_plain_username'],
sasl_plain_password=conf['sasl_plain_password'])
for message in consumer:
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
print('end consumer') 示例代码中的参数说明,可参考获取MQS连接信息获取参数值。 bootstrap_servers:MQS连接地址和端口。 topic_name:要消费消息的Topic名称。 sasl_plain_username和sasl_plain_password:开启SASL认证时所使用的用户名和密码。 consumer_id:消费组名称。根据业务需要,自定义消费组名称,如果设置的消费组不存在,系统会自动创建。 context.load_verify_locations:开启SASL_SSL认证时所使用的客户端证书。 非SASL认证方式 注意,加粗内容需要替换为实例自有信息。 from kafka import KafkaConsumer
conf = {
'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],
'topic_name': 'topic_name',
'consumer_id': 'consumer_id'
}
print('start consumer')
consumer = KafkaConsumer(conf['topic_name'],
bootstrap_servers=conf['bootstrap_servers'],
group_id=conf['consumer_id'])
for message in consumer:
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
print('end consumer') 示例代码中的参数说明,可参考获取MQS连接信息获取参数值。 bootstrap_servers:MQS连接地址和端口。 topic_name:要消费消息的Topic名称。 consumer_id:消费组名称。根据业务需要,自定义消费组名称,如果设置的消费组不存在,系统会自动创建。
-
方法详情 public void produceWithDirectExchange(String routingKey, String props, String message) 用direct交换器生产消息,把消息路由到bindingKey与routingKey完全匹配的Queue中。 输入参数 routingKey:消息路由键 props:消息持久化设置,非必填 message:消息内容 public void produceWithTopicExchange(String bindingKey, String routingKey, String props, String message) 用topic交换器生产消息,把消息路由到bindingKey与routingKey模糊匹配的Queue中。 输入参数 bindingKey:队列绑定键 routingKey:消息路由键 props:消息持久化设置,非必填 message:消息内容 produceWithFanoutExchange(String props, String message) 用fanout交换器生产消息,把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。 输入参数 props:消息持久化设置,非必填 message:消息内容
-
使用示例 用direct交换器生产消息,把消息路由到bindingKey与routingKey完全匹配的Queue中。 importClass(com.roma.apic.livedata.client.v1.RabbitMqProducer);
importClass(com.roma.apic.livedata.config.v1.RabbitMqConfig);
importClass(com.roma.apic.livedata.config.v1.QueueConfig);
importClass(com.roma.apic.livedata.config.v1.ExchangeConfig);
importClass(com.roma.apic.livedata.config.v1.ConnectionConfig);
function execute(data) {
var connectionConfig = new ConnectionConfig("10.10.10.10", 5672, "admin", "123456");
var queueConfig = new QueueConfig("directQueue", false, false, false, null);
var exchangeConfig = new ExchangeConfig("directExchange", "direct", true, false, false, null);
var config = new RabbitMqConfig(connectionConfig, queueConfig, exchangeConfig);
var producer = new RabbitMqProducer(config);
producer.produceWithDirectExchange("direct.exchange", "PERSISTENT_TEXT_PLAIN", "direct exchange message");
return "produce successful.";
} 用topic交换器生产消息,把消息路由到bindingKey与routingKey模糊匹配的Queue中。 importClass(com.roma.apic.livedata.client.v1.RabbitMqProducer);
importClass(com.roma.apic.livedata.config.v1.RabbitMqConfig);
importClass(com.roma.apic.livedata.config.v1.QueueConfig);
importClass(com.roma.apic.livedata.config.v1.ExchangeConfig);
importClass(com.roma.apic.livedata.config.v1.ConnectionConfig);
function execute(data) {
var connectionConfig = new ConnectionConfig("10.10.10.10", 5672, "admin", "123456");
var queueConfig = new QueueConfig(“topicQueue”, false, false, false, null);
var exchangeConfig = new ExchangeConfig("topicExchange", "topic", true, false, false, null);
var config = new RabbitMqConfig(connectionConfig, queueConfig, exchangeConfig);
var producer = new RabbitMqProducer(config);
producer.produceWithTopicExchange("topic.#", “topic.A”, null, “message”);
return "produce successful.";
} 用fanout交换器生产消息,把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。 importClass(com.roma.apic.livedata.client.v1.RabbitMqProducer);
importClass(com.roma.apic.livedata.config.v1.RabbitMqConfig);
importClass(com.roma.apic.livedata.config.v1.QueueConfig);
importClass(com.roma.apic.livedata.config.v1.ExchangeConfig);
importClass(com.roma.apic.livedata.config.v1.ConnectionConfig);
function execute(data) {
var connectionConfig = new ConnectionConfig("10.10.10.10", 5672, "admin", "123456");
var queueConfig = new QueueConfig(“fanoutQueue”, false, false, false, null);
var exchangeConfig = new ExchangeConfig(“fanoutExchange”, "fanout", true, false, null);
var config = new RabbitMqConfig(connectionConfig, queueConfig, exchangeConfig);
var producer = new RabbitMqProducer(config);
producer.produceWithFanoutExchange( null, “message”);
return "produce successful."
}
-
方法列表 返回类型 方法和说明 void produceWithDirectExchange(String routingKey, String props, String message) 用direct交换器生产消息,把消息路由到bindingKey与routingKey完全匹配的Queue中。 void produceWithTopicExchange(String bindingKey, String routingKey, String props, String message) 用topic交换器生产消息,把消息路由到bindingKey与routingKey模糊匹配的Queue中。 void produceWithFanoutExchange(String props, String message) 用fanout交换器生产消息,把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。
-
使用示例 未启用安全协议或安全协议使用PLAINTEXT的场景: importClass(com.roma.apic.livedata.client.v1.KafkaProducer);
importClass(com.roma.apic.livedata.config.v1.KafkaConfig);
var kafka_brokers = '192.168.0.10:26330,192.168.0.11:26330,192.168.0.12:26330'
var topic = 'YourKafkaTopic'
function execute(data) {
var config = KafkaConfig.getConfig(kafka_brokers, null)
var producer = new KafkaProducer(config)
var record = producer.produce(topic, "hello, kafka.")
return {
offset: record.offset(),
partition: record.partition(),
code: 0,
message: "OK"
}
} 安全协议使用SASL_SSL且SASL认证机制使用PLAIN的场景: importClass(com.roma.apic.livedata.client.v1.KafkaProducer);
importClass(com.roma.apic.livedata.config.v1.KafkaConfig);
var kafka_brokers = '192.168.0.10:26330,192.168.0.11:26330,192.168.0.12:26330'
var topic = 'YourKafkaTopic'
//AppKey和AppSecret为SASL_SSL认证的用户名和密码,如果对接的为MQS,则为Topic所属集成应用的Key和Secret
var app_key = 'AppKey'
var app_secret = 'AppSecret'
function execute(data) {
var config = KafkaConfig.getSaslConfig(kafka_brokers, null, app_key, app_secret)
var producer = new KafkaProducer(config)
var record = producer.produce(topic, "hello, kafka.")
return {
offset: record.offset(),
partition: record.partition(),
code: 0,
message: "OK"
}
} 安全协议使用SASL_SSL或SASL_PLAINTEXT,且SASL认证机制使用SC
RAM -SHA-512的场景: importClass(com.roma.apic.livedata.client.v1.KafkaProducer);
importClass(com.roma.apic.livedata.config.v1.KafkaConfig);
var kafka_brokers = '192.168.0.10:26330,192.168.0.11:26330,192.168.0.12:26330'
var topic = 'YourKafkaTopic'
function execute(data) {
var config = KafkaConfig.getConfig(kafka_brokers, null);
//根据实际使用的安全协议设置具体的值,SASL_SSL或SASL_PLAINTEXT
config.put("security.protocol", "SASL_SSL|SASL_PLAINTEXT");
config.put("sasl.mechanism", "SCRAM-SHA-512");
//AppKey和AppSecret为SASL_SSL或SASL_PLAINTEXT认证的用户名和密码,如果对接的为MQS,则为Topic所属集成应用的Key和Secret
config.put("sasl.jaas.config","org.apache.kafka.common.security.scram.ScramLoginModule required username=\"AppKey\" password=\"AppSecret\";");
var producer = new KafkaProducer(config)
var record = producer.produce(topic, "hello, kafka.")
return {
offset: record.offset(),
partition: record.partition(),
code: 0,
message: "OK"
}
}
-
方法列表 返回类型 方法和说明 org.apache.kafka.clients.producer.RecordMetadata produce(String topic, String message) 生产消息 不能直接返回方法produce(String topic, String message),否则会导致返回信息为空。例如在使用示例中,不能直接使用“return record”句式,否则返回的信息为空。
-
创建imap邮件连接 登录新版ROMA Connect控制台。 在左侧导航栏选择“连接器”,在连接器页面单击“新建连接”。 选择“imap邮件”连接器。 在弹窗中配置连接器信息,完成后单击“确定”。 参数 说明 连接名称 填写连接器实例名称。 IMAP服务器地址 填写IMAP邮箱服务器地址。一般为IP地址或者
域名 形式。 不需要包含协议方案及端口号。 端口 SMTP邮箱服务器地址的端口。 邮箱地址 发件人邮箱地址。比如lilei@imap.com。 密码 发件人邮箱密码。 是否使用SSL加密 选择是否启用SSL加密。 描述 填写连接器的描述信息,用于识别不同的连接器。
-
创建SMTP邮箱连接 登录新版ROMA Connect控制台。 在左侧导航栏选择“连接器”,在连接器页面单击“新建连接”。 选择“SMTP邮箱”连接器。 在弹窗中配置连接器信息,完成后单击“确定”。 参数 说明 连接名称 填写连接器实例名称。 SMTP服务器地址 填写SMTP邮箱服务器地址。一般为IP地址或者域名形式。 不需要包含协议方案及端口号。 端口 SMTP邮箱服务器地址的端口。 邮箱地址 发件人邮箱地址。比如lilei@imap.com。 密码 发件人邮箱密码。 是否使用SSL加密 选择是否启用SSL加密。 描述 填写连接器的描述信息,用于识别不同的连接器。
-
配置参数 参数 说明 收件人 填写邮件的收件人邮箱地址列表,多个电子邮件地址使用英文逗号(,)分隔。 抄送 填写邮件的抄送人邮箱地址列表,多个电子邮件地址使用英文逗号(,)分隔。 密送 填写邮件的密送邮箱地址列表,多个电子邮件地址使用英文逗号(,)分隔。 主题 填写邮件的标题。 内容 填写邮件的内容。 内容类型 选择内容类型。 文本 HTML 重要性 选择邮件的重要性。 None 低 中 高 附件 添加邮件附件,格式为json数组,必填项为filename及contentData。可选填contentType,类型为mime.例如:[{"filename": "xxx.pdf","contentData": "text file","contentType":"application/pdf"}]
-
参数说明 表1 IBM MQ常见的 C
CS ID 及其对应的字符编码 CCSID 描述 819 ISO 8859-1 (Latin 1) 1208 UTF-8 13488 UTF-16BE (Big Endian) 1200 UTF-16LE (Little Endian) 437 PC US ASCII 500 EBCDIC International 1047 EBCDIC US/Canada Open Systems 1140 EBCDIC US/Canada with Euro Symbol 父主题: 附录
-
设置数据结构 用户在编辑组合应用时,可以通过变量点选的方式引用前置节点payload中的字段。 当前支持两种方式配置节点payload的数据结构。 节点实际的payload的数据结构必须与设置的相同,否则将无法在后续节点中正常使用payload中的字段。 导入json 用户单击“导入json”按钮,选择需要导入的json文件,完成后可以在表格中看到导入的payload的数据结构。 例如,导入json文件内容如下: {
"id":["123456"],
"sort": 1,
"attribute":{
"custom_fields":[{
"code":"codeA",
"value":"111"
}]
}
} 手动新增字段 用户单击表格中“新增根字段”按钮,表格中会自动生成一个待修改的字段;单击编辑按钮,修改字段名称和字段类型。如需继续新增字段请单击“新增根字段”按钮。 单击Object类型的节点对应的新增按钮,可以新增当前节点的子节点或相邻节点。 父主题: 参数设置与变量点选
-
重命名 存在需要名称不明确或错误的流节点时,用户可以重新编辑节点名称。 登录新版ROMA Connect控制台。 在左侧导航栏选择“组合应用”,在组合应用页面找到要编辑的组合应用。 单击组合应用上的“”,选择“编辑”,进入组合应用的编辑界面。 单击对应节点,打开节点编辑页面,单击节点名称右侧铅笔图标,在文本框中输入节点的新名称。 单击“√”图标保存输入的节点新名称。 单击组合应用详情页面右上方“保存”按钮。
-
复制与粘贴 一个组合应用里面需要多条重复的流时,用户可以对其中一个流节点做复制、粘贴操作。 登录新版ROMA Connect控制台。 在左侧导航栏选择“组合应用”,在组合应用页面找到要编辑的组合应用。 单击组合应用上的“”,选择“编辑”,进入组合应用的编辑界面。 将鼠标悬浮在需要复制的节点上,单击“复制”图标,完成节点复制。 将鼠标移动到待添加复制节点的连接线上,出现加号图标后,右键加号图标,单击“粘贴”按钮。