华为云用户手册

  • 迁移说明 SFS Turbo默认只能被云上同一个VPC内的E CS /CCE访问,用户可通过云专线/VPN/对等连接等方式打通网络,实现多种访问方式。 云下或其他云访问: 云专线/VPN 云上同区域同一账号不同VPC: VPC对等连接 云上同区域跨账号访问: VPC对等连接 不同区域访问: 云连接 迁移数据分为两种网络条件,通过可访问公网的ECS直接挂载迁移。 通过mount方式挂载访问SFS Turbo,将本地NAS存储中的数据迁移至SFS Turbo。 通过云专线迁移 无法打通网络直接挂载,可以通过公网访问华为云ECS绑定的EIP,也可进行传输迁移。 使用公网迁移 父主题: 数据迁移
  • 操作步骤 以root账号登录弹性云服务器。 给非root的普通用户添加root权限。 执行chmod 777 /etc/sudoers命令修改sudoers文件权限为可编辑权限。 使用which命令查看mount和umount命令的路径。 图1 查看命令路径 执行vi /etc/sudoers命令编辑sudoers文件。 在root账号下添加普通用户账号,下图以添加普通用户Mike为例。 图2 添加用户 编辑完成后,单击“Esc”,并输入:wq,保存文件并退出。 执行chmod 440 /etc/sudoers命令恢复sudoers文件权限为只读权限。 切换到普通用户Mike登录弹性云服务器。 执行如下命令挂载文件系统。挂载参数参见表1。 sudo mount -t nfs -o vers=3,timeo=600,noresvport,nolock 挂载地址 本地路径 表1 参数说明 参数 说明 挂载地址 SFS容量型文件系统的格式为:文件系统 域名 :/路径,例如:example.com:/share-xxx。SFS Turbo文件系统的格式为:文件系统IP:/,例如192.168.0.0:/。 说明: x是数字或字母。 由于挂载地址名称较长,需要拉宽该栏以便完整显示。 本地路径 云服务器上用于挂载文件系统的本地路径,例如“/local_path”。 挂载完成后,执行如下命令,查看已挂载的文件系统。 mount -l 如果回显包含如下类似信息,说明挂载成功。 example.com:/share-xxx on /local_path type nfs (rw,vers=3,timeo=600,nolock,addr=)
  • 设置配额 登录管理控制台,选择“弹性文件服务”。 左侧导航栏选择“通用文件系统”,跳转到通用文件系统控制台。 在文件系统列表页,单击指定文件系统右侧的“配额管理”进入配额管理页面。 在配额管理页面,单击“设置配额”,系统弹出如图2所示对话框。 图1 配额管理页面 图2 设置配额 设置文件系统配额。 容量配额:必填,输入容量配额,必须大于0且不可低于已用容量。单位是GB。 文件数限制:选填,输入文件数限制,必须大于0且不可低于当前文件数。单位是个。 单击“确定”,完成文件系统配额设置。在配额管理页面可以看到配额详情。 图3 配额详情
  • 创建加密文件系统 当您需要使用文件系统加密功能时,创建SFS容量型文件系统需要授权文件系统访问KMS。如果您拥有“Security Administrator”权限,则可直接授权。如果权限不足,需要联系系统管理员获取安全管理员权限,然后再重新操作。 如果创建SFS Turbo文件系统时,则不需要授权。 可以新创建加密或者不加密的文件系统,无法更改已有文件系统的加密属性。 创建加密文件系统的具体操作请参见创建文件系统。
  • 场景介绍 媒体处理 包括媒体素材的上传、下载、编目、节目转码和数据归档等工作,涉及音视频数据的存储、调用和管理,根据其业务特性对共享的文件系统有如下要求: 媒体素材的视频码率高,文件规模大,要求文件系统容量大且易于扩展。 音视频的采集、编辑、合成等应用要求文件系统无抖动、低时延。 多用户同时进行编辑制作,要求文件系统提供稳定易用的数据共享。 视频渲染、特效加工需要频繁处理小文件,要求文件系统具有较高的数据读写性能。 弹性文件服务是基于文件系统的共享存储服务,具有高速数据共享,动态分级存储,按需平滑扩展,支持在线扩容等特点,能充分满足媒体处理中用户对存储容量,吞吐量,IOPS(每秒读写次数)和各种工作负荷下低时延的需求。 某卫视频道栏目组外拍大量音视频素材,现需要将这组素材编辑处理成为即将播出的节目,节目的编辑处理将由多个编辑工作站协作完成。为实现多个编辑工作站访问到同一素材文件,栏目组选用了弹性文件服务。首先将同一文件系统挂载到栏目组的作为上载工作站和编辑工作站的云服务器上,再将素材文件由上载工作站上传到挂载的文件系统,最终实现多个编辑工作站直接对挂载文件系统中的素材进行编辑。
  • 网段类型 网段类型配置可以采用CIDR格式网段配置方式。 CIDR格式地址使用可变长度子网掩码来表示 IP 地址中网络地址位和主机地址位之间的比例。 CIDR IP地址在普通IP地址的基础上附加了一个后缀值,这个后缀值就是网络地址前缀位数。例如,192.1.1.0/24 是一个 IPv4 CIDR地址,其中前 24 位(即 192.1.1)是网络地址。 任何前24位与192.1.1.0相同的ip都适用于这一条鉴权规则,即192.1.1.1 与 192.1.1.1/32 表达的含义相同。
  • 配置参考 登录弹性文件服务管理控制台。 在左侧导航栏,选择“SFS容量型”。在页面右上角单击“创建文件系统”。 在创建文件系统页面,根据界面提示配置参数。 配置完成后,单击“立即创建”,完成文件系统创建。 Linux系统ECS挂载操作请参见挂载NFS文件系统到云服务器(Linux);Windows系统ECS挂载操作请参见挂载NFS文件系统到云服务器(Windows)和挂载CIFS文件系统到云服务器(Windows)。 登录上载工作站将需要上传的素材文件上传到挂载的文件系统。 登录编辑工作站,从挂载的文件系统中获取到素材文件进行编辑。
  • 权限类型 权限分为两部分:读写权限和用户权限。 表1 读写权限 权限 描述 读写 用户拥有读写权限。 只读 用户拥有只读权限。 表2 用户权限 权限 描述 root用户不匿名(no_root_squash) 包括root用户在内的任何用户访问时权限都不会降为nobody。 所有用户匿名(all_squash) 所有的用户访问时权限降为nobody。 root用户匿名(root_squash) root用户访问时权限降为nobody。
  • 添加授权操作步骤 本章节介绍如何通过控制台完成添加授权进行权限管理。 登录弹性文件服务管理控制台。 在通用文件系统列表中,找到待添加授权的通用文件系统并单击目标通用文件系统名称,进入通用文件系统详情界面。 在“权限管理”页签,单击“添加授权”。 图1 添加授权 在“添加授权”弹窗内,参考表3完成授权的添加。 表3 参数说明 参数 说明 VPC 添加的VPC,例如:vpc-30e0。如无VPC,可选择新建VPC。 读写权限 支持选择读写权限和只读权限。默认为“读写”。 用户权限 支持选择root用户不匿名(no_root_squash)、root用户匿名(root_squash)或者所有用户匿名(all_squash)。 root用户不匿名(no_root_squash):允许使用root用户访问通用文件系统。 root用户匿名(root_squash):root用户身份访问时,将映射nobody为用户,允许用户访问通用文件系统。 所有用户匿名(all_squash):无论以何种用户身份访问,均映射为nobody用户,允许用户访问、修改、删除通用文件系统。 授权地址条目 支持选择所有IP地址和指定IP地址。默认为“所有IP地址”。 输入的IPv4地址/地址段必须合法,且不能为除0.0.0.0/0以外之前0开头的IP地址或地址段,其中当设置为0.0.0.0/0时表示VPC内的任意IP。同时,不能为127以及224~255开头的IP地址或地址段,例如127.0.0.1,224.0.0.1,255.255.255.255,因为以224-239开头的IP地址或地址段是属于D类地址,用于组播;以240-255开头的IP地址或地址段属于E类地址,用于研究。使用非合法的IP或IP地址段可能会导致添加访问规则失败或者添加的访问规则无法生效。 如果要表示一个地址段,如192.168.1.0-192.168.1.255的地址段应使用掩码形式:192.168.1.0/24,不支持192.168.1.0-255等其他地址段表示形式。掩码位数的取值为0到31的整数,且只有为0.0.0.0/0时掩码位数可取0,其他情况均不合法。 网段类型请参见网段类型。 说明: 填写指定IP地址时,您可以在IP地址组内添加多个不同格式的IP地址,每个IP地址输入完成后,按回车键换行。 完成授权添加后,单击权限管理列表的授权地址条目数,可查看或查询授权地址条目信息。 确认授权信息,单击“确定”。
  • 代码样例 下面代码片段在com.huawei.hadoop.hbase.example包的“HBaseSample”类的testScanDataByIndex方法中: 样例:使用二级索引查找数据 public void testScanDataByIndex() { LOG .info("Entering testScanDataByIndex."); Table table = null; ResultScanner scanner = null; try { table = conn.getTable(tableName); // Create a filter for indexed column. Filter filter = new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("name"), CompareOperator.EQUAL, "Li Gang".getBytes()); Scan scan = new Scan(); scan.setFilter(filter); scanner = table.getScanner(scan); LOG.info("Scan indexed data."); for (Result result : scanner) { for (Cell cell : result.rawCells()) { LOG.info("{}:{},{},{}", Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneFamily(cell)), Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell))); } } LOG.info("Scan data by index successfully."); } catch (IOException e) { LOG.error("Scan data by index failed."); } finally { if (scanner != null) { // Close the scanner object. scanner.close(); } try { if (table != null) { table.close(); } } catch (IOException e) { LOG.error("Close table failed."); } } LOG.info("Exiting testScanDataByIndex."); }
  • 功能介绍 针对添加了二级索引的用户表,您可以通过Filter来查询数据。其数据查询性能高于针对无二级索引用户表的数据查询。 HIndex支持的Filter类型为“SingleColumnValueFilter”,“SingleColumnValueExcludeFilter”以及“SingleColumnValuePartitionFilter”。 HIndex支持的Comparator为“BinaryComparator”,“BitComparator”,“LongComparator”,“DecimalComparator”,“DoubleComparator”,“FloatComparator”,“IntComparator”,“NullComparator”。 二级索引的使用规则如下: 针对某一列或者多列创建了单索引的场景下: 当查询时使用此列进行过滤时,不管是AND还是OR操作,该索引都会被利用来提升查询性能。 例如:Filter_Condition(IndexCol1) AND/OR Filter_Condition(IndexCol2) 当查询时使用“索引列AND非索引列”过滤时,此索引会被利用来提升查询性能。 例如:Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2) AND Filter_Condition(NonIndexCol1) 当查询时使用“索引列OR非索引列”过滤时,此索引将不会被使用,查询性能不会因为索引得到提升。 例如:Filter_Condition(IndexCol1) AND/OR Filter_Condition(IndexCol2) OR Filter_Condition(NonIndexCol1) 针对多个列创建的联合索引场景下: 当查询时使用的列(多个),是联合索引所有对应列的一部分或者全部,且列的顺序与联合索引一致时,此索引会被利用来提升查询性能。 例如,针对C1、C2、C3列创建了联合索引,生效的场景包括: Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2) AND Filter_Condition(IndexCol3) Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2) Filter_Condition(IndexCol1) 不生效的场景包括: Filter_Condition(IndexCol2) AND Filter_Condition(IndexCol3) Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol3) Filter_Condition(IndexCol2) Filter_Condition(IndexCol3) 当查询时使用“索引列AND非索引列”过滤时,此索引会被利用来提升查询性能。 例如: Filter_Condition(IndexCol1) AND Filter_Condition(NonIndexCol1) Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2) AND Filter_Condition(NonIndexCol1) 当查询时使用“索引列OR非索引列”过滤时,此索引不会被使用,查询性能不会因为索引得到提升。 例如: Filter_Condition(IndexCol1) OR Filter_Condition(NonIndexCol1) (Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2))OR ( Filter_Condition(NonIndexCol1)) 当查询时使用多个列进行范围查询时,只有联合索引中最后一个列可指定取值范围,前面的列只能设置为“=”。 例如:针对C1、C2、C3列创建了联合索引,需要进行范围查询时,只能针对C3设置取值范围,过滤条件为“C1=XXX,C2=XXX,C3=取值范围”。 针对添加了二级索引的用户表,可以通过Filter来查询数据,在单列索引和复合列索引上进行过滤查询,查询结果都与无索引结果相同,且其数据查询性能高于无二级索引用户表的数据查询性能。
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“PhoenixSample”类的testSelect方法中。 /** * Select Data */ public void testSelect() { LOG.info("Entering testSelect."); String URL = "jdbc:phoenix:" + conf.get("hbase.zookeeper.quorum"); // Query String querySQL = "SELECT * FROM TEST WHERE id = ?"; Connection conn = null; PreparedStatement preStat = null; Statement stat = null; ResultSet result = null; try { // Create Connection conn = DriverManager.getConnection(url, props); // Create Statement stat = conn.createStatement(); // Create PrepareStatement preStat = conn.prepareStatement(querySQL); // Execute query preStat.setInt(1, 1); result = preStat.executeQuery(); // Get result while (result.next()) { int id = result.getInt("id"); String name = result.getString(1); System.out.println("id: " + id); System.out.println("name: " + name); } LOG.info("Select successfully."); } catch (Exception e) { LOG.error("Select failed.", e); } finally { if (null != result) { try { result.close(); } catch (Exception e2) { LOG.error("Result close failed.", e2); } } if (null != stat) { try { stat.close(); } catch (Exception e2) { LOG.error("Stat close failed.", e2); } } if (null != conn) { try { conn.close(); } catch (Exception e2) { LOG.error("Connection close failed.", e2); } } } LOG.info("Exiting testSelect."); }
  • 回答 bulkload是通过启动MapReduce任务直接生成HFile文件,再将HFile文件注册到HBase,因此错误的使用bulkload会因为启动MapReduce任务而占用更多的集群内存和CPU资源,也可能会生成大量很小的HFile文件频繁的触发Compaction,导致查询速度急剧下降。 错误的使用put,会造成数据加载慢,当分配给RegionServer内存不足时会造成RegionServer内存溢出从而导致进程退出。 下面给出bulkload和put适合的场景: bulkload适合的场景: 大量数据一次性加载到HBase。 对数据加载到HBase可靠性要求不高,不需要生成WAL文件。 使用put加载大量数据到HBase速度变慢,且查询速度变慢时。 加载到HBase新生成的单个HFile文件大小接近HDFS block大小。 put适合的场景: 每次加载到单个Region的数据大小小于HDFS block大小的一半。 数据需要实时加载。 加载数据过程不会造成用户查询速度急剧下降。
  • 回答 由于浏览器所在的计算机IP地址未加到Web访问白名单导致。用户可以通过修改客户端的配置文件“conf/flink-conf.yaml”来解决问题。 确认配置项“jobmanager.web.ssl.enabled”的值是否是“false”,若不是,请修改为“false”。 确认配置项“jobmanager.web.access-control-allow-origin”和“jobmanager.web.allow-access-address”中是否已经添加浏览器所在的计算机IP地址。如果没有添加,可以通过这两项配置项进行添加。例如: jobmanager.web.access-control-allow-origin: 浏览器所在的计算机IP地址jobmanager.web.allow-access-address: 浏览器所在的计算机IP地址
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的testModifyTable方法中 public void testModifyTable() { LOG.info("Entering testModifyTable."); // Specify the column family name. byte[] familyName = Bytes.toBytes("education"); Admin admin = null; try { // Instantiate an Admin object. admin = conn.getAdmin(); // Obtain the table descriptor. TableDescriptor htd = admin.getDescriptor(tableName); // Check whether the column family is specified before modification. if (!htd.hasColumnFamily(familyName)) { // Create the column descriptor. TableDescriptor tableBuilder = TableDescriptorBuilder.newBuilder(htd) .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(familyName).build()).build(); // Disable the table to get the table offline before modifying // the table. admin.disableTable(tableName);//注[1] // Submit a modifyTable request. admin.modifyTable(tableBuilder); // Enable the table to get the table online after modifying the // table. admin.enableTable(tableName); } LOG.info("Modify table successfully."); } catch (IOException e) { LOG.error("Modify table failed " ,e); } finally { if (admin != null) { try { // Close the Admin object. admin.close(); } catch (IOException e) { LOG.error("Close admin failed " ,e); } } } LOG.info("Exiting testModifyTable."); }
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的testSingleColumnValueFilter方法中。 public void testSingleColumnValueFilter() { LOG.info("Entering testSingleColumnValueFilter."); Table table = null; ResultScanner rScanner = null; try { table = conn.getTable(tableName); Scan scan = new Scan(); scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name")); // Set the filter criteria. SingleColumnValueFilter filter = new SingleColumnValueFilter( Bytes.toBytes("info"), Bytes.toBytes("name"), CompareOperator.EQUAL, Bytes.toBytes("Xu Bing")); scan.setFilter(filter); // Submit a scan request. rScanner = table.getScanner(scan); // Print query results. for (Result r = rScanner.next(); r != null; r = rScanner.next()) { for (Cell cell : r.rawCells()) { LOG.info("{}:{},{},{}", Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneFamily(cell)), Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell))); } } LOG.info("Single column value filter successfully."); } catch (IOException e) { LOG.error("Single column value filter failed " ,e); } finally { if (rScanner != null) { // Close the scanner object. rScanner.close(); } if (table != null) { try { // Close the HTable object. table.close(); } catch (IOException e) { LOG.error("Close table failed " ,e); } } } LOG.info("Exiting testSingleColumnValueFilter."); }
  • 注意事项 当前二级索引不支持使用SubstringComparator类定义的对象作为Filter的比较器。 例如,如下示例中的用法当前不支持: Scan scan = new Scan();filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);filterList.addFilter(new SingleColumnValueFilter(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier),CompareOperator.EQUAL, new SubstringComparator(substring)));scan.setFilter(filterList);
  • 样例工程运行依赖包参考信息 Flink客户端lib目录、opt目录中都有flink jar包,其中lib目录中默认是flink核心jar包,opt目录中是对接外部组件的jar包(例如flink-connector-kafka*.jar),若应用开发中需要请手动复制相关jar包到lib目录中。 针对Flink提供的几个样例工程,其对应的运行依赖包如下: 表1 样例工程运行依赖包 样例工程 依赖包 依赖包获取地址 DataStream程序 异步Checkpoint机制程序 flink-dist_*.jar 可在Flink的客户端或者服务端安装路径的lib目录下获取。 使用Flink Jar提交SQL作业程序 FlinkServer REST API程序 flink-dist_*.jar flink-table_*.jar 可在Flink的客户端或者服务端安装路径的lib目录下获取。 向Kafka生产并消费数据程序 kafka-clients-*.jar flink-connector-kafka_*.jar kafka-clients-*.jar由Kafka组件发布提供,可在Kafka组件客户端或者服务端安装路径下的lib目录下获取。 flink-connector-kafka_*.jar可在Flink客户端或者服务端安装路径的opt目录下获取。 pipeline程序 flink-connector-netty_*.jar flink-dist_*.jar flink-connector-netty_*.jar可在二次开发样例代码编译后产生的lib文件夹下获取。 flink-dist_*.jar可在Flink的客户端或者服务端安装路径的lib目录下获取。 Stream SQL Join程序 kafka-clients-*.jar flink-connector-kafka_*.jar flink-dist_*.jar flink-table_*.jar kafka-clients-*.jar由Kafka组件发布提供,可在Kafka组件客户端或者服务端安装路径下的lib目录下获取。 flink-connector-kafka_*.jar可在Flink客户端或者服务端安装路径的opt目录下获取。 flink-dist_*.jar、flink-table_*.jar可在Flink的客户端或者服务端安装路径的lib目录下获取。 Flink读写HBase程序 flink-connector-hbase*.jar flink-dist_*.jar flink-table_*.jar hbase-clients-*.jar flink-connector-hbase_*.jar可在Flink客户端或者服务端安装路径的opt目录下获取。 flink-dist_*.jar、flink-table_*.jar可在Flink的客户端或者服务端安装路径的lib目录下获取。 hbase-clients-*.jar由HBase组件发布提供,可在HBase组件客户端或者服务端安装路径下的lib目录下获取。 Flink读写Hudi程序 hbase-unsafe-*.jar 可在二次开发样例代码编译后产生的lib文件夹下获取。
  • Hive应用开发流程介绍 开发流程中各阶段的说明如图1和表1所示。 图1 Hive应用程序开发流程 表1 Hive应用开发的流程说明 阶段 说明 参考文档 了解基本概念 在开始开发应用前,需要了解Hive的基本概念。 常用概念 准备开发和运行环境 Hive的应用程序支持使用Java、Python两种语言进行开发。推荐使用IntelliJ IDEA工具,请根据指导完成不同语言的开发环境配置。 准备Hive应用开发环境 根据场景开发工程 提供了Java、Python两种不同语言的样例工程,还提供了从建表、数据加载到数据查询的样例工程。 开发Hive应用 运行程序及查看结果 指导用户将开发好的程序编译提交运行并查看结果。 调测Hive应用 父主题: Hive开发指南(普通模式)
  • Python3样例工程的命令行形式运行 赋予“python3-examples”文件夹中脚本的可执行权限。在命令行终端执行以下命令: chmod +x python3-examples -R。 在python3-examples/pyCLI_nosec.py中的host的值修改为安装HiveServer的节点的业务平面IP,port的值修改为Hive提供Thrift服务的端口(hive.server2.thrift.port),默认值为“10000”。 执行以下命令运行Python3客户端: cd python3-examples python pyCLI_nosec.py 在命令行终端查看样例代码中的HQL所查询出的结果。例如: [['default', '']] [{'comment': 'from deserializer', 'columnName': 'tab_name', 'type': 'STRING_TYPE'}] ['xx']
  • HetuEngine应用开发流程介绍 开发流程中各阶段的说明如图1所示: 图1 HetuEngine应用程序开发流程 表1 HetuEngine应用开发的流程说明 阶段 说明 参考文档 了解基本概念 在开始开发应用前,需要了解HetuEngine的基本概念,了解场景需求等。 HetuEngine基本概念 准备开发和运行环境 HetuEngine的应用程序支持使用任何语言调用JDBC接口进行开发,当前样例主要是java语言。推荐使用IDEA工具,请根据指导完成不同语言的开发环境配置。HetuEngine的运行环境即客户端,请根据指导完成客户端的安装和配置。 准备本地应用开发环境 准备工程 HetuEngine提供了不同场景下的样例程序,您可以导入样例工程进行程序学习。或者您可以根据指导,新建一个HetuEngine工程。 导入并配置HetuEngine样例工程 根据场景开发工程 提供了Java语言的样例工程,包括连接HetuEngine、SQL语句执行、结果解析,断开连接等全流程的样例工程。 开发HetuEngine应用 编译并运行程序 指导用户将开发好的程序编译并提交运行。 调测HetuEngine应用 查看程序运行结果 程序运行结果会根据结果解析部分的实现显示到期望显示的地方。 调测HetuEngine应用 父主题: HetuEngine开发指南(普通模式)
  • 配置样例代码 在开发环境IntelliJ IDEA中,单击“src/springboot/hbase-examples/src/main/resources”目录下的“springclient.properties”文件,按需修改如下表1中提供的参数: 表1 配置说明表 配置名称 含义 conf.path HBase配置文件所在目录,即“hbase-example\src\main\resources\conf”。
  • HetuEngine基本概念 HSBroker:HetuEngine的服务代理,用作用户租户管理校验,HetuEngine访问URL的获取等。 Coordinator:HetuEngine服务的资源协调者,负责SQL解析和优化等事务。 Worker:负责执行任务和处理数据。 Connector:HetuEngine访问数据库的接口,HetuEngine通过Connector的驱动连接数据源,读取数据源元数据和对数据进行增删改查等操作。 Catalog:HetuEngine中一个catalog配置文件对应一个数据源,一个数据源可以有多个不同catalog配置,可以通过数据源的properties文件进行配置。 Schema:对应数据库的Schema名称。 Table:对应数据库的表名。
  • HetuEngine连接方式说明 表1 HetuEngine连接方式说明 连接方式 是否支持用户名密码认证方式 是否支持Keytab认证方式 是否支持客户端跨网段访问 使用前提 HSFabric 是 是 是 确保业务侧和HetuServer服务端HSFabric所在业务节点网络互通 适用于双平面的网络场景 只需对外开放HSFabric固定的IP,端口 支持范围: MRS 3.1.3及之后版本 HSBroker 是 否 否 确保业务侧和HetuServer服务端HSBroker、Coordinator(随机分布在Yarn NodeManger)所在业务节点网络互通 需对外开放Coordinator的IP,端口 支持范围:MRS 3.1.0及之后版本
  • 代码样例 以下代码片段在“hbase-zk-example\src\main\java\com\huawei\hadoop\hbase\example”包的“TestZKSample”类中,用户主要需要关注“login”和“connectApacheZK”这两个方法。 private static void login(String keytabFile, String principal) throws IOException { conf = HBaseConfiguration.create(); //In Windows environment String confDirPath = TestZKSample.class.getClassLoader().getResource("").getPath() + File.separator;[1] //In Linux environment //String confDirPath = System.getProperty("user.dir") + File.separator + "conf" + File.separator; // Set zoo.cfg for hbase to connect to fi zookeeper. conf.set("hbase.client.zookeeper.config.path", confDirPath + "zoo.cfg"); if (User.isHBaseSecurityEnabled(conf)) { // jaas.conf file, it is included in the client pakcage file System.setProperty("java.security.auth.login.config", confDirPath + "jaas.conf"); // set the kerberos server info,point to the kerberosclient System.setProperty("java.security.krb5.conf", confDirPath + "krb5.conf"); // set the keytab file name conf.set("username.client.keytab.file", confDirPath + keytabFile); // set the user's principal try { conf.set("username.client.kerberos.principal", principal); User.login(conf, "username.client.keytab.file", "username.client.kerberos.principal", InetAddress.getLocalHost().getCanonicalHostName()); } catch (IOException e) { throw new IOException("Login failed.", e); } } } private void connectApacheZK() throws IOException, org.apache.zookeeper.KeeperException { try { // Create apache zookeeper connection. ZooKeeper digestZk = new ZooKeeper("127.0.0.1:2181", 60000, null); LOG.info("digest directory:{}", digestZk.getChildren("/", null)); LOG.info("Successfully connect to apache zookeeper."); } catch (InterruptedException e) { LOG.error("Found error when connect apache zookeeper ", e); } }
  • 通过Python API的方式提交Flink SQL作业到Yarn上代码样例 下面列出pyflink-sql.py的主要逻辑代码作为演示,在提交之前需要确保“file_path” 为要运行的SQL的路径,建议写全路径。 完整代码参见“flink-examples/pyflink-example/pyflink-sql”中的“pyflink-sql.py”。 import loggingimport sysimport osfrom pyflink.table import (EnvironmentSettings, TableEnvironment)def read_sql(file_path): if not os.path.isfile(file_path): raise TypeError(file_path + " does not exist") all_the_text = open(file_path).read() return all_the_textdef exec_sql(): # 提交之前修改SQL路径 file_path = "datagen2kafka.sql" sql = read_sql(file_path) t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode()) statement_set = t_env.create_statement_set() sqlArr = sql.split(";") for sqlStr in sqlArr: sqlStr = sqlStr.strip() if sqlStr.lower().startswith("create"): print("---------create---------------") print(sqlStr) t_env.execute_sql(sqlStr) if sqlStr.lower().startswith("insert"): print("---------insert---------------") print(sqlStr) statement_set.add_insert_sql(sqlStr) statement_set.execute()if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") exec_sql() 表2 使用Python提交SQL作业参数说明 参数 说明 示例 file_path “datagen2kafka.sql”文件路径,建议写全路径。需在二次样例代码中获取并上传至客户端指定目录。 说明: 当作业需要以yarn-application模式提交时,需替换如下路径: file_path = os.getcwd() + "/../../../../yarnship/datagen2kafka.sql" file_path = /客户端安装目录/Flink/flink/datagen2kafka.sql SQL示例: create table kafka_sink ( uuid varchar(20), name varchar(10), age int, ts timestamp(3), p varchar(20)) with ( 'connector' = 'kafka', 'topic' = 'input2', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup2', 'scan.startup.mode' = 'latest-offset', 'format' = 'json');create TABLE datagen_source ( uuid varchar(20), name varchar(10), age int, ts timestamp(3), p varchar(20)) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1');INSERT INTO kafka_sinkSELECT *FROM datagen_source;
  • 通过Python API的方式提交Flink读写Kafka作业到Yarn上代码样例 下面列出pyflink-kafka.py的主要逻辑代码作为演示,在提交之前需要确保“file_path” 为要运行的SQL的路径,建议写全路径。 完整代码参见“flink-examples/pyflink-example/pyflink-kafka”中的“pyflink-kafka.py”。 import osimport loggingimport sysfrom pyflink.common import JsonRowDeserializationSchema, JsonRowSerializationSchemafrom pyflink.common.typeinfo import Typesfrom pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumerfrom pyflink.datastream import StreamExecutionEnvironmentfrom pyflink.table import TableEnvironment, EnvironmentSettingsdef read_sql(file_path): if not os.path.isfile(file_path): raise TypeError(file_path + " does not exist") all_the_text = open(file_path).read() return all_the_textdef exec_sql(): # 提交前修改sql路径 # file_path = "/opt/client/Flink/flink/insertData2kafka.sql" # file_path = os.getcwd() + "/../../../../yarnship/insertData2kafka.sql" # file_path = "/opt/client/Flink/flink/conf/ssl/insertData2kafka.sql" file_path = "insertData2kafka.sql" sql = read_sql(file_path) t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode()) statement_set = t_env.create_statement_set() sqlArr = sql.split(";") for sqlStr in sqlArr: sqlStr = sqlStr.strip() if sqlStr.lower().startswith("create"): print("---------create---------------") print(sqlStr) t_env.execute_sql(sqlStr) if sqlStr.lower().startswith("insert"): print("---------insert---------------") print(sqlStr) statement_set.add_insert_sql(sqlStr) statement_set.execute()def read_write_kafka(): # find kafka connector jars env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) specific_jars = "file:///opt/client/Flink/flink/lib/flink-connector-kafka-xxx.jar" # specific_jars = "file://" + os.getcwd() + "/../../../../yarnship/flink-connector-kafka-xxx.jar" # specific_jars = "file:///opt/client/Flink/flink/conf/ssl/flink-connector-kafka-xxx.jar" # the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues env.add_jars(specific_jars) kafka_properties = {'bootstrap.servers': '192.168.20.162:21005', 'group.id': 'test_group'} deserialization_schema = JsonRowDeserializationSchema.builder() \ .type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build() kafka_consumer = FlinkKafkaConsumer( topics='test_source_topic', deserialization_schema=deserialization_schema, properties=kafka_properties) print("---------read ---------------") ds = env.add_source(kafka_consumer) serialization_schema = JsonRowSerializationSchema.builder().with_type_info( type_info=Types.ROW([Types.INT(), Types.STRING()])).build() kafka_producer = FlinkKafkaProducer( topic='test_sink_topic', serialization_schema=serialization_schema, producer_config=kafka_properties) print("--------write------------------") ds.add_sink(kafka_producer) env.execute("pyflink kafka test")if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") print("------------------insert data to kafka----------------") exec_sql() print("------------------read_write_kafka----------------") read_write_kafka() 表1 使用Python提交普通作业参数说明 参数 说明 示例 bootstrap.servers Kafka的Broker实例业务IP和端口。 192.168.12.25:21005 specific_jars “客户端安装目录/Flink/flink/lib/flink-connector-kafka-*.jar”包路径,建议写全路径。 说明: 当作业需要以yarn-application模式提交时,需替换如下路径,jar包版本号请以实际为准: specific_jars="file://"+os.getcwd()+"/../../../../yarnship/flink-connector-kafka-1.15.0-h0.cbu.mrs.330.r13.jar" specific_jars = file:///客户端安装目录/Flink/flink/lib/flink-connector-kafka-1.15.0-h0.cbu.mrs.330.r13.jar file_path “insertData2kafka.sql”文件路径,建议写全路径。需在二次样例代码中获取并上传至客户端指定目录。 说明: 当作业需要以yarn-application模式提交时,需替换如下路径: file_path = os.getcwd() + "/../../../../yarnship/insertData2kafka.sql" file_path = /客户端安装目录/Flink/flink/insertData2kafka.sql SQL示例: create table kafka_sink_table ( age int, name varchar(10)) with ( 'connector' = 'kafka', 'topic' = 'test_source_topic', --写入Kafka的topic名称,需确保与上述Python文件中的topic相同 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'test_group', 'format' = 'json');create TABLE datagen_source_table ( age int, name varchar(10)) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1');INSERT INTO kafka_sink_tableSELECT *FROM datagen_source_table;
  • 数据规划 发布者Job使用自定义算子每秒钟产生10000条数据 数据包含两个属性:分别是Int和String类型 配置文件 nettyconnector.registerserver.topic.storage:设置NettySink的IP、端口及并发度信息在第三方注册服务器上的路径(必填),例如: nettyconnector.registerserver.topic.storage: /flink/nettyconnector nettyconnector.sinkserver.port.range:设置NettySink的端口范围(必填),例如: nettyconnector.sinkserver.port.range: 28444-28943 nettyconnector.sinkserver.subnet:设置网络所属域,例如: nettyconnector.sinkserver.subnet: 10.162.0.0/16 接口说明 注册服务器接口 注册服务器用来保存NettySink的IP、端口以及并发度信息,以便NettySource连接使用。为用户提供以下接口: public interface RegisterServerHandler { /** * 启动注册服务器 * @param configuration Flink的Configuration类型 */void start(Configuration configuration) throws Exception;/** *注册服务器上创建Topic节点(目录) * @param topic topic节点名称 */void createTopicNode(String topic) throw Exception;/***将信息注册到某个topic节点(目录)下* @param topic 需要注册到的目录* @param registerRecord 需要注册的信息*/void register(String topic, RegisterRecord registerRecord) throws Exception;/** *删除topic节点 * @param topic 待删除topic */ void deleteTopicNode(String topic) throws Exception;/** *注销注册信息 *@param topic 注册信息所在的topic *@param recordId 待注销注册信息ID */void unregister(String topic, int recordId) throws Exception;/** * 查询信息* @param 查询信息所在的topic*@recordId 查询信息的ID*/RegisterRecord query(String topic, int recordId) throws Exception;/** * 查询某个Topic是否存在 * @param topic */Boolean isExist(String topic) throws Exception;/** *关闭注册服务器句柄 */void shutdown() throws Exception; 工程基于以上接口提供了ZookeeperRegisterHandler供用户使用。 NettySink算子 Class NettySink(String name, String topic, RegisterServerHandler registerServerHandler,int numberOfSubscribedJobs) name:为本NettySink的名称。 topic:为本NettySink产生数据的Topic,每个不同的NettySink(并发度除外)必须使用不同的TOPIC,否则会引起订阅混乱,数据无法正常分发。 registerServerHandler:为注册服务器的句柄。 numberOfSubscribedJobs:为订阅本NettySink的作业数量,该数量必须是明确的,只有当所有订阅者都连接上NettySink,NettySink才发送数据。 NettySource算子 Class NettySource(String name,String topic,RegisterServerHandler registerServerHandler) name:为本NettySource的名称,该NettySource必须是唯一的(并发度除外),否则,连接NettySink时会出现冲突,导致无法连接。 topic:订阅的NettySink的topic。 registerServerHandler:为注册服务器的句柄。 NettySource的并发度必须与NettySource的并发度相同,否则无法正常创建连接。
  • 简介 Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。它的最大亮点是流处理,是业界最顶级的开源流处理引擎。 Flink最适合的应用场景是低时延的数据处理(Data Processing)场景:高并发pipeline处理数据,时延毫秒级,且兼具可靠性。 Flink技术栈如图1所示。 图1 Flink技术栈 Flink在当前版本中重点构建如下特性,其他特性继承开源社区,不做增强。 DataStream Checkpoint 窗口 Job Pipeline 配置表
  • Flink基本概念 DataStream 数据流,是指Flink系统处理的最小数据单元。该数据单元最初由外部系统导入,可以通过Socket、Kafka和文件等形式导入,在Flink系统处理后,通过Socket、Kafka和文件等输出到外部系统,这是Flink的核心概念。 Data Transformation 数据处理单元,会将一或多个DataStream转换成一个新的DataStream。 具体可以细分如下几类: 一对一的转换:如Map。 一对0、1或多个的转换:如FlatMap。 一对0或1的转换,如Filter。 多对1转换,如Union。 多个聚合的转换,如window、keyby。 CheckPoint CheckPoint是Flink数据处理高可靠、最重要的机制。该机制可以保证应用在运行过程中出现失败时,应用的所有状态能够从某一个检查点恢复,保证数据仅被处理一次(Exactly Once)。 SavePoint Savepoint是指允许用户在持久化存储中保存某个checkpoint,以便用户可以暂停自己的任务进行升级。升级完后将任务状态设置为savepoint存储的状态开始恢复运行,保证数据处理的延续性。
共100000条