云服务器内容精选

  • 配置从Postgresql抓取数据到Hudi任务的心跳表 在需要同步的Postgresql数据库下执行以下命令创建一张心跳表,心跳表归属cdc_cdl Schema,表名为cdc_heartbeat,主键为cdl_job_id: DROP TABLE IF EXISTS cdc_cdl.cdc_heartbeat; CREATE TABLE cdc_cdl.cdc_heartbeat ( cdl_job_id int8 NOT NULL, cdl_last_heartbeat timestamp(6) ); ALTER TABLE cdc_cdl.cdc_heartbeat ADD CONSTRAINT cdc_heartbeat_pkey PRIMARY KEY (cdl_job_id); 心跳表创建完成后,在CDL WebUI界面创建从Postgresql抓取数据到Hudi的同步任务并启动即可收到心跳数据。
  • 数据判齐消息字段含义 表1 数据判齐消息字段 字段名 描述 cdl_job_name 本批次数据所属同步任务名称 target_table_schema 本批次数据写入Schema名称 target_table_name 本批次数据写入Hudi表名称 target_table_path 本批次数据保存的Hudi表路径 total_num 本批次数据总数 cdl_original_heartbeat 本批次数据中包含的心跳数据的最大时间, 如果本批次不包含心跳数据则值为空 cdl_last_heartbeat 本批次数据中包含的心跳数据的最小时间,如果本批次不包含心跳数据则取“event_time_min”的值 insert_num 本批次数据insert事件总数 update_num 本批次数据update事件总数 delete_num 本批次数据delete事件总数 event_time_min 本批次数据源端最小事务提交时间 event_time_max 本批次数据源端最大事务提交时间 event_time_avg 本批次数据源端平均事务提交时间 kafka_timestamp_min 本批次数据发送到Kafka的最小时间 kafka_timestamp_max 本批次数据发送到Kafka的最大时间 begin_time 本批次数据开始写入Hudi的时间 end_time 本批次数据写入Hudi的结束时间 cdc_partitioned_time 心跳表的时间分区字段 cdc_last_update_date 该条判齐记录写入时间
  • 操作场景 心跳和数据判齐功能用于统计CDL同步任务的全链路信息, 包括从数据库管理系统RDBMS到Kafka的数据耗时、从Kafka消费数据写入到Hudi的数据耗时和数据条数等一系列信息,并将其写入到特定的Topic(cdl_snapshot_topic)中,用户可自行消费Topic中的数据并写入到某个特定Hudi表作数据判齐使用。心跳判齐数据不仅可以用来判断心跳时间之前的数据已经同步到数据湖,还可以根据事务时间,写Kafka的时间,数据开始入湖时间和数据入湖结束时间来判断数据时延问题。 同时对于PgSQL任务,配置心跳表可以定期向前推进PgSQL中Slot记录的LSN的信息,避免由于某个任务配置了某部分变化很小的表导致数据库日志积压。
  • 配置从Oracle(ogg)抓取数据到Hudi任务的心跳表 在需要同步数据的Oracle数据库中执行以下命令创建一张心跳表,心跳表归属于CDC_CDL Schema,表名为CDC_HEARTBEAT,主键为CDL_JOB_ID: CREATE TABLE "CDC_CDL"."CDC_HEARTBEAT" ( "CDL_JOB_ID" VARCHAR(22) PRIMARY KEY, "CDL_LAST_HEARTBEAT" TIMESTAMP, SUPPLEMENTAL LOG DATA (ALL) COLUMNS ); 将CDC_HEARTBEAT表加入到Oracle或者ogg的任务中,确保心跳数据可以正常发送到Kafka。 如果是Oracle任务,直接执行4。 在CDL WebUI配置thirdparty-kafka(ogg)连接增加Oracle的连接信息。 配置完成后,在CDL WebUI界面创建从Oracle(ogg)抓取数据到Hudi任务并启动即可收到心跳数据。
  • 配置opengauss到Hudi任务的心跳表 在需要同步的opengauss数据库下执行以下命令创建一张心跳表,心跳表归属cdc_cdl Schema,表名为cdc_heartbeat,主键为cdl_job_id: DROP TABLE IF EXISTS cdc_cdl.cdc_heartbeat; CREATE TABLE cdc_cdl.cdc_heartbeat ( cdl_job_id int8 NOT NULL, cdl_last_heartbeat timestamp(6) ); ALTER TABLE cdc_cdl.cdc_heartbeat ADD CONSTRAINT cdc_heartbeat_pkey PRIMARY KEY (cdl_job_id); 将该心跳表加入到DRS任务,以确保心跳表数据正常发送到DRS Kafka。 在CDL WebUI界面配置opengauss的thirdparty-kafka连接时增加opengauss的连接信息,如果opengauss部署为一主多备模式,需在“host”填写所有的IP。 配置完成之后,在CDL WebUI界面创建从thirdparty-kafka抓取数据到Hudi的任务并启动即可收到心跳数据。
  • 前提条件 已获取待连接数据库对应的驱动Jar包。 仅数据源MySQL、Oracle(MRS 3.3.0及之后版本支持)需要上传相应的驱动,驱动对应的版本号如表1所示,且驱动需要在MySQL或Oracle官网下载。 表1 MySQL、Oracle数据源支持的驱动 数据源 支持的驱动包 MySQL mysql-connector-java-8.0.24.jar Oracle(MRS 3.3.0及之后版本支持) ojdbc8-12.2.0.1.jar 此处Oracle仅作为ThirdKafka数据源使用。 开启Kerberos认证的集群需已参考CDL用户权限管理创建具有CDL管理操作权限的用户。