-
连接器(Connector) 连接器是业务流中用于与外部服务或系统交互的组件,它封装了与特定服务通信的细节,简化了集成过程。连接器支持多种服务,例如云服务、消息系统、数据库、HTTP以及第三方服务(如邮箱、微信等),并提供预定义的API调用、认证和数据格式转换等功能。通过连接器,业务流可以轻松地读取或写入外部系统的数据,调用外部API,或发送通知。 例如,使用数据库连接器可以从华为云RDS中读取数据,而使用邮件连接器可以发送处理结果通知。连接器的作用在于降低集成的复杂性,提高工作流的可扩展性和适用性。
-
组合应用(Composite Application) 组合应用是指将多条业务流整合在一起,形成一个更复杂的自动化解决方案,以满足多样化的业务需求。通过组合应用,用户可以实现跨系统、跨服务的端到端业务流程自动化。 例如,可以将数据采集、处理、存储和通知等多个业务流串联起来,形成一个完整的数据处理管道。组合应用的优势在于其灵活性和可扩展性,用户可以根据业务变化随时调整或扩展工作流的逻辑,同时支持多工作流之间的数据共享和协同工作,从而实现更高效的业务运营。 一个组合应用可以包含多条业务流,并对多条流进行编辑、启动、停止、调试等操作。
-
重命名 存在需要名称不明确或错误的流节点时,用户可以重新编辑节点名称。 登录新版ROMA Connect控制台。 在左侧导航栏选择“组合应用”,在组合应用页面找到要编辑的组合应用。 单击组合应用上的“”,选择“编辑”,进入组合应用的编辑界面。 单击对应节点,打开节点编辑页面,单击节点名称右侧铅笔图标,在文本框中输入节点的新名称。 单击“√”图标保存输入的节点新名称。 单击组合应用详情页面右上方“保存”按钮。
-
复制与粘贴 一个组合应用里面需要多条重复的流时,用户可以对其中一个流节点做复制、粘贴操作。 登录新版ROMA Connect控制台。 在左侧导航栏选择“组合应用”,在组合应用页面找到要编辑的组合应用。 单击组合应用上的“”,选择“编辑”,进入组合应用的编辑界面。 将鼠标悬浮在需要复制的节点上,单击“复制”图标,完成节点复制。 将鼠标移动到待添加复制节点的连接线上,出现加号图标后,右键加号图标,单击“粘贴”按钮。
-
修改配置信息 为了方便,下文分生产与消费两个配置文件介绍。如果ROMA Connect实例开启了SASL认证,在Java客户端的配置文件中必须配置涉及SASL认证的相关信息,否则无法连接。如果没有使用SASL认证,请注释掉相关配置。 生产消息配置文件(对应生产消息代码中的mqs.sdk.producer.properties文件) 以下粗体部分为不同MQS特有的信息,必须修改。客户端其他参数,可以自主添加。 #Topic名称在具体的生产与消费代码中。
#######################
#举例:bootstrap.servers=192.168.0.196:9095,192.168.0.196:9096,192.168.0.196:9094
bootstrap.servers=ip1:port1,ip2:port2,ip3:port3
#发送确认参数
acks=all
#键的序列化方式
key.serializer=org.apache.kafka.common.serialization.StringSerializer
#值的序列化方式
value.serializer=org.apache.kafka.common.serialization.StringSerializer
#producer可以用来缓存数据的内存大小
buffer.memory=33554432
#重试次数
retries=0
#######################
#如果不使用SASL认证,以下参数请注释掉。
#######################
#设置用户名和密码
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="username" \
password="********";
#SASL鉴权方式
sasl.mechanism=PLAIN
#加密协议,目前支持SASL_SSL协议
security.protocol=SASL_SSL
#ssl truststore文件的位置
ssl.truststore.location=E:\\temp\\client.truststore.jks
#ssl truststore文件的密码,固定,请勿修改。配置此密码是为了访问Java生成的jks文件。
ssl.truststore.password=dms@kafka
ssl.endpoint.identification.algorithm= 示例代码中的参数说明,可参考获取MQS连接信息获取参数值。 bootstrap.servers:MQS连接地址和端口。 username和password:开启SASL认证所使用的用户名和密码。 ssl.truststore.location:开启SASL_SSL认证时所使用的客户端证书。 消费消息配置文件(对应消费消息代码中的mqs.sdk.consumer.properties文件) 以下粗体部分为不同MQS特有的信息,必须修改。客户端其他参数,可以自主添加。 #Topic名称在具体的生产与消费代码中。
#######################
#举例:bootstrap.servers=192.168.0.196:9095,192.168.0.196:9096,192.168.0.196:9094
bootstrap.servers=ip1:port1,ip2:port2,ip3:port3
#用来唯一标识consumer进程所在组的字符串,请您自行设定。
#如果设置同样的group id,表示这些processes都是属于同一个consumer group
group.id=1
#键的序列化方式
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
#值的序列化方式
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
#偏移量的方式
auto.offset.reset=earliest
#######################
#如果不使用SASL认证,以下参数请注释掉。
#######################
#设置用户名和密码
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="username" \
password="********";
#SASL鉴权方式
sasl.mechanism=PLAIN
#加密协议,目前支持SASL_SSL协议
security.protocol=SASL_SSL
#ssl truststore文件的位置
ssl.truststore.location=E:\\temp\\client.truststore.jks
#ssl truststore文件的密码,配置此密码是为了访问Java生成的jks文件。
ssl.truststore.password=dms@kafka
ssl.endpoint.identification.algorithm= 示例代码中的参数说明,可参考获取MQS连接信息获取参数值。 bootstrap.servers:MQS连接地址和端口。 group.id:消费组名称。根据业务需要,自定义消费组名称,如果设置的消费组不存在,系统会自动创建。 username和password:开启SASL认证所使用的用户名和密码。 ssl.truststore.location:开启SASL_SSL认证时所使用的客户端证书。
-
生产消息 SASL认证方式 注意,加粗内容需要替换为实例自有信息。 from kafka import KafkaProducer
import ssl
##连接信息
conf = {
'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],
'topic_name': 'topic_name',
'sasl_plain_username': 'username',
'sasl_plain_password': 'password'
}
context = ssl.create_default_context()
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_REQUIRED
##证书文件。
context.load_verify_locations("phy_ca.crt")
print('start producer')
producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'],
sasl_mechanism="PLAIN",
ssl_context=context,
security_protocol='SASL_SSL',
sasl_plain_username=conf['sasl_plain_username'],
sasl_plain_password=conf['sasl_plain_password'])
data = bytes("hello kafka!", encoding="utf-8")
producer.send(conf['topic_name'], data)
producer.close()
print('end producer') 示例代码中的参数说明,可参考获取MQS连接信息获取参数值。 bootstrap_servers:MQS连接地址和端口。 topic_name:要生产消息的Topic名称。 sasl_plain_username和sasl_plain_password:开启SASL认证时所使用的用户名和密码。 context.load_verify_locations:开启SASL_SSL认证时所使用的客户端证书。 非SASL认证方式 注意,加粗内容需要替换为实例自有信息。 from kafka import KafkaProducer
conf = {
'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],
'topic_name': 'topic_name',
}
print('start producer')
producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'])
data = bytes("hello kafka!", encoding="utf-8")
producer.send(conf['topic_name'], data)
producer.close()
print('end producer') 示例代码中的参数说明,可参考获取MQS连接信息获取参数值。 bootstrap_servers:MQS连接地址和端口。 topic_name:要生产消息的Topic名称。
-
消费消息 SASL认证方式 注意,加粗内容需要替换为实例自有信息。 from kafka import KafkaConsumer
import ssl
##连接信息
conf = {
'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],
'topic_name': 'topic_name',
'sasl_plain_username': 'username',
'sasl_plain_password': 'password',
'consumer_id': 'consumer_id'
}
context = ssl.create_default_context()
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_REQUIRED
##证书文件。
context.load_verify_locations("phy_ca.crt")
print('start consumer')
consumer = KafkaConsumer(conf['topic_name'],
bootstrap_servers=conf['bootstrap_servers'],
group_id=conf['consumer_id'],
sasl_mechanism="PLAIN",
ssl_context=context,
security_protocol='SASL_SSL',
sasl_plain_username=conf['sasl_plain_username'],
sasl_plain_password=conf['sasl_plain_password'])
for message in consumer:
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
print('end consumer') 示例代码中的参数说明,可参考获取MQS连接信息获取参数值。 bootstrap_servers:MQS连接地址和端口。 topic_name:要消费消息的Topic名称。 sasl_plain_username和sasl_plain_password:开启SASL认证时所使用的用户名和密码。 consumer_id:消费组名称。根据业务需要,自定义消费组名称,如果设置的消费组不存在,系统会自动创建。 context.load_verify_locations:开启SASL_SSL认证时所使用的客户端证书。 非SASL认证方式 注意,加粗内容需要替换为实例自有信息。 from kafka import KafkaConsumer
conf = {
'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],
'topic_name': 'topic_name',
'consumer_id': 'consumer_id'
}
print('start consumer')
consumer = KafkaConsumer(conf['topic_name'],
bootstrap_servers=conf['bootstrap_servers'],
group_id=conf['consumer_id'])
for message in consumer:
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
print('end consumer') 示例代码中的参数说明,可参考获取MQS连接信息获取参数值。 bootstrap_servers:MQS连接地址和端口。 topic_name:要消费消息的Topic名称。 consumer_id:消费组名称。根据业务需要,自定义消费组名称,如果设置的消费组不存在,系统会自动创建。
-
方法详情 public void produceWithDirectExchange(String routingKey, String props, String message) 用direct交换器生产消息,把消息路由到bindingKey与routingKey完全匹配的Queue中。 输入参数 routingKey:消息路由键 props:消息持久化设置,非必填 message:消息内容 public void produceWithTopicExchange(String bindingKey, String routingKey, String props, String message) 用topic交换器生产消息,把消息路由到bindingKey与routingKey模糊匹配的Queue中。 输入参数 bindingKey:队列绑定键 routingKey:消息路由键 props:消息持久化设置,非必填 message:消息内容 produceWithFanoutExchange(String props, String message) 用fanout交换器生产消息,把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。 输入参数 props:消息持久化设置,非必填 message:消息内容
-
使用示例 用direct交换器生产消息,把消息路由到bindingKey与routingKey完全匹配的Queue中。 importClass(com.roma.apic.livedata.client.v1.RabbitMqProducer);
importClass(com.roma.apic.livedata.config.v1.RabbitMqConfig);
importClass(com.roma.apic.livedata.config.v1.QueueConfig);
importClass(com.roma.apic.livedata.config.v1.ExchangeConfig);
importClass(com.roma.apic.livedata.config.v1.ConnectionConfig);
function execute(data) {
var connectionConfig = new ConnectionConfig("10.10.10.10", 5672, "admin", "123456");
var queueConfig = new QueueConfig("directQueue", false, false, false, null);
var exchangeConfig = new ExchangeConfig("directExchange", "direct", true, false, false, null);
var config = new RabbitMqConfig(connectionConfig, queueConfig, exchangeConfig);
var producer = new RabbitMqProducer(config);
producer.produceWithDirectExchange("direct.exchange", "PERSISTENT_TEXT_PLAIN", "direct exchange message");
return "produce successful.";
} 用topic交换器生产消息,把消息路由到bindingKey与routingKey模糊匹配的Queue中。 importClass(com.roma.apic.livedata.client.v1.RabbitMqProducer);
importClass(com.roma.apic.livedata.config.v1.RabbitMqConfig);
importClass(com.roma.apic.livedata.config.v1.QueueConfig);
importClass(com.roma.apic.livedata.config.v1.ExchangeConfig);
importClass(com.roma.apic.livedata.config.v1.ConnectionConfig);
function execute(data) {
var connectionConfig = new ConnectionConfig("10.10.10.10", 5672, "admin", "123456");
var queueConfig = new QueueConfig(“topicQueue”, false, false, false, null);
var exchangeConfig = new ExchangeConfig("topicExchange", "topic", true, false, false, null);
var config = new RabbitMqConfig(connectionConfig, queueConfig, exchangeConfig);
var producer = new RabbitMqProducer(config);
producer.produceWithTopicExchange("topic.#", “topic.A”, null, “message”);
return "produce successful.";
} 用fanout交换器生产消息,把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。 importClass(com.roma.apic.livedata.client.v1.RabbitMqProducer);
importClass(com.roma.apic.livedata.config.v1.RabbitMqConfig);
importClass(com.roma.apic.livedata.config.v1.QueueConfig);
importClass(com.roma.apic.livedata.config.v1.ExchangeConfig);
importClass(com.roma.apic.livedata.config.v1.ConnectionConfig);
function execute(data) {
var connectionConfig = new ConnectionConfig("10.10.10.10", 5672, "admin", "123456");
var queueConfig = new QueueConfig(“fanoutQueue”, false, false, false, null);
var exchangeConfig = new ExchangeConfig(“fanoutExchange”, "fanout", true, false, null);
var config = new RabbitMqConfig(connectionConfig, queueConfig, exchangeConfig);
var producer = new RabbitMqProducer(config);
producer.produceWithFanoutExchange( null, “message”);
return "produce successful."
}
-
方法列表 返回类型 方法和说明 void produceWithDirectExchange(String routingKey, String props, String message) 用direct交换器生产消息,把消息路由到bindingKey与routingKey完全匹配的Queue中。 void produceWithTopicExchange(String bindingKey, String routingKey, String props, String message) 用topic交换器生产消息,把消息路由到bindingKey与routingKey模糊匹配的Queue中。 void produceWithFanoutExchange(String props, String message) 用fanout交换器生产消息,把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。
-
使用示例 未启用安全协议或安全协议使用PLAINTEXT的场景: importClass(com.roma.apic.livedata.client.v1.KafkaProducer);
importClass(com.roma.apic.livedata.config.v1.KafkaConfig);
var kafka_brokers = '192.168.0.10:26330,192.168.0.11:26330,192.168.0.12:26330'
var topic = 'YourKafkaTopic'
function execute(data) {
var config = KafkaConfig.getConfig(kafka_brokers, null)
var producer = new KafkaProducer(config)
var record = producer.produce(topic, "hello, kafka.")
return {
offset: record.offset(),
partition: record.partition(),
code: 0,
message: "OK"
}
} 安全协议使用SASL_SSL且SASL认证机制使用PLAIN的场景: importClass(com.roma.apic.livedata.client.v1.KafkaProducer);
importClass(com.roma.apic.livedata.config.v1.KafkaConfig);
var kafka_brokers = '192.168.0.10:26330,192.168.0.11:26330,192.168.0.12:26330'
var topic = 'YourKafkaTopic'
//AppKey和AppSecret为SASL_SSL认证的用户名和密码,如果对接的为MQS,则为Topic所属集成应用的Key和Secret
var app_key = 'AppKey'
var app_secret = 'AppSecret'
function execute(data) {
var config = KafkaConfig.getSaslConfig(kafka_brokers, null, app_key, app_secret)
var producer = new KafkaProducer(config)
var record = producer.produce(topic, "hello, kafka.")
return {
offset: record.offset(),
partition: record.partition(),
code: 0,
message: "OK"
}
} 安全协议使用SASL_SSL或SASL_PLAINTEXT,且SASL认证机制使用SC
RAM -SHA-512的场景: importClass(com.roma.apic.livedata.client.v1.KafkaProducer);
importClass(com.roma.apic.livedata.config.v1.KafkaConfig);
var kafka_brokers = '192.168.0.10:26330,192.168.0.11:26330,192.168.0.12:26330'
var topic = 'YourKafkaTopic'
function execute(data) {
var config = KafkaConfig.getConfig(kafka_brokers, null);
//根据实际使用的安全协议设置具体的值,SASL_SSL或SASL_PLAINTEXT
config.put("security.protocol", "SASL_SSL|SASL_PLAINTEXT");
config.put("sasl.mechanism", "SCRAM-SHA-512");
//AppKey和AppSecret为SASL_SSL或SASL_PLAINTEXT认证的用户名和密码,如果对接的为MQS,则为Topic所属集成应用的Key和Secret
config.put("sasl.jaas.config","org.apache.kafka.common.security.scram.ScramLoginModule required username=\"AppKey\" password=\"AppSecret\";");
var producer = new KafkaProducer(config)
var record = producer.produce(topic, "hello, kafka.")
return {
offset: record.offset(),
partition: record.partition(),
code: 0,
message: "OK"
}
}
-
方法列表 返回类型 方法和说明 org.apache.kafka.clients.producer.RecordMetadata produce(String topic, String message) 生产消息 不能直接返回方法produce(String topic, String message),否则会导致返回信息为空。例如在使用示例中,不能直接使用“return record”句式,否则返回的信息为空。
-
创建imap邮件连接 登录新版ROMA Connect控制台。 在左侧导航栏选择“连接器”,在连接器页面单击“新建连接”。 选择“imap邮件”连接器。 在弹窗中配置连接器信息,完成后单击“确定”。 参数 说明 连接名称 填写连接器实例名称。 IMAP服务器地址 填写IMAP邮箱服务器地址。一般为IP地址或者
域名 形式。 不需要包含协议方案及端口号。 端口 SMTP邮箱服务器地址的端口。 邮箱地址 发件人邮箱地址。比如lilei@imap.com。 密码 发件人邮箱密码。 是否使用SSL加密 选择是否启用SSL加密。 描述 填写连接器的描述信息,用于识别不同的连接器。
-
配置参数 参数 说明 收件人 填写邮件的收件人邮箱地址列表,多个电子邮件地址使用英文逗号(,)分隔。 抄送 填写邮件的抄送人邮箱地址列表,多个电子邮件地址使用英文逗号(,)分隔。 密送 填写邮件的密送邮箱地址列表,多个电子邮件地址使用英文逗号(,)分隔。 主题 填写邮件的标题。 内容 填写邮件的内容。 内容类型 选择内容类型。 文本 HTML 重要性 选择邮件的重要性。 None 低 中 高 附件 添加邮件附件,格式为json数组,必填项为filename及contentData。可选填contentType,类型为mime.例如:[{"filename": "xxx.pdf","contentData": "text file","contentType":"application/pdf"}]
-
创建SMTP邮箱连接 登录新版ROMA Connect控制台。 在左侧导航栏选择“连接器”,在连接器页面单击“新建连接”。 选择“SMTP邮箱”连接器。 在弹窗中配置连接器信息,完成后单击“确定”。 参数 说明 连接名称 填写连接器实例名称。 SMTP服务器地址 填写SMTP邮箱服务器地址。一般为IP地址或者域名形式。 不需要包含协议方案及端口号。 端口 SMTP邮箱服务器地址的端口。 邮箱地址 发件人邮箱地址。比如lilei@imap.com。 密码 发件人邮箱密码。 是否使用SSL加密 选择是否启用SSL加密。 描述 填写连接器的描述信息,用于识别不同的连接器。