云服务器内容精选

  • 语法格式 CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "dis", region = "", channel = "", partition_count = "", encode = "", field_delimiter = "", offset= "");
  • 功能描述 创建source流从数据接入服务(DIS)获取数据。用户数据从DIS接入,Flink作业从DIS的通道读取数据,作为作业的输入数据。Flink作业可通过DIS的source源将数据从生产者快速移出,进行持续处理,适用于将云服务外数据导入云服务后进行过滤、实时分析、监控报告和转储等场景。 数据接入服务(Data Ingestion Service,简称DIS)为处理或分析流数据的自定义应用程序构建数据流管道,主要解决云服务外的数据实时传输到云服务内的问题。数据接入服务每小时可从数十万种数据源(如IoT数据采集、日志和定位追踪事件、网站点击流、社交媒体源等)中连续捕获、传送和存储数TB数据。DIS的更多信息,请参见《数据接入服务用户指南》。
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 数据源类型,“dis”表示数据源为数据接入服务。 region 是 数据所在的DIS区域。 ak 否 访问密钥ID(Access Key ID)。访问密钥获取方式请参见我的凭证。 sk 否 Secret Access Key,与访问密钥ID结合使用的密钥。访问密钥获取方式请参见我的凭证。 channel 是 数据所在的DIS通道名称。 partition_count 否 数据所在的DIS通道分区数。该参数和partition_range参数不能同时配置。当该参数没有配置的时候默认读取所有partition。 partition_range 否 指定作业从DIS通道读取的分区范围。该参数和partition_count参数不能同时配置。当该参数没有配置的时候默认读取所有partition。 partition_range = "[0:2]"时,表示读取的分区范围是1-3,包括分区1、分区2和分区3。 encode 是 数据编码格式,可选为“csv”、“json”、“xml”、“email”、“blob”和“user_defined”。 若编码格式为“csv”,则需配置“field_delimiter”属性。 若编码格式为“json”,则需配置“json_config”属性。 若编码格式为“xml”,则需配置“xml_config”属性。 若编码格式为“email”,则需配置“email_key”属性。 若编码格式为“blob”,表示不对接收的数据进行解析,流属性仅能有一个且数据格式为ARRAY[TINYINT]。 若编码格式为“user_defined”,则需配置“encode_class_name”和“encode_class_parameter”属性。 field_delimiter 否 属性分隔符,仅当编码格式为csv时该参数需要填写,例如配置为“,”。 quote 否 可以指定数据格式中的引用符号,在两个引用符号之间的属性分隔符会被当做普通字符处理。 当引用符号为双引号时,请设置quote = "\u005c\u0022"进行转义。 当引用符号为单引号时,则设置quote = "'"。 说明: 目前仅适用于CSV格式。 设置引用符号后,必须保证每个字段中包含0个或者偶数个引用符号,否则会解析失败。 json_config 否 当编码格式为json时,用户需要通过该参数来指定json字段和流定义字段的映射关系,格式为“field1=data_json.field1; field2=data_json.field2; field3=$”,其中field3=$表示field3的内容为整个json串。 xml_config 否 当编码格式为xml时,用户需要通过该参数来指定xml字段和流定义字段的映射关系,格式为“field1=data_xml.field1; field2=data_xml.field2”。 email_key 否 当编码格式为email时,用户需要通过该参数来指定需要提取的信息,需要列出信息的key值,需要与流定义字段一一对应,多个key值时以逗号分隔,例如“Message-ID, Date, Subject, body”,其中由于邮件正文没有关键字,DLI规定其关键字为“body”。 encode_class_name 否 当encode为user_defined时,需配置该参数,指定用户自实现解码类的类名(包含完整包路径),该类需继承类DeserializationSchema。 encode_class_parameter 否 当encode为user_defined时,可以通过配置该参数指定用户自实现解码类的入参,仅支持一个string类型的参数。 offset 否 当启动作业后再获取数据,则该参数无效。 当获取数据后再启动作业,用户可以根据需求设置该参数的数值。 例如当offset= "100"时,则表示DLI从DIS服务中的第100条数据开始处理。 start_time 否 DIS数据读取起始时间。 当该参数配置时则从配置的时间开始读取数据,有效格式为yyyy-MM-dd HH:mm:ss。 当没有配置start_time也没配置offset的时候,读取最新数据。 当没有配置start_time但配置了offset的时候,则从offset开始读取数据。 enable_checkpoint 否 是否启用checkpoint功能,可配置为true(启用)或者false(停用), 默认为false。 checkpoint_app_name 否 DIS服务的消费者标识,当不同作业消费相同通道时,需要区分不同的消费者标识,以免checkpoint混淆。 checkpoint_interval 否 DIS源算子做checkpoint的时间间隔,单位秒,默认为60。
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 数据源类型,“dis”表示数据源为数据接入服务。 region 是 数据所在的DIS区域。 ak 否 访问密钥ID(Access Key ID)。访问密钥获取方式请参见我的凭证。 sk 否 Secret Access Key,与访问密钥ID结合使用的密钥。访问密钥获取方式请参见我的凭证。 channel 是 数据所在的DIS通道名称。 partition_count 否 数据所在的DIS通道分区数。该参数和partition_range参数不能同时配置。当该参数没有配置的时候默认读取所有partition。 partition_range 否 指定作业从DIS通道读取的分区范围。该参数和partition_count参数不能同时配置。当该参数没有配置的时候默认读取所有partition。 partition_range = "[0:2]"时,表示读取的分区范围是1-3,包括分区1、分区2和分区3。 encode 是 数据编码格式,可选为“csv”、“json”、“xml”、“email”、“blob”和“user_defined”。 若编码格式为“csv”,则需配置“field_delimiter”属性。 若编码格式为“json”,则需配置“json_config”属性。 若编码格式为“xml”,则需配置“xml_config”属性。 若编码格式为“email”,则需配置“email_key”属性。 若编码格式为“blob”,表示不对接收的数据进行解析,流属性仅能有一个且数据格式为ARRAY[TINYINT]。 若编码格式为“user_defined”,则需配置“encode_class_name”和“encode_class_parameter”属性。 field_delimiter 否 属性分隔符,仅当编码格式为csv时该参数需要填写,例如配置为“,”。 quote 否 可以指定数据格式中的引用符号,在两个引用符号之间的属性分隔符会被当做普通字符处理。 当引用符号为双引号时,请设置quote = "\u005c\u0022"进行转义。 当引用符号为单引号时,则设置quote = "'"。 说明: 目前仅适用于CSV格式。 设置引用符号后,必须保证每个字段中包含0个或者偶数个引用符号,否则会解析失败。 json_config 否 当编码格式为json时,用户需要通过该参数来指定json字段和流定义字段的映射关系,格式为“field1=data_json.field1; field2=data_json.field2; field3=$”,其中field3=$表示field3的内容为整个json串。 xml_config 否 当编码格式为xml时,用户需要通过该参数来指定xml字段和流定义字段的映射关系,格式为“field1=data_xml.field1; field2=data_xml.field2”。 email_key 否 当编码格式为email时,用户需要通过该参数来指定需要提取的信息,需要列出信息的key值,需要与流定义字段一一对应,多个key值时以逗号分隔,例如“Message-ID, Date, Subject, body”,其中由于邮件正文没有关键字,DLI规定其关键字为“body”。 encode_class_name 否 当encode为user_defined时,需配置该参数,指定用户自实现解码类的类名(包含完整包路径),该类需继承类DeserializationSchema。 encode_class_parameter 否 当encode为user_defined时,可以通过配置该参数指定用户自实现解码类的入参,仅支持一个string类型的参数。 offset 否 当启动作业后再获取数据,则该参数无效。 当获取数据后再启动作业,用户可以根据需求设置该参数的数值。 例如当offset= "100"时,则表示DLI从DIS服务中的第100条数据开始处理。 start_time 否 DIS数据读取起始时间。 当该参数配置时则从配置的时间开始读取数据,有效格式为yyyy-MM-dd HH:mm:ss。 当没有配置start_time也没配置offset的时候,读取最新数据。 当没有配置start_time但配置了offset的时候,则从offset开始读取数据。 enable_checkpoint 否 是否启用checkpoint功能,可配置为true(启用)或者false(停用), 默认为false。 checkpoint_app_name 否 DIS服务的消费者标识,当不同作业消费相同通道时,需要区分不同的消费者标识,以免checkpoint混淆。 checkpoint_interval 否 DIS源算子做checkpoint的时间间隔,单位秒,默认为60。
  • 功能描述 创建source流从数据接入服务(DIS)获取数据。用户数据从DIS接入,Flink作业从DIS的通道读取数据,作为作业的输入数据。Flink作业可通过DIS的source源将数据从生产者快速移出,进行持续处理,适用于将云服务外数据导入云服务后进行过滤、实时分析、监控报告和转储等场景。 数据接入服务(Data Ingestion Service,简称DIS)为处理或分析流数据的自定义应用程序构建数据流管道,主要解决云服务外的数据实时传输到云服务内的问题。数据接入服务每小时可从数十万种数据源(如IoT数据采集、日志和定位追踪事件、网站点击流、社交媒体源等)中连续捕获、传送和存储数TB数据。DIS的更多信息,请参见《数据接入服务用户指南》。
  • 语法格式 CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "dis", region = "", channel = "", partition_count = "", encode = "", field_delimiter = "", offset= "");