云服务器内容精选
-
在对性能要求比较高的场景下,可以使用Kryo优化序列化性能 Spark提供了两种序列化实现: org.apache.spark.serializer.KryoSerializer:性能好,兼容性差 org.apache.spark.serializer.JavaSerializer:性能一般,兼容性好 使用:conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 为什么不默认使用Kryo序列化? Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。官方介 绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有使用Kryo作为序列化类库,是因为Kryo要求要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。
-
RDD多次使用时,建议将RDD持久化 RDD在默认情况下的存储级别是StorageLevel.NONE,即既不存磁盘也不放在内存中,如果某个RDD需要多次使用,可以考虑将该RDD持久化,方法如下: 调用spark.RDD中的cache()、persist()、persist(newLevel:StorageLevel)函数均可将RDD持久化,cache()和persist()都是将RDD的存储级别设置为StorageLevel.MEMORY_ONLY,persist(newLevel:StorageLevel)可以为RDD设置其他存储级别,但是要求调用该方法之前RDD的存储级别为StorageLevel.NONE或者与newLevel相同,也就是说,RDD的存储级别一旦设置为StorageLevel.NONE之外的级别,则无法改变。 如果想要将RDD去持久化,那么可以调用unpersist(blocking:Boolean = true),该函数功能如下: 将该RDD从持久化列表中移除,RDD对应的数据进入可回收状态; 将RDD的存储级别重新设置为StorageLevel.NONE。
-
在业务情况允许的情况下使用高性能算子 使用reduceByKey/aggregateByKey替代groupByKey。 所谓的map-side预聚合,说的是在每个节点本地对相同的key进行一次聚合操作,类似于MapReduce中的本地combiner。 map-side预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的key都被聚合起来了。其他节点在拉取所有节点上的相同key时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘IO以及网络传输开销。通常来说,在可能的情况下,建议使用reduceByKey或aggregateByKey算子来替代掉groupByKey算子。因为reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合。而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。 使用mapPartitions替代普通map。 mapPartitions类的算子,一次函数调用会处理一个partition所有的数据,而不是一次函数调用处理一条,性能相对来说会高一些。 但是有的时候,使用mapPartitions会出现OOM(内存溢出)的问题。因为单次函数调用就要处理掉一个partition所有的数据,如果内存不够,垃圾回收时是无法回收掉太多对象的,很可能出现OOM异常。所以使用这类操作时要慎重! 使用filter之后进行coalesce操作。 通常对一个RDD执行filter算子过滤掉RDD中较多数据后(比如30%以上的数据),建议使用coalesce算子,手动减少RDD的partition数量,将RDD中的数据压缩到更少的partition中去。因为filter之后,RDD的每个partition中都会有很多数据被过滤掉,此时如果照常进行后续的计算,其实每个task处理的partition中的数据量并不是很多,有一点资源浪费,而且此时处理的task越多,可能速度反而越慢。因此用coalesce减少partition数量,将RDD中的数据压缩到更少的partition之后,只要使用更少的task即 可处理完所有的partition。在某些场景下,对于性能的提升会有一定的帮助。 使用repartitionAndSortWithinPartitions替代repartition与sort类操作。 repartitionAndSortWithinPartitions是Spark官网推荐的一个算子,官方建议,如果需要在 repartition重分区之后,还要进行排序,建议直接使用repartitionAndSortWithinPartitions 算子。因为该算子 可以一边进行重分区的shuffle操作,一边进行排序。shuffle与sort两个操作同时进行,比先shuffle再sort来说,性能可能是要高的。 使用foreachPartitions替代foreach。 原理类似于“使用mapPartitions替代map”,也是一次函数调用处理一个partition的所有数据,而不是一次函数调用处理一条数 据。在实践中发现,foreachPartitions类的算子,对性能的提升还是很有帮助的。比如在foreach函数中,将RDD中所有数据写 MySQL,那么如果是普通的foreach算子,就会一条数据一条数据地写,每次函数调用可能就会创建一个数据库连接,此时就势必会频繁地创建和销毁数据库连接,性能是非常低下;但是如果用foreachPartitions算子一次性处理一个partition的数据,那么对于每个 partition,只要创建一个数据库连接即可,然后执行批量插入操作,此时性能是比较高的。
-
Hive JDBC驱动的加载 客户端程序以JDBC的形式连接HiveServer时,需要首先加载Hive的JDBC驱动类org.apache.hive.jdbc.HiveDriver。 故在客户端程序的开始,必须先使用当前类加载器加载该驱动类。 如果classpath下没有相应的jar包,则客户端程序抛出Class Not Found异常并退出。 如下: Class.forName("org.apache.hive.jdbc.HiveDriver").newInstance();
-
关闭数据库连接 客户端程序在执行完HQL之后,注意关闭数据库连接,以免内存泄露,同时这是一个良好的编程习惯。 需要关闭JDK的两个对象statement和connection。 如下: finally { if (null != statement) { statement.close(); } // 关闭JDBC连接 if (null != connection) { connection.close(); } }
-
使用WebHCat的REST接口以Streaming方式提交MR任务的前置条件 本接口需要依赖hadoop的streaming包,在以Streaming方式提交MR任务给WebHCat前,需要将“hadoop-streaming-2.7.0.jar”包上传到HDFS的指定路径下:“hdfs:///apps/templeton/hadoop-streaming-2.7.0.jar”。首先登录到安装有客户端和Hive服务的节点上,以客户端安装路径为“/opt/client”为例: source /opt/client/bigdata_env 使用kinit登录人机用户或者机机用户。 hdfs dfs -put ${BIGDATA_HOME}/FusionInsight_HD_8.1.0.1/FusionInsight-Hadoop-*/hadoop/share/hadoop/tools/lib/hadoop-streaming-*.jar /apps/templeton/ 其中/apps/templeton/需要根据不同的实例进行修改,默认实例使用/apps/templeton/,Hive1实例使用/apps1/templeton/,以此类推。
-
客户端配置参数需要与服务端保持一致 当集群的Hive、YARN、HDFS服务端配置参数发生变化时,客户端程序对应的参数会被改变,用户需要重新审视在配置参数变更之前提交到HiveServer的配置参数是否和服务端配置参数一致,如果不一致,需要用户在客户端重新调整并提交到HiveServer。例如下面的示例中,如果修改了集群中的YARN配置参数时,Hive客户端、示例程序都需要审视并修改之前已经提交到HiveServer的配置参数: 初始状态: 集群YARN的参数配置如下: mapreduce.reduce.java.opts=-Xmx2048M 客户端的参数配置如下: mapreduce.reduce.java.opts=-Xmx2048M 集群YARN修改后,参数配置如下: mapreduce.reduce.java.opts=-Xmx1024M 如果此时客户端程序不做调整修改,则还是以客户端参数有效,会导致reducer内存不足而使MR运行失败。
-
HQL语法规则之判空 判断字段是否为“空”,即没有值,使用“is null”;判断不为空,即有值,使用“is not null”。 要注意的是,在HQL中String类型的字段若是空字符串, 即长度为0,那么对它进行IS NULL的判断结果是False。此时应该使用“col = '' ”来判断空字符串;使用“col != '' ”来判断非空字符串。 正确示例: select * from default.tbl_src where id is null; select * from default.tbl_src where id is not null; select * from default.tbl_src where name = ''; select * from default.tbl_src where name != ''; 错误示例: select * from default.tbl_src where id = null; select * from default.tbl_src where id != null; select * from default.tbl_src where name is null; select * from default.tbl_src where name is not null; 注:表tbl_src的id字段为Int类型,name字段为String类型。
-
获取数据库连接 使用JDK的驱动管理类java.sql.DriverManager来获取一个Hive的数据库连接。 Hive的数据库URL为url="jdbc:hive2://xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver;sasl.qop=auth-conf;auth=KERBEROS;principal=hive/hadoop.hadoop.com@HADOOP.COM;user.principal=hive/hadoop.hadoop.com;user.keytab=conf/hive.keytab"; 以上已经经过安全认证,所以Hive数据库的用户名和密码为null或者空。 如下: // 建立连接 connection = DriverManager.getConnection(url, "", "");
-
执行HQL 执行HQL,注意HQL不能以";"结尾。 正确示例: String sql = "SELECT COUNT(*) FROM employees_info"; Connection connection = DriverManager.getConnection(url, "", ""); PreparedStatement statement = connection.prepareStatement(sql); resultSet = statement.executeQuery(); 错误示例: String sql = "SELECT COUNT(*) FROM employees_info;"; Connection connection = DriverManager.getConnection(url, "", ""); PreparedStatement statement = connection.prepareStatement(sql); resultSet = statement.executeQuery();
-
多线程安全登录方式 如果有多线程进行login的操作,当应用程序第一次登录成功后,所有线程再次登录时应该使用relogin的方式。 login的代码样例: private Boolean login(Configuration conf){ boolean flag = false; UserGroupInformation.setConfiguration(conf); try { UserGroupInformation.loginUserFromKeytab(conf.get(PRINCIPAL), conf.get(KEYTAB)); System.out.println("UserGroupInformation.isLoginKeytabBased(): " +UserGroupInformation.isLoginKeytabBased()); flag = true; } catch (IOException e) { e.printStackTrace(); } return flag; } relogin的代码样例: public Boolean relogin(){ boolean flag = false; try { UserGroupInformation.getLoginUser().reloginFromKeytab(); System.out.println("UserGroupInformation.isLoginKeytabBased(): " +UserGroupInformation.isLoginKeytabBased()); flag = true; } catch (IOException e) { e.printStackTrace(); } return flag; }
-
Flink常见参数说明 表1 Flink常见参数说明 参数名称 参数描述 建议值 说明 -c 指定主类名。 根据实际填写 必填 -yjm JobManager进程内存,默认值:2GB。 根据实际填写 选填 -ytm TaskManager进程内存,默认值:4GB。 根据实际填写 选填 -ynm Flink Yarn作业名称。 根据实际填写 必填 -ys TaskManager中slot个数。 2 选填 execution.checkpointing.interval checkpoint触发间隔(毫秒),通过-yD添加,单位毫秒。 60000 必填 execution.checkpointing.timeout checkpoint超时时长,通过-yD添加,默认值:30min。 30min 必填 execution.checkpointing.tolerable-failed-checkpoints checkpoint失败容忍次数总和,通过-yD添加。 1000 选填 state.checkpoints.num-retained checkpoint保留个数,通过-yD添加。 5 选填 state.backend 状态后端使用rocksdb,通过-yD添加。 rocksdb 默认开启 state.backend.incremental 开启rocksdb增量状态后端,通过-yD添加。 TRUE 必填 state.backend.rocksdb.block.blocksize 写状态后端的数据块大小,通过-yD添加。 512KB 必填 state.backend.rocksdb.block.cache-size 整个状态后端的block cache大小,通过-yD添加。 1024MB 必填 taskmanager.memory.jvm-overhead.max 用于JVM其他开销的本地内存的最大值,例如栈空间、垃圾回收空间等,通过-yD添加。 10g 选填 taskmanager.memory.jvm-overhead.fraction 用于JVM其他开销的本地内存占tm内存的比例,例如栈空间、垃圾回收空间等,通过-yD添加。 0.2 选填 parallelism.default 作业并行度,例如join算子,通过-yD添加,默认值:1。 根据实际填写 选填 table.exec.state.ttl Flink状态TTL(join ttl),通过-yD添加,默认值:0。 根据实际填写 必填 heartbeat.timeout jm与tm之间心跳超时时间,通过-yD添加。 1800000 必填 akka.ask.timeout akka通信超时时间,通过-yD添加。 240s 必填 taskmanager.memory.segment-size 内存管理和网络栈使用的内存缓冲块字节数大小,默认值: 32768 (32KB),通过-yD添加。 64kb 选填 taskmanager.network.memory.max-buffers-per-channel 每个channel最大能持有多少buffers,如果segment有很多空闲,可以适当调大该值,否则channel会因为拿不到segment而blocking,通过-yD添加。 100 选填 taskmanager.network.memory.buffers-per-channel 每个channel独享的buffer数,通过-yD添加。 10 选填 taskmanager.network.memory.floating-buffers-per-gate 每个channel浮动buffer数,通过-yD添加。 2000 选填 taskmanager.network.netty.server.numThreads 每个taskmanager中netty服务端线程数,通过-yD添加。 20 选填 taskmanager.network.netty.client.numThreads 每个taskmanager中netty客户端线程数,通过-yD添加。 20 选填 state.backend.rocksdb.files.open 最大打开文件数目,-1意味着没有限制,通过-yD添加。 -1 选填 state.backend.rocksdb.compaction.level.use-dynamic-size 参数允许Rocksdb对每层数据存储的数据量阈值进行动态调整,通过-yD添加。 TRUE 选填 state.backend.rocksdb.levels.num Rocksdb允许存储compaction数据层数,通过-yD添加。 10 选填 state.backend.rocksdb.compaction.style compaction算法,通过-yD添加。 FIFO 选填 state.backend.rocksdb.verify.checksum 关闭数据读取时数据check,通过-yD添加。 FALSE 选填 state.backend.rocksdb.thread.num 后台负责flush和compaction的最大并发线程数,通过-yD添加。 4 选填 state.backend.rocksdb.writebuffer.count memtable的最大数量,通过-yD添加。 5 选填 state.backend.rocksdb.writebuffer.number-to-merge 在flush发生之前被合并的memtable最小数量,通过-yD添加。 3 选填 state.backend.rocksdb.background.compaction.max 负责compaction最大线程数,通过-yD添加。 10 选填 state.backend.rocksdb.flush.max rocksdb flush线程数,通过-yD添加。 1 选填 父主题: Flink应用开发规范
-
HDFS需要开启DataNode数据存储路径 DataNode默认存储路径配置为:${BIGDATA_DATA_HOME}/hadoop/dataN/dn/datadir(N≥1),N为数据存放的目录个数。 例如:${BIGDATA_DATA_HOME}/hadoop/data1/dn/datadir、${BIGDATA_DATA_HOME}/hadoop/data2/dn/datadir 设置后,数据会存储到节点上每个挂载磁盘的对应目录下面。
-
HDFS创建文件 通过"FileSystem.mkdirs(Path f)"可在HDFS上创建文件夹,其中f为文件夹的完整路径。 正确示例: public class CreateDir { public static void main(String[] args) throws Exception{ Configuration conf=new Configuration(); FileSystem hdfs=FileSystem.get(conf); Path dfs=new Path("/TestDir"); hdfs.mkdirs(dfs); } }
-
多线程安全登录方式 如果有多线程进行login的操作,当应用程序第一次登录成功后,所有线程再次登录时应该使用relogin的方式。 login的代码样例: private Boolean login(Configuration conf){ boolean flag = false; UserGroupInformation.setConfiguration(conf); try { UserGroupInformation.loginUserFromKeytab(conf.get(PRINCIPAL), conf.get(KEYTAB)); System.out.println("UserGroupInformation.isLoginKeytabBased(): " +UserGroupInformation.isLoginKeytabBased()); flag = true; } catch (IOException e) { e.printStackTrace(); } return flag; } relogin的代码样例: public Boolean relogin(){ boolean flag = false; try { UserGroupInformation.getLoginUser().reloginFromKeytab(); System.out.println("UserGroupInformation.isLoginKeytabBased(): " +UserGroupInformation.isLoginKeytabBased()); flag = true; } catch (IOException e) { e.printStackTrace(); } return flag; } 多次重复登录会导致后建立的会话对象覆盖掉之前登录建立的,将会导致之前建立的会话无法被维护监控,最终导致会话超期后部分功能不可用。
更多精彩内容
CDN加速
GaussDB
文字转换成语音
免费的服务器
如何创建网站
域名网站购买
私有云桌面
云主机哪个好
域名怎么备案
手机云电脑
SSL证书申请
云点播服务器
免费OCR是什么
电脑云桌面
域名备案怎么弄
语音转文字
文字图片识别
云桌面是什么
网址安全检测
网站建设搭建
国外CDN加速
SSL免费证书申请
短信批量发送
图片OCR识别
云数据库MySQL
个人域名购买
录音转文字
扫描图片识别文字
OCR图片识别
行驶证识别
虚拟电话号码
电话呼叫中心软件
怎么制作一个网站
Email注册网站
华为VNC
图像文字识别
企业网站制作
个人网站搭建
华为云计算
免费租用云托管
云桌面云服务器
ocr文字识别免费版
HTTPS证书申请
图片文字识别转换
国外域名注册商
使用免费虚拟主机
云电脑主机多少钱
鲲鹏云手机
短信验证码平台
OCR图片文字识别
SSL证书是什么
申请企业邮箱步骤
免费的企业用邮箱
云免流搭建教程
域名价格