云服务器内容精选

  • 修订记录 发布日期 修订记录 2024-05-10 弹性资源池概述,新增弹性资源池扩缩容约束限制。 2024-04-17 修改导出作业SQL作业结果,介绍导出作业结果到指定作业桶的方法。 2024-04-03 新增SQL防御,介绍SQL防御规则配置方法。 2024-03-30 新增 DLI 委托权限设置,介绍DLI委托权限设置操作指导。 创建Spark作业,补充说明Spark3.3.x及以上版本仅支持选择OBS路径下的程序包。 2024-03-15 新增 标签管理、队列标签管理、弹性资源池标签管理、库表管理标签管理等标签配置相关内容,新增标签策略使用约束限制。 2024-03-08 新增 添加路由,补充关于路由IP的网段说明。 队列属性设置、弹性资源池添加队列补充关于队列的最小CUs小于16CUs时,队列属性相关约束限制说明。 2024-01-30 新增 SQL作业管理,补充说明DLI仅支持从作业桶读取查询结果。 2024-01-17 新增 内置依赖包,新增3.1.1版本依赖包信息。 2024-01-09 修改 数据湖探索 监控指标说明及查看指导,新增弹性资源池相关监控指标。 2023-12-11 修改 全局变量,删除敏感变量相关信息,DLI不再支持新建敏感变量。 2023-12-07 修改 内置依赖包,删除AI镜像安装依赖包信息。 2023-11-14 优化 SQL作业管理、Spark作业管理概述删除导出日志功能的相关描述,SQL作业和Spark作业不再支持导出日志功能。 2023-11-07 优化 跨源认证概述,修改跨源认证的简介,推荐用户使用跨源认证。 增强型跨源连接概述,补充说明跨源场景推荐使用跨源认证存储跨源凭证。 经典型跨源连接(废弃,不推荐使用),补充说明不推荐使用经典型跨源连接。 2023-11-02 优化 创建Spark作业,补充关于资源规格的说明,解释说明driver对队列资源的占用。 2023-08-28 新增 分配队列至项目。 队列属性设置。 弹性资源池添加队列、弹性资源池关联队列补充说明弹性资源池支持仅添加同一企业项目的队列资源。 下线“创建Flink SQL边缘作业”、“边缘鉴权码管理”。 2023-08-14 新增 SQL作业管理,SQL作业详情支持查看“CPU累计使用量”和“输出字节”。 2023-08-02 新增 全局配置,修改全局变量中关于敏感变量的说明。 新增全局变量权限管理。 修改 SQL编辑器,修改SQL作业编辑窗口说明内容。 SQL作业管理,修改作业管理参数表的内容。 使用场景一:创建弹性资源池并运行作业,修改SQL队列类型的相关说明。 弹性资源池添加队列,修改弹性资源池添加队列基础配置关于执行引擎的说明。 2023-07-31 新增 下载基础镜像,新增Flink 1.12镜像下载地址。 2023-07-25 修改 队列弹性扩缩容新增约束限制:16CUs队列不支持扩容。 2023-06-30 优化 增强型跨源连接(推荐)。 跨源认证。 2023-06-16 新增 下载基础镜像,新增Spark 3.1.1镜像下载地址。 2023-05-05 新增 开启Flink作业动态扩缩容。 设置作业优先级。 2023-04-17 修改 队列标签管理,修改其中的标签值和键的说明内容。 2023-03-03 新增Spark参数(--conf)配置跨源作业的依赖模块说明表。 2023-03-02 新增库表管理标签管理章节。 2023-02-28 优化弹性资源池队列管理章节。 2023-02-23 修改 下线调试Flink作业相关内容。 操作Flink作业,修改运行时配置中的“异常自动重启”参数描述,Flink SQL作业和Flink OpenSource SQL作业不支持配置从Checkpoint恢复。 2023-01-10 手册结构调整,新增配额管理。 2022-11-04 弹性资源池CU设置,补充CU设置的常见问题。 2022-09-20 使用 自定义镜像 ,修改自定义镜像使用约束,当前仅支持Spark自定义镜像。 2022-08-22 数据库权限管理,补充“显示所有表”、“显示数据库”的权限说明。 Flink作业权限管理,补充查看作业详情的说明。 2022-08-19 云审计 服务支持的DLI操作列表说明,修改DLI审计事件名称。 2022-08-12 内置依赖包,删除Python 3.7.5 依赖包相关内容。 2022-07-19 新增弹性资源池关联队列。 新增(推荐)创建Flink OpenSource SQL作业。 使用场景二:配置弹性资源池队列扩缩容策略,补充弹性资源池扩缩容约束限制。 弹性资源池CU设置,补充弹性资源池调整CU的约束限制。
  • 约束与限制 16CUs队列不支持扩容和缩容。 64CUs队列不支持缩容。 目前只支持计费模式为“按需/CU时”和“按需/专属资源模式”的队列进行弹性扩缩容。 如果在“弹性扩缩容”页面提示“Status of queue xxx is assigning, which is not available”,表示需要等待队列资源分配完毕才可进行扩缩容。 队列资源扩容时,可能会由于物理资源不足导致队列资源无法扩容到设定的目标大小。 队列资源缩容时,系统不保证将队列资源完全缩容到设定的目标大小。通常队列资源缩容时,系统会先检查资源使用情况,判断是否存在缩容空间,如果现有资源无法按照最小缩容步长执行缩容任务,则队列可能缩容不成功,或缩容一部分规格的情况。 因资源规格不同可能有不同的缩容步长,通常是16CUs、32CUs、48CUs、64CUs等。 示例:队列大小为48CUs,执行作业占用了18CUs,剩余30CUs不满足该32CUs步长缩容的要求,如果执行缩容任务,则缩容失败。
  • 约束与限制 不支持切换弹性资源池的计费模式。 弹性资源池不支持切换区域。 按需计费的弹性资源池默认勾选专属资源模式,自创建起按自然小时收费。 Flink 1.10及其以上版本的作业支持在弹性资源池运行。 弹性资源池网段设置后不支持更改。 弹性资源池关联队列: 仅支持关联按需计费模式的队列(包括专属队列)。 队列和弹性资源池状态正常,资源未被冻结。 当前仅支持包年包月计费模式的弹性资源池进行规格变更。 仅支持查看30天以内的弹性资源池扩缩容历史。 弹性资源池不支持访问公网。 如有访问公网诉求请您参考配置DLI 队列与公网网络联通。 弹性资源池CU设置、弹性资源池中添加/删除队列、修改弹性资源池中队列的扩缩容策略、系统自动触发弹性资源池扩缩容时都会引起弹性资源池CU的变化,部分情况下系统无法保证按计划扩容/缩容至目标CUs: 弹性资源池扩容时,可能会由于物理资源不足导致弹性资源池无法扩容到设定的目标大小。 弹性资源池缩容时,系统不保证将队列资源完全缩容到设定的目标大小。 在执行缩容任务时,系统会先检查资源使用情况,判断是否存在缩容空间,如果现有资源无法按照最小缩容步长执行缩容任务,则弹性资源池可能缩容不成功,或缩容一部分规格的情况。 因资源规格不同可能有不同的缩容步长,通常是16CUs、32CUs、48CUs、64CUs等。 示例:弹性资源池规格为192CUs,资源池中的队列执行作业占用了68CUs,计划缩容至64CUs。 执行缩容任务时,系统判断剩余124CUs,按64CUs的缩容步长执行缩容任务,剩余60CUs资源无法继续缩容,因此弹性资源池执行缩容任务后规格为128CUs。
  • 弹性资源池架构和优势 弹性资源池后端采用CCE集群的架构,支持异构,对资源进行统一的管理和调度。 图3 弹性资源池架构图 弹性资源池的优势主要体现在以下几个方面: 统一资源管理 统一管理内部多集群和调度作业,规模可以到百万核级别。 多AZ部署,支持跨AZ高可用。(暂未实现,后续版本支持) 租户资源隔离 不同队列之间资源隔离,减少队列之间的相互影响。 分时按需弹性 秒级动态扩容,从容应对流量洪峰和资源诉求。 支持分时设置队列优先级和配额,提高资源利用率。 作业级资源隔离(暂未实现,后续版本支持) 支持独立Spark实例运行SQL作业,减少作业间相互影响。 自动弹性(暂未实现,后续版本支持) 基于队列负载和优先级实时自动更新队列配额。 弹性资源池解决方案主要解决了以下问题和挑战。 维度 原有队列,无弹性资源池时 弹性资源池 扩容时长 手工扩容时间长,扩容时长在分钟级别 不需要手工干预,秒级动态扩容。 资源利用率 不同队列之间资源不能共享。 例如:队列1当前还剩余10CU资源,队列2当前负载高需要扩容时,队列2不能使用队列1中的资源,只能单独对队列2进行扩容。 添加到同一个弹性资源池的多个队列,CU资源可以共享,达到资源的合理利用。 配置跨源时,必须为每个队列分配不重合的网段,占用大量VPC网段。 多队列通过弹性资源池统一进行网段划分,减少跨源配置的复杂度。 资源调配 多个队列同时扩容时不能设置优先级,在资源不够时,会导致部分队列扩容申请失败。 您可以根据当前业务波峰和波谷时间段,设置各队列在弹性资源池中的优先级,保证资源的合理调配。 弹性资源池主要包括如下基本操作: 创建弹性资源池 弹性资源池权限管理 弹性资源池添加队列 弹性资源池关联队列 弹性资源池队列管理 弹性资源池CU设置 弹性资源池规格变更 弹性资源池标签管理
  • 使用场景 场景一:固定资源造成资源浪费和资源不足的场景 在每天的不同时段,作业任务对资源的请求量也会发生变化,如果采用固定资源规格则会导致资源浪费或者资源不足的问题。例如,如下图图1示例可以看出: 大约在凌晨4点到7点这个数据段,ETL作业任务结束后没有其他作业,因为资源固定一直占用,导致严重的资源浪费。 上午9点到12点以及下午14点16点的两个时段,ETL报表和作业查询的请求量很高,因为当前固定资源不够,导致作业任务排队,任务一直排队。 图1 固定资源场景 场景二:资源相互隔离,没有共享,造成资源浪费的场景 某公司下有两个部门,两个部门的不同作业运行在DLI的两个队列上。部门A在上午8点到12点业务比较空闲,资源有剩余,部门B在这个时间段业务请求量大,原有资源规格满足不了,需要扩容时,请求不了部门A的队列资源,造成资源浪费。 图2 资源隔离造成的资源浪费 弹性资源池通过“分时按需弹性”功能,支持按照不同时间段对资源进行动态的扩缩容,保证资源的利用率和应对资源洪峰等诉求。 弹性资源池对后端资源统一进行管理和调度,多个队列绑定弹性资源池后,资源池内资源共享,资源利用率高,解决了场景二的问题。
  • 弹性资源池产品规格 DLI提供以下规格的计算资源,如表1所示。 表1 弹性资源池规格 类型 规格 约束限制 适用场景 基础版 16-64CUs规格 不支持高可靠与高可用。 不支持设置队列属性和作业优先级。 不支持对接Notebook实例。 更多弹性资源池约束限制参考约束与限制。 适用于对资源消耗不高、对资源高可靠性和高可用性要求不高的测试场景。 标准版 64CUs及以上规格 更多弹性资源池约束限制参考约束与限制。 具备强大的计算能力、高可用性、及灵活的资源管理能力,适用于大规模计算任务场景和有长期资源规划需求的业务场景。
  • DLI系统委托中没有我需要的委托怎么办? 当您在使用DLI过程中所需的委托没有包含在DLI系统委托dli_management_agency时,您需要在 IAM 页面创建相关委托,并在作业配置中添加新建的委托信息。 常见的需要自建委托的场景: DLI表生命周期清理数据及Lakehouse表数据清理所需的数据清理委托。需用户自行在IAM创建名为dli_data_clean_agency的DLI云服务委托并授权。该委托需新建后自定义权限,但委托名称固定为dli_data_clean_agency。 DLI Flink作业访问和使用OBS、日志转储(包括桶授权)、开启checkpoint、作业导入导出等,需要获得访问和使用OBS( 对象存储服务 )的Tenant Administrator权限。 DLI Flink作业所需的AKSK存储在 数据加密 服务DEW中,如需允许DLI在执行作业时访问DEW数据,需要新建委托将DEW数据操作权限委托给DLI,允许DLI服务以您的身份访问DEW服务。 允许DLI在执行作业时访问DLI Catalog元数据,需要新建委托将DLI Catelog数据操作权限委托给DLI,允许DLI服务以您的身份访问DLI Catalog元数据。 DLI Flink作业所需的云数据存储在LakeFormation中,如需允许DLI在执行作业时访问Catalog获取元数据,需要新建委托将Catelog数据操作权限委托给DLI,允许DLI服务以您的身份访问Catalog元数据。 新建委托时,请注意委托名称不可与系统默认委托重复,即不可以是dli_admin_agency、dli_management_agency、dli_data_clean_agency。 更多自定义委托的操作请参考自定义DLI委托权限和常见场景的委托权限策略。
  • 约束限制 DLI服务授权需要区分项目,请在每个需要新委托的项目分别执行更新委托操作,即切换至对应项目后,再按照本节的操作更新委托权限。 dli_management_agency包含的三个权限中: IAM ReadOnlyAccess授权范围是全局服务资源,授权范围不区分区域: 任意区域在更新DLI委托时选择了该权限,那么所有区域的项目都将生效。 任意项目在更新委托时未勾选该权限,代表回收该权限,那么所有区域的项目都将回收该权限,即所有项目无法正常获取IAM用户相关信息。 更新时请务必勾选IAM ReadOnlyAccess,因IAM ReadOnlyAccess授权范围是全局服务资源,如果去勾选该权限后更新委托,则所有区域和项目的IAM ReadOnlyAccess都将失效。 DLI Datasource Connections Agency Access、DLI Notification Agency Access授权范围是指定区域项目资源: 仅在勾选该权限且更新DLI委托权限后的项目生效。未勾选该权限的项目不具备跨源场景所需权限、和 SMN 发送通知消息的权限。 示例1和示例2给出了同一个区域的不同项目更新DLI委托带来的委托权限差异。
  • DLI委托 在使用DLI前,为了确保正常使用DLI的功能,建议先进行DLI委托权限设置。 DLI默认提供以下类型的委托:dli_admin_agency、dli_management_agency、dli_data_clean_agency(名称固定,权限需自定义)。其他场景需用户自定义委托。委托的详细说明请参考表1。 DLI为了满足细粒度的委托权限需求,升级了系统委托,将原有的dli_admin_agency升级为dli_management_agency,新的委托包含跨源操作、 消息通知 、用户授权操作所需的权限。更新委托权限。 使用Flink 1.15和Spark 3.3.1(Spark通用队列场景)及以上版本的引擎执行作业时,需完成以下配置操作: 需用户自行在IAM页面创建相关委托,并在作业配置中添加新建的委托信息。具体操作请参考自定义DLI委托权限。 常见新建委托场景:允许DLI读写OBS数据、日志转储、Flink checkopoint;允许DLI在访问DEW获取数据访问凭证、允许访问Catalog获取元数据等场景。 委托名称不可与系统默认委托重复,即不可以是dli_admin_agency、dli_management_agency、dli_data_clean_agency。 引擎版本低于Flink1.15,执行作业时默认使用dli_admin_agency;引擎版本低于Spark 3.3.1,执行作业时使用用户认证信息(AKSK、SecurityToken)。 即引擎版本低于Flink1.15和Spark 3.3.1版本的作业不受更新委托权限的影响,无需自定义委托。 为兼容存量的作业委托权限需求,dli_admin_agency仍为您保留在IAM委托中。 服务授权需要主账号或者用户组admin中的子账号进行操作。 请勿删除系统默认创建的委托。 表1 DLI委托 权限名 类型 权限说明 dli_admin_agency 系统默认委托 该委托已废弃,不推荐使用,请尽快更新委托升级至dli_management_agency。 更新委托请参考更新委托权限。 dli_management_agency 系统默认委托 DLI系统委托,用于将操作权限委托给DLI服务,让DLI服务以您的身份使用其他云服务,代替您进行一些资源运维工作。该委托包含跨源操作、消息通知、用户授权操作所需的权限。详细委托包含的权限请参考表2 dli_data_clean_agency 系统默认委托,需用户自行授权 数据清理委托,表生命周期清理数据及lakehouse表数据清理使用。需用户自行在IAM创建名为dli_data_clean_agency的DLI云服务委托并授权。 该委托需新建后自定义权限,但委托名称固定为dli_data_clean_agency。 委托的权限策略示例请参考常见场景的委托权限策略。 其他自定义委托 自定义委托 使用Flink 1.15和Spark 3.3及以上版本的引擎执行作业时,请自行在IAM页面创建相关委托,并在作业配置中添加新建的委托信息。自定义DLI委托权限 常见新建委托场景:允许DLI读写OBS将日志转储、允许DLI在访问DEW获取数据访问凭证、允许访问Catalog获取元数据等场景。 委托名称不可与系统默认委托重复,即不可以是dli_admin_agency、dli_management_agency、dli_data_clean_agency。 委托的权限策略示例请参考常见场景的委托权限策略。 表2 dli_management_agency委托包含的权限 权限名 权限说明 IAM ReadOnlyAccess DLI对未登录过DLI的用户进行授权时,需获取IAM用户相关信息。因此需要IAM ReadOnlyAccess权限。 DLI Datasource Connections Agency Access 访问和使用VPC、子网、路由、对等连接的权限 DLI Notification Agency Access 作业执行失败需要通过SMN发送通知消息的权限
  • 什么是委托? 华为云各服务之间存在业务交互关系,一些云服务需要与其他云服务协同工作,需要您创建云服务委托,将操作权限委托给DLI服务,让DLI服务以您的身份使用其他云服务,代替您进行一些资源运维工作。 例如:在DLI新建Flink作业所需的AKSK存储在数据加密服务DEW中,如需允许DLI在执行作业时访问DEW数据,需要提供IAM委托将DEW数据操作权限委托给DLI,允许DLI服务以您的身份访问DEW服务。 图1 DLI云服务委托
  • DLI 作业类型 DLI 提供了三种作业类型: SQL作业:SQL作业为用户提供标准的SQL,兼容Spark SQL、Presto SQL(基于Presto),通过可视化界面API、JDBC、ODBC、Beeline等多种接入方式对云上异构数据源进行查询分析,兼容 CS V、JSON、Parquet、Carbon、ORC等主流数据格式。 Flink作业:Flink作业是运行在公有云上的实时流式大数据分析服务作业,全托管的方式用户无需感知计算集群,只需聚焦于Stream SQL业务,即时执行作业,完全兼容Apache Flink API。 Spark作业:Spark作业可为用户提供全托管式的Spark计算服务。用户可通过可视化界面和RESTful API提交作业,支持提交Spark Core、DataSet、Streaming、MLlib、GraphX等Spark全栈作业。
  • 约束与限制 自定义委托名称不可与系统默认委托重复,即不可以是dli_admin_agency、dli_management_agency、dli_data_clean_agency。 允许DLI按表生命周期清理数据的委托名称必须为dli_data_clean_agency。 仅Flink 1.15和Spark 3.3.1(Spark通用队列场景)及以上版本的引擎执行作业支持配置自定义委托。 更新委托权限后,系统将升级您的dli_admin_agency为dli_management_agency,新的委托包含跨源操作、消息通知、用户授权操作所需的权限。除此之外的其他委托权限需求都需要用户自定义委托。了解dli_management_agency请参考DLI委托概述。 常见新建委托场景:允许DLI读写OBS数据、日志转储、Flink checkopoint;允许DLI在访问DEW获取数据访问凭证、允许访问Catalog获取元数据等场景。以上场景的委托权限请参考常见场景的委托权限策略。
  • 步骤1:在IAM控制台创建云服务委托并授权 登录管理控制台。 单击右上方登录的用户名,在下拉列表中选择“ 统一身份认证 ”。 在左侧导航栏中,单击“委托”。 在“委托”页面,单击“创建委托”。 在“创建委托”页面,设置如下参数: 委托名称:按需填写,例如“dli_obs_agency_access”。 委托类型:选择“云服务”。 云服务:(“委托类型”选择“云服务”时出现此参数项。)在下拉列表中选择“DLI"。 持续时间:选择“永久”。 描述:非必选,可以填写“拥有OBS OperateAccess权限的委托”。 图2 创建委托 配置完委托的基本信息后,单击“下一步”。 授予当前委托所需的权限策略,单击“新建策略”。 配置策略信息。 输入策略名称,本例:dli-obs-agency。 选择“JSON视图”。 在策略内容中粘贴自定义策略。 本例权限包含访问和使用OBS的权限,适用于以下场景:DLI Flink作业下载OBS对象、OBS/DWS数据源(外表)、日志转储、使用savepoint、开启Checkpoint。DLI Spark作业下载OBS对象、读写OBS外表。 更多Flink作业常见委托权限配置请参考常见场景的委托权限策略。 { "Version": "1.1", "Statement": [ { "Effect": "Allow", "Action": [ "obs:bucket:GetBucketPolicy", "obs:bucket:GetLifecycleConfiguration", "obs:bucket:GetBucketLocation", "obs:bucket:ListBucketMultipartUploads", "obs:bucket:GetBucketLogging", "obs:object:GetObjectVersion", "obs:bucket:GetBucketStorage", "obs:bucket:GetBucketVersioning", "obs:object:GetObject", "obs:object:GetObjectVersionAcl", "obs:object:DeleteObject", "obs:object:ListMultipartUploadParts", "obs:bucket:HeadBucket", "obs:bucket:GetBucketAcl", "obs:bucket:GetBucketStoragePolicy", "obs:object:AbortMultipartUpload", "obs:object:DeleteObjectVersion", "obs:object:GetObjectAcl", "obs:bucket:ListBucketVersions", "obs:bucket:ListBucket", "obs:object:PutObject" ], "Resource": [ "OBS:*:*:bucket:bucketName",//请替换bucketName为对应的桶名称 "OBS:*:*:object:*" ] }, { "Effect": "Allow", "Action": [ "obs:bucket:ListAllMyBuckets" ] } ] } 按需输入策略描述。 新建策略完成后,单击“下一步”,返回委托授权页面。 选择步骤8新建的自定义策略。 图3 选择自定义策略 单击“下一步”,选择委托的授权范围。 了解更多授权操作说明请参考创建用户组并授权。 所有资源:授权后,IAM用户可以根据权限使用账号中所有资源,包括企业项目、区域项目和全局服务资源。 全局服务资源:全局服务部署时不区分区域,访问全局级服务,不需要切换区域,全局服务不支持基于区域项目授权。如对象存储服务(OBS)、内容分发网络(CDN)等。授权后,用户根据权限使用全局服务的资源。 指定区域项目资源:授权后,IAM用户根据权限使用所选区域项目中的资源,未选择的区域项目中的资源,该IAM用户将无权访问。 指定企业项目资源:授权后,IAM用户根据权限使用所选企业项目中的资源。如企业项目A包含资源B,资源B部署在北京四和上海二,IAM用户所在用户组关联企业项目A后,北京四和上海二的资源B用户都可访问,不在企业项目A内的其他资源,该IAM用户将无权访问。 本例自定义策略中是OBS权限,因此选择全局服务资源。如果使用的是DLI权限,推荐选择“指定区域项目资源”。 单击“确定”,完成授权。 授权后需等待15-30分钟才可生效。
  • DLI自定义委托场景 表1 DLI自定义委托场景 场景 委托名称 适用场景 权限策略 允许DLI按表生命周期清理数据 dli_data_clean_agency 数据清理委托,表生命周期清理数据、Lakehouse表数据清理使用。 该委托需新建后自定义权限,但委托名称固定为dli_data_clean_agency。 数据清理委托权限配置 允许DLI读写OBS将日志转储 自定义 DLI Flink作业下载OBS对象、OBS/DWS数据源(外表)、日志转储、使用savepoint、开启checkpoint,DLI Spark作业下载OBS对象、读写OBS外表。 访问和使用OBS的权限策略 允许DLI在访问DEW获取数据访问凭证 自定义 DLI 作业使用DEW-C SMS 凭证管理能力。 使用DEW加密功能的权限 允许访问DLI Catalog元数据 自定义 DLI 访问DLI元数据。 访问DLI Catalog元数据的权限 允许访问LakeFormation Catalog元数据 自定义 DLI 访问LakeFormation元数据。 访问LakeFormation Catalog元数据的权限
  • 示例 该示例展示了一个经典的业务流水线,维度表来自 Hive,每天通过批处理流水线作业或 Flink 作业更新一次,kafka流来自实时在线业务数据或日志,需要与维度表联接以扩充流。 使用spark sql 创建 hive obs 外表,并插入数据。 CREATE TABLE if not exists dimension_hive_table ( product_id STRING, product_name STRING, unit_price DECIMAL(10, 4), pv_count BIGINT, like_count BIGINT, comment_count BIGINT, update_time TIMESTAMP, update_user STRING ) STORED AS PARQUET LOCATION 'obs://demo/spark.db/dimension_hive_table' PARTITIONED BY ( create_time STRING ); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_11', 'product_name_11', 1.2345, 100, 50, 20, '2023-11-25 02:10:58', 'update_user_1'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_12', 'product_name_12', 2.3456, 200, 100, 40, '2023-11-25 02:10:58', 'update_user_2'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_13', 'product_name_13', 3.4567, 300, 150, 60, '2023-11-25 02:10:58', 'update_user_3'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_14', 'product_name_14', 4.5678, 400, 200, 80, '2023-11-25 02:10:58', 'update_user_4'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_15', 'product_name_15', 5.6789, 500, 250, 100, '2023-11-25 02:10:58', 'update_user_5'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_16', 'product_name_16', 6.7890, 600, 300, 120, '2023-11-25 02:10:58', 'update_user_6'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_17', 'product_name_17', 7.8901, 700, 350, 140, '2023-11-25 02:10:58', 'update_user_7'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_18', 'product_name_18', 8.9012, 800, 400, 160, '2023-11-25 02:10:58', 'update_user_8'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_19', 'product_name_19', 9.0123, 900, 450, 180, '2023-11-25 02:10:58', 'update_user_9'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_10', 'product_name_10', 10.1234, 1000, 500, 200, '2023-11-25 02:10:58', 'update_user_10'); 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,并提交运行。该作业模拟从kafka读取数据,并关联hive维表对数据进行打宽,并输出到print。 如下脚本中的加粗参数请根据实际环境修改。 CREATE CATA LOG myhive WITH ( 'type' = 'hive' , 'default-database' = 'demo', 'hive-conf-dir' = '/opt/flink/conf' ); USE CATALOG myhive; CREATE TABLE if not exists ordersSource ( product_id STRING, user_name string, proctime as Proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'TOPIC', 'properties.bootstrap.servers' = 'KafkaIP:PROT,KafkaIP:PROT,KafkaIP:PROT', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); create table if not exists print ( product_id STRING, user_name string, product_name STRING, unit_price DECIMAL(10, 4), pv_count BIGINT, like_count BIGINT, comment_count BIGINT, update_time TIMESTAMP, update_user STRING, create_time STRING ) with ( 'connector' = 'print' ); insert into print select orders.product_id, orders.user_name, dim.product_name, dim.unit_price, dim.pv_count, dim.like_count, dim.comment_count, dim.update_time, dim.update_user, dim.create_time from ordersSource orders left join dimension_hive_table /*+ OPTIONS('lookup.join.cache.ttl'='60 m') */ for system_time as of orders.proctime as dim on orders.product_id = dim.product_id; 连接Kafka集群,向Kafka的source topic中插入如下测试数据: {"product_id": "product_id_11", "user_name": "name11"} {"product_id": "product_id_12", "user_name": "name12"} 查看print结果表数据。 +I[product_id_11, name11, product_name_11, 1.2345, 100, 50, 20, 2023-11-24T18:10:58, update_user_1, create_time_1] +I[product_id_12, name12, product_name_12, 2.3456, 200, 100, 40, 2023-11-24T18:10:58, update_user_2, create_time_1]