云服务器内容精选

  • 常用Sink配置 HDFS Sink HDFS Sink将数据写入HDFS。常用配置如表9所示: 图9 HDFS Sink 表9 HDFS Sink常用配置 参数 默认值 描述 channel - 与之相连的Channel。仅可在“properties.properties”文件中配置。 type hdfs 类型,需配置为“hdfs”。仅可在“properties.properties”文件中配置。 monTime 0(不开启) 线程监控阈值,更新时间大于阈值时重新启动该Sink,单位:秒。 hdfs.path - HDFS路径。 hdfs.inUseSuffix .tmp 正在写入的HDFS文件后缀。 hdfs.rollInterval 30 按时间滚动文件,单位:秒,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。 hdfs.rollSize 1024 按大小滚动文件,单位:字节,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。 hdfs.rollCount 10 按Event个数滚动文件,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。 hdfs.idleTimeout 0 自动关闭空闲文件超时时间,单位:秒。 hdfs.batchSize 1000 每次写入HDFS的Event个数。 hdfs.kerberosPrincipal - 认证HDFS的Kerberos用户名,未启用Kerberos认证集群不配置。 hdfs.kerberosKeytab - 认证HDFS的Kerberos keytab路径,未启用Kerberos认证集群不配置 hdfs.fileCloseByEndEvent true 收到最后一个Event时是否关闭文件。 hdfs.batchCallTimeout - 每次写入HDFS超时控制时间,单位:毫秒。 当不配置此参数时,对每个Event写入HDFS进行超时控制。当“hdfs.batchSize”大于0时,配置此参数可以提升写入HDFS性能。 说明: “hdfs.batchCallTimeout”设置多长时间需要考虑“hdfs.batchSize”的大小,“hdfs.batchSize”越大,“hdfs.batchCallTimeout”也要调整更长时间,设置过短时间容易导致数据写入HDFS失败。 serializer.appendNewline true 将一个Event写入HDFS后是否追加换行符('\n'),如果追加该换行符,该换行符所占用的数据量指标不会被HDFS Sink统计。 Avro Sink Avro Sink把events转化为Avro events并发送到配置的主机的监测端口。常用配置如表10所示: 图10 Avro Sink 表10 Avro Sink常用配置 参数 默认值 描述 channel - 与之相连的Channel。仅可在“properties.properties”文件中配置。 type - 类型,需配置为“avro”。仅可在“properties.properties”文件中配置。 hostname - 绑定关联的主机名或IP地址。 port - 监测端口。 batch-size 1000 批次发送的Event个数。 ssl false 是否使用SSL加密。 truststore-type JKS Java信任库类型。 truststore - Java信任库文件。 truststore-password - Java信任库密码。 keystore-type JKS 密钥存储类型。 keystore - 密钥存储文件。 keystore-password - 密钥存储密码 HBase Sink HBase Sink将数据写入到HBase中。常用配置如表11所示: 图11 HBase Sink 表11 HBase Sink常用配置 参数 默认值 描述 channel - 与之相连的Channel。仅可在“properties.properties”文件中配置。 type - 类型,需配置为“hbase”。仅可在“properties.properties”文件中配置。 table - HBase表名称。 monTime 0(不开启) 线程监控阈值,更新时间大于阈值时重新启动该Sink,单位:秒。 columnFamily - HBase列族名称。 batchSize 1000 每次写入HBase的Event个数。 kerberosPrincipal - 认证HBase的Kerberos用户名,未启用Kerberos认证集群不配置。 kerberosKeytab - 认证HBase的Kerberos keytab路径,未启用Kerberos认证集群不配置。 Kafka Sink Kafka Sink将数据写入到Kafka中。常用配置如表12所示: 图12 Kafka Sink 表12 Kafka Sink常用配置 参数 默认值 描述 channel - 与之相连的Channel。仅可在“properties.properties”文件中配置。 type - 类型,需配置为“org.apache.flume.sink.kafka.KafkaSink”。 仅可在“properties.properties”文件中配置。 kafka.bootstrap.servers - Kafkabrokers列表,多个用英文逗号分隔。 monTime 0(不开启) 线程监控阈值,更新时间大于阈值时重新启动该Sink,单位:秒。 kafka.topic default-flume-topic 数据写入的topic。 flumeBatchSize 1000 每次写入Kafka的Event个数。 kafka.security.protocol SASL_PLAINTEXT Kafka安全协议,未启用Kerberos认证集群下须配置为“PLAINTEXT”。 kafka.kerberos.domain.name - Kafka Domain名称。安全集群必填。仅可在“properties.properties”文件中配置。 Other Kafka Producer Properties - 其他Kafka配置,可以接受任意Kafka支持的生产参数配置,配置需要加前缀“.kafka”。 仅可在“properties.properties”文件中配置。
  • 常用Channel配置 Memory Channel Memory Channel使用内存作为缓存区,Events存放在内存队列中。常用配置如表6所示: 图6 Memory Channel 表6 Memory Channel常用配置 参数 默认值 描述 type - 类型,需配置为“memory”。仅可在“properties.properties”文件中配置。 capacity 10000 缓存在Channel中的最大Event数。 transactionCapacity 1000 每次存取的最大Event数。 channelfullcount 10 Channel full次数,达到该次数后发送告警。 File Channel File Channel使用本地磁盘作为缓存区,Events存放在设置的“dataDirs”配置项文件夹中。常用配置如表7所示: 图7 File Channel 表7 File Channel常用配置 参数 默认值 描述 type - 类型,需配置为“file”。仅可在“properties.properties”文件中配置。 checkpointDir ${BIGDATA_DATA_HOME}/flume/checkpoint 检查点存放路径。 dataDirs ${BIGDATA_DATA_HOME}/flume/data 数据缓存路径,设置多个路径可提升性能,中间用逗号分开。 maxFileSize 2146435071 单个缓存文件的最大值,单位:字节。 minimumRequiredSpace 524288000 缓冲区空闲空间最小值,单位:字节。 capacity 1000000 缓存在Channel中的最大Event数。 transactionCapacity 10000 每次存取的最大Event数。 channelfullcount 10 Channel full次数,达到该次数后发送告警。 Kafka Channel Kafka Channel使用kafka集群缓存数据,Kafka提供高可用、多副本,以防Flume或Kafka Broker崩溃,Channel中的数据会立即被Sink消费。常用配置如表 10 Kafka Channel 常用配置所示: 图8 Kafka Channel 表8 Kafka Channel常用配置 参数 默认值 描述 type - 类型,需配置为 “org.apache.flume.channel.kafka.KafkaChannel”.。 仅可在“properties.properties”文件中配置。 kafka.bootstrap.servers - kafka broker列表。 kafka.topic flume-channel Channel用来缓存数据的topic。 kafka.consumer.group.id flume Kafka消费者组ID。 parseAsFlumeEvent true 是否解析为Flume event。 migrateZookeeperOffsets true 当Kafka没有存储offset时,是否从ZooKeeper中查找,并提交到Kafka。 kafka.consumer.auto.offset.reset latest 当没有offset记录时,从指定的位置消费数据。 kafka.producer.security.protocol SASL_PLAINTEXT Kafka生产者安全协议。 kafka.consumer.security.protocol SASL_PLAINTEXT Kafka消费者安全协议。
  • 补充说明 Flume可靠性保障措施。 Source与Channel、Channel与Sink之间支持事务机制。 Sink Processor支持配置failover、load_balance机制。 例如load_balance示例如下: server.sinkgroups=g1 server.sinkgroups.g1.sinks=k1 k2 server.sinkgroups.g1.processor.type=load_balance server.sinkgroups.g1.processor.backoff=true server.sinkgroups.g1.processor.selector=random Flume多客户端聚合级联时的注意事项。 级联时需要走Avro或者Thrift协议进行级联。 聚合端存在多个节点时,连接配置尽量配置均衡,不要聚合到单节点上。 Flume客户端可以包含多个独立的数据流,即在一个配置文件properties.properties中配置多个Source、Channel、Sink。这些组件可以链接以形成多个流。 例如在一个配置中配置两个数据流,示例如下: server.sources = source1 source2 server.sinks = sink1 sink2 server.channels = channel1 channel2 #dataflow1 server.sources.source1.channels = channel1 server.sinks.sink1.channel = channel1 #dataflow2 server.sources.source2.channels = channel2 server.sinks.sink2.channel = channel2
  • 参考实例 Flume配置参考示例(SpoolDir--Mem--Hive): server.sources = spool_source server.channels = mem_channel server.sinks = Hive_Sink #config the source server.sources.spool_source.type = spooldir server.sources.spool_source.spoolDir = /tmp/testflume server.sources.spool_source.montime = server.sources.spool_source.fileSuffix =.COMPLETED server.sources.spool_source.deletePolicy = never server.sources.spool_source.trackerDir =.flumespool server.sources.spool_source.ignorePattern = ^$ server.sources.spool_source.batchSize = 20 server.sources.spool_source.inputCharset =UTF-8 server.sources.spool_source.selector.type = replicating server.sources.spool_source.fileHeader = false server.sources.spool_source.fileHeaderKey = file server.sources.spool_source.basenameHeaderKey= basename server.sources.spool_source.deserializer = LINE server.sources.spool_source.deserializer.maxBatchLine= 1 server.sources.spool_source.deserializer.maxLineLength= 2048 server.sources.spool_source.channels = mem_channel #config the channel server.channels.mem_channel.type = memory server.channels.mem_channel.capacity =10000 server.channels.mem_channel.transactionCapacity= 2000 server.channels.mem_channel.channelfullcount= 10 server.channels.mem_channel.keep-alive = 3 server.channels.mem_channel.byteCapacity = server.channels.mem_channel.byteCapacityBufferPercentage= 20 #config the sink server.sinks.Hive_Sink.type = hive server.sinks.Hive_Sink.channel = mem_channel server.sinks.Hive_Sink.hive.metastore = thrift://${任意metastore业务IP}:21088 server.sinks.Hive_Sink.hive.hiveSite = /opt/hivesink-conf/hive-site.xml server.sinks.Hive_Sink.hive.coreSite = /opt/hivesink-conf/core-site.xml server.sinks.Hive_Sink.hive.metastoreSite = /opt/hivesink-conf/hivemeatastore-site.xml server.sinks.Hive_Sink.hive.database = default server.sinks.Hive_Sink.hive.table = flume_multi_type_part server.sinks.Hive_Sink.hive.partition = Tag,%Y-%m,%d server.sinks.Hive_Sink.hive.txnsPerBatchAsk= 100 server.sinks.Hive_Sink.hive.autoCreatePartitions= true server.sinks.Hive_Sink.useLocalTimeStamp = true server.sinks.Hive_Sink.batchSize = 1000 server.sinks.Hive_Sink.hive.kerberosPrincipal= super1 server.sinks.Hive_Sink.hive.kerberosKeytab= /opt/mykeytab/user.keytab server.sinks.Hive_Sink.round = true server.sinks.Hive_Sink.roundValue = 10 server.sinks.Hive_Sink.roundUnit = minute server.sinks.Hive_Sink.serializer = DELIMITED server.sinks.Hive_Sink.serializer.delimiter= ";" server.sinks.Hive_Sink.serializer.serdeSeparator= ';' server.sinks.Hive_Sink.serializer.fieldnames= id,msg
  • 注意事项 Flume可靠性保障措施有哪些? Source&Channel、Channel&Sink之间的事务机制。 Sink Processor支持配置failover、load_blance机制,例如负载均衡示例如下。 server.sinkgroups=g1 server.sinkgroups.g1.sinks=k1 k2 server.sinkgroups.g1.processor.type=load_balance server.sinkgroups.g1.processor.backoff=true server.sinkgroups.g1.processor.selector=random Flume多agent聚合级联时的注意事项? 级联时需要使用Avro或者Thrift协议进行级联。 聚合端存在多个节点时,连接配置尽量配置均衡,不要聚合到单节点上。
  • 常用Sink配置 HDFS Sink HDFS Sink将数据写入Hadoop分布式文件系统(HDFS)。常用配置如下表所示: 表10 HDFS Sink常用配置 参数 默认值 描述 channel - 与之相连的channel。 type hdfs hdfs sink的类型,必须设置为hdfs。 hdfs.path - HDFS上数据存储路径,必须以“hdfs://hacluster/”开头。 monTime 0(不开启) 线程监控阈值,更新时间超过阈值后,重新启动该Sink,单位:秒。 hdfs.inUseSuffix .tmp 正在写入的hdfs文件后缀。 hdfs.rollInterval 30 按时间滚动文件,单位:秒,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。 hdfs.rollSize 1024 按大小滚动文件,单位:bytes,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。 hdfs.rollCount 10 按Event个数滚动文件,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。 说明: 参数“rollInterval”、“rollSize”和“rollCount”可同时配置,三个参数采取优先原则,哪个参数值先满足,优先按照哪个参数进行压缩。 hdfs.idleTimeout 0 自动关闭空闲文件超时时间,单位:秒。 hdfs.batchSize 1000 批次写入HDFS的Event个数。 hdfs.kerberosPrincipal - 认证HDFS的Kerberos principal,普通模式集群不配置,安全模式集群必须配置。 hdfs.kerberosKeytab - 认证HDFS的Kerberos keytab,普通模式集群不配置,安全模式集群中,用户必须对jaas.cof文件中的keyTab路径有访问权限。 hdfs.fileCloseByEndEvent true 收到源文件的最后一个Event时是否关闭hdfs文件。 hdfs.batchCallTimeout - 批次写入HDFS超时控制时间,单位:毫秒。 当不配置此参数时,对每个Event写入HDFS进行超时控制。当“hdfs.batchSize”大于0时,配置此参数可以提升写入HDFS性能。 说明: “hdfs.batchCallTimeout”设置多长时间需要考虑“hdfs.batchSize”的大小,“hdfs.batchSize”越大,“hdfs.batchCallTimeout”也要调整更长时间,设置过短时间容易导致写HDFS失败。 serializer.appendNewline true 将一个Event写入HDFS后是否追加换行符('\n'),如果追加该换行符,该换行符所占用的数据量指标不会被HDFS Sink统计。 hdfs.filePrefix over_%{basename} 数据写入hdfs后文件名的前缀。 hdfs.fileSuffix - 数据写入hdfs后文件名的后缀。 hdfs.inUsePrefix - 正在写入的hdfs文件前缀。 hdfs.fileType DataStream hdfs文件格式,包括“SequenceFile”、“DataStream”以及“CompressedStream”。 说明: “SequenceFile”和“DataStream”不压缩输出文件,不能设置参数“codeC”,“CompressedStream”压缩输出文件,必须设置“codeC”参数值配合使用。 hdfs.codeC - 文件压缩格式,包括gzip、bzip2、lzo、lzop、snappy。 hdfs.maxOpenFiles 5000 最大允许打开的hdfs文件数,当打开的文件数达到该值时,最早打开的文件将会被关闭。 hdfs.writeFormat Writable 文件写入格式,“Writable”或者“Text”。 hdfs.callTimeout 10000 写入HDFS超时控制时间,单位:毫秒。 hdfs.threadsPoolSize - 每个HDFS sink用于HDFS io操作的线程数。 hdfs.rollTimerPoolSize - 每个HDFS sink用于调度定时文件滚动的线程数。 hdfs.round false 时间戳是否四舍五入。如果设置为true,则会影响所有基于时间的转义序列(%t除外)。 hdfs.roundUnit second 时间戳四舍五入单位,可选为“second”、“minute”或“hour”,分别对应为秒、分钟和小时。 hdfs.useLocalTimeStamp true 是否启用本地时间戳,建议设置为“true”。 hdfs.closeTries 0 hdfs sink尝试关闭重命名文件的最大次数。默认为0表示sink会一直尝试重命名,直至重命名成功。 hdfs.retryInterval 180 尝试关闭hdfs文件的时间间隔,单位:秒。 说明: 每个关闭请求都会有多个RPC往返Namenode,因此设置的太低可能导致Namenode超负荷。如果设置0,如果第一次尝试失败的话,该Sink将不会尝试关闭文件,并且把文件打开,或者用“.tmp”作为扩展名。 hdfs.failcount 10 数据写入hdfs失败的次数。该参数作为sink写入hdfs失败次数的阈值,当超过该阈值后上报数据传输异常告警。 Avro Sink Avro Sink把events转化为Avro events并发送到配置的主机的监测端口。常用配置如下表所示: 表11 Avro Sink常用配置 参数 默认值 描述 channel - 与之相连的channel。 type - avro sink的类型,必须设置为avro。 hostname - 绑定的主机名/IP。 port - 监测端口,该端口需未被占用。 batch-size 1000 批次发送的Event个数。 client.type DEFAULT 客户端实例类型,根据所配置的模型实际使用到的通信协议设置。该值可选值包括: DEFAULT,返回AvroRPC类型的客户端实例。 OTHER,返回NULL。 THRIFT,返回Thrift RPC类型的客户端实例。 DEFAULT_LOADBALANCING, 返回LoadBalancing RPC 客户端实例。 DEFAULT_FAILOVER, 返回Failover RPC 客户端实例。 ssl false 是否使用SSL加密。设置为true时还必须指定“密钥(keystore)”和“密钥存储密码(keystore-password)”。 truststore-type JKS Java信任库类型,“JKS”或“PKCS12”。 说明: JKS的密钥库和私钥采用不同的密码进行保护,而PKCS12的密钥库和私钥采用相同密码进行保护。 truststore - Java信任库文件。 truststore-password - Java信任库密码。 keystore-type JKS ssl启用后密钥存储类型。 keystore - ssl启用后密钥存储文件路径,开启ssl后,该参数必填。 keystore-password - ssl启用后密钥存储密码,开启ssl后,该参数必填。 connect-timeout 20000 第一次连接的超时时间,单位:毫秒。 request-timeout 20000 第一次请求后一次请求的最大超时时间,单位:毫秒。 reset-connection-interval 0 一次断开连接后,等待多少时间后进行重新连接,单位:秒。默认为0表示不断尝试。 compression-type none 批数据压缩类型,“none”或“deflate”,“none”表示不压缩,“deflate”表示压缩。该值必须与AvroSource的compression-type匹配。 compression-level 6 批数据压缩级别(1-9),数值越高,压缩率越高。 exclude-protocols SSLv3 排除的协议列表,用空格分开。默认排除SSLv3协议。 HBase Sink HBase Sink将数据写入到HBase中。常用配置如下表所示: 表12 HBase Sink常用配置 参数 默认值 描述 channel - 与之相连的channel。 type - hbase sink的类型,必须设置为hbase。 table - HBase表名称。 columnFamily - HBase列族。 monTime 0(不开启) 线程监控阈值,更新时间超过阈值后,重新启动该Sink,单位:秒。 batchSize 1000 批次写入HBase的Event个数。 kerberosPrincipal - 认证HBase的Kerberos principal,普通模式集群不配置,安全模式集群必须配置。 kerberosKeytab - 认证HBase的Kerberos keytab,普通模式集群不配置,安全模式集群中,flume运行用户必须对jaas.cof文件中的keyTab路径有访问权限。 coalesceIncrements true 是否在同一个处理批次中,合并对同一个hbase cell多个操作。设置为true有利于提高性能。 Kafka Sink Kafka Sink将数据写入到Kafka中。常用配置如下表所示: 表13 Kafka Sink常用配置 参数 默认值 描述 channel - 与之相连的channel。 type - kafka sink的类型,必须设置为org.apache.flume.sink.kafka.KafkaSink。 kafka.bootstrap.servers - Kafka 的bootstrap 地址端口列表。如果集群安装有kafka并且配置已经同步,服务端可以不配置此项,默认值为Kafka集群中所有的broker列表,客户端必须配置该项,多个用逗号分隔。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。 monTime 0(不开启) 线程监控阈值,更新时间超过阈值后,重新启动该Sink,单位:秒。 kafka.producer.acks 1 必须收到多少个replicas的确认信息才认为写入成功。0表示不需要接收确认信息,1表示只等待leader的确认信息。-1表示等待所有的relicas的确认信息。设置为-1,在某些leader失败的场景中可以避免数据丢失。 kafka.topic - 数据写入的topic,必须填写。 allowTopicOverride false 是否将Event Header中保存的topic替换kafka.topic中配置的topic。 flumeBatchSize 1000 批次写入Kafka的Event个数。 kafka.security.protocol SASL_PLAINTEXT Kafka安全协议,普通模式集群下须配置为“PLAINTEXT”。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。 ignoreLongMessage false 是否丢弃超大消息的开关。 messageMaxLength 1000012 Flume写入Kafka的消息的最大长度。 defaultPartitionId - 用于指定channel中的events被传输到哪一个Kafka partition ID ,此值会被partitionIdHeader覆盖。默认情况下,如果此参数不设置,会由Kafka Producer's partitioner 进行events分发(可以通过指定key或者kafka.partitioner.class自定义的partitioner)。 partitionIdHeader - 设置时,对应的Sink 将从Event 的Header中获取使用此属性的值命名的字段的值,并将消息发送到主题的指定分区。 如果该值无对应的有效分区,则会发生EventDeliveryException。 如果Header 值已经存在,则此设置将覆盖参数defaultPartitionId。 Other Kafka Producer Properties - 其他Kafka配置,可以接受任意Kafka支持的生产配置,配置需要加前缀 .kafka。 Thrift Sink Thrift Sink把events转化为Thrift events并发送到配置的主机的监测端口。常用配置如下表所示: 表14 Thrift Sink常用配置 参数 默认值 描述 channel - 与之相连的channel。 type thrift thrift sink的类型,必须设置为thrift。 hostname - 绑定的主机名/IP。 port - 监测端口,该端口需未被占用。 batch-size 1000 批次发送的Event个数。 connect-timeout 20000 第一次连接的超时时间,单位:毫秒。 request-timeout 20000 第一次请求后一次请求的最大超时时间,单位:毫秒。 kerberos false 是否启用Kerberos认证。 client-keytab - 客户端使用的keytab文件地址,flume运行用户必须对认证文件具有访问权限。 client-principal - 客户端使用的安全用户的Principal。 server-principal - 服务端使用的安全用户的Principal。 compression-type none Flume发送数据的压缩类型,“none”或“deflate”,“none”表示不压缩,“deflate”表示压缩。 maxConnections 5 Flume发送数据时的最大连接池大小。 ssl false 是否使用SSL加密。 truststore-type JKS Java信任库类型。 truststore - Java信任库文件。 truststore-password - Java信任库密码。 reset-connection-interval 0 一次断开连接后,等待多少时间后进行重新连接,单位:秒。默认为0表示不断尝试。
  • 常用Channel配置 Memory Channel Memory Channel使用内存作为缓存区,Events存放在内存队列中。常用配置如下表所示: 表6 Memory Channel常用配置 参数 默认值 描述 type - memory channel的类型,必须设置为memory。 capacity 10000 缓存在channel中的最大Event数。 transactionCapacity 1000 每次存取的最大Event数。 说明: 此参数值需要大于source和sink的batchSize。 事务缓存容量必须小于或等于Channel缓存容量。 channelfullcount 10 channel full次数,达到该次数后发送告警。 keep-alive 3 当事务缓存或Channel缓存满时,Put、Take线程等待时间。单位:秒。 byteCapacity JVM最大内存的80% channel中最多能容纳所有event body的总字节数,默认是 JVM最大可用内存(-Xmx )的80%,单位:bytes。 byteCapacityBufferPercentage 20 channel中字节容量百分比(%)。 File Channel File Channel使用本地磁盘作为缓存区,Events存放在设置的dataDirs配置项文件夹中。常用配置如下表所示: 表7 File Channel常用配置 参数 默认值 描述 type - file channel的类型,必须设置为file。 checkpointDir ${BIGDATA_DATA_HOME}/hadoop/data1~N/flume/checkpoint 说明: 此路径随自定义数据路径变更。 检查点存放路径。 dataDirs ${BIGDATA_DATA_HOME}/hadoop/data1~N/flume/data 说明: 此路径随自定义数据路径变更。 数据缓存路径,设置多个路径可提升性能,中间用逗号分开。 maxFileSize 2146435071 单个缓存文件的最大值,单位:bytes。 minimumRequiredSpace 524288000 缓冲区空闲空间最小值,单位:bytes。 capacity 1000000 缓存在channel中的最大Event数。 transactionCapacity 10000 每次存取的最大Event数。 说明: 此参数值需要大于source和sink的batchSize。 事务缓存容量必须小于或等于Channel缓存容量。 channelfullcount 10 channel full次数,达到该次数后发送告警。 useDualCheckpoints false 是否备份检查点。设置为“true”时,必须设置backupCheckpointDir的参数值。 backupCheckpointDir - 备份检查点路径。 checkpointInterval 30000 检查点间隔时间,单位:秒。 keep-alive 3 当事务缓存或Channel缓存满时,Put、Take线程等待时间。单位:秒。 use-log-replay-v1 false 是否启用旧的回复逻辑。 use-fast-replay false 是否使用队列回复。 checkpointOnClose true channel关闭时是否创建检查点。 Memory File Channel Memory File Channel同时使用内存和本地磁盘作为缓存区,消息可持久化,性能优于File Channel,接近Memory Channel的性能。此Channel目前处于试验阶段,可靠性不够高,不建议在生产环境使用。常用配置如下表所示: 表8 Memory File Channel常用配置 参数 默认值 描述 type org.apache.flume.channel.MemoryFileChannel memory file channel的类型,必须设置为“org.apache.flume.channel.MemoryFileChannel”。 capacity 50000 Channel缓存容量:缓存在Channel中的最大Event数。 transactionCapacity 5000 事务缓存容量:一次事务能处理的最大Event数。 说明: 此参数值需要大于source和sink的batchSize。 事务缓存容量必须小于或等于Channel缓存容量。 subqueueByteCapacity 20971520 每个subqueue最多保存多少byte的Event,单位:byte。 Memory File Channel采用queue和subqueue两级缓存,event保存在subqueue,subqueue保存在queue。 subqueue能保存多少event,由“subqueueCapacity”和“subqueueInterval”两个参数决定,“subqueueCapacity”限制subqueue内的Event总容量,“subqueueInterval”限制subqueue保存Event的时长,只有subqueue达到“subqueueCapacity”或“subqueueInterval”上限时,subqueue内的Event才会发往目的地。 说明: “subqueueByteCapacity”必须大于一个batchsize内的Event总容量。 subqueueInterval 2000 每个subqueue最多保存一段多长时间的Event,单位:毫秒。 keep-alive 3 当事务缓存或Channel缓存满时,Put、Take线程等待时间。 单位:秒。 dataDir - 缓存本地文件存储目录。 byteCapacity JVM最大内存的80% Channel缓存容量。 单位:bytes。 compression-type None 消息压缩格式:“none”或“deflate”。“none”表示不压缩,“deflate”表示压缩。 channelfullcount 10 channel full次数,达到该次数后发送告警。 Memory File Channel配置样例: server.channels.c1.type = org.apache.flume.channel.MemoryFileChannel server.channels.c1.dataDir = /opt/flume/mfdata server.channels.c1.subqueueByteCapacity = 20971520 server.channels.c1.subqueueInterval=2000 server.channels.c1.capacity = 500000 server.channels.c1.transactionCapacity = 40000 Kafka Channel Kafka Channel使用Kafka集群缓存数据,Kafka提供高可用、多副本,以防Flume或Kafka Broker崩溃,Channel中的数据会立即被Sink消费。 表9 Kafka channel 常用配置 Parameter Default Value Description type - kafka channel的类型,必须设置为 “org.apache.flume.channel.kafka.KafkaChannel”。 kafka.bootstrap.servers - Kafka的bootstrap地址端口列表。 如果集群已安装Kafka并且配置已经同步,则服务端可以不配置此项,默认值为Kafka集群中所有的broker列表。客户端必须配置该项,多个值用逗号分隔。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。 kafka.topic flume-channel channel用来缓存数据的topic。 kafka.consumer.group.id flume 从kafka中获取数据的组标识,此参数不能为空。 parseAsFlumeEvent true 是否解析为Flume event。 migrateZookeeperOffsets true 当Kafka没有存储offset时,是否从ZooKeeper中查找,并提交到Kafka。 kafka.consumer.auto.offset.reset latest 当没有offset记录时从什么位置消费,可选为“earliest”、“latest”或“none”。“earliest”表示将offset重置为初始点,“latest”表示将offset置为最新位置点,“none”表示如果没有offset则发生异常。 kafka.producer.security.protocol SASL_PLAINTEXT Kafka生产安全协议。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。 说明: 如果该参数没有显示,请单击弹窗左下角的"+"显示全部参数。 kafka.consumer.security.protocol SASL_PLAINTEXT 同上,但用于消费。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。 pollTimeout 500 consumer调用poll()函数能接受的最大超时时间,单位:毫秒。 ignoreLongMessage false 是否丢弃超大消息。 messageMaxLength 1000012 Flume写入Kafka的消息的最大长度。
  • 常用Channel配置 Memory Channel Memory Channel使用内存作为缓存区,Events存放在内存队列中。常用配置如表6所示: 图6 Memory Channel 表6 Memory Channel常用配置 参数 默认值 描述 type - 类型,需配置为“memory”。仅可在“properties.properties”文件中配置。 capacity 10000 缓存在Channel中的最大Event数。 transactionCapacity 1000 每次存取的最大Event数。 channelfullcount 10 Channel full次数,达到该次数后发送告警。 File Channel File Channel使用本地磁盘作为缓存区,Events存放在设置的“dataDirs”配置项文件夹中。常用配置如表7所示: 图7 File Channel 表7 File Channel常用配置 参数 默认值 描述 type - 类型,需配置为“file”。仅可在“properties.properties”文件中配置。 checkpointDir ${BIGDATA_DATA_HOME}/flume/checkpoint 检查点存放路径。 dataDirs ${BIGDATA_DATA_HOME}/flume/data 数据缓存路径,设置多个路径可提升性能,中间用逗号分开。 maxFileSize 2146435071 单个缓存文件的最大值,单位:字节。 minimumRequiredSpace 524288000 缓冲区空闲空间最小值,单位:字节。 capacity 1000000 缓存在Channel中的最大Event数。 transactionCapacity 10000 每次存取的最大Event数。 channelfullcount 10 Channel full次数,达到该次数后发送告警。 Kafka Channel Kafka Channel使用kafka集群缓存数据,Kafka提供高可用、多副本,以防Flume或Kafka Broker崩溃,Channel中的数据会立即被Sink消费。常用配置如表 10 Kafka Channel 常用配置所示: 图8 Kafka Channel 表8 Kafka Channel常用配置 参数 默认值 描述 type - 类型,需配置为 “org.apache.flume.channel.kafka.KafkaChannel”.。 仅可在“properties.properties”文件中配置。 kafka.bootstrap.servers - kafka broker列表。 kafka.topic flume-channel Channel用来缓存数据的topic。 kafka.consumer.group.id flume Kafka消费者组ID。 parseAsFlumeEvent true 是否解析为Flume event。 migrateZookeeperOffsets true 当Kafka没有存储offset时,是否从ZooKeeper中查找,并提交到Kafka。 kafka.consumer.auto.offset.reset latest 当没有offset记录时,从指定的位置消费数据。 kafka.producer.security.protocol SASL_PLAINTEXT Kafka生产者安全协议。 kafka.consumer.security.protocol SASL_PLAINTEXT Kafka消费者安全协议。
  • 常用Sink配置 HDFS Sink HDFS Sink将数据写入HDFS。常用配置如表9所示: 图9 HDFS Sink 表9 HDFS Sink常用配置 参数 默认值 描述 channel - 与之相连的Channel。仅可在“properties.properties”文件中配置。 type hdfs 类型,需配置为“hdfs”。仅可在“properties.properties”文件中配置。 monTime 0(不开启) 线程监控阈值,更新时间大于阈值时重新启动该Sink,单位:秒。 hdfs.path - HDFS路径。 hdfs.inUseSuffix .tmp 正在写入的HDFS文件后缀。 hdfs.rollInterval 30 按时间滚动文件,单位:秒,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。 hdfs.rollSize 1024 按大小滚动文件,单位:字节,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。 hdfs.rollCount 10 按Event个数滚动文件,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。 hdfs.idleTimeout 0 自动关闭空闲文件超时时间,单位:秒。 hdfs.batchSize 1000 每次写入HDFS的Event个数。 hdfs.kerberosPrincipal - 认证HDFS的Kerberos用户名,未启用Kerberos认证集群不配置。 hdfs.kerberosKeytab - 认证HDFS的Kerberos keytab路径,未启用Kerberos认证集群不配置 hdfs.fileCloseByEndEvent true 收到最后一个Event时是否关闭文件。 hdfs.batchCallTimeout - 每次写入HDFS超时控制时间,单位:毫秒。 当不配置此参数时,对每个Event写入HDFS进行超时控制。当“hdfs.batchSize”大于0时,配置此参数可以提升写入HDFS性能。 说明: “hdfs.batchCallTimeout”设置多长时间需要考虑“hdfs.batchSize”的大小,“hdfs.batchSize”越大,“hdfs.batchCallTimeout”也要调整更长时间,设置过短时间容易导致数据写入HDFS失败。 serializer.appendNewline true 将一个Event写入HDFS后是否追加换行符('\n'),如果追加该换行符,该换行符所占用的数据量指标不会被HDFS Sink统计。 Avro Sink Avro Sink把events转化为Avro events并发送到配置的主机的监测端口。常用配置如表10所示: 图10 Avro Sink 表10 Avro Sink常用配置 参数 默认值 描述 channel - 与之相连的Channel。仅可在“properties.properties”文件中配置。 type - 类型,需配置为“avro”。仅可在“properties.properties”文件中配置。 hostname - 绑定关联的主机名或IP地址。 port - 监测端口。 batch-size 1000 批次发送的Event个数。 ssl false 是否使用SSL加密。 truststore-type JKS Java信任库类型。 truststore - Java信任库文件。 truststore-password - Java信任库密码。 keystore-type JKS 密钥存储类型。 keystore - 密钥存储文件。 keystore-password - 密钥存储密码 HBase Sink HBase Sink将数据写入到HBase中。常用配置如表11所示: 图11 HBase Sink 表11 HBase Sink常用配置 参数 默认值 描述 channel - 与之相连的Channel。仅可在“properties.properties”文件中配置。 type - 类型,需配置为“hbase”。仅可在“properties.properties”文件中配置。 table - HBase表名称。 monTime 0(不开启) 线程监控阈值,更新时间大于阈值时重新启动该Sink,单位:秒。 columnFamily - HBase列族名称。 batchSize 1000 每次写入HBase的Event个数。 kerberosPrincipal - 认证HBase的Kerberos用户名,未启用Kerberos认证集群不配置。 kerberosKeytab - 认证HBase的Kerberos keytab路径,未启用Kerberos认证集群不配置。 Kafka Sink Kafka Sink将数据写入到Kafka中。常用配置如表12所示: 图12 Kafka Sink 表12 Kafka Sink常用配置 参数 默认值 描述 channel - 与之相连的Channel。仅可在“properties.properties”文件中配置。 type - 类型,需配置为“org.apache.flume.sink.kafka.KafkaSink”。 仅可在“properties.properties”文件中配置。 kafka.bootstrap.servers - Kafkabrokers列表,多个用英文逗号分隔。 monTime 0(不开启) 线程监控阈值,更新时间大于阈值时重新启动该Sink,单位:秒。 kafka.topic default-flume-topic 数据写入的topic。 flumeBatchSize 1000 每次写入Kafka的Event个数。 kafka.security.protocol SASL_PLAINTEXT Kafka安全协议,未启用Kerberos认证集群下须配置为“PLAINTEXT”。 kafka.kerberos.domain.name - Kafka Domain名称。安全集群必填。仅可在“properties.properties”文件中配置。 Other Kafka Producer Properties - 其他Kafka配置,可以接受任意Kafka支持的生产参数配置,配置需要加前缀“.kafka”。 仅可在“properties.properties”文件中配置。
  • 解决办法 适当调大堆内存(xmx)的值。 与正常启动Flume的节点进行文件和文件夹权限对比,更改错误文件或文件夹权限。 重新配置JAVA_HOME。 客户端替换“${install_home}/fusioninsight-flume-flume组件版本号/conf/ENV_VARS文件中JAVA_HOME”的值,服务端替换“etc”目录下“ENV_VARS”文件中“JAVA_HOME”的值。 其中“JAVA_HOME”的值可通过登录正常启动Flume的节点,执行echo ${JAVA_HOME}获取。 ${install_home}为Flume客户端的安装路径。
  • 原因分析 Flume堆内存设置的值大于机器剩余内存,查看Flume启动日志: [CST 2019-02-26 13:31:43][INFO] [[checkMemoryValidity:124]] [GC_OPTS is invalid: Xmx(40960000MB) is bigger than the free memory(56118MB) in system.] [9928] Flume文件或文件夹权限异常,界面或后台会提示如下信息: [2019-02-26 13:38:02]RoleInstance prepare to start failure [{ScriptExecutionResult=ScriptExecutionResult [exitCode=126, output=, errMsg=sh: line 1: /opt/Bigdata/MRS_XXX/install/FusionInsight-Flume-1.9.0/flume/bin/flume-manage.sh: Permission denied JAVA_HOME配置错误,查看Flume agent启动日志: Info: Sourcing environment configuration script /opt/FlumeClient/fusioninsight-flume-1.9.0/conf/flume-env.sh + '[' -n '' ']' + exec /tmp/MRS-Client/MRS_Flume_ClientConfig/JDK/jdk-8u18/bin/java '-XX:OnOutOfMemoryError=bash /opt/FlumeClient/fusioninsight-flume-1.9.0/bin/out_memory_error.sh /opt/FlumeClient/fusioninsight-flume-1.9.0/conf %p' -Xms2G -Xmx4G -XX:CMSFullGCsBeforeCompaction=1 -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -Dkerberos.domain.name=hadoop.hadoop.com -verbose:gc -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1M -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:/var/log/Bigdata//flume-client-1/flume/flume-root-20190226134231-%p-gc.log -Dproc_org.apache.flume.node.Application -Dproc_name=client -Dproc_conf_file=/opt/FlumeClient/fusioninsight-flume-1.9.0/conf/properties.properties -Djava.security.krb5.conf=/opt/FlumeClient/fusioninsight-flume-1.9.0/conf//krb5.conf -Djava.security.auth.login.config=/opt/FlumeClient/fusioninsight-flume-1.9.0/conf//jaas.conf -Dzookeeper.server.principal=zookeeper/hadoop.hadoop.com -Dzookeeper.request.timeout=120000 -Dflume.instance.id=884174180 -Dflume.agent.name=clientName1 -Dflume.role=client -Dlog4j.configuration.watch=true -Dlog4j.configuration=log4j.properties -Dflume_log_dir=/var/log/Bigdata//flume-client-1/flume/ -Dflume.service.id=flume-client-1 -Dbeetle.application.home.path=/opt/FlumeClient/fusioninsight-flume-1.9.0/conf/service -Dflume.called.from.service -Dflume.conf.dir=/opt/FlumeClient/fusioninsight-flume-1.9.0/conf -Dflume.metric.conf.dir=/opt/FlumeClient/fusioninsight-flume-1.9.0/conf -Dflume.script.home=/opt/FlumeClient/fusioninsight-flume-1.9.0/bin -cp '/opt/FlumeClient/fusioninsight-flume-1.9.0/conf:/opt/FlumeClient/fusioninsight-flume-1.9.0/lib/*:/opt/FlumeClient/fusioninsight-flume-1.9.0/conf/service/' -Djava.library.path=/opt/FlumeClient/fusioninsight-flume-1.9.0/plugins.d/native/native org.apache.flume.node.Application --conf-file /opt/FlumeClient/fusioninsight-flume-1.9.0/conf/properties.properties --name client /opt/FlumeClient/fusioninsight-flume-1.9.0/bin/flume-ng: line 233: /tmp/FusionInsight-Client/Flume/FusionInsight_Flume_ClientConfig/JDK/jdk-8u18/bin/java: No such file or directory
  • 原因分析 Flume文件或文件夹权限异常,重启后Manager界面提示如下信息: [2019-02-26 13:38:02]RoleInstance prepare to start failure [{ScriptExecutionResult=ScriptExecutionResult [exitCode=126, output=, errMsg=sh: line 1: /opt/Bigdata/MRS_XXX/install/FusionInsight-Flume-1.9.0/flume/bin/flume-manage.sh: Permission denied