数据湖探索 DLI-配置时间模型:配置Event Time

时间:2023-11-10 09:20:51

配置Event Time

Event Time是指事件产生的时间,即数据产生时自带时间戳。

语法格式

1
2
3
CREATE SOURCE STREAM stream_name(...) WITH (...)
TIMESTAMP BY {attr_name}.rowtime
SET WATERMARK (RANGE {time_interval} | ROWS {literal}, {time_interval});

语法说明

设置Event Time需要选定流中的某一个属性来作为时间戳,同时需要设置Watermark策略。

由于网络等原因,有时会导致乱序的产生;对于迟来的数据,需要Watermark来保证一个特定的时间后去触发Window进行计算。Watermark主要是用来处理乱序数据,流处理从事件产生,到发送到DLI服务,中间有一个过程。

Watermark有两种设置策略:

  • 按时间周期
    1
    SET WATERMARK(range interval {time_unit}, interval {time_unit})
    
  • 按事件个数
    1
    SET WATERMARK(rows literal, interval {time_unit})
    

一个逗号表示一个参数,第一个参数表示Watermark发送周期,第二个参数表示允许最大延迟时间。

注意事项

无。

示例

  • time2事件产生时间开始,每10s发送一次watermark,事件最大允许延迟时间为20s。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    CREATE SOURCE STREAM student_scores (
      student_number STRING, /* 学号 */
      student_name STRING, /* 姓名 */
      subject STRING, /* 学科 */
      score INT, /* 成绩 */
      time2 TIMESTAMP
    )
    WITH (
      type = "dis",
      region = "",
      channel = "dliinput",
      partition_count = "1",
      encode = "csv",
      field_delimiter=","
    )
    TIMESTAMP BY time2.rowtime
    SET WATERMARK (RANGE interval 10 second, interval 20 second);
    
    INSERT INTO score_greate_90
    SELECT student_name, sum(score) over (order by time2 RANGE UNBOUNDED PRECEDING) 
    FROM student_scores;
    
  • 每收到10个数据发送一次watermark,事件最大允许延迟时间为20s。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    CREATE SOURCE STREAM student_scores (
      student_number STRING, /* 学号 */
      student_name STRING, /* 姓名 */
      subject STRING, /* 学科 */
      score INT, /* 成绩 */
      time2 TIMESTAMP
    )
    WITH (
      type = "dis",
      region = "",
      channel = "dliinput",
      partition_count = "1",
      encode = "csv",
      field_delimiter=","
    )
    TIMESTAMP BY time2.rowtime
    SET WATERMARK (ROWS 10, interval 20 second);
    
    INSERT INTO score_greate_90
    SELECT student_name, sum(score) over (order by time2 RANGE UNBOUNDED PRECEDING) 
    FROM student_scores;
    
support.huaweicloud.com/sqlref-flink-dli/dli_08_0107.html