华为云用户手册

  • 操作步骤 创建工作流,请参考使用Hue创建工作流。 在工作流编辑页面,选择“Java 程序”按钮,将其拖到操作区中。 在弹出的“Java program”窗口中配置“Jar name”的值,例如“/user/admin/examples/apps/java-main/lib/oozie-examples-5.1.0.jar”。配置“Main class”的值,例如“org.apache.oozie.example.DemoJavaMain”。然后单击“添加”。 单击Oozie编辑器右上角的。 保存前如果需要修改作业名称(默认为“My Workflow”),可以直接单击该名称进行修改,例如“Java-Workflow”。 保存完成后,单击,提交该作业。 作业提交后,可通过Hue界面查看作业的详细信息、日志、进度等相关内容。
  • 单表并发控制配置 表6 单表并发控制参数配置 参数 描述 默认值 hoodie.write.lock.provider 指定lock provider,不建议使用默认值,使用org.apache.hudi.hive.HiveMetastoreBasedLockProvider。 org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider hoodie.write.lock.hivemetastore.database Hive的database。 无 hoodie.write.lock.hivemetastore.table Hive的table name。 无 hoodie.write.lock.client.num_retries 重试次数。 10 hoodie.write.lock.client.wait_time_ms_between_retry 重试间隔。 10000 hoodie.write.lock.conflict.resolution.strategy lock provider类,必须是ConflictResolutionStrategy的子类。 org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy hoodie.write.lock.zookeeper.base_path 存放ZNodes的路径,同一张表的并发写入需配置一致。 无 hoodie.write.lock.zookeeper.lock_key ZNode的名称,建议与Hudi表名相同。 无 hoodie.write.lock.zookeeper.connection_timeout_ms ZooKeeper连接超时时间。 15000 hoodie.write.lock.zookeeper.port ZooKeeper端口号。 无 hoodie.write.lock.zookeeper.url ZooKeeper的url。 无 hoodie.write.lock.zookeeper.session_timeout_ms ZooKeeper的session过期时间。 60000
  • compaction&cleaning配置 表5 compaction&cleaning参数配置 参数 描述 默认值 hoodie.clean.automatic 是否执行自动clean。 true hoodie.cleaner.policy 要使用的清理策略。Hudi将删除旧版本的parquet文件以回收空间。 任何引用此版本文件的查询和计算都将失败。建议确保数据保留的时间超过最大查询执行时间。 KEEP_LATEST_COMMITS hoodie.cleaner.commits.retained 保留的提交数。因此,数据将保留为num_of_commits * time_between_commits(计划的),这也直接转化为逐步提取此数据集的数量。 10 hoodie.keep.max.commits 触发归档操作的commit数阈值。 30 hoodie.keep.min.commits 归档操作保留的commit数。 20 hoodie.commits.archival.batch 这控制着批量读取并一起归档的提交即时的数量。 10 hoodie.parquet.small.file.limit 该值应小于maxFileSize,如果将其设置为0,会关闭此功能。由于批处理中分区中插入记录的数量众多,总会出现小文件。Hudi提供了一个选项,可以通过将对该分区中的插入作为对现有小文件的更新来解决小文件的问题。此处的大小是被视为“小文件大小”的最小文件大小。 104857600 byte hoodie.copyonwrite.insert.split.size 插入写入并行度。为单个分区的总共插入次数。写出100MB的文件,至少1KB大小的记录,意味着每个文件有100K记录。默认值是超额配置为500K。 为了改善插入延迟,请对其进行调整以匹配单个文件中的记录数。将此值设置为较小的值将导致文件变小(尤其是当compactionSmallFileSize为0时)。 500000 hoodie.copyonwrite.insert.auto.split Hudi是否应该基于最后24个提交的元数据动态计算insertSplitSize,默认关闭。 true hoodie.copyonwrite.record.size.estimate 平均记录大小。如果指定,Hudi将使用它,并且不会基于最后24个提交的元数据动态地计算。 没有默认值设置。这对于计算插入并行度以及将插入打包到小文件中至关重要。 1024 hoodie.compact.inline 当设置为true时,紧接在插入或插入更新或批量插入的提交或增量提交操作之后由摄取本身触发压缩。 true hoodie.compact.inline.max.delta.commits 触发内联压缩之前要保留的最大增量提交数。 5 hoodie.compaction.lazy.block.read 当CompactedLogScanner合并所有日志文件时,此配置有助于选择是否应延迟读取日志块。选择true以使用I/O密集型延迟块读取(低内存使用),或者为false来使用内存密集型立即块读取(高内存使用)。 true hoodie.compaction.reverse.log.read HoodieLogFormatReader会从pos=0到pos=file_length向前读取日志文件。如果此配置设置为true,则Reader会从pos=file_length到pos=0反向读取日志文件。 false hoodie.cleaner.parallelism 如果清理变慢,请增加此值。 200 hoodie.compaction.strategy 用来决定在每次压缩运行期间选择要压缩的文件组的压缩策略。默认情况下,Hudi选择具有累积最多未合并数据的日志文件。 org.apache.hudi.table.action.compact.strategy. LogFileSizeBasedCompactionStrategy hoodie.compaction.target.io LogFileSizeBasedCompactionStrategy的压缩运行期间要花费的MB量。当压缩以内联模式运行时,此值有助于限制摄取延迟。 500 * 1024 MB hoodie.compaction.daybased.target.partitions 由org.apache.hudi.io.compact.strategy.DayBasedCompactionStrategy使用,表示在压缩运行期间要压缩的最新分区数。 10 hoodie.compaction.payload.class 这需要与插入/插入更新过程中使用的类相同。就像写入一样,压缩也使用记录有效负载类将日志中的记录彼此合并,再次与基本文件合并,并生成压缩后要写入的最终记录。 org.apache.hudi.common.model.Defaulthoodierecordpayload hoodie.schedule.compact.only.inline 在写入操作时,是否只生成压缩计划。在hoodie.compact.inline=true时有效。 false hoodie.run.compact.only.inline 通过Sql执行run compaction命令时,是否只执行压缩操作,压缩计划不存在时直接退出。 false
  • Clustering配置 本章节内容仅适用于 MRS 3.2.0及之后版本。 Clustering中有两个策略分别是hoodie.clustering.plan.strategy.class和hoodie.clustering.execution.strategy.class。一般情况下指定plan.strategy为SparkRecentDaysClusteringPlanStrategy或者SparkSizeBasedClusteringPlanStrategy时,execution.strategy不需要指定。但当plan.strategy为SparkSingleFileSortPlanStrategy时,需要指定execution.strategy为SparkSingleFileSortExecutionStrategy。 表7 Clustering参数配置 参数 描述 默认值 hoodie.clustering.inline 是否同步执行clustering。 false hoodie.clustering.inline.max.commits 触发clustering的commit数。 4 hoodie.clustering.async.enabled 是否启用异步执行clustering。 说明: 此参数仅适用于MRS 3.3.0-LTS及之后版本。 false hoodie.clustering.async.max.commits 异步执行时触发clustering的commit数。 说明: 此参数仅适用于MRS 3.3.0-LTS及之后版本。 4 hoodie.clustering.plan.strategy.target.file.max.bytes 指定clustering后每个文件大小最大值。 1024 * 1024 * 1024 byte hoodie.clustering.plan.strategy.small.file.limit 小于该大小的文件会被clustering。 300 * 1024 * 1024 byte hoodie.clustering.plan.strategy.sort.columns clustering用以排序的列。 无 hoodie.layout.optimize.strategy Clustering执行策略,可选linear、z-order、hilbert三种排序方式。 linear hoodie.layout.optimize.enable 使用z-order、hilbert时需要开启。 false hoodie.clustering.plan.strategy.class 筛选FileGroup进行clustering的策略类,默认筛选小于hoodie.clustering.plan.strategy.small.file.limit阈值的文件。 org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy hoodie.clustering.execution.strategy.class 执行clustering的策略类(RunClusteringStrategy的子类),用以定义群集计划的执行方式。 默认类们按指定的列对计划中的文件组进行排序,同时满足配置的目标文件大小。 org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy hoodie.clustering.plan.strategy.max.num.groups 设置执行clustering时最多选择多少个FileGroup,该值越大并发度越大。 30 hoodie.clustering.plan.strategy.max.bytes.per.group 设置执行clustering时每个FileGroup最多有多少数据参与clustering。 2 * 1024 * 1024 * 1024 byte
  • 存储配置 表4 存储参数配置 参数 描述 默认值 hoodie.parquet.max.file.size Hudi写阶段生成的parquet文件的目标大小。对于DFS,这需要与基础文件系统块大小保持一致,以实现最佳性能。 120 * 1024 * 1024 byte hoodie.parquet.block.size parquet页面大小,页面是parquet文件中的读取单位,在一个块内,页面被分别压缩。 120 * 1024 * 1024 byte hoodie.parquet.compression.ratio 当Hudi尝试调整新parquet文件的大小时,预期对parquet数据进行压缩的比例。 如果bulk_insert生成的文件小于预期大小,请增加此值。 0.1 hoodie.parquet.compression.codec parquet压缩编解码方式名称,默认值为gzip。可能的选项是[gzip | snappy | uncompressed | lzo] snappy hoodie.logfile.max.size LogFile的最大值。这是在将日志文件移到下一个版本之前允许的最大值。 1GB hoodie.logfile.data.block.max.size LogFile数据块的最大值。这是允许将单个数据块附加到日志文件的最大值。 这有助于确保附加到日志文件的数据被分解为可调整大小的块,以防止发生OOM错误。此大小应大于JVM内存。 256MB hoodie.logfile.to.parquet.compression.ratio 随着记录从日志文件移动到parquet,预期会进行额外压缩的比例。 用于merge_on_read存储,以将插入内容发送到日志文件中并控制压缩parquet文件的大小。 0.35
  • 写入操作配置 表1 写入操作重要配置项 参数 描述 默认值 hoodie.datasource.write.table.name 指定写入的Hudi表名。 无 hoodie.datasource.write.operation 写Hudi表指定的操作类型,当前支持upsert、delete、insert、bulk_insert等方式。 upsert:更新插入混合操作。 delete:删除操作。 insert:插入操作。 bulk_insert: 用于初始建表导入数据, 注意初始建表禁止使用upsert、insert方式。 insert_overwrite:对静态分区执行insert overwrite。 insert_overwrite_table:动态分区执行insert overwrite,该操作并不会立刻删除全表做overwrite,会逻辑上重写hudi表的元数据,无用数据后续由hudi的clean机制清理。效率比bulk_insert加overwrite高。 upsert hoodie.datasource.write.table.type 指定Hudi表类型,一旦这个表类型被指定,后续禁止修改该参数,可选值MERGE_ON_READ。 COPY_ON_WRITE hoodie.datasource.write.precombine.field 该值用于在写之前对具有相同的key的行进行合并去重。 指定为具体的表字段 hoodie.datasource.write.payload.class 在更新过程中,该类用于提供方法将要更新的记录和更新的记录做合并,该实现可插拔,如要实现自己的合并逻辑,可自行编写。 org.apache.hudi.common.model.DefaultHoodieRecordPayload hoodie.datasource.write.recordkey.field 用于指定Hudi的主键,Hudi表要求有唯一主键。 指定为具体的表字段 hoodie.datasource.write.partitionpath.field 用于指定分区键,该值配合hoodie.datasource.write.keygenerator.class使用可以满足不同的分区场景。 无 hoodie.datasource.write.hive_style_partitioning 用于指定分区方式是否和Hive保持一致,建议该值设置为true。 true hoodie.datasource.write.keygenerator.class 配合hoodie.datasource.write.partitionpath.field,hoodie.datasource.write.recordkey.field产生主键和分区方式。 说明: 写入设置KeyGenerator与表保存的参数值不一致时将提示需要保持一致。 org.apache.hudi.keygen.ComplexKeyGenerator
  • index相关配置 表3 index相关参数配置 参数 描述 默认值 hoodie.index.class 用户自定义索引的全路径名,索引类必须为HoodieIndex的子类,当指定该配置时,其会优先于hoodie.index.type配置。 "" hoodie.index.type 使用的索引类型,默认为布隆过滤器。可能的选项是[BLOOM | HBASE | GLOBAL_BLOOM | SIMPLE | GLOBAL_SIMPLE] 。 布隆过滤器消除了对外部系统的依赖,并存储在Parquet数据文件的页脚中。 BLOOM hoodie.index.bloom.num_entries 存储在布隆过滤器中的条目数。 假设maxParquetFileSize为128MB,averageRecordSize为1024B,因此,一个文件中的记录总数约为130K。 默认值(60000)大约是此近似值的一半。 注意: 将此值设置的太低,将产生很多误报,并且索引查找将必须扫描比其所需的更多的文件;如果将其设置的非常高,将线性增加每个数据文件的大小(每50000个条目大约4KB)。 60000 hoodie.index.bloom.fpp 根据条目数允许的错误率。 用于计算应为布隆过滤器分配多少位以及哈希函数的数量。通常将此值设置的很低(默认值0.000000001),在磁盘空间上进行权衡以降低误报率。 0.000000001 hoodie.bloom.index.parallelism 索引查找的并行度,其中涉及Spark Shuffle。 默认情况下,根据输入的工作负载特征自动计算的。 0 hoodie.bloom.index.prune.by.ranges 为true时,从文件框定信息,可以加快索引查找的速度。 如果键具有单调递增的前缀,例如时间戳,则特别有用。 true hoodie.bloom.index.use.caching 为true时,将通过减少用于计算并行度或受影响分区的IO来缓存输入的RDD以加快索引查找。 true hoodie.bloom.index.use.treebased.filter 为true时,启用基于间隔树的文件过滤优化。与暴力模式相比,此模式可根据键范围加快文件过滤速度。 true hoodie.bloom.index.bucketized.checking 为true时,启用了桶式布隆过滤。这减少了在基于排序的布隆索引查找中看到的偏差。 true hoodie.bloom.index.keys.per.bucket 仅在启用bloomIndexBucketizedChecking并且索引类型为bloom的情况下适用。 此配置控制“存储桶”的大小,该大小可跟踪对单个文件进行的记录键检查的次数,并且是分配给执行布隆过滤器查找的每个分区的工作单位。 较高的值将分摊将布隆过滤器读取到内存的固定成本。 10000000 hoodie.bloom.index.update.partition.path 仅在索引类型为GLOBAL_BLOOM时适用。 为true时,当对一个已有记录执行包含分区路径的更新操作时,将会导致把新记录插入到新分区,而把原有记录从旧分区里删除。为false时,只对旧分区的原有记录进行更新。 true hoodie.index.hbase.zkquorum 仅在索引类型为HBase时适用,必填选项。要连接的HBase ZK Quorum URL。 无 hoodie.index.hbase.zkport 仅在索引类型为HBase时适用,必填选项。要连接的HBase ZK Quorum端口。 无 hoodie.index.hbase.zknode.path 仅在索引类型为HBase时适用,必填选项。这是根znode,它将包含HBase创建及使用的所有znode。 无 hoodie.index.hbase.table 仅在索引类型为HBase时适用,必填选项。HBase表名称,用作索引。Hudi将row_key和[partition_path, fileID, commitTime]映射存储在表中。 无
  • 同步Hive表配置 表2 同步Hive表参数配置 参数 描述 默认值 hoodie.datasource.hive_sync.enable 是否同步Hudi表信息到Hive Metastore。 注意: 建议该值设置为true,统一使用Hive管理Hudi表。 false hoodie.datasource.hive_sync.database 要同步给Hive的数据库名。 default hoodie.datasource.hive_sync.table 要同步给Hive的表名,建议这个值和hoodie.datasource.write.table.name保证一致。 unknown hoodie.datasource.hive_sync.username 同步Hive时,指定的用户名。 hive hoodie.datasource.hive_sync.password 同步Hive时,指定的密码。 hive hoodie.datasource.hive_sync.jdbcurl 连接Hive JDBC指定的连接。 "" hoodie.datasource.hive_sync.use_jdbc 是否使用Hive JDBC方式连接Hive同步Hudi表信息。 建议该值设置为false,设置为false后JDBC连接相关配置无效。 true hoodie.datasource.hive_sync.partition_fields 用于决定Hive分区列。 "" hoodie.datasource.hive_sync.partition_extractor_class 用于提取Hudi分区列值,将其转换成Hive分区列。 org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor hoodie.datasource.hive_sync.support_timestamp 当hudi表存在timestamp类型字段时,需指定此参数为true,以实现同步timestamp类型到hive元数据中。 该值默认为false,默认将timestamp类型同步为bigInt,默认情况可能导致使用sql查询包含timestamp类型字段的hudi表出现错误。 true
  • 操作步骤 创建工作流,请参考使用Hue创建工作流。 在工作流编辑页面,选择“数据流”按钮,将其拖到操作区中。 在弹出的“Streaming”窗口中配置“Mapper”的值,例如“/bin/cat”。配置“Reducer”的值,例如“/usr/bin/wc”。然后单击“添加”。 单击“文件+”,添加运行所需的文件。 例如“/user/oozie/share/lib/mapreduce-streaming/hadoop-streaming-xxx.jar”和“/user/oozie/share/lib/mapreduce-streaming/oozie-sharelib-streaming-5.1.0.jar”。 单击右上角的配置按钮。在打开的配置界面中,单击“删除+”,添加删除目录,例如“/user/admin/examples/output-data/streaming_workflow”。 单击“属性+”,添加下列属性。 左边框填写属性名称“mapred.input.dir”,右边框填写属性值“/user/admin/examples/input-data/text”。 左边框填写属性名称“mapred.output.dir”,右边框填写属性值“/user/admin/examples/output-data/streaming_workflow”。 单击Oozie编辑器右上角的。 保存前如果需要修改作业名称(默认为“My Workflow”),可以直接单击该名称进行修改,例如“Streaming-Workflow”。 保存完成后,单击,提交该作业。 作业提交后,可通过Hue界面查看作业的详细信息、日志、进度等相关内容。
  • 在安全区中配置权限策略 使用Ranger安全区管理员用户登录Ranger管理页面。 在Ranger首页右上角的“Security Zone”选项的下拉列表中选择对应的安全区,即可切换至该安全区内的权限视图。 单击组件名称下的权限插件名称,即可进入组件安全访问策略列表页面。 各组件的策略列表中,系统默认生成的条目会自动继承至安全区内,用于保证集群内的部分系统默认用户或用户组的权限。 单击“Add New Policy”,根据业务场景规划配置相关用户或者用户组的资源访问策略。 例如在本章节样例中,在安全区内配置一条允许“test”用户访问“/testzone/test”目录的策略: 其他不同组件的完整访问策略配置样例参考: 添加CDL的Ranger访问权限策略 添加HDFS的Ranger访问权限策略 添加HBase的Ranger访问权限策略 添加Hive的Ranger访问权限策略 添加Yarn的Ranger访问权限策略 添加Spark2x的Ranger访问权限策略 添加Kafka的Ranger访问权限策略 添加HetuEngine的Ranger访问权限策略 策略添加后,需等待30秒左右,待系统生效。 安全区中定义的策略仅适用于区域中的资源,服务的资源被划分到安全区后,非安全区针对该资源的访问权限策略将不再生效。 如需配置针对当前安全区之外其他资源的访问策略,需在Ranger首页右上角的“Security Zone”选项中退出当前安全区后进行配置。
  • 添加安全区 使用Ranger管理员用户rangeradmin登录Ranger管理页面,具体操作可参考登录Ranger WebUI界面。 单击“Security Zone”,在区域列表页面中单击,添加安全区。 表1 安全区配置参数 参数名称 描述 示例 Zone Name 配置安全区的名称。 test Zone Description 配置安全区的描述信息。 - Admin Users/Admin Usergroups 配置安全区的管理用户/用户组,可在安全区中添加及修改相关资源的权限策略。 必须至少配置一个用户或用户组。 zone_admin Auditor Users/ Auditor Usergroups 添加审计用户/用户组,可在安全区中查看相关资源权限策略内容。 必须至少配置一个用户或用户组。 zone_user Select Tag Services 选择服务的标签信息。 - Select Resource Services 选择安全区内包含的服务及具体资源。 在“Select Resource Services”中选择服务后,需要在“Resource”列中添加具体的资源对象,例如HDFS服务器的文件目录、Yarn的队列、Hive的数据库及表、HBase的表及列。 /testzone 例如针对HDFS中的“/testzone”目录创建一个安全区,配置如下: 单击“Save”,等待安全区添加成功。 Ranger管理员可在“Security Zone”页面查看当前的所有安全区并单击“Edit”修改安全区的属性信息,当相关资源不需要在安全区中进行管理时,可单击“Delete”删除对应安全区。
  • Clustering架构 Hudi通过其写入客户端API提供了不同的操作,如insert/upsert/bulk_insert来将数据写入Hudi表。为了能够在文件大小和入湖速度之间进行权衡,Hudi提供了一个hoodie.parquet.small.file.limit配置来设置最小文件大小。用户可以将该配置设置为“0”,以强制新数据写入新的文件组,或设置为更高的值以确保新数据被“填充”到现有小的文件组中,直到达到指定大小为止,但其会增加摄取延迟。 为能够支持快速摄取的同时不影响查询性能,引入了Clustering服务来重写数据以优化Hudi 数据湖 文件的布局。 Clustering服务可以异步或同步运行,Clustering会添加了一种新的REPLACE操作类型,该操作类型将在Hudi元数据时间轴中标记Clustering操作。 Clustering服务基于Hudi的MVCC设计,允许继续插入新数据,而Clustering操作在后台运行以重新格式化数据布局,从而确保并发读写者之间的快照隔离。 总体而言Clustering分为两个部分: 调度Clustering:使用可插拔的Clustering策略创建Clustering计划。 识别符合Clustering条件的文件:根据所选的Clustering策略,调度逻辑将识别符合Clustering条件的文件。 根据特定条件对符合Clustering条件的文件进行分组。每个组的数据大小应为targetFileSize的倍数。分组是计划中定义的"策略"的一部分。此外还有一个选项可以限制组大小,以改善并行性并避免混排大量数据。 将Clustering计划以avro元数据格式保存到时间线。 执行Clustering:使用执行策略处理计划以创建新文件并替换旧文件。 读取Clustering计划,并获得ClusteringGroups,其标记了需要进行Clustering的文件组。 对于每个组使用strategyParams实例化适当的策略类(例如:sortColumns),然后应用该策略重写数据。 创建一个REPLACE提交,并更新HoodieReplaceCommitMetadata中的元数据。
  • 操作场景 Broadcast(广播)可以把数据集合分发到每一个节点上,Spark任务在执行过程中要使用这个数据集合时,就会在本地查找Broadcast过来的数据集合。如果不使用Broadcast,每次任务需要数据集合时,都会把数据序列化到任务里面,不但耗时,还使任务变得很大。 每个任务分片在执行中都需要同一份数据集合时,就可以把公共数据集Broadcast到每个节点,让每个节点在本地都保存一份。 大表和小表做join操作时可以把小表Broadcast到各个节点,从而就可以把join操作转变成普通的操作,减少了shuffle操作。
  • 回答 如果应用程序没有设置标签表达式,那么该应用程序上新增的container/resource将使用其所在队列默认的标签表达式。如果队列没有默认的标签表达式,则将其标签表达设置为“default label”。 当应用程序(app1)提交到队列(Q1)上时,应用程序上新增的container/resource使用队列默认的标签表达式(“label1”)。如果app1正在运行时Q1被删除,则app1被移动到“lost_and_found”队列上。由于“lost_and_found”队列没有标签表达式,其标签表达式设置为“default label”,此时app1上新增的container/resource也将其标签表达式设置为“default label”。当app1被移回正常运行的队列(例如,Q2)时,如果Q2支持调用app1中的所有标签表达式(包含“label1”和“default label”),则app1能正常运行直到结束;如果Q2仅支持调用app1中的部分标签表达式(例如,仅支持调用“default label”),那么app1在运行时,拥有“label1”标签表达式的部分任务的资源请求将无法获得资源,从而被挂起,不能正常运行。 因此当把应用程序从“lost_and_found”队列移动到其他运行正常的队列上时,需要保证目标队列能够调用该应用程序的所有标签表达式。 建议不要删除正在运行应用程序的队列。
  • 操作场景 Spark系统在运行含shuffle过程的应用时,Executor进程除了运行task,还要负责写shuffle数据以及给其他Executor提供shuffle数据。当Executor进程任务过重,导致触发GC(Garbage Collection)而不能为其他Executor提供shuffle数据时,会影响任务运行。 External shuffle Service是长期存在于NodeManager进程中的一个辅助服务。通过该服务来抓取shuffle数据,减少了Executor的压力,在Executor GC的时候也不会影响其他Executor的任务运行。
  • 回答 JD BCS erver方式使用了ShuffleService功能,Reduce阶段所有的Executor会从NodeManager中获取数据,当数据量达到一个级别(10T级别),会出现NodeManager单点瓶颈(ShuffleService服务在NodeManager进程中),就会出现某些Task获取数据超时,从而出现该问题。 因此,当数据量达到10T级别以上的Spark任务,建议用户关闭ShuffleService功能,即在“Spark-defaults.conf”配置文件中将配置项“spark.shuffle.service.enabled”配置为“false”。
  • CarbonData表操作并发语法说明 DDL和DML中的操作,执行前,需要获取对应的锁,各操作需要获取锁的情况见表1 操作获取锁一览表,√表示需要获取该锁,一个操作仅在获取到所有需要获取的锁后,才能继续执行。 任意两个操作是否可以并发执行,可以通过如下方法确定:表1两行代表两个操作,这两行没有任意一列都标记√,即不存在某一列两行全为√。 表1 操作获取锁一览表 操作 METADATA_LOCK COMPACTION_LOCK DROP_TABLE_LOCK DELETE_SEGMENT_LOCK CLEAN_FILES_LOCK ALTER_PARTITION_LOCK UPDATE_LOCK STREAMING_LOCK CONCURRENT_LOAD_LOCK SEGMENT_LOCK CREATE TABLE - - - - - - - - - - CREATE TABLE As SELECT - - - - - - - - - - DROP TABLE √ - √ - - - - √ - - ALTER TABLE COMPACTION - √ - - - - √ - - - TABLE RENAME - - - - - - - - - - ADD COLUMNS √ √ - - - - - - - - DROP COLUMNS √ √ - - - - - - - - CHANGE DATA TYPE √ √ - - - - - - - - REFRESH TABLE - - - - - - - - - - REGISTER INDEX TABLE √ - - - - - - - - - REFRESH INDEX - √ - - - - - - - - LOAD DATA/INSERT INTO - - - - - - - - √ √ UPDATE CARBON TABLE √ √ - - - - √ - - - DELETE RECORDS from CARBON TABLE √ √ - - - - √ - - - DELETE SEGMENT by ID - - - √ √ - - - - - DELETE SEGMENT by DATE - - - √ √ - - - - - SHOW SEGMENTS - - - - - - - - - - CREATE SECONDARY INDEX √ √ - √ - - - - - - SHOW SECONDARY INDEXES - - - - - - - - - - DROP SECONDARY INDEX √ - √ - - - - - - - CLEAN FILES - - - - - - - - - - SET/RESET - - - - - - - - - - Add Hive Partition - - - - - - - - - - Drop Hive Partition √ √ √ √ √ √ - - - - Drop Partition √ √ √ √ √ √ - - - - Alter table set √ √ - - - - - - - - 父主题: CarbonData语法参考
  • 基本语法 方法一:在指定的“database_name”数据库中创建一个名为“table_name ”的表。 如果建表语句中没有包含“database_name”,则默认使用客户端登录时选择的数据库作为数据库名称。 CREATE TABLE [IF NOT EXISTS] [database_name.]table_name [ON CLUSTER ClickHouse集群名] ( name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1], name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2], ... ) ENGINE = engine_name() [PARTITION BY expr_list] [ORDER BY expr_list] ClickHouse在创建表时建议携带PARTITION BY创建表分区。因为ClickHouse数据迁移工具是基于表的分区作数据迁移,在创建表时如果不携带PARTITION BY创建表分区,则在集群内ClickHouseServer节点间数据迁移界面无法对该表进行数据迁移。 方法二:创建一个与database_name2.table_name2具有相同结构的表,同时可以对其指定不同的表引擎声明。 如果没有表引擎声明,则创建的表将与database_name2.table_name2使用相同的表引擎。 CREATE TABLE [IF NOT EXISTS] [database_name.]table_name AS [database_name2.]table_name2 [ENGINE = engine_name] 方法三:使用指定的引擎创建一个与SELECT子句的结果具有相同结构的表,并使用SELECT子句的结果填充它。 CREATE TABLE [IF NOT EXISTS] [database_name.]table_name ENGINE = engine_name AS SELECT ...
  • 前提条件 ClickHouse服务运行正常,Zookeeper服务运行正常,迁入、迁出节点的ClickHouseServer实例状态正常。 请确保迁入节点已有待迁移数据表,且确保该表是MergeTree系列引擎的分区表。 创建迁移任务前请确保所有对待迁移数据表的写入任务已停止,且任务启动后,只允许对待迁移数据表进行查询操作,禁止对该表进行写入、删除等操作,否则可能会造成迁移前后数据不一致。 迁入节点的ClickHouse数据目录有足够的空间。
  • 使用示例 --在default数据库和default_cluster集群下创建名为test表CREATE TABLE default.test ON CLUSTER default_cluster( `EventDate` DateTime, `id` UInt64)ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/default/test', '{replica}')PARTITION BY toYYYYMM(EventDate)ORDER BY id
  • 使用示例 --查看ClickHouse集群信息select * from system.clusters;--显示当前节点设置的宏select * from system.macros;--查看数据库容量selectsum(rows) as "总行数",formatReadableSize(sum(data_uncompressed_bytes)) as "原始大小",formatReadableSize(sum(data_compressed_bytes)) as "压缩大小",round(sum(data_compressed_bytes) / sum(data_uncompressed_bytes) * 100,0) "压缩率"from system.parts;--查询test表容量。where条件根据实际情况添加修改selectsum(rows) as "总行数",formatReadableSize(sum(data_uncompressed_bytes)) as "原始大小",formatReadableSize(sum(data_compressed_bytes)) as "压缩大小",round(sum(data_compressed_bytes) / sum(data_uncompressed_bytes) * 100,0) "压缩率"from system.partswhere table in ('test')and partition like '2020-11-%'group by table;
  • 使用示例 查询表t1的表结构: desc t1;┌─name────┬─type─┬─default_type─┬─default_expression ┬─comment─┬─codec_expression─┬─ttl_expression─┐│ id │ UInt8 │ │ │ │ │ │ │ name │ UInt8 │ │ │ │ │ │ │ address │ String │ │ │ │ │ │└───────┴────┴────────┴────────── ┴───── ┴──────────┴─────────┘
  • 操作场景 用户可以通过KafkaUI查看Topic详情、修改Topic Configs、增加Topic分区个数、删除Topic,并可实时查看不同时段的生产数据条数。 安全模式下,KafkaUI对查看Topic详情操作不作鉴权处理,即任何用户都可以查询Topic信息;对于修改Topic Configs、增加Topic分区个数、删除Topic场景,需保证KafkaUI登录用户属于“kafkaadmin”用户组或者单独给用户授予对应操作权限,否则将会鉴权失败。 非安全模式下,KafkaUI对所有操作不作鉴权处理。
  • 使用HetuEngine UDF 使用客户端访问: 进入HetuEngine客户端。 执行如下命令创建HetuEngine UDF: CREATE FUNCTION example.namespace01.add_two ( num integer)RETURNS integerLANGUAGE JAVADETERMINIS TICS YMBOL "com.xxx.bigdata.hetuengine.functions.AddTwo"URI "hdfs://hacluster/udf/hetuserver/udf-test-0.0.1-SNAPSHOT.jar"; 执行如下命令使用HetuEngine UDF: select example.namespace01.add_two(2);_col0------- 4(1 row) 函数实现类中通过重载方法来区分同名的不同函数,在创建HetuEngine UDF时要指定不同的函数名称。
  • 购买ELB并配置对接ClickHouse 购买ELB并获取其私有IP地址 详细操作步骤请参考创建共享型负载均衡器。 登录“弹性负载均衡器”控制台,在“负载均衡器”界面单击“购买弹性负载均衡”。 在“购买弹性负载均衡”界面,“实例规格类型”选择“共享型”,“所属VPC”和“子网”参数需要和MRS集群保持一致,其他参数保持默认即可。 单击“立即购买”,确认配置信息,并单击“提交”。 创建完成后,在“负载均衡器”界面,选择对应的区域即可看到新建的负载均衡器。查看并获取该负载均衡器的私有IP地址。 添加ELB监听器 详细操作步骤请参考添加监听器。
  • 使用示例 --给表t1增加列test01 ALTER TABLE t1 ADD COLUMN test01 String DEFAULT 'defaultvalue';--查询修改后的表t1desc t1┌─name────┬─type─┬─default_type─┬─default_expression ┬─comment─┬─codec_expression─┬─ttl_expression─┐│ id │ UInt8 │ │ │ │ │ │ │ name │ String │ │ │ │ │ │ │ address │ String │ │ │ │ │ ││ test01 │ String │ DEFAULT │ 'defaultvalue' │ │ │ │└───────┴────┴────────┴────────── ┴───── ┴──────────┴─────────┘--修改表t1列name类型为UInt8ALTER TABLE t1 MODIFY COLUMN name UInt8;--查询修改后的表t1desc t1┌─name────┬─type─┬─default_type─┬─default_expression ┬─comment─┬─codec_expression─┬─ttl_expression─┐│ id │ UInt8 │ │ │ │ │ │ │ name │ UInt8 │ │ │ │ │ │ │ address │ String │ │ │ │ │ ││ test01 │ String │ DEFAULT │ 'defaultvalue' │ │ │ │└───────┴────┴────────┴────────── ┴───── ┴──────────┴─────────┘--删除表t1的列test01ALTER TABLE t1 DROP COLUMN test01;--查询修改后的表t1desc t1┌─name────┬─type─┬─default_type─┬─default_expression ┬─comment─┬─codec_expression─┬─ttl_expression─┐│ id │ UInt8 │ │ │ │ │ │ │ name │ UInt8 │ │ │ │ │ │ │ address │ String │ │ │ │ │ │└───────┴────┴────────┴────────── ┴───── ┴──────────┴─────────┘
  • 日志级别 IoTDB提供了如表2所示的日志级别。 日志的级别优先级从高到低分别是ERROR、WARN、INFO、DEBUG,程序会打印高于或等于所设置级别的日志,设置的日志等级越高,打印出来的日志就越少。 表2 日志级别 级别 描述 ERROR ERROR表示系统运行的错误信息。 WARN WARN表示当前事件处理存在异常信息。 INFO INFO表示记录系统及各事件正常运行状态信息。 DEBUG DEBUG表示记录系统及系统的调试信息。 如果您需要修改日志级别,请执行如下操作: 参考修改集群服务配置参数,进入IoTDB服务“全部配置”页面。 在左侧导航栏选择需修改的角色所对应的日志菜单。 选择所需修改的日志级别并保存。 配置IoTDB日志级别60秒后即可生效,无需重启服务。
  • UDF java代码及SQL样例 UDF java使用样例 package com.xxx.udf;import org.apache.flink.table.functions.ScalarFunction;public class UdfClass_UDF extends ScalarFunction { public int eval(String s) { return s.length(); }} UDF SQL使用样例 CREATE TEMPORARY FUNCTION udf as 'com.xxx.udf.UdfClass_UDF';CREATE TABLE udfSource (a VARCHAR) WITH ('connector' = 'datagen','rows-per-second'='1');CREATE TABLE udfSink (a VARCHAR,b int) WITH ('connector' = 'print');INSERT INTO udfSinkSELECT a, udf(a)FROM udfSource;
  • 前提条件 创建或获取该任务中创建Loader作业的业务用户和密码。 确保用户已授权访问作业执行时操作的HDFS目录和数据。 ClickHouse相关表已创建,并确保用户已授权访问作业执行时操作该表的权限。 检查磁盘空间,确保没有出现告警且余量满足导入、导出数据的大小。 使用Loader从HDFS导入数据时,确保HDFS输入路径目录名、输入路径的子目录名及子文件名不能包含特殊字符/\"':;,中的任意字符。 如果设置的作业需要使用指定YARN队列功能,该用户需要已授权有相关YARN队列的权限。 设置任务的用户需要获取该任务的执行权限,并获取该任务对应的连接的使用权限。
  • 上传UDF至FlinkServer 准备UDF jar文件,大小不能超过200MB。 访问Flink WebUI,请参考访问FlinkServer WebUI界面。 单击“UDF管理”进入UDF管理页面。 单击“添加UDF”,在“本地Jar文件”参数后选择并上传本地已准备好的UDF jar文件。 填写UDF名称以及描述信息后,单击“确定”。 “UDF名称”最多可添加10项,“名称”可自定义,“类名”需与上传的UDF jar文件中UDF函数全限定类名一一对应。 上传UDF jar文件后,服务器默认保留5分钟,5分钟内单击确定则完成UDF创建,超时后单击确定则创建UDF失败并弹出错误提示:本地UDF文件路径有误。 在UDF列表中,可查看当前应用内所有的UDF信息。可在对应UDF信息的“操作”列编辑或删除UDF信息(只能删除未被使用的UDF项)。 (可选)如果需要立即运行或开发作业,可在“作业管理”进行相关作业配置,可参考如何创建FlinkServer作业。
共100000条
提示

您即将访问非华为云网站,请注意账号财产安全