华为云用户手册

  • 数据类型映射 下表详细说明了这种格式支持的 SQL 类型,包括用于编码和解码的序列化类和反序列化类的详细信息。 表2 数据类型映射 Flink SQL 类型 值 CHAR / VARCHAR / STRING UTF-8(默认)编码的文本字符串。编码字符集可以通过 'raw.charset' 进行配置。 BINARY / VARBINARY / BYTES 字节序列本身。 BOOLEAN 表示布尔值的单个字节,0表示 false, 1 表示 true。 TINYINT 有符号数字值的单个字节。 SMALLINT 采用big-endian(默认)编码的两个字节。字节序可以通过 'raw.endianness' 配置。 INT 采用 big-endian (默认)编码的四个字节。字节序可以通过 'raw.endianness' 配置。 BIGINT 采用 big-endian (默认)编码的八个字节。字节序可以通过 'raw.endianness' 配置。 FLOAT 采用 IEEE 754 格式和 big-endian (默认)编码的四个字节。字节序可以通过 'raw.endianness' 配置。 DOUBLE 采用 IEEE 754 格式和 big-endian (默认)编码的八个字节。字节序可以通过 'raw.endianness' 配置。 RAW 通过 RAW 类型的底层 TypeSerializer 序列化的字节序列。
  • 参数说明 表1 参数 是否必选 默认值 类型 描述 format 是 (none) String 指定要使用的格式, 这里应该是 'raw'。 raw.charset 否 UTF-8 String 指定字符集来编码文本字符串。 raw.endianness 否 big-endian String 指定字节序来编码数字值的字节。有效值为'big-endian'和'little-endian'。 更多细节可查阅 字节序。
  • 数据类型映射 目前,Parquet 格式类型映射与 Apache Hive 兼容,但与 Apache Spark 有所不同: Timestamp:不论精度,映射 timestamp 类型至 int96。 Decimal:根据精度,映射 decimal 类型至固定长度字节的数组。 下表列举了 Flink 中的数据类型与 JSON 中的数据类型的映射关系。 注意:复合数据类型暂只支持写不支持读(Array、Map 与 Row)。 表2 数据类型映射 Flink数据类型 Parquet类型 Parquet逻辑类型 CHAR / VARCHAR / STRING BINARY UTF8 BOOLEAN BOOLEAN - BINARY / VARBINARY BINARY - DECIMAL FIXED_LEN_BYTE_ARRAY DECIMAL TINYINT INT32 INT_8 SMALLINT INT32 INT_16 INT INT32 - BIGINT INT64 - FLOAT FLOAT - DOUBLE DOUBLE - DATE INT32 DATE TIME INT32 TIME_MILLIS TIMESTAMP INT96 - ARRAY - LIST MAP - MAP ROW - STRUCT
  • 数据类型映射 Orc 格式类型的映射和 Apache Hive 是兼容的。下面的表格列出了 Flink 类型的数据和 Orc 类型的数据的映射关系。 表2 数据类型映射 Flink数据类型 Orc物理类型 Orc逻辑类型 CHAR bytes CHAR VARCHAR bytes VARCHAR STRING bytes STRING BOOLEAN long BOOLEAN BYTES bytes BINARY DECIMAL decimal DECIMAL TINYINT long BYTE SMALLINT long SHORT INT long INT BIGINT long LONG FLOAT double FLOAT DOUBLE double DOUBLE DATE long DATE TIMESTAMP timestamp TIMESTAMP ARRAY - LIST MAP - MAP ROW - STRUCT
  • 参数说明 表1 参数说明 参数 是否必须 默认值 类型 描述 format 是 (none) String 指定要使用的格式,此处应为 'ogg-json'。 ogg-json.ignore-parse-errors 否 false Boolean 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。 debezium-json.timestamp-format.standard 否 'SQL' String 声明输入和输出的时间戳格式。当前支持的格式为'SQL' 以及 'ISO-8601': 可选参数 'SQL' 将会以 "yyyy-MM-dd HH:mm:ss.s{precision}" 的格式解析时间戳, 例如 '2020-12-30 12:13:14.123',且会以相同的格式输出。 可选参数 'ISO-8601' 将会以 "yyyy-MM-ddTHH:mm:ss.s{precision}" 的格式解析输入时间戳, 例如 '2020-12-30T12:13:14.123' ,且会以相同的格式输出。 ogg-json.map-null-key.mode 否 'FAIL' String 指定处理 Map 中 key 值为空的方法. 当前支持的值有 'FAIL', 'DROP' 和 'LITERAL': Option 'FAIL' 将抛出异常。 Option 'DROP' 将丢弃 Map 中 key 值为空的数据项。 Option 'LITERAL' 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 ogg-json.map-null-key.literal 定义。 ogg-json.map-null-key.literal 否 'null' String 当 'ogg-json.map-null-key.mode' 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。
  • 功能描述 Oracle GoldenGate (a.k.a ogg) 是一个实现异构 IT 环境间数据实时数据集成和复制的综合软件包。 该产品集支持高可用性解决方案、实时数据集成、事务更改数据捕获、运营和分析企业系统之间的数据复制、转换和验证。Ogg 为变更日志提供了统一的格式结构,并支持使用 JSON 序列化消息。 Flink 支持将 Ogg JSON 消息解析为 INSERT/UPDATE/DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常有用,例如 将增量数据从数据库同步到其他系统 日志审计 数据库的实时物化视图 关联维度数据库的变更历史,等等 Flink 还支持将 Flink SQL 中的 INSERT/UPDATE/DELETE 消息编码为 Ogg JSON 格式的消息, 输出到 Kafka 等存储中。 但需要注意, 目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息. 因此, Flink 将 UPDATE_BEFORE 和 UPDATE_AFTER 分别编码为 DELETE 和 INSERT 类型的 Ogg 消息。
  • 参数说明 表1 参数说明 参数 是否必选 默认值 类型 说明 format 是 (none) String 指定使用格式,此处使用'maxwell-json'。 maxwell-json.ignore-parse-errors 否 false Boolean 跳过解析错误而不是失败的字段和行。出现错误时,字段设置为空。 maxwell-json.timestamp-format.standard 否 'SQL' String 指定输入和输出时间戳格式。当前支持的值为“SQL”和“ISO-8601”: 'SQL'将以“yyyy-MM-dd HH:mm:ss.s{precision}”格式解析输入时间戳,例如'2020-12-30 12:13:14.123' 并以相同格式输出时间戳。 'ISO-8601'将以“yyyy-MM-ddTHH:mm:ss.s{precision}”格式解析输入时间戳,例如'2020-12-30T12:13:14.123' 并以相同格式输出时间戳。 maxwell-json.map-null-key.mode 否 'FAIL' String 指定序列化map数据的null键时的处理模式。当前支持的值为“FAIL”、“DROP”和“LITERAL”: 'FAIL'将在遇到带有null键的map时抛出异常。 'DROP'将删除map数据的null键条目。 'LITERAL'将使用字符串代替null键。字符串由 maxwell-json.map-null-key.literal 选项定义。 maxwell-json.map-null-key.literal 否 'null' String 当 'maxwell-json.map-null-key.mode' 为 LITERAL 时,指定字符串以替换null键。 maxwell-json.encode.decimal-as-plain-number 否 false Boolean 将所有小数编码为普通数字,而不是可能的科学计数法。默认情况下,小数可以使用科学计数法书写。例如,0.000000027在默认情况下被编码为2.7E-8,如果将此选项设置为true,则将被写入为0.000000027。
  • 注意事项 Maxwell应用允许将每个变动消息精确地传递一次。在这种情况下,Flink在消费Maxwell生成的消息时处理得很好。如果Maxwell应用程序在at-least-once模式处理,它可能向Kafka写入重复的改动消息,Flink将获得重复的消息。这可能会导致Flink查询得到错误的结果或意外的异常。因此,在这种情况下,建议将作业配置table.exec.source.cdc-events-duplicate设置为true,并在源表上定义PRIMARY KEY。Framework将生成一个额外的有状态操作符,并使用主键对变更事件进行去重,并生成一个规范化的changelog流。
  • 功能描述 Maxwell是一个CDC(Changelog Data Capture)工具,可以将MySql中的更改实时流式写入到Kafka等流式connector。Maxwell为changelog提供了统一的格式,而且支持使用JSON对消息进行序列化。 Flink 支持将 Maxwell JSON 消息解释为 INSERT/UPDATE/DELETE 消息到 Flink SQL 系统中。在许多情况下,这对于利用此功能很有用。
  • 参数说明 表1 参数 是否必选 默认值 类型 说明 format 是 (none) String 声明使用的格式,这里应为'json'。 json.fail-on-missing-field 否 false Boolean 当解析字段缺失时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。 json.ignore-parse-errors 否 false Boolean 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。 json.timestamp-format.standard 否 'SQL' String 声明输入和输出的 TIMESTAMP 和 TIMESTAMP_LTZ 的格式。当前支持的格式为'SQL' 以及 'ISO-8601': 可选参数 'SQL' 将会以 "yyyy-MM-dd HH:mm:ss.s{precision}" 的格式解析 TIMESTAMP, 例如 "2020-12-30 12:13:14.123", 以 "yyyy-MM-dd HH:mm:ss.s{precision}'Z'" 的格式解析 TIMESTAMP_LTZ, 例如 "2020-12-30 12:13:14.123Z" 且会以相同的格式输出。 可选参数 'ISO-8601' 将会以 "yyyy-MM-ddTHH:mm:ss.s{precision}" 的格式解析输入 TIMESTAMP, 例如 "2020-12-30T12:13:14.123" , 以 "yyyy-MM-ddTHH:mm:ss.s{precision}'Z'" 的格式解析 TIMESTAMP_LTZ, 例如 "2020-12-30T12:13:14.123Z" 且会以相同的格式输出。 json.map-null-key.mode 否 'FALL' String 指定处理 Map 中 key 值为空的方法。当前支持的值有:'FAIL','DROP'和'LITERAL'。 Option 'FAIL' 将抛出异常,如果遇到 Map 中 key 值为空的数据。 Option 'DROP' 将丢弃 Map 中 key 值为空的数据项。 Option 'LITERAL' 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 'json.map-null-key.literal' 定义。 json.map-null-key.literal 否 'null' String 当 'json.map-null-key.mode' 是LITERAL的时候,指定字符串常量替换 Map 中的空 key 值。 json.encode.decimal-as-plain-number 否 false Boolean 将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:0.000000027 默认会表示为 2.7E-8。当此选项设为 true 时,则会表示为 0.000000027。
  • 数据类型映射 当前,JSON schema 将会自动从 table schema 之中自动推导得到。不支持显式地定义 JSON schema。 在 Flink 中,JSON Format 使用 jackson databind API 去解析和生成 JSON。 下表列出了 Flink 中的数据类型与 JSON 中的数据类型的映射关系。 表2 数据类型映射 Flink SQL类型 JSON类型 CHAR / VARCHAR / STRING string BOOLEAN boolean BINARY / VARBINARY string with encoding: base64 DECIMAL number TINYINT number SMALLINT number INT number BIGINT number FLOAT number DOUBLE number DATE string with format: date TIME string with format: time TIMESTAMP string with format: date-time TIMESTAMP_WITH_LOCAL_TIME_ZONE string with format: date-time (with UTC time zone) INTERVAL number ARRAY array MAP / MULTISET object ROW object
  • 数据类型映射 目前 CS V 的 schema 都是从 table schema 推断而来的。显式地定义 CSV schema 暂不支持。 Flink 的 CSV Format 数据使用 jackson databind API 去解析 CSV 字符串。 表2 数据类型映射 Flink SQL 类型 CSV 类型 CHAR / VARCHAR / STRING string BOOLEAN boolean BINARY / VARBINARY string with encoding: base64 DECIMAL number TINYINT number SMALLINT number INT number BIGINT number FLOAT number DOUBLE number DATE string with format: date TIME string with format: time TIMESTAMP string with format: date-time INTERVAL number ARRAY array ROW object
  • 参数说明 表1 参数说明 参数 是否必选 默认值 类型 说明 format 是 (none) String 指定要使用的格式,这里应该是 'csv'。 csv.field-delimiter 否 , String 字段分隔符 (默认','),必须为单字符。您可以使用反斜杠字符指定一些特殊字符,例如 '\t' 代表制表符。 您也可以通过 unicode 编码在纯 SQL 文本中指定一些特殊字符,例如'csv.field-delimiter' = U&'\0001' 代表 0x01 字符。 csv.disable-quote-character 否 false Boolean 是否禁止对引用的值使用引号 (默认是 false). 如果禁止,选项 'csv.quote-character' 不能设置。 csv.quote-character 否 ‘’ String 用于围住字段值的引号字符 (默认"). csv.allow-comments 否 false Boolean 是否允许忽略注释行(默认不允许),注释行以 '#' 作为起始字符。 如果允许注释行,请确保 csv.ignore-parse-errors 也开启了从而允许空行。 csv.ignore-parse-errors 否 false Boolean 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。 csv.array-element-delimiter 否 ; String 分隔数组和行元素的字符串(默认';'). csv.escape-character 否 (none) String 转义字符(默认关闭). csv.null-literal 否 (none) String 是否将 "null" 字符串转化为 null 值。
  • 功能描述 Avro Schema Registry (avro-confluent) 格式能让您读取被 io.confluent.kafka.serializers.KafkaAvroSerializer 序列化的记录,以及可以写入成能被 io.confluent.kafka.serializers.KafkaAvroDeserializer 反序列化的记录。 当以这种格式读取(反序列化)记录时,将根据记录中编码的 schema 版本 id 从配置的 Confluent Schema Registry 中获取 Avro writer schema ,而从 table schema 中推断出 reader schema。 当以这种格式写入(序列化)记录时,Avro schema 是从 table schema 中推断出来的,并会用来检索要与数据一起编码的 schema id。我们会在配置的 Confluent Schema Registry 中配置的 subject 下,检索 schema id。subject 通过 avro-confluent.subject 参数来指定。
  • 数据类型映射 目前 Apache Flink 都是从 table schema 去推断反序列化期间的 Avro reader schema 和序列化期间的 Avro writer schema。显式地定义 Avro schema 暂不支持。 Avro Format中描述了 Flink 数据类型和 Avro 类型的对应关系。 除了此处列出的类型之外,Flink 还支持读取/写入可为空(nullable)的类型。 Flink 将可为空的类型映射到 Avro union(something, null), 其中 something 是从 Flink 类型转换的 Avro 类型。
  • 功能描述 Canal是一个 CDC(ChangeLog Data Capture,变更日志数据捕获)工具,可以实时地将 MySQL 变更传输到其他系统。Canal 为变更日志提供了统一的数据格式,并支持使用 JSON 或 protobuf序列化消息(Canal 默认使用 protobuf)。 Flink 支持将 Canal 的 JSON 消息解析为 INSERT / UPDATE / DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常的有用,例如 将增量数据从数据库同步到其他系统 日志审计 数据库的实时物化视图 关联维度数据库的变更历史,等等。 Flink 还支持将 Flink SQL 中的 INSERT / UPDATE / DELETE 消息编码为 Canal 格式的 JSON 消息,输出到 Kafka 等存储中。 但需要注意的是,目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息。因此,Flink 将 UPDATE_BEFORE 和 UPDATE_AFTER 分别编码为 DELETE 和 INSERT 类型的 Canal 消息。 更多具体使用可参考开源社区文档:Canal Format。
  • 参数说明 表1 参数说明 参数 是否必选 默认值 类型 说明 format 是 (none) String 指定要使用的格式,此处应为 'canal-json'. canal-json.ignore-parse-errors 否 false Boolean 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。 canal-json.timestamp-format.standard 否 'SQL' String 指定输入和输出时间戳格式。当前支持的值是:'SQL'和'ISO-8601'。 选项 'SQL' 将解析 "yyyy-MM-dd HH:mm:ss.s{precision}" 格式的输入时间戳,例如 '2020-12-30 12:13:14.123',并以相同格式输出时间戳。 选项 'ISO-8601' 将解析 "yyyy-MM-ddTHH:mm:ss.s{precision}" 格式的输入时间戳,例如 '2020-12-30T12:13:14.123',并以相同的格式输出时间戳。 canal-json.map-null-key.mode 否 'FALL' String 指定处理 Map 中 key 值为空的方法. 当前支持的值有'FAIL', 'DROP'和 'LITERAL'。 Option 'FAIL' 将抛出异常,如果遇到 Map 中 key 值为空的数据。 Option 'DROP' 将丢弃 Map 中 key 值为空的数据项。 Option 'LITERAL' 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 'canal-json.map-null-key.literal' 定义。 canal-json.map-null-key.literal 否 'null' String 当 'canal-json.map-null-key.mode' 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。 canal-json.encode.decimal-as-plain-number 否 false Boolean 将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:0.000000027 默认会表示为 2.7E-8。当此选项设为 true 时,则会表示为 0.000000027。 canal-json.database.include 否 (none) String 一个可选的正则表达式,通过正则匹配 Canal 记录中的 "database" 元字段,仅读取指定数据库的 changelog 记录。正则字符串与 Java 的 Pattern 兼容。 canal-json.table.include 否 (none) String 一个可选的正则表达式,通过正则匹配 Canal 记录中的 "table" 元字段,仅读取指定表的 changelog 记录。正则字符串与 Java 的 Pattern 兼容。
  • 数据类型映射 目前,Avro schema 通常是从 table schema 中推导而来。尚不支持显式定义 Avro schema。因此,下表列出了从 Flink 类型到 Avro 类型的类型映射。 除了下面列出的类型,Flink 支持读取/写入 nullable 的类型。Flink 将 nullable 的类型映射到 Avro union(something, null),其中 something 是从 Flink 类型转换的 Avro 类型。 表2 数据类型映射 Flink SQL类型 Avro类型 Avro逻辑类型 CHAR / VARCHAR / STRING string - BOOLEAN boolean - BINARY / VARBINARY bytes - DECIMAL fixed decimal TINYINT int - SMALLINT int - INT int - BIGINT long - FLOAT float - DOUBLE double - DATE int date TIME int time-millis TIMESTAMP long timestamp-millis ARRAY array - MAP(key 必须是 string/char/varchar 类型) map - MULTISET(元素必须是 string/char/varchar 类型) map - ROW record -
  • Format概述 Flink 提供了一套与表连接器(table connector)一起使用的表格式(table format)。 表格式是一种存储格式,定义了如何把二进制数据映射到表的列上。 表1 Flink支持格式 Formats 支持的Connectors CSV Kafka, Upsert Kafka, FileSystem JSON Kafka, Upsert Kafka, FileSystem, Elasticsearch Avro Kafka, Upsert Kafka, FileSystem Confluent Avro Kafka, Upsert Kafka Debezium Kafka, FileSystem Canal Kafka, FileSystem Maxwell Kafka, FileSystem Ogg Kafka, FileSystem Orc FileSystem Parquet FileSystem Raw Kafka, Upsert Kafka, FileSystem 父主题: Format
  • 创建表相关语法 表1 创建表相关语法 语法分类 功能描述 Format Avro Canal Confluent Avro CSV Debezium JSON Maxwell Ogg Orc Parquet Raw Connectors BlackHole ClickHouse DataGen Doris DWS Elasticsearch FileSystem Hbase Hive JDBC Kafka Print Redis Upsert Kafka
  • 语法定义 INSERT INTO table_name [PARTITION part_spec] query part_spec: (part_col_name1=val1 [, part_col_name2=val2, ...]) query: values | { select | selectWithoutFrom | query UNION [ ALL ] query | query EXCEPT query | query INTERSECT query } [ ORDER BY orderItem [, orderItem ]* ] [ LIMIT { count | ALL } ] [ OFFSET start { ROW | ROWS } ] [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY] orderItem: expression [ ASC | DESC ] select: SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] [ HAVING booleanExpression ] [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ] selectWithoutFrom: SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } projectItem: expression [ [ AS ] columnAlias ] | tableAlias . * tableExpression: tableReference [, tableReference ]* | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ] joinCondition: ON booleanExpression | USING '(' column [, column ]* ')' tableReference: tablePrimary [ matchRecognize ] [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ] tablePrimary: [ TABLE ] [ [ catalogName . ] schemaName . ] tableName | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')' | UNNEST '(' expression ')' values: VALUES expression [, expression ]* groupItem: expression | '(' ')' | '(' expression [, expression ]* ')' | CUBE '(' expression [, expression ]* ')' | ROLLUP '(' expression [, expression ]* ')' | GROUPING SETS '(' groupItem [, groupItem ]* ')' windowRef: windowName | windowSpec windowSpec: [ windowName ] '(' [ ORDER BY orderItem [, orderItem ]* ] [ PARTITION BY expression [, expression ]* ] [ RANGE numericOrIntervalExpression {PRECEDING} | ROWS numericExpression {PRECEDING} ] ')' matchRecognize: MATCH_RECOGNIZE '(' [ PARTITION BY expression [, expression ]* ] [ ORDER BY orderItem [, orderItem ]* ] [ MEASURES measureColumn [, measureColumn ]* ] [ ONE ROW PER MATCH ] [ AFTER MATCH ( SKIP TO NEXT ROW | SKIP PAST LAST ROW | SKIP TO FIRST variable | SKIP TO LAST variable | SKIP TO variable ) ] PATTERN '(' pattern ')' [ WITHIN intervalLiteral ] DEFINE variable AS condition [, variable AS condition ]* ')' measureColumn: expression AS alias pattern: patternTerm [ '|' patternTerm ]* patternTerm: patternFactor [ patternFactor ]* patternFactor: variable [ patternQuantifier ] patternQuantifier: '*' | '*?' | '+' | '+?' | '?' | '??' | '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?'] | '{' repeat '}'
  • 约束限制 Flink SQL 对于标识符(表、属性、函数名)有类似于 Java 的词法约定: 不管是否引用标识符,都保留标识符的大小写。 且标识符需区分大小写。 与 Java 不一样的地方在于,通过反引号,可以允许标识符带有非字母的字符(如:"SELECT a AS `my field` FROM t")。 字符串文本常量需要被单引号包起来(如 SELECT 'Hello World' )。两个单引号表示转义(如 SELECT 'It''s me.')。字符串文本常量支持 Unicode 字符,如需明确使用 Unicode 编码,请使用以下语法: 使用反斜杠(\)作为转义字符(默认):SELECT U&'\263A' 使用自定义的转义字符: SELECT U&'#263A' UESCAPE '#'
  • 语法说明 TEMPORARY 创建一个有 catalog 和数据库命名空间的临时 catalog function ,并覆盖原有的 catalog function 。 TEMPORARY SYSTEM 创建一个没有数据库命名空间的临时系统 catalog function ,并覆盖系统内置的函数。 IF NOT EXISTS 如果该函数已经存在,则不会进行任何操作。 LANGUAGE JAVA|SCALA Language tag 用于指定 Flink runtime 如何执行这个函数。目前,只支持 JAVA, SCALA,且函数的默认语言为 JAVA。
  • 功能描述 创建一个有 catalog 和数据库命名空间的 catalog function ,需要指定一个 identifier ,可指定 language tag 。 若catalog 中,已经有同名的函数注册了,则无法注册。如果 language tag 是 JAVA 或者 SCALA ,则 identifier 是 UDF 实现类的全限定名。 如果您需要了解创建自定义函数的步骤请参考自定义函数。
  • 语法说明 COMPUTED COLUMN 计算列是一个使用 “column_name AS computed_column_expression” 语法生成的虚拟列。它由使用同一表中其他列的非查询表达式生成,并且不会在表中进行物理存储。例如,一个计算列可以使用 cost AS price * quantity 进行定义,这个表达式可以包含物理列、常量、函数或变量的任意组合,但这个表达式不能存在任何子查询。 在 Flink 中计算列一般用于为 CREATE TABLE 语句定义 时间属性。 处理时间属性 可以简单地通过使用了系统函数 PROCTIME() 的 proc AS PROCTIME() 语句进行定义。 另一方面,由于事件时间列可能需要从现有的字段中获得,因此计算列可用于获得事件时间列。例如,原始字段的类型不是 TIMESTAMP(3) 或嵌套在 JSON 字符串中。 注意: 定义在一个数据源表( source table )上的计算列会在从数据源读取数据后被计算,它们可以在 SELECT 查询语句中使用。 计算列不可以作为 INSERT 语句的目标,在 INSERT 语句中,SELECT 语句的 schema 需要与目标表不带有计算列的 schema 一致。
  • 保留关键字 一些字符串的组合已经被预留为关键字以备未来使用。 如果使用以下字符串作为字段名,请在使用时使用反引号将该字段名包起来,例如 `value`, `count` 。 A, ABS, ABSOLUTE, ACTION, ADA, ADD, ADMIN, AFTER, ALL, ALLOCATE, ALLOW, ALTER, ALWAYS, AND, ANY, ARE, ARRAY, AS, ASC, ASENSITIVE, ASSERTION, ASSIGNMENT, ASYMMETRIC, AT, ATOMIC, ATTRIBUTE, ATTRIBUTES, AUTHORIZATION, AVG, BEFORE, BEGIN, BERNOULLI, BETWEEN, BIGINT, BINARY, BIT, BLOB, BOOLEAN, BOTH, BREADTH, BY, BYTES, C, CALL, CALLED, CARDINALITY, CASCADE, CASCADED, CASE, CAST, CATA LOG , CATALOG_NAME, CEIL, CEILING, CENTURY, CHAIN, CHAR, CHARACTER, CHARACTERIS TICS , CHARACTERS, CHARACTER_LENGTH, CHARACTER_SET_CATALOG, CHARACTER_SET_NAME, CHARACTER_SET_SCHEMA, CHAR_LENGTH, CHECK, CLASS_ORIGIN, CLOB, CLOSE, COALESCE, COBOL, COLLATE, COLLATION, COLLATION_CATALOG, COLLATION_NAME, COLLATION_SCHEMA, COLLECT, COLUMN, COLUMNS, COLUMN_NAME, COMMAND_FUNCTION, COMMAND_FUNCTION_CODE, COMMIT, COMMITTED, CONDITION, CONDITION_NUMBER, CONNECT, CONNECTION, CONNECTION_NAME, CONSTRAINT, CONSTRAINTS, CONSTRAINT_CATALOG, CONSTRAINT_NAME, CONSTRAINT_SCHEMA, CONSTRUCTOR, CONTAINS, CONTINUE, CONVERT, CORR, CORRESPONDING, COUNT, COVAR_POP, COVAR_SAMP, CREATE, CROSS, CUBE, CUME_DIST, CURRENT, CURRENT_CATALOG, CURRENT_DATE, CURRENT_DEFAULT_TRANSFORM_GROUP, CURRENT_PATH, CURRENT_ROLE, CURRENT_SCHEMA, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_TRANSFORM_GROUP_FOR_TYPE, CURRENT_USER, CURSOR, CURSOR_NAME, CYCLE, DATA, DATABASE, DATE, DATETIME_INTERVAL_CODE, DATETIME_INTERVAL_PRECISION, DAY, DEALLOCATE, DEC, DECADE, DECIMAL, DECLARE, DEFAULT, DEFAULTS, DEFERRABLE, DEFERRED, DEFINED, DEFINER, DEGREE, DELETE, DENSE_RANK, DEPTH, DEREF, DERIVED, DESC, DESCRIBE, DESCRIPTION, DESCRIPTOR, DETERMINISTIC, DIAGNOSTICS, DISALLOW, DISCONNECT, DISPATCH, DISTINCT, DOMAIN, DOUBLE, DOW, DOY, DROP, DYNAMIC, DYNAMIC_FUNCTION, DYNAMIC_FUNCTION_CODE, EACH, ELEMENT, ELSE, END, END-EXEC, EPOCH, EQUALS, ESCAPE, EVERY, EXCEPT, EXCEPTION, EXCLUDE, EXCLUDING, EXEC, EXECUTE, EXISTS, EXP, EXPLAIN, EXTEND, EXTERNAL, EXTRACT, FALSE, FETCH, FILTER, FINAL, FIRST, FIRST_VALUE, FLOAT, FLOOR, FOLLOWING, FOR, FOREIGN, FORTRAN, FOUND, FRAC_SECOND, FREE, FROM, FULL, FUNCTION, FUSION, G, GENERAL, GENERATED, GET, GLOBAL, GO, GOTO, GRANT, GRANTED, GROUP, GROUPING, HAVING, HIERARCHY, HOLD, HOUR, IDENTITY, IMMEDIATE, IMPLEMENTATION, IMPORT, IN, INCLUDING, INCREMENT, INDICATOR, INITIALLY, INNER, INOUT, INPUT, INSENSITIVE, INSERT, INSTANCE, INSTANTIABLE, INT, INTEGER, INTERSECT, INTERSECTION, INTERVAL, INTO, INVOKER, IS, ISOLATION, JAVA, JOIN, K, KEY, KEY_MEMBER, KEY_TYPE, LABEL, LANGUAGE, LARGE, LAST, LAST_VALUE, LATERAL, LEADING, LEFT, LENGTH, LEVEL, LIBRARY, LIKE, LIMIT, LN, LOCAL, LOCALTIME, LOCALTIMESTAMP, LOCATOR, LOWER, M, MAP, MATCH, MATCHED, MAX, MAXVALUE, MEMBER, MERGE, MESSAGE_LENGTH, MESSAGE_OCTET_LENGTH, MESSAGE_TEXT, METHOD, MICROSECOND, MILLENNIUM, MIN, MINUTE, MINVALUE, MOD, MODIFIES, MODULE, MODULES, MONTH, MORE, MULTISET, MUMPS, NAME, NAMES, NATIONAL, NATURAL, NCHAR, NCLOB, NESTING, NEW, NEXT, NO, NONE, NORMALIZE, NORMALIZED, NOT, NULL, NULLABLE, NULLIF, NULLS, NUMBER, NUMERIC, OBJECT, OCTETS, OCTET_LENGTH, OF, OFFSET, OLD, ON, ONLY, OPEN, OPTION, OPTIONS, OR, ORDER, ORDERING, ORDINALITY, OTHERS, OUT, OUTER, OUTPUT, OVER, OVERLAPS, OVERLAY, OVERRIDING, PAD, PA RAM ETER, PARAMETER_MODE, PARAMETER_NAME, PARAMETER_ORDINAL_POSITION, PARAMETER_SPECIFIC_CATALOG, PARAMETER_SPECIFIC_NAME, PARAMETER_SPECIFIC_SCHEMA, PARTIAL, PARTITION, PASCAL, PASSTHROUGH, PATH, PERCENTILE_CONT, PERCENTILE_DISC, PERCENT_RANK, PLACING, PLAN, PLI, POSITION, POWER, PRECEDING, PRECISION, PREPARE, PRESERVE, PRIMARY, PRIOR, PRIVILEGES, PROCEDURE, PUBLIC, QUARTER, RANGE, RANK, RAW, READ, READS, REAL, RECURSIVE, REF, REFEREN CES , REFERENCING, REGR_AVGX, REGR_AVGY, REGR_COUNT, REGR_INTERCEPT, REGR_R2, REGR_SLOPE, REGR_SXX, REGR_SXY, REGR_SYY, RELATIVE, RELEASE, REPEATABLE, RESET, RESTART, RESTRICT, RESULT, RETURN, RETURNED_CARDINALITY, RETURNED_LENGTH, RETURNED_OCTET_LENGTH, RETURNED_SQLSTATE, RETURNS, REVOKE, RIGHT, ROLE, ROLLBACK, ROLLUP, ROUTINE, ROUTINE_CATALOG, ROUTINE_NAME, ROUTINE_SCHEMA, ROW, ROWS, ROW_COUNT, ROW_NUMBER, SAVEPOINT, SCALE, SCHEMA, SCHEMA_NAME, SCOPE, SCOPE_CATALOGS, SCOPE_NAME, SCOPE_SCHEMA, SCROLL, SEARCH, SECOND, SECTION, SECURITY, SELECT, SELF, SENSITIVE, SEQUENCE, SERIALIZABLE, SERVER, SERVER_NAME, SESSION, SESSION_USER, SET, SETS, SIMILAR, SIMPLE, SIZE, SMALLINT, SOME, SOURCE, SPACE, SPECIFIC, SPECIFICTYPE, SPECIFIC_NAME, SQL, SQLEXCEPTION, SQLSTATE, SQLWARNING, SQL_TSI_DAY, SQL_TSI_FRAC_SECOND, SQL_TSI_HOUR, SQL_TSI_MICROSECOND, SQL_TSI_MINUTE, SQL_TSI_MONTH, SQL_TSI_QUARTER, SQL_TSI_SECOND, SQL_TSI_WEEK, SQL_TSI_YEAR, SQRT, START, STATE, STATEMENT, STATIC, STDDEV_POP, STDDEV_SAMP, STREAM, STRING, STRUCTURE, STYLE, SUBCLASS_ORIGIN, SUBMULTISET, SUBSTITUTE, SUBSTRING, SUM, SYMMETRIC, SYSTEM, SYSTEM_USER, TABLE, TABLESAMPLE, TABLE_NAME, TEMPORARY, THEN, TIES, TIME, TIMESTAMP, TIMESTAMPADD, TIMESTAMPDIFF, TIMEZONE_HOUR, TIMEZONE_MINUTE, TINYINT, TO, TOP_LEVEL_COUNT, TRAILING, TRANSACTION, TRANSACTIONS_ACTIVE, TRANSACTIONS_COMMITTED, TRANSACTIONS_ROLLED_BACK, TRANSFORM, TRANSFO RMS , TRANSLATE, TRANSLATION, TREAT, TRIGGER, TRIGGER_CATALOG, TRIGGER_NAME, TRIGGER_SCHEMA, TRIM, TRUE, TYPE, UESCAPE, UNBOUNDED, UNCOMMITTED, UNDER, UNION, UNIQUE, UNKNOWN, UNNAMED, UNNEST, UPDATE, UPPER, UPSERT, USAGE, USER, USER_DEFINED_TYPE_CATALOG, USER_DEFINED_TYPE_CODE, USER_DEFINED_TYPE_NAME, USER_DEFINED_TYPE_SCHEMA, USING, VALUE, VALUES, VARBINARY, VARCHAR, VARYING, VAR_POP, VAR_SAMP, VERSION, VIEW, WEEK, WHEN, WHENEVER, WHERE, W IDT H_BUCKET, WINDOW, WITH, WITHIN, WITHOUT, WORK, WRAPPER, WRITE, XML, YEAR, ZONE 父主题: SQL语法约束与定义
  • 语法支持类型 DLI 支持以下数据类型: CHAR, VARCHAR, STRING, BOOLEAN, BINARY, VARBINARY, BYTES, DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ, INTERVAL, ARRAY, MULTISET, MAP, ROW, RAW 在SQL语法中这些类型用于定义表中列的数据类型。 父主题: SQL语法约束与定义
  • 操作步骤 自定义函数中提供了可选的open(FunctionContext context)方法,FunctionContext具备参数传递功能,自定义配置项通过此对象来传递。自定义函数的参数传递操作步骤如下: 在Flink OpenSource SQL编辑页面右侧自定义配置中添加参数pipeline.global-job-parameters,格式如下: pipeline.global-job-parameters=k1:v1,"k2:v1,v2",k3:"str:ing","k4:str""ing" 该配置定义了如表1的map。 表1 pipeline.global-job-parameters示例 key value k1 v1 k2 v1,v2 k3 str:ing k4 str""ing FunctionContext#getJobParameter只能获取pipeline.global-job-parameters这一配置项的值。因此需要将UDF用到的所有配置项全部写入到pipeline.global-job-parameters中。 key和value之间通过冒号(:)分隔,所有key-value用逗号(,)连接。 如果key或value中含有逗号(,),则需要用双引号(")将key:value整个包围起来。参考k2。 如果key或value中含有半角冒号(:),则需要用双引号(")将key或value包围起来。参考k3。 如果key或value中含有双引号("),则需要通过连写两个双引号("")进行转义,也需要用双引号(")将key:value整个包围起来。参考k4。 在自定义函数代码中,通过FunctionContext#getJobParameter获取map的各项内容,代码示例如下: context.getJobParameter("url","jdbc:mysql://xx.xx.xx.xx:3306/table"); context.getJobParameter("driver","com.mysql.jdbc.Driver"); context.getJobParameter("user","user"); context.getJobParameter("password","password");
  • 操作场景 如果您的自定义函数需要在多个作业中使用,但对于不同作业某些参数值不同,直接在UDF中修改较为复杂。您可以在Flink OpenSource SQL编辑页面,自定义配置中配置参数pipeline.global-job-parameters,在UDF代码中获取该参数并使用。如需修改参数值,直接在FlinkOpenSource SQL编辑页面,自定义配置中修改该参数值,即可达到快速修改UDF参数值的目的。
  • 操作场景 类型推导包含了验证输入值、派生参数和返回值数据类型。从逻辑角度看,Planner需要知道数据类型、精度和小数位数;从 JVM 角度来看,Planner 在调用自定义函数时需要知道如何将内部数据结构表示为JVM对象。 Flink 自定义函数实现了自动的类型推导提取,通过反射从函数的类及其求值方法中派生数据类型。然而以反射方式提取数据类型并不总是成功的,比如UDTF中常见的Row类型。 由于 Flink 1.11 起引入了新的自定义函数注册接口,使用了新的自定义函数类型推断机制,因此原先1.10 重载 getResultType 声明返回字段类型的方式将不再可用。继续使用会抛出如下异常: Caused by: org.apache.flink.table.api.ValidationException: Cannot extract a data type from a pure 'org.apache.flink.types.Row' class. Please use annotations to define field names and field types. 目前 Flink 1.12 可以通过使用DataTypeHint 和FunctionHint 注解相关参数、类或方法来支持提取过程。
共100000条
提示

您即将访问非华为云网站,请注意账号财产安全