云服务器内容精选

  • 自定义UDTF使用指导 在Spark 3.3.1中,PySpark目前没有直接提供类似Scala那样的UDTF(用户自定义表生成函数)的API。如果你需要实现UDTF效果(即一行数据转换成多行数据),通常可以使用如下替代方案: 利用UDF返回列表,再使用DataFrame的explode函数将数组拆分成多行。 使用RDD转换中的flatMap来实现一行生成多行数据的效果。 以一行数据转换成多行数据场景为例: 利用UDF返回Array,再使用explode拆分。 用Python函数编写一个UDF,该函数将输入数据(或一行中的某个字段)转化为列表(或数组)。然后使用DataFrame内置的explode函数将数组拆分成多行,从而达到UDTF的效果。 示例: from pyspark.sql import SparkSession from pyspark.sql.functions import udf, explode from pyspark.sql.types import ArrayType, StringType sparkSession = SparkSession.builder.appName("SimulateUDTF").getOrCreate() data = [("a,b,c",), ("d,e",)] df = spark.createDataFrame(data, ["csv_col"]) def split_string(s): if s: return s.split(",") return [] split_udf = udf(split_string, ArrayType(StringType())) df_with_array = df.withColumn("split_col", split_udf("csv_col")) df_exploded = df_with_array.withColumn("word", explode("split_col")) df_exploded.show() sparkSession.stop() 运行结果: +-------+---------+----+ |csv_col|split_col|word| +-------+---------+----+ | a,b,c|[a, b, c]| a| | a,b,c|[a, b, c]| b| | a,b,c|[a, b, c]| c| | d,e| [d, e]| d| | d,e| [d, e]| e| +-------+---------+----+ 利用RDD的flatMap实现。 对于更灵活地情况,您可以直接使用DataFrame的RDD接口,通过flatMap操作来实现转换。 样例(以将输入行拆分为多行为例): from pyspark.sql import SparkSession sparkSession = SparkSession.builder.appName("SimulateUDTFWithFlatMap").getOrCreate() data = [("a,b,c",), ("d,e",)] df = spark.createDataFrame(data, ["csv_col"]) rdd = df.rdd.flatMap(lambda row: [(row.csv_col, token) for token in row.csv_col.split(",")]) new_df = rdd.toDF(["csv_col", "word"]) new_df.show() sparkSession.stop() 运行结果: +-------+----+ |csv_col|word| +-------+----+ | a,b,c| a| | a,b,c| b| | a,b,c| c| | d,e| d| | d,e| e| +-------+----+
  • 自定义UDF使用指导 可通过pyspark.sql.functions.udf函数创建自定义udf,pyspark.sql.functions.udf介绍如下: pyspark.sql.functions.udf(f: Union[Callable[[…], Any], DataTypeOrString, None] = None, returnType: DataTypeOrString = StringType()) → Union[UserDefinedFunctionLike, Callable[[Callable[[…], Any]], UserDefinedFunctionLike]] 表1 参数说明 参数 说明 f python函数。 returnType 用户定义函数的返回类型。该值可以是pyspark.sql.types.DataType对象,也可以是ddl格式的类型字符串。 用户定义函数默认被认为是确定性的。由于优化,可以消除重复调用,甚至可能调用该函数比查询中出现的次数更多。如果你的函数不是确定性的,可以调用用户定义函数的asNondeterministic。例如: from pyspark.sql.types import IntegerType import random random_udf = udf(lambda: int(random.random() * 100), IntegerType()).asNondeterministic() 用户定义函数不支持条件表达式或布尔表达式中的短路,最终只能在内部执行。如果函数在特殊行上失败,则解决方法是将条件合并到函数中。 用户定义函数在调用方不接受关键字参数。 示例: from pyspark.sql import SparkSession from pyspark.sql.functions import udf from pyspark.sql.types import IntegerType sparkSession = SparkSession.builder.getOrCreate() slen = udf(lambda s: len(s), IntegerType()) @udf def to_upper(s): if s is not None: return s.upper() @udf(returnType=IntegerType()) def add_one(x): if x is not None: return x + 1 df = sparkSession.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age")) df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")).show() sparkSession.stop() 运行结果: +----------+--------------+------------+ |slen(name)|to_upper(name)|add_one(age)| +----------+--------------+------------+ | 8| JOHN DOE| 22| +----------+--------------+------------+
  • 修订记录 发布日期 修订记录 2025-05-21 新增专属技术服务经理的服务方式是什么?、购买企业级支持计划包含专属技术服务经理和驻场服务吗?、提供现场服务吗?章节。更新支持计划如何计费?章节 2024-11-25 更新我的问题多久可以获得回复?、支持计划是否支持退订,如何退订?、我分别购买两个半年的支持计划产品,能否享受包年权益?章节。 2021-11-11 更新支持计划支持哪些Region?章节。 2019-01-31 第一次正式发布。 父主题: 常见问题
  • MindSpore Lite问题定位指南 在MindSpore Lite使用中遇到问题时,例如模型转换失败、训练后量化转换失败、模型推理失败、模型推理精度不理想、模型推理性能不理想、使用Visual Studio报错、使用Xcode构建APP报错等,您可以先查看日志信息进行定位分析。 多数场景下的问题可以通过日志报错信息直接定位。如果日志的信息不能定位问题,您可以通过设置环境变量调整日志等级,打印更多调试日志。 关于如何对MindSpore Lite遇到的问题进行定位与解决,请参见MindSpore Lite官网提供的问题定位指南。 父主题: 常见问题
  • 下单后如何获取License文件,需要提供哪些信息? 当您购买的“通用SDK”基础版数据建模引擎订单付款成功后,需要通过提交工单的方式申请License文件。在此之前,您需要提前准备如下相关信息: SDK所属应用的应用ID(License文件不支持跨应用使用)。 已支付订单中购买的用户数量。 已支付订单中订阅的时长。 SDK部署机器的信息。 如果SDK部署机器为本地通用服务器,则需要准备如下信息: Windows系统:提供机器MAC地址。 Linux系统:提供机器的UUID。 如果SDK部署机器为容器,则提供容器的ESN。
  • 什么时候需要更换License文件? 当您存在如下变更场景时,需要更换License文件: 用户数变更 例如,您已购买500个用户的SDK服务,在使用过程中发现500个用户已无法满足您的业务诉求,重新购买1000个用户的SDK服务用以扩充用户数量。 更换绑定的应用 例如,您之前配置了A应用的信息,因业务调整,A应用不再使用,现在希望配置成其他应用。 机器部署范围变更 例如,您之前配置了4台部署机器,因业务调整,新增2台部署机器。 授权时长变更 例如,您申请的License文件将于2024年09月30日到期,您希望将此License文件延期至2026年09月30日。
  • 解决方法 创建应用时,“应用名称”输入唯一的应用名称,例如:weathermap_test。 单击1已创建的天气预报应用名称(例如:weathermap_test),进入“应用概览”页。 单击“环境变量”,在下拉列表选择应用组件部署环境(例如:env-test)。 单击“添加环境变量”,设置环境变量。 “变量名称”根据微服务组件采用的技术,参考下表设置。 微服务组件采用技术 变量名称 Java Chassis servicecomb_service_application Spring Cloud spring_cloud_servicecomb_discovery_appName 对应的“变量/变量引用”设置为1已创建的应用名称,例如:weathermap_test。 单击“提交”,完成应用环境变量的设置。
  • ContextCaching接口超时时间如何设置? ContextCaching的读写相关接口执行时间,跟请求并发数、每个请求的键值对数量有关系,当前单个请求超时时间默认5秒,用户可以根据SLO(Service Level Objective,服务级别目标,例如吞吐、首token时延等)、请求batch数和KVCache数据量,合理设置超时时间。例如:长序列场景要求的首token时延是5秒,超时时间建议设置为3秒。 父主题: 常见问题
  • 如何处理终端输出中文乱码问题? 针对终端输出中文乱码的问题,解决方式有如下两种: 在运行配置启动参数中加入VMOptions参数: -Dfile.encoding=gbk; 这种方法可以解决部分的乱码问题,考虑到实际项目中可能用到Scanner的交互式输入的场景比较少,可以先尝试此方法。 将输出重定向到调试控制台里面:将运行配置中Console设置项中的integrated改成internal。 父主题: CodeArts IDE for Java常见问题
  • DataArts Insight应用场景 智能数据洞察(DataArts Insight)致力于用更生动、友好的形式,即时呈现隐藏在瞬息万变且庞杂数据背后的业务洞察。无论在零售、物流、电力、水利、环保、还是交通领域,通过交互式实时 数据可视化 大屏来帮助业务人员发现、诊断业务问题。 DataArts Insight提供丰富的可视化组件,除了针对业务展示优化过的常规图表外,还有拓扑关系等异形图表供您自由搭配。 DataArts Insight无缝集成华为云 数据仓库 服务、支持本地 CS V、在线API及企业内部私有云数据;满足您各类大数据实时计算、监控的需求,充分发挥大数据计算的能力。 拖拽即可完成组件自由配置与布局、所见即所得,无需编程就能轻松搭建可视化大屏。 DataArts Insight特别针对拼接大屏端的展示做了分辨率优化,能够适配非常规的拼接分辨率。创建的大屏能够发布分享,作为您对外数据业务展示的窗口。 父主题: 常见问题
  • 回答 当用户在distcp命令中使用webhdfs://时,会发生上述异常,是由于集群所使用的HTTP策略为HTTPS,即配置在“hdfs-site.xml”(文件路径为“客户端安装目录/HDFS/hadoop/etc/hadoop”)的“dfs.http.policy”值为“HTTPS_ONLY”。所以要避免出现此异常,应使用swebhdfs://替代webhdfs://。 例如: ./hadoop distcpswebhdfs://IP:PORT/testfile hdfs://IP:PORT/testfile1
  • 问题 使用运行的Spark Streaming任务回写kafka时,kafka上接收不到回写的数据,且kafka日志报错信息如下: 2016-03-02 17:46:19,017 | INFO | [kafka-network-thread-21005-1] | Closing socket connection to /10.91.8.208 due to invalid request: Request of length 122371301 is not valid, it is larger than the maximum size of 104857600 bytes. | kafka.network.Processor (Logging.scala:68) 2016-03-02 17:46:19,155 | INFO | [kafka-network-thread-21005-2] | Closing socket connection to /10.91.8.208. | kafka.network.Processor (Logging.scala:68) 2016-03-02 17:46:19,270 | INFO | [kafka-network-thread-21005-0] | Closing socket connection to /10.91.8.208 due to invalid request: Request of length 122371301 is not valid, it is larger than the maximum size of 104857600 bytes. | kafka.network.Processor (Logging.scala:68) 2016-03-02 17:46:19,513 | INFO | [kafka-network-thread-21005-1] | Closing socket connection to /10.91.8.208 due to invalid request: Request of length 122371301 is not valid, it is larger than the maximum size of 104857600 bytes. | kafka.network.Processor (Logging.scala:68) 2016-03-02 17:46:19,763 | INFO | [kafka-network-thread-21005-2] | Closing socket connection to /10.91.8.208 due to invalid request: Request of length 122371301 is not valid, it is larger than the maximum size of 104857600 bytes. | kafka.network.Processor (Logging.scala:68) 53393 [main] INFO org.apache.hadoop.mapreduce.Job - Counters: 50
  • 回答 如下图所示,Spark Streaming应用中定义的逻辑为,从kafka中读取数据,执行对应处理之后,然后将结果数据回写至kafka中。 例如:Spark Streaming中定义了批次时间,如果数据传入Kafka的速率为10MB/s,而Spark Streaming中定义了每60s一个批次,回写数据总共为600MB。而Kafka中定义了接收数据的阈值大小为500MB。那么此时回写数据已超出阈值。此时,会出现上述错误。 图1 应用场景 解决措施: 方式一:推荐优化Spark Streaming应用程序中定义的批次时间,降低批次时间,可避免超过kafka定义的阈值。一般建议以5-10秒/次为宜。 方式二:将kafka的阈值调大,建议在 MRS Manager中的Kafka服务进行参数设置,将socket.request.max.bytes参数值根据应用场景,适当调整。
  • 配置进程参数 Flink on YARN模式下,有JobManager和TaskManager两种进程。在任务调度和运行的过程中,JobManager和TaskManager承担了很大的责任。 因而JobManager和TaskManager的参数配置对Flink应用的执行有着很大的影响意义。用户可通过如下操作对Flink集群性能做优化。 配置JobManager内存。 JobManager负责任务的调度,以及TaskManager、RM之间的消息通信。当任务数变多,任务平行度增大时,JobManager内存都需要相应增大。 您可以根据实际任务数量的多少,为JobManager设置一个合适的内存。 在使用yarn-session命令时,添加“-jm MEM”参数设置内存。 在使用yarn-cluster命令时,添加“-yjm MEM”参数设置内存。 配置TaskManager个数。 每个TaskManager每个核同时能跑一个task,所以增加了TaskManager的个数相当于增大了任务的并发度。在资源充足的情况下,可以相应增加TaskManager的个数,以提高运行效率。 在使用yarn-session命令时,添加“-n NUM”参数设置TaskManager个数。 在使用yarn-cluster命令时,添加“-yn NUM”参数设置TaskManager个数。 配置TaskManager Slot数。 每个TaskManager多个核同时能跑多个task,相当于增大了任务的并发度。但是由于所有核共用TaskManager的内存,所以要在内存和核数之间做好平衡。 在使用yarn-session命令时,添加“-s NUM”参数设置SLOT数。 在使用yarn-cluster命令时,添加“-ys NUM”参数设置SLOT数。 配置TaskManager内存。 TaskManager的内存主要用于任务执行、通信等。当一个任务很大的时候,可能需要较多资源,因而内存也可以做相应的增加。 将在使用yarn-session命令时,添加“-tm MEM”参数设置内存。 将在使用yarn-cluster命令时,添加“-ytm MEM”参数设置内存。
  • 配置netty网络通信 Flink通信主要依赖netty网络,所以在Flink应用执行过程中,netty的设置尤为重要,网络通信的好坏直接决定着数据交换的速度以及任务执行的效率。 以下配置均可在客户端的“conf/flink-conf.yaml”配置文件中进行修改适配,默认已经是相对较优解,请谨慎修改,防止性能下降。 “taskmanager.network.netty.num-arenas”: 默认是“taskmanager.numberOfTaskSlots”,表示netty的域的数量。 “taskmanager.network.netty.server.numThreads”和“taskmanager.network.netty.client.numThreads”:默认是“taskmanager.numberOfTaskSlots”,表示netty的客户端和服务端的线程数目设置。 “taskmanager.network.netty.client.connectTimeoutSec”:默认是120s,表示taskmanager的客户端连接超时的时间。 “taskmanager.network.netty.sendReceiveBufferSize”:默认是系统缓冲区大小(cat /proc/sys/net/ipv4/tcp_[rw]mem) ,一般为4MB,表示netty的发送和接收的缓冲区大小。 “taskmanager.network.netty.transport”:默认为“nio”方式,表示netty的传输方式,有“nio”和“epoll”两种方式。