华为云用户手册

  • 配置场景 当Yarn配置“yarn.log-aggregation-enable”为“true”时,就开启了container日志聚合功能。日志聚合功能是指:当应用在Yarn上执行完成后,NodeManager将本节点中所有container的日志聚合到HDFS中,并删除本地日志。详情请参见配置Container日志聚合功能。 然而,开启container日志聚合功能之后,其日志聚合至HDFS目录中,只能通过获取HDFS文件来查看日志。开源Spark和Yarn服务不支持通过WebUI查看聚合后的日志。 因此,Spark在此基础上进行了功能增强。如图1所示,在HistoryServer页面添加“AggregatedLogs”页签,可以通过“logs”链接查看聚合的日志。 图1 聚合日志显示页面
  • 配置描述 为了使WebUI页面显示日志,需要将聚合日志进行解析和展现。Spark是通过Hadoop的JobHistoryServer来解析聚合日志的,所以您可以通过“spark.jobhistory.address”参数,指定JobHistoryServer页面地址,即可完成解析和展现。 参数入口: 在应用提交时通过“--conf”设置这些参数,或者在客户端的“spark-defaults.conf”配置文件中调整如下参数。 此功能依赖Hadoop中的JobHistoryServer服务,所以使用聚合日志之前需要保证JobHistoryServer服务已经运行正常。 如果参数值为空,“AggregatedLogs”页签仍然存在,但是无法通过logs链接查看日志。 只有当App已经running,HDFS上已经有该App的事件日志文件时才能查看到聚合的container日志。 正在运行的任务的日志,用户可以通过“Executors”页面的日志链接进行查看,任务结束后日志会汇聚到HDFS上,“Executors”页面的日志链接就会失效,此时用户可以通过“AggregatedLogs”页面的logs链接查看聚合日志。 表1 参数说明 参数 描述 默认值 spark.jobhistory.address JobHistoryServer页面的地址,格式:http(s)://ip:port/jobhistory。例如,将参数值设置为“https://10.92.115.1:26014/jobhistory”。 默认值为空,表示不能从WebUI查看container聚合日志。 修改参数后,需重启服务使得配置生效。 -
  • 查看消费组 进入KafkaUI,请参考访问KafkaUI。 单击“Consumers”,进入消费组详情页面,可以查看当前集群内的所有ConsumerGroups,并可以查看各个ConsumerGroups Coordinator所在节点IP,在页面右上角,用户可以输入ConsumerGroup来搜索指定的ConsumerGroup信息。 在Consumer Summary一栏,可查看当前集群已存在的消费组,单击消费组名称,可查看该消费组所消费过的Topic,消费过的Topic有两种状态:“pending”和“running”,分别表示“曾经消费过但现在未消费”和“现在正在消费”,在弹框右上角,可以输入Topic名来进行过滤。 单击Topic名称,进入Consumer Offsets页面,可查看Topic消费详情。
  • 参数说明 表1 算子参数说明 参数 含义 类型 是否必填 默认值 换行符 用户根据数据实际情况,填写字符串作为换行符。支持任何字符串。默认使用操作系统的换行符。 string 否 \n 分割长度单位 长度单位,可选择“char”字符或“byte”字节。 enum 是 char 输入字段 配置输入字段相关信息: 固定长度:设置字段长度,第2个字段起点从第1个字段终点开始,以此类推。 字段名:配置输入字段名。 类型:配置字段类型。 数据格式:字段类型为“DATE”或“TIME”或“TIMESTAMP”时,需指定特定时间格式,其他字段类型指定无效。时间格式如:“yyyyMMdd HH:mm:ss”。 长度:配置字段长度,字段值实际长度太长则按配置的长度截取,“类型”为“CHAR”时实际长度不足则空格补齐,“类型”为“VARCHAR”时实际长度不足则不补齐。 map 是 无
  • 问题 运行一个Spark Streaming任务,确认有数据输入后,发现没有任何处理的结果。打开Web界面查看Spark Job执行情况,发现如下图所示:有两个Job一直在等待运行,但一直无法成功运行。 图1 Active Jobs 继续查看已经完成的Job,发现也只有两个,说明Spark Streaming都没有触发数据计算的任务(Spark Streaming默认有两个尝试运行的Job,就是图中两个) 图2 Completed Jobs
  • 回答 经过定位发现,导致这个问题的原因是:Spark Streaming的计算核数少于Receiver的个数,导致部分Receiver启动以后,系统已经没有资源去运行计算任务,导致第一个任务一直在等待,后续任务一直在排队。从现象上看,就是如问题中的图1中所示,会有两个任务一直在等待。 因此,当Web出现两个任务一直在等待的情况,首先检查Spark的核数是否大于Receiver的个数。 Receiver在Spark Streaming中是一个常驻的Spark Job,Receiver对于Spark是一个普通的任务,但它的生命周期和Spark Streaming任务相同,并且占用一个核的计算资源。 在调试和测试等经常使用默认配置的场景下,要时刻注意核数与Receiver个数的关系。
  • 回答 造成该现象的原因是NodeManager重启。使用ExternalShuffle的时候,Spark将借用NodeManager传输Shuffle数据,因此NodeManager的内存将成为瓶颈。 在当前版本的 FusionInsight 中,NodeManager的默认内存只有1G,在数据量比较大(1T以上)的Spark任务下,内存严重不足,消息响应缓慢,导致FusionInsight健康检查认为NodeManager进程退出,强制重启NodeManager,导致上述问题产生。 解决方式: 调整NodeManager的内存,数据量比较大(1T以上)的情况下,NodeManager的内存至少在4G以上。
  • 常用Sink配置 HDFS Sink HDFS Sink将数据写入HDFS。常用配置如表9所示: 图9 HDFS Sink 表9 HDFS Sink常用配置 参数 默认值 描述 channel - 与之相连的Channel。仅可在“properties.properties”文件中配置。 type hdfs 类型,需配置为“hdfs”。仅可在“properties.properties”文件中配置。 monTime 0(不开启) 线程监控阈值,更新时间大于阈值时重新启动该Sink,单位:秒。 hdfs.path - HDFS路径。 hdfs.inUseSuffix .tmp 正在写入的HDFS文件后缀。 hdfs.rollInterval 30 按时间滚动文件,单位:秒,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。 hdfs.rollSize 1024 按大小滚动文件,单位:字节,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。 hdfs.rollCount 10 按Event个数滚动文件,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。 hdfs.idleTimeout 0 自动关闭空闲文件超时时间,单位:秒。 hdfs.batchSize 1000 每次写入HDFS的Event个数。 hdfs.kerberosPrincipal - 认证HDFS的Kerberos用户名,未启用Kerberos认证集群不配置。 hdfs.kerberosKeytab - 认证HDFS的Kerberos keytab路径,未启用Kerberos认证集群不配置 hdfs.fileCloseByEndEvent true 收到最后一个Event时是否关闭文件。 hdfs.batchCallTimeout - 每次写入HDFS超时控制时间,单位:毫秒。 当不配置此参数时,对每个Event写入HDFS进行超时控制。当“hdfs.batchSize”大于0时,配置此参数可以提升写入HDFS性能。 说明: “hdfs.batchCallTimeout”设置多长时间需要考虑“hdfs.batchSize”的大小,“hdfs.batchSize”越大,“hdfs.batchCallTimeout”也要调整更长时间,设置过短时间容易导致数据写入HDFS失败。 serializer.appendNewline true 将一个Event写入HDFS后是否追加换行符('\n'),如果追加该换行符,该换行符所占用的数据量指标不会被HDFS Sink统计。 Avro Sink Avro Sink把events转化为Avro events并发送到配置的主机的监测端口。常用配置如表10所示: 图10 Avro Sink 表10 Avro Sink常用配置 参数 默认值 描述 channel - 与之相连的Channel。仅可在“properties.properties”文件中配置。 type - 类型,需配置为“avro”。仅可在“properties.properties”文件中配置。 hostname - 绑定关联的主机名或IP地址。 port - 监测端口。 batch-size 1000 批次发送的Event个数。 ssl false 是否使用SSL加密。 truststore-type JKS Java信任库类型。 truststore - Java信任库文件。 truststore-password - Java信任库密码。 keystore-type JKS 密钥存储类型。 keystore - 密钥存储文件。 keystore-password - 密钥存储密码 HBase Sink HBase Sink将数据写入到HBase中。常用配置如表11所示: 图11 HBase Sink 表11 HBase Sink常用配置 参数 默认值 描述 channel - 与之相连的Channel。仅可在“properties.properties”文件中配置。 type - 类型,需配置为“hbase”。仅可在“properties.properties”文件中配置。 table - HBase表名称。 monTime 0(不开启) 线程监控阈值,更新时间大于阈值时重新启动该Sink,单位:秒。 columnFamily - HBase列族名称。 batchSize 1000 每次写入HBase的Event个数。 kerberosPrincipal - 认证HBase的Kerberos用户名,未启用Kerberos认证集群不配置。 kerberosKeytab - 认证HBase的Kerberos keytab路径,未启用Kerberos认证集群不配置。 Kafka Sink Kafka Sink将数据写入到Kafka中。常用配置如表12所示: 图12 Kafka Sink 表12 Kafka Sink常用配置 参数 默认值 描述 channel - 与之相连的Channel。仅可在“properties.properties”文件中配置。 type - 类型,需配置为“org.apache.flume.sink.kafka.KafkaSink”。 仅可在“properties.properties”文件中配置。 kafka.bootstrap.servers - Kafkabrokers列表,多个用英文逗号分隔。 monTime 0(不开启) 线程监控阈值,更新时间大于阈值时重新启动该Sink,单位:秒。 kafka.topic default-flume-topic 数据写入的topic。 flumeBatchSize 1000 每次写入Kafka的Event个数。 kafka.security.protocol SASL_PLAINTEXT Kafka安全协议,未启用Kerberos认证集群下须配置为“PLAINTEXT”。 kafka.kerberos.domain.name - Kafka Domain名称。安全集群必填。仅可在“properties.properties”文件中配置。 Other Kafka Producer Properties - 其他Kafka配置,可以接受任意Kafka支持的生产参数配置,配置需要加前缀“.kafka”。 仅可在“properties.properties”文件中配置。
  • 常用Channel配置 Memory Channel Memory Channel使用内存作为缓存区,Events存放在内存队列中。常用配置如表6所示: 图6 Memory Channel 表6 Memory Channel常用配置 参数 默认值 描述 type - 类型,需配置为“memory”。仅可在“properties.properties”文件中配置。 capacity 10000 缓存在Channel中的最大Event数。 transactionCapacity 1000 每次存取的最大Event数。 channelfullcount 10 Channel full次数,达到该次数后发送告警。 File Channel File Channel使用本地磁盘作为缓存区,Events存放在设置的“dataDirs”配置项文件夹中。常用配置如表7所示: 图7 File Channel 表7 File Channel常用配置 参数 默认值 描述 type - 类型,需配置为“file”。仅可在“properties.properties”文件中配置。 checkpointDir ${BIGDATA_DATA_HOME}/flume/checkpoint 检查点存放路径。 dataDirs ${BIGDATA_DATA_HOME}/flume/data 数据缓存路径,设置多个路径可提升性能,中间用逗号分开。 maxFileSize 2146435071 单个缓存文件的最大值,单位:字节。 minimumRequiredSpace 524288000 缓冲区空闲空间最小值,单位:字节。 capacity 1000000 缓存在Channel中的最大Event数。 transactionCapacity 10000 每次存取的最大Event数。 channelfullcount 10 Channel full次数,达到该次数后发送告警。 Kafka Channel Kafka Channel使用kafka集群缓存数据,Kafka提供高可用、多副本,以防Flume或Kafka Broker崩溃,Channel中的数据会立即被Sink消费。常用配置如表 10 Kafka Channel 常用配置所示: 图8 Kafka Channel 表8 Kafka Channel常用配置 参数 默认值 描述 type - 类型,需配置为 “org.apache.flume.channel.kafka.KafkaChannel”.。 仅可在“properties.properties”文件中配置。 kafka.bootstrap.servers - kafka broker列表。 kafka.topic flume-channel Channel用来缓存数据的topic。 kafka.consumer.group.id flume Kafka消费者组ID。 parseAsFlumeEvent true 是否解析为Flume event。 migrateZookeeperOffsets true 当Kafka没有存储offset时,是否从ZooKeeper中查找,并提交到Kafka。 kafka.consumer.auto.offset.reset latest 当没有offset记录时,从指定的位置消费数据。 kafka.producer.security.protocol SASL_PLAINTEXT Kafka生产者安全协议。 kafka.consumer.security.protocol SASL_PLAINTEXT Kafka消费者安全协议。
  • 描述 向表中插入新的数据行。 如果指定了列名列表,那么这些列名列表必须与query语句产生列列表名完全匹配。表中不在列名列表中的每一列,其值会设置为null。 如果没有指定列名列表,则query语句产生的列必须与将要插入的列完全匹配。 使用insert into时,会往表中追加数据,而使用insert overwrite时,如果表属性“auto.purge”被设置为“true”,直接删除原表数据,再写入新的数据。 如果对象表是分区表时,insert overwrite会删除对应分区的数据而非所有数据。 insert into后面的table关键字为可选,以兼容hive语法。
  • 示例 创建fruit和fruit_copy表: create table fruit (name varchar,price double); create table fruit_copy (name varchar,price double); 向fruit表中插入一行数据: insert into fruit values('LIchee',32); -- 兼容写法示例,带上table关键字 insert into table fruit values('Cherry',88); 向fruit表中插入多行数据: insert into fruit values('banana',10),('peach',6),('lemon',12),('apple',7); 将fruit表中的数据行加载到fruit_copy表中,执行后表中有5条记录: insert into fruit_copy select * from fruit; 先清空fruit_copy表,再将fruit中的数据加载到表中,执行之后表中有2条记录: insert overwrite fruit_copy select * from fruit limit 2; 对于varchar类型,仅当目标表定义的列字段长度大于源表的实际字段长度时,才可以使用INSERT... SELECT...的形式从源表中查数据并且插入到目标表: create table varchar50(c1 varchar(50)); insert into varchar50 values('hetuEngine'); create table varchar100(c1 varchar(100)); insert into varchar100 select * from varchar50; 分区表使用insert overwrite语句时,只会清理插入值所在分区的数据,而不是整个表: --创建表 create table test_part (id int, alias varchar) partitioned by (dept_id int, status varchar); insert into test_part partition(dept_id=10, status='good') values (1, 'xyz'), (2, 'abc'); select * from test_part order by id; id | alias | dept_id | status ----|-------|---------|-------- 1 | xyz | 10 | good 2 | abc | 10 | good (2 rows) --清理分区partition(dept_id=25, status='overwrite'),并插入一条数据 insert overwrite test_part (id, alias, dept_id, status) values (3, 'uvw', 25, 'overwrite'); select * from test_part ; id | alias | dept_id | status ----|-------|---------|----------- 1 | xyz | 10 | good 2 | abc | 10 | good 3 | uvw | 25 | overwrite --清理分区partition(dept_id=10, status='good'),并插入一条数据 insert overwrite test_part (id, alias, dept_id, status) values (4, 'new', 10, 'good'); select * from test_part ordr; id | alias | dept_id | status ----|-------|---------|----------- 3 | uvw | 25 | overwrite 4 | new | 10 | good (2 rows) --分区表插入数据 create table test_p_1(name string, age int) partitioned by (provice string, city string); create table test_p_2(name string, age int) partitioned by (provice string, city string); -- 填充数据到test_p_1 insert into test_p_1 partition (provice = 'hebei', city= 'baoding') values ('xiaobei',15),( 'xiaoming',22); -- 根据test_p_1 插入数据到test_p_2 -- 方式一 from test_p_1 insert into table test_p_2 partition (provice = 'hebei', city= 'baoding') select name,age; -- 方式二 insert into test_p_2 partition(provice = 'hebei', city= 'baoding') select name,age from test_p_1;
  • 语法 INSERT { INTO | OVERWRITE } [TABLE] table_name [(column_list)] [ PARTITION (partition_clause)] {select_statement | VALUES (value [, value ...]) [, (value [, value ...]) ...] } FROM from_statement INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement FROM from_statement INSERT INTO TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) select_statement
  • 前提条件 已安装客户端,请参见。例如安装目录为“/opt/client”,以下操作的客户端目录只是举例,请根据实际安装目录修改。 各组件业务用户由 MRS 集群管理员根据业务需要创建,具体操作请参见。安全模式下,“机机”用户需要下载keytab文件,具体操作请参见。“人机”用户第一次登录时需修改密码。 服务端默认开启了SSL,需参考使用IoTDB客户端章节生成“truststore.jks”证书,并复制到“客户端安装目录/IoTDB/iotdb/conf”目录下。
  • 配置从Postgresql抓取数据到Hudi任务的心跳表 在需要同步的Postgresql数据库下执行以下命令创建一张心跳表,心跳表归属cdc_cdl Schema,表名为cdc_heartbeat,主键为cdl_job_id: DROP TABLE IF EXISTS cdc_cdl.cdc_heartbeat; CREATE TABLE cdc_cdl.cdc_heartbeat ( cdl_job_id int8 NOT NULL, cdl_last_heartbeat timestamp(6) ); ALTER TABLE cdc_cdl.cdc_heartbeat ADD CONSTRAINT cdc_heartbeat_pkey PRIMARY KEY (cdl_job_id); 心跳表创建完成后,在CDL WebUI界面创建从Postgresql抓取数据到Hudi的同步任务并启动即可收到心跳数据。
  • 数据判齐消息字段含义 表1 数据判齐消息字段 字段名 描述 cdl_job_name 本批次数据所属同步任务名称 target_table_schema 本批次数据写入Schema名称 target_table_name 本批次数据写入Hudi表名称 target_table_path 本批次数据保存的Hudi表路径 total_num 本批次数据总数 cdl_original_heartbeat 本批次数据中包含的心跳数据的最大时间, 如果本批次不包含心跳数据则值为空 cdl_last_heartbeat 本批次数据中包含的心跳数据的最小时间,如果本批次不包含心跳数据则取“event_time_min”的值 insert_num 本批次数据insert事件总数 update_num 本批次数据update事件总数 delete_num 本批次数据delete事件总数 event_time_min 本批次数据源端最小事务提交时间 event_time_max 本批次数据源端最大事务提交时间 event_time_avg 本批次数据源端平均事务提交时间 kafka_timestamp_min 本批次数据发送到Kafka的最小时间 kafka_timestamp_max 本批次数据发送到Kafka的最大时间 begin_time 本批次数据开始写入Hudi的时间 end_time 本批次数据写入Hudi的结束时间 cdc_partitioned_time 心跳表的时间分区字段 cdc_last_update_date 该条判齐记录写入时间
  • 配置从Oracle(ogg)抓取数据到Hudi任务的心跳表 在需要同步数据的Oracle数据库中执行以下命令创建一张心跳表,心跳表归属于CDC_CDL Schema,表名为CDC_HEARTBEAT,主键为CDL_JOB_ID: CREATE TABLE "CDC_CDL"."CDC_HEARTBEAT" ( "CDL_JOB_ID" VARCHAR(22) PRIMARY KEY, "CDL_LAST_HEARTBEAT" TIMESTAMP, SUPPLEMENTAL LOG DATA (ALL) COLUMNS ); 将CDC_HEARTBEAT表加入到Oracle或者ogg的任务中,确保心跳数据可以正常发送到Kafka。 如果是Oracle任务,直接执行4。 在CDL WebUI配置thirdparty-kafka(ogg)连接增加Oracle的连接信息。 配置完成后,在CDL WebUI界面创建从Oracle(ogg)抓取数据到Hudi任务并启动即可收到心跳数据。
  • 配置opengauss到Hudi任务的心跳表 在需要同步的opengauss数据库下执行以下命令创建一张心跳表,心跳表归属cdc_cdl Schema,表名为cdc_heartbeat,主键为cdl_job_id: DROP TABLE IF EXISTS cdc_cdl.cdc_heartbeat; CREATE TABLE cdc_cdl.cdc_heartbeat ( cdl_job_id int8 NOT NULL, cdl_last_heartbeat timestamp(6) ); ALTER TABLE cdc_cdl.cdc_heartbeat ADD CONSTRAINT cdc_heartbeat_pkey PRIMARY KEY (cdl_job_id); 将该心跳表加入到DRS任务,以确保心跳表数据正常发送到DRS Kafka。 在CDL WebUI界面配置opengauss的thirdparty-kafka连接时增加opengauss的连接信息,如果opengauss部署为一主多备模式,需在“host”填写所有的IP。 配置完成之后,在CDL WebUI界面创建从thirdparty-kafka抓取数据到Hudi的任务并启动即可收到心跳数据。
  • 操作场景 心跳和数据判齐功能用于统计CDL同步任务的全链路信息, 包括从数据库管理系统RDBMS到Kafka的数据耗时、从Kafka消费数据写入到Hudi的数据耗时和数据条数等一系列信息,并将其写入到特定的Topic(cdl_snapshot_topic)中,用户可自行消费Topic中的数据并写入到某个特定Hudi表作数据判齐使用。心跳判齐数据不仅可以用来判断心跳时间之前的数据已经同步到 数据湖 ,还可以根据事务时间,写Kafka的时间,数据开始入湖时间和数据入湖结束时间来判断数据时延问题。 同时对于PgSQL任务,配置心跳表可以定期向前推进PgSQL中Slot记录的LSN的信息,避免由于某个任务配置了某部分变化很小的表导致数据库日志积压。
  • 前提条件 创建或获取该任务中创建Loader作业的业务用户和密码。 获取FTP服务器使用的用户和密码,且该用户具备FTP服务器上源文件的读取权限。如果源文件在导入后文件名要增加后缀,则该用户还需具备源文件的写入权限。 检查磁盘空间,确保没有出现告警且余量满足导入、导出数据的大小。 使用Loader从FTP服务器导入数据时,确保FTP服务器输入路径目录名、输入路径的子目录名及子文件名不能包含特殊字符/\"':;,中的任意字符。 如果设置的作业需要使用指定YARN队列功能,该用户需要已授权有相关YARN队列的权限。 设置任务的用户需要获取该任务的执行权限,并获取该任务对应的连接的使用权限。
  • 配置ClickHouse默认用户密码 使用root用户登录ClickHouse安装节点,切换到omm用户,进入“$BIGDATA_HOME/FusionInsight_ClickHouse_*/install/FusionInsight-ClickHouse-*/clickhouse/clickhouse_change_password”目录。 su - omm cd $BIGDATA_HOME/FusionInsight_ClickHouse_*/install/FusionInsight-ClickHouse-*/clickhouse/clickhouse_change_password 执行如下命令修改default或clickhouse用户密码: ./change_password.sh 如下所示:以clickhouse用户为例,按照提示输入clickhouse和密码,等待密码修改完成。 密码复杂度要求: 密码长度限制是8~64位。 至少包含一个小写字母、一个大写字母、一个数字和一个特殊字符,支持的特殊字符包含-%;[]{}@_。 查看密码修改结果: 登录到ClickHouse Server节点的,查看“${BIGDATA_HOME}/FusionInsight_ClickHouse_*/*_ClickHouseServer/etc/users.xml”文件中参数“password_sha256_hex”的值,即为存储修改后的密码。 cd ${BIGDATA_HOME}/FusionInsight_ClickHouse_*/*_ClickHouseServer/etc/ vi users.xml 如下所示:用password_sha256_hex来存储修改后的密码。配置文件中包含认证密码信息可能存在安全风险,建议当前场景执行完毕后删除相关配置文件或加强安全管理。
  • 迁移整体流程 迁移整体流程和步骤参考如下: 图2 迁移流程图 表1 迁移流程说明 阶段 流程说明 步骤1:源集群和目标集群网络打通 将源ClickHouse集群和目标ClickHouse集群的网络需要打通,保证两个集群ClickHouse实例节点网络可以互通。 步骤2:在目标集群配置文件中增加源集群的ZooKeeper信息 通过在目标集群的ClickHouse配置文件中添加源集群的ZooKeeper信息,将源集群中的ZooKeeper作为迁移过程中的辅助ZooKeeper。 步骤3:迁移源ClickHouse集群下数据库和表的元数据信息到目标集群 执行元数据迁移脚本,将源集群中的ClickHouse数据库和表的数据库名、表名、表结构等元数据信息迁移到目标集群。 步骤4:迁移源ClickHouse集群下数据库和表数据到目标集群 执行数据迁移脚本,将源集群中的ClickHouse数据库和表的数据迁移至目标集群。
  • 操作场景 场景一:随着MRS ClickHouse业务数量的增长,原有集群的存储和计算资源已不满足业务需求,需要对集群进行拆分,将部分用户业务及数据库数据迁移到新建集群中。 场景二:MRS ClickHouse集群后端主机所在机房需要搬迁,需要将ClickHouse集群整体迁移到另外一个机房的新集群当中。 为了解决上述场景下对搬迁能力的要求,MRS提供了ClickHouse集群数据一键式工具搬迁能力,将源集群中的ClickHouse数据库、表对象DDL、业务数据迁移到新建集群中。
  • 迁移方案原理介绍 Replicated*MergeTree引擎的复制表迁移: ClickHouse利用ZooKeeper将同一分片下不同副本的Replicated*MergeTree引擎表数据自动进行同步,本迁移方案利用该特性进行数据迁移。大致逻辑步骤如下: 首先,在目标集群的配置文件中添加源集群的ZooKeeper信息作为辅助ZooKeeper。其次,再在目标集群中创建和源集群相同ZooKeeper路径不同副本并且表结构和源集群一致的临时表。临时表创建完成源集群中的数据将会自动同步到临时表。最后,等待源集群数据同步到目标集群的临时表完成后,将目标集群中的临时表数据复制到正式表即可。 图1 Replicated*MergeTree引擎表迁移架构图 分布式表迁移: 分布式表不涉及表数据,只涉及表的元数据信息,迁移过程中会将源集群ClickHouse分布式表的元数据信息导出,然后将元数据信息修改为目标集群的ZooKeeper路径和副本,根据修改后的元数据信息在目标集群新建表即可。 非复制表和物化视图迁移: 针对非复制表和物化视图采用调用remote函数方式进行数据迁移。 上述迁移的操作步骤通过迁移工具脚本做了封装处理,只需修改相关配置文件执行迁移脚本即可完成一键式迁移操作,具体可以参考操作步骤说明。
  • 前提条件 创建或获取该任务中创建Loader作业的业务用户和密码。 确保用户已授权访问作业执行时操作的HDFS/OBS目录和数据。 确保用户已授权访问作业执行时操作的HBase表或phoenix表。 检查磁盘空间,确保没有出现告警且余量满足导入、导出数据的大小。 使用Loader从HDFS/OBS导入数据时,确保HDFS/OBS输入路径目录名、输入路径的子目录名及子文件名不能包含特殊字符/\"':;,中的任意字符。 如果设置的作业需要使用指定YARN队列功能,该用户需要已授权有相关YARN队列的权限。 设置任务的用户需要获取该任务的执行权限,并获取该任务对应的连接的使用权限。
  • 参数说明 表1 算子参数说明 参数 含义 类型 是否必填 默认值 Spark数据库 SparkSQL的数据库名称。 String 否 default Spark表名 配置SparkSQL表名。 仅支持一个SparkSQL表。 String 是 无 分区过滤器 配置分区过滤器可以导出指定分区数据,默认为空,导出整个表数据。 例如导出分区字段locale的值为“CN”或“US”的表数据,输入如下: locale = "CN" or locale = "US" String 否 - Spark输入字段 配置SparkSQL输入信息: 列名:配置SparkSQL列名。 字段名:配置输入字段名。 类型:配置字段类型。 长度:配置字段长度,字段值实际长度太长则按配置的长度截取,“类型”为“CHAR”时实际长度不足则空格补齐,“类型”为“VARCHAR”时实际长度不足则不补齐。 map 是 -
  • 样例 以SPARK导出到sqlserver2014数据库为例。 在sqlserver2014上创建一张空表“test_1”用于存储SparkSQL数据。执行以下语句: create table test_1 (id int, name text, value text); 配置“Spark输入”算子,生成三个字段A、B和C: 设置了数据连接器后,单击“自动识别”,系统将自动读取数据库中的字段,可根据需要选择添加,然后根据业务场景手动进行完善或者修正即可,无需逐一手动添加。 此操作会覆盖表格内已有数据。 通过“表输出”算子,将A、B和C输出到“test_1”表中: select * from test_1;
  • 回答 打开FusionInsight Manager页面,看到Yarn服务的业务IP地址为192网段。 从Yarn的日志看到,Yarn读取的Spark Web UI地址为http://10.120.169.53:23011,是10网段的IP地址。由于192网段的IP和10网段的IP不能互通,所以导致访问Spark Web UI界面失败。 修改方案: 登录10.120.169.53客户端机器,修改/etc/hosts文件,将10.120.169.53更改为相对应的192网段的IP地址。再重新运行Spark应用,这时就可以打开Spark Web UI界面。
  • 索引状态介绍 索引状态反映了索引当前的使用情况,全局二级索引支持以下五种状态: ACTIVE:索引正常,可以正常读写。 UNUSABLE:索引被禁用,索引数据会正常写入,查询时无法使用这个索引。 INACTIVE:索引异常,索引数据与数据表不一致,跳过生成这个索引的索引数据,查询数据时无法使用这个索引。 BUILDING:索引数据正常批量生成,索引数据生成工具执行结束会自动转换到ACTIVE状态,此状态下可以正常读写。 DROPPING:索引正在被删除,跳过生成这个索引的索引数据,查询数据时无法使用这个索引。 基于工具的索引状态修改,支持图1所示的状态转换。 图1 索引状态转换图
  • 配置场景 分布式缓存在两种情况下非常有用。 滚动升级 在升级过程中,应用程序必须保持文字内容(jar文件或配置文件)不变。而这些内容并非基于当前版本的YARN,而是要基于其提交时的版本。这是一个具有挑战性的问题。一般情况下,应用程序(例如MapReduce、Hive、Tez等)需要进行完整的本地安装,将库安装至所有的集群机器(客户端及服务器端机器)中。当集群内开始进行滚动升级或降级时,本地安装的库的版本必然会在应用运行过程时发生改变。在滚动升级过程中,首先只会对少数NodeManager进行升级,这些NodeManager会获得新版本的软件。这导致了行为的不一致,并可能发生运行时错误。 同时存在多个YARN版本 集群管理员可能会在一个集群内运行使用多个版本YARN及Hadoop jars的任务。这在当前很难实现,因为jars已被本地化且只有一个版本。 MapReduce应用框架可以通过分布式缓存进行部署,且无需依赖安装中复制的静态版本。因此,可以在HDFS中存放多版本的Hadoop,并通过配置“mapred-site.xml”文件指定任务默认使用的版本。只需设置适当的配置属性,用户就可以运行不同版本的MapReduce,而无需使用部署在集群中的版本。 图1 具有多个版本NodeManagers及Applications的集群
  • 前提条件 获取一个拥有管理员权限的用户,例如“admin”。 请参考创建Hive角色,在Manager界面创建一个角色,例如“hrole”,不需要设置Hive权限,设置提交Hql命令到Yarn执行的权限。 在Manager界面创建两个使用Hive的“人机”用户并加入“hive”组,例如“huser1”和“huser2”。“huser2”需绑定“hrole”。使用“huser1”创建一个数据库“hdb”,并在此数据库中创建表“htable”。
共100000条