MAPREDUCE服务 MRS-Flume业务配置指南:常用Sink配置

时间:2023-12-14 16:35:21

常用Sink配置

  • HDFS Sink

    HDFS Sink将数据写入Hadoop分布式文件系统(HDFS)。常用配置如下表所示:

    表10 HDFS Sink常用配置

    参数

    默认值

    描述

    channel

    -

    与之相连的channel。

    type

    hdfs

    hdfs sink的类型,必须设置为hdfs。

    hdfs.path

    -

    HDFS上数据存储路径,必须以“hdfs://hacluster/”开头。

    monTime

    0(不开启)

    线程监控阈值,更新时间超过阈值后,重新启动该Sink,单位:秒。

    hdfs.inUseSuffix

    .tmp

    正在写入的hdfs文件后缀。

    hdfs.rollInterval

    30

    按时间滚动文件,单位:秒,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。

    hdfs.rollSize

    1024

    按大小滚动文件,单位:bytes,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。

    hdfs.rollCount

    10

    按Event个数滚动文件,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。

    说明:

    参数“rollInterval”、“rollSize”和“rollCount”可同时配置,三个参数采取优先原则,哪个参数值先满足,优先按照哪个参数进行压缩。

    hdfs.idleTimeout

    0

    自动关闭空闲文件超时时间,单位:秒。

    hdfs.batchSize

    1000

    批次写入HDFS的Event个数。

    hdfs.kerberosPrincipal

    -

    认证HDFS的Kerberos principal,普通模式集群不配置,安全模式集群必须配置。

    hdfs.kerberosKeytab

    -

    认证HDFS的Kerberos keytab,普通模式集群不配置,安全模式集群中,用户必须对jaas.cof文件中的keyTab路径有访问权限。

    hdfs.fileCloseByEndEvent

    true

    收到源文件的最后一个Event时是否关闭hdfs文件。

    hdfs.batchCallTimeout

    -

    批次写入HDFS超时控制时间,单位:毫秒。

    当不配置此参数时,对每个Event写入HDFS进行超时控制。当“hdfs.batchSize”大于0时,配置此参数可以提升写入HDFS性能。

    说明:

    “hdfs.batchCallTimeout”设置多长时间需要考虑“hdfs.batchSize”的大小,“hdfs.batchSize”越大,“hdfs.batchCallTimeout”也要调整更长时间,设置过短时间容易导致写HDFS失败。

    serializer.appendNewline

    true

    将一个Event写入HDFS后是否追加换行符('\n'),如果追加该换行符,该换行符所占用的数据量指标不会被HDFS Sink统计。

    hdfs.filePrefix

    over_%{basename}

    数据写入hdfs后文件名的前缀。

    hdfs.fileSuffix

    -

    数据写入hdfs后文件名的后缀。

    hdfs.inUsePrefix

    -

    正在写入的hdfs文件前缀。

    hdfs.fileType

    DataStream

    hdfs文件格式,包括“SequenceFile”、“DataStream”以及“CompressedStream”。

    说明:

    “SequenceFile”和“DataStream”不压缩输出文件,不能设置参数“codeC”,“CompressedStream”压缩输出文件,必须设置“codeC”参数值配合使用。

    hdfs.codeC

    -

    文件压缩格式,包括gzip、bzip2、lzo、lzop、snappy。

    hdfs.maxOpenFiles

    5000

    最大允许打开的hdfs文件数,当打开的文件数达到该值时,最早打开的文件将会被关闭。

    hdfs.writeFormat

    Writable

    文件写入格式,“Writable”或者“Text”。

    hdfs.callTimeout

    10000

    写入HDFS超时控制时间,单位:毫秒。

    hdfs.threadsPoolSize

    -

    每个HDFS sink用于HDFS io操作的线程数。

    hdfs.rollTimerPoolSize

    -

    每个HDFS sink用于调度定时文件滚动的线程数。

    hdfs.round

    false

    时间戳是否四舍五入。若设置为true,则会影响所有基于时间的转义序列(%t除外)。

    hdfs.roundUnit

    second

    时间戳四舍五入单位,可选为“second”、“minute”或“hour”,分别对应为秒、分钟和小时。

    hdfs.useLocalTimeStamp

    true

    是否启用本地时间戳,建议设置为“true”。

    hdfs.closeTries

    0

    hdfs sink尝试关闭重命名文件的最大次数。默认为0表示sink会一直尝试重命名,直至重命名成功。

    hdfs.retryInterval

    180

    尝试关闭hdfs文件的时间间隔,单位:秒。

    说明:

    每个关闭请求都会有多个RPC往返Namenode,因此设置的太低可能导致Namenode超负荷。如果设置0,如果第一次尝试失败的话,该Sink将不会尝试关闭文件,并且把文件打开,或者用“.tmp”作为扩展名。

    hdfs.failcount

    10

    数据写入hdfs失败的次数。该参数作为sink写入hdfs失败次数的阈值,当超过该阈值后上报数据传输异常告警。

  • Avro Sink

    Avro Sink把events转化为Avro events并发送到配置的主机的监测端口。常用配置如下表所示:

    表11 Avro Sink常用配置

    参数

    默认值

    描述

    channel

    -

    与之相连的channel。

    type

    -

    avro sink的类型,必须设置为avro。

    hostname

    -

    绑定的主机名/IP。

    port

    -

    监测端口,该端口需未被占用。

    batch-size

    1000

    批次发送的Event个数。

    client.type

    DEFAULT

    客户端实例类型,根据所配置的模型实际使用到的通信协议设置。该值可选值包括:

    • DEFAULT,返回AvroRPC类型的客户端实例。
    • OTHER,返回NULL。
    • THRIFT,返回Thrift RPC类型的客户端实例。
    • DEFAULT_LOADBALANCING, 返回LoadBalancing RPC 客户端实例。
    • DEFAULT_FAILOVER, 返回Failover RPC 客户端实例。

    ssl

    false

    是否使用SSL加密。设置为true时还必须指定“密钥(keystore)”和“密钥存储密码(keystore-password)”。

    truststore-type

    JKS

    Java信任库类型,“JKS”或“PKCS12”。

    说明:

    JKS的密钥库和私钥采用不同的密码进行保护,而PKCS12的密钥库和私钥采用相同密码进行保护。

    truststore

    -

    Java信任库文件。

    truststore-password

    -

    Java信任库密码。

    keystore-type

    JKS

    ssl启用后密钥存储类型。

    keystore

    -

    ssl启用后密钥存储文件路径,开启ssl后,该参数必填。

    keystore-password

    -

    ssl启用后密钥存储密码,开启ssl后,该参数必填。

    connect-timeout

    20000

    第一次连接的超时时间,单位:毫秒。

    request-timeout

    20000

    第一次请求后一次请求的最大超时时间,单位:毫秒。

    reset-connection-interval

    0

    一次断开连接后,等待多少时间后进行重新连接,单位:秒。默认为0表示不断尝试。

    compression-type

    none

    批数据压缩类型,“none”或“deflate”,“none”表示不压缩,“deflate”表示压缩。该值必须与AvroSource的compression-type匹配。

    compression-level

    6

    批数据压缩级别(1-9),数值越高,压缩率越高。

    exclude-protocols

    SSLv3

    排除的协议列表,用空格分开。默认排除SSLv3协议。

  • HBase Sink

    HBase Sink将数据写入到HBase中。常用配置如下表所示:

    表12 HBase Sink常用配置

    参数

    默认值

    描述

    channel

    -

    与之相连的channel。

    type

    -

    hbase sink的类型,必须设置为hbase。

    table

    -

    HBase表名称。

    columnFamily

    -

    HBase列族。

    monTime

    0(不开启)

    线程监控阈值,更新时间超过阈值后,重新启动该Sink,单位:秒。

    batchSize

    1000

    批次写入HBase的Event个数。

    kerberosPrincipal

    -

    认证HBase的Kerberos principal,普通模式集群不配置,安全模式集群必须配置。

    kerberosKeytab

    -

    认证HBase的Kerberos keytab,普通模式集群不配置,安全模式集群中,flume运行用户必须对jaas.cof文件中的keyTab路径有访问权限。

    coalesceIncrements

    true

    是否在同一个处理批次中,合并对同一个hbase cell多个操作。设置为true有利于提高性能。

  • Kafka Sink

    Kafka Sink将数据写入到Kafka中。常用配置如下表所示:

    表13 Kafka Sink常用配置

    参数

    默认值

    描述

    channel

    -

    与之相连的channel。

    type

    -

    kafka sink的类型,必须设置为org.apache.flume.sink.kafka.KafkaSink。

    kafka.bootstrap.servers

    -

    Kafka 的bootstrap 地址端口列表。如果集群安装有kafka并且配置已经同步,服务端可以不配置此项,默认值为Kafka集群中所有的broker列表,客户端必须配置该项,多个用逗号分隔。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。

    monTime

    0(不开启)

    线程监控阈值,更新时间超过阈值后,重新启动该Sink,单位:秒。

    kafka.producer.acks

    1

    必须收到多少个replicas的确认信息才认为写入成功。0表示不需要接收确认信息,1表示只等待leader的确认信息。-1表示等待所有的relicas的确认信息。设置为-1,在某些leader失败的场景中可以避免数据丢失。

    kafka.topic

    -

    数据写入的topic,必须填写。

    flumeBatchSize

    1000

    批次写入Kafka的Event个数。

    kafka.security.protocol

    SASL_PLAINTEXT

    Kafka安全协议,普通模式集群下须配置为“PLAINTEXT”。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。

    ignoreLongMessage

    false

    是否丢弃超大消息的开关。

    messageMaxLength

    1000012

    Flume写入Kafka的消息的最大长度。

    defaultPartitionId

    -

    用于指定channel中的events被传输到哪一个Kafka partition ID ,此值会被partitionIdHeader覆盖。默认情况下,如果此参数不设置,会由Kafka Producer's partitioner 进行events分发(可以通过指定key或者kafka.partitioner.class自定义的partitioner)。

    partitionIdHeader

    -

    设置时,对应的Sink 将从Event 的Header中获取使用此属性的值命名的字段的值,并将消息发送到主题的指定分区。 如果该值无对应的有效分区,则会抛出EventDeliveryException。 如果Header 值已经存在,则此设置将覆盖参数defaultPartitionId。

    Other Kafka Producer Properties

    -

    其他Kafka配置,可以接受任意Kafka支持的生产配置,配置需要加前缀 .kafka。

  • Thrift Sink

    Thrift Sink把events转化为Thrift events并发送到配置的主机的监测端口。常用配置如下表所示:

    表14 Thrift Sink常用配置

    参数

    默认值

    描述

    channel

    -

    与之相连的channel。

    type

    thrift

    thrift sink的类型,必须设置为thrift。

    hostname

    -

    绑定的主机名/IP。

    port

    -

    监测端口,该端口需未被占用。

    batch-size

    1000

    批次发送的Event个数。

    connect-timeout

    20000

    第一次连接的超时时间,单位:毫秒。

    request-timeout

    20000

    第一次请求后一次请求的最大超时时间,单位:毫秒。

    kerberos

    false

    是否启用Kerberos认证。

    client-keytab

    -

    客户端使用的keytab文件地址,flume运行用户必须对认证文件具有访问权限。

    client-principal

    -

    客户端使用的安全用户的Principal。

    server-principal

    -

    服务端使用的安全用户的Principal。

    compression-type

    none

    Flume发送数据的压缩类型,“none”或“deflate”,“none”表示不压缩,“deflate”表示压缩。

    maxConnections

    5

    Flume发送数据时的最大连接池大小。

    ssl

    false

    是否使用SSL加密。

    truststore-type

    JKS

    Java信任库类型。

    truststore

    -

    Java信任库文件。

    truststore-password

    -

    Java信任库密码。

    reset-connection-interval

    0

    一次断开连接后,等待多少时间后进行重新连接,单位:秒。默认为0表示不断尝试。

support.huaweicloud.com/cmpntguide-mrs/mrs_01_1057.html