云服务器内容精选

  • 操作步骤 假设存在如下一张表: user_data(user_group int, user_name string, update_time timestamp); 其中user_group是分区列,现在需要根据已有数据,按更新时间进行排序,刷新用户组信息。 开启Hive动态分区参数。 set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict; 创建一个临时表存储去重后的数据。 CREATE TABLE temp_user_data AS SELECT * FROM ( SELECT *, ROW_NUMBER() OVER(PARTITION BY user_group ORDER BY update_time DESC) as rank FROM user_data ) tmp WHERE rank = 1; 使用临时数据作为数据源,插入目的表。 INSERT OVERWRITE TABLE user_data SELECT user_group, user_name, update_time FROM temp_user_data; 清理临时表。 DROP TABLE IF EXISTS temp_user_data;
  • 操作步骤 一个简单的流处理系统由以下三部分组件组成:数据源 + 接收器 + 处理器。数据源为Kafka,接受器为Streaming中的Kafka数据源接收器,处理器为Streaming。 对Streaming调优,就必须使该三个部件的性能都更优化。 数据源调优 在实际的应用场景中,数据源为了保证数据的容错性,会将数据保存在本地磁盘中,而Streaming的计算结果全部在内存中完成,数据源很有可能成为流式系统的最大瓶颈点。 对Kafka的性能调优,有以下几个点: 使用Kafka-0.8.2以后版本,可以使用异步模式的新Producer接口。 配置多个Broker的目录,设置多个IO线程,配置Topic合理的Partition个数。 详情请参见Kafka开源文档中的“性能调优”部分:http://kafka.apache.org/documentation.html。 接收器调优 Streaming中已有多种数据源的接收器,例如Kafka、Flume、MQTT、ZeroMQ等,其中Kafka的接收器类型最多,也是最成熟一套接收器。 Kafka包括三种模式的接收器API: KafkaReceiver:直接接收Kafka数据,进程异常后,可能出现数据丢失。 ReliableKafkaReceiver:通过ZooKeeper记录接收数据位移。 DirectKafka:直接通过RDD读取Kafka每个Partition中的数据,数据高可靠。 从实现上来看,DirectKafka的性能更优,实际测试上来看,DirectKafka也确实比其他两个API性能好了不少。因此推荐使用DirectKafka的API实现接收器。 数据接收器作为一个Kafka的消费者,对于它的配置优化,请参见Kafka开源文档:http://kafka.apache.org/documentation.html。 处理器调优 Spark Streaming的底层由Spark执行,因此大部分对于Spark的调优措施,都可以应用在Spark Streaming之中,例如: 数据序列化 配置内存 设置并行度 使用External Shuffle Service提升性能 在做Spark Streaming的性能优化时需注意一点,越追求性能上的优化,Spark Streaming整体的可靠性会越差。例如: “spark.streaming.receiver.writeAheadLog.enable”配置为“false”的时候,会明显减少磁盘的操作,提高性能,但由于缺少WAL机制,会出现异常恢复时,数据丢失。 因此,在调优Spark Streaming的时候,这些保证数据可靠性的配置项,在生产环境中是不能关闭的。 日志归档调优 参数“spark.eventLog.group.size”用来设置一个应用的JobHistory日志按照指定job个数分组,每个分组会单独创建一个文件记录日志,从而避免应用长期运行时形成单个过大日志造成JobHistory无法读取的问题,设置为“0”时表示不分组。 大部分Spark Streaming任务属于小型job,而且产生速度较快,会导致频繁的分组,产生大量日志小文件消耗磁盘I/O。建议增大此值,例如改为“1000”或更大值。