云服务器内容精选

  • 代码样例 import os, torch, torch_npu from ems import Ems, EmsConfig, EmsException, CcConfig, CcKvOption, KvBufferWrapper # 初始化cc配置 cc_config = CcConfig(rank_id=8, device_id=0, model_id="llama2-13b") # 初始化Ems config = EmsConfig(cc_config=cc_config) try: Ems.init(config) except EmsException as e: print(f"exception: {e}.") exit(1) # 获取context caching对象 cc = Ems.get_cc() if cc is None: print("cc is None.") exit(1) # 设置save请求的超时时间 option = CcKvOption(timeout=5000) # 组成键值列表 key_list = ["123", "66"] # 仅支持npu设备的tensor tensor1 = torch.ones(2, device="npu:1") tensor2 = torch.ones(6, device="npu:1") len1 = tensor1.numel() * tensor1.element_size() len2 = tensor2.numel() * tensor2.element_size() val_list = [[KvBufferWrapper(tensor1.data_ptr, len1)], [KvBufferWrapper(tensor2.data_ptr, len2)]] # 可以根据不同异常,采取不同处理方式,例如超时错误可以重试。 try: cc_result = cc.load(option, key_list, val_list) except EmsException as e: print(f"failed to load, {e}.") exit(2) print(f"succeed to load key num {cc_result.success}.") print(cc_result)
  • 返回结果说明 表4 返回结果 类型 说明 CcResult 参数解释: ContextCaching访问内存池的执行结果。 表5 CcResult 参数名称 参数类型 描述 success int 参数解释: 请求的批量key读写连续成功的个数。 取值范围: 0~请求key的个数。 默认取值: 无 total string 参数解释: 请求的批量key总个数。 取值范围: 请求列表中批量key的个数。 默认取值: 无
  • 请求参数说明 表1 请求参数列表 参数名称 参数类型 是否必选 描述 option CcKvOption 必选 参数解释: ContextCaching访问内存池的KV操作选项。 默认取值: 无 key_list List[str] 必选 参数解释: ContextCaching访问内存池的键名列表。 约束限制: 所有键名必须唯一。 取值范围: 单个key的长度小于128,且保证全局唯一。 默认取值: 无 value_list List[List[KvBufferWrapper]] 必选 参数解释: ContextCaching访问内存池的值列表。 约束限制: 值列表的数目必须跟键列表中的数目相同,形成一一对应的键值对。 默认取值: 无。 表2 CcKvOption 参数名称 参数类型 是否必选 描述 write_rcache bool 可选 参数解释: 是否将本次写入保存为本地读缓存,默认值为True。 取值范围: True或者False。 默认取值: True read_local_only bool 可选 参数解释: 是否只读本地缓存,如果置为True,则不会从其他节点读取数据,只有读流程生效。 取值范围: True或者False。 默认取值: False timeout int 可选 参数解释: 请求超时时间,单位为毫秒。 取值范围: 大于0。 默认取值: 5000 表3 KvBufferWrapper 参数名称 参数类型 是否必选 描述 data_ptr int 必选 参数解释: 加速卡计算产生的KVCache连续显存起始地址。 约束限制: 只支持NPU显存地址。 取值范围: 大于0。 length int 必选 参数解释: 加速卡计算产生的KVCache连续显存长度。 取值范围: 大于0。
  • 代码样例 import os, torch, torch_npu from ems import Ems, EmsConfig, EmsException, CcConfig, CcKvOption, KvBufferWrapper # 初始化cc配置 cc_config = CcConfig(rank_id=8, device_id=0, model_id="llama2-13b") # 初始化Ems config = EmsConfig(cc_config=cc_config) try: Ems.init(config) except EmsException as e: print(f"exception: {e}.") exit(1) # 获取context caching对象 cc = Ems.get_cc() if cc is None: print("cc is None.") exit(1) # 设置save请求的超时时间 option = CcKvOption(timeout=5000) # 组成键值列表 key_list = ["123", "66"] # 仅支持npu设备的tensor tensor1 = torch.ones(2, device="npu:1") tensor2 = torch.ones(6, device="npu:1") len1 = tensor1.numel() * tensor1.element_size() len2 = tensor2.numel() * tensor2.element_size() val_list = [[KvBufferWrapper(tensor1.data_ptr, len1)], [KvBufferWrapper(tensor2.data_ptr, len2)]] # 可以根据不同异常,采取不同处理方式,例如超时错误可以重试。 try: cc_result = cc.save(option, key_list, val_list) except EmsException as e: print(f"failed to save, {e}.") exit(2) print(cc_result)
  • 返回结果说明 表4 返回结果 类型 说明 CcResult 参数解释: ContextCaching访问内存池的执行结果。 表5 CcResult 参数名称 参数类型 描述 success int 参数解释: 请求的批量key读写连续成功的个数。 取值范围: 0~请求key的个数。 默认取值: 无 total string 参数解释: 请求的批量key总个数。 取值范围: 请求列表中批量key的个数。 默认取值: 无
  • 请求参数说明 表1 请求参数列表 参数名称 参数类型 是否必选 描述 option CcKvOption 必选 参数解释: ContextCaching访问内存池的KV操作选项。 默认取值: 无。 key_list List[string] 必选 参数解释: ContextCaching访问内存池的键名列表。 约束限制: 所有键名必须唯一。 取值范围: 单个key的长度小于128,且保证全局唯一。 默认取值: 无。 value_list List[List[KvBufferWrapper]] 必选 参数解释: ContextCaching访问内存池的值列表。 约束限制: 值列表的数目必须跟键列表中的数目相同,形成一一对应的键值对。 默认取值: 无。 表2 CcKvOption 参数名称 参数类型 是否必选 描述 write_rcache bool 可选 参数解释: 是否将本次写入保存为本地读缓存,默认值为True。 取值范围: True:将本次写入保存为本地读缓存 False:不将本次写入保存为本地读缓存 默认取值: True read_local_only bool 可选 参数解释: 是否只读本地缓存,如果置为True,则不会从其他节点读取数据,只有读流程生效。 取值范围: True或者False。 默认取值: False timeout int 可选 参数解释: 请求超时时间,单位为毫秒。 取值范围: 大于0。 默认取值: 5000 表3 KvBufferWrapper 参数名称 参数类型 是否必选 描述 data_ptr int 必选 参数解释: 加速卡计算产生的KVCache连续显存起始地址。 约束限制: 只支持NPU显存地址。 取值范围: 大于0。 length int 必选 参数解释: 加速卡计算产生的KVCache连续显存长度。 取值范围: 大于0。
  • 代码样例 以下为获取ContextCaching对象示例,用于后续执行该对象内部的方法。 from ems import Ems, EmsConfig, EmsException, CcConfig # 初始化cc配置 cc_config = CcConfig(rank_id=8, device_id=0, model_id="llama2-13b") # 初始化Ems config = EmsConfig(cc_config=cc_config) try: Ems.init(config) except EmsException as e: print(f"exception: {e}.") exit(1) cc = Ems.get_cc() if cc is None: print(f"cc is not initialized.")
  • 代码样例 本示例用于获取异常详细描述信息 。 import os import torch, torch_npu from ems import Ems, EmsConfig, EmsException, CcConfig, CcKvOption, KvBufferWrapper # 初始化cc配置 cc_config = CcConfig(rank_id=8, device_id=0, model_id="llama2-13b") # 初始化Ems config = EmsConfig(cc_config=cc_config) try: Ems.init(config) except EmsException as e: print(f"exception: {e}.") exit(1) # 获取context caching对象 cc = Ems.get_cc() if cc is None: print("cc is None.") exit(1) # 设置save请求的超时时间 option = CcKvOption(timeout=5000) # 组成键值列表 key_list = ["hello_world"] # 仅支持npu设备的tensor data = torch.ones(1, 4, 2, device="npu:1") data_len = data.numel() * data.element_size() val_list = [[KvBufferWrapper(data.data_ptr, data_len)]] # 多次异步下发异步load请求,增加load并发能力。 try: result = cc.load(option, key_list, val_list) except EmsException as e: print(f"failed to load, message: {e.message()}.")
  • 代码样例 默认不使用密钥(access_id/access_key)访问内存池: # 引入模块 import os from ems import Ems, EmsConfig, EmsException, CcConfig # 初始化cc配置 cc_config = CcConfig(rank_id=8, device_id=0, model_id="llama2-13b") # 初始化Ems config = EmsConfig(cc_config=cc_config) try: Ems.init(config) except EmsException as e: print(f"exception: {e}.") 多租户场景,指的是同一套推理服务器集群中,仅有一套EMS集群,通过租户隔离不同业务/租户在推理中产生的KVCache数据。需要引入额外的鉴权组件服务,提前申请access_id/access_key后,通过永久访问密钥(access_id/access_key)初始化EMS SDK代码如下: # 引入模块 import os from ems import Ems, EmsConfig, EmsException # 通过环境变量获取access_id及access_key access_id = os.getenv("access_id", None) access_key = os.getenv("access_key", None) # 初始化cc配置 cc_config = CcConfig(rank_id=8, device_id=0, model_id="llama2-13b") # 初始化Ems config = EmsConfig(access_id=access_id, access_key=access_key, cc_config=cc_config) try: Ems.init(config) except EmsException as e: print(f"exception: {e}.") 一个进程中只会初始化一次Ems,以及Context Caching对象。 当前默认支持无密钥方式访问EMS内存池,如果需要在多租场景使用密钥,请联系EMS技术支持。
  • 构造函数参数描述 参数名称 参数类型 是否必选 描述 config 表1 必选 参数解释: EMS存储SDK初始化配置。 约束限制: 不能为None 默认取值: 无 表1 EmsConfig 参数名称 参数类型 是否必选 描述 access_id string 可选 参数解释: 业务访问内存池身份凭证,由用户自行指定并保证唯一性,在需要进行业务多租隔离场景使用。 约束限制: access_id需要保证全局唯一。 access_id创建规则: 1~512个字符,支持数字、小写字母、“.”、“-”、“_”。 默认取值: None,即为空。 access_key string 可选 参数解释: SDK访问内存池资源的密钥,与身份凭证access_id配合使用。多租场景需要在部署EMS服务时搭建鉴权服务,并通过控制面Restful接口预先申请。 约束限制: access_key创建规则:32~256个字符,支持数字、大小写字母、“+”、“/”、“=”。 默认取值: None,即为空。 cc_config 表2 CcConfig 可选 参数解释: 用于初始化Context Caching的配置。 默认取值: None,表示不使用Context Caching功能。 表2 CcConfig 参数名称 参数类型 是否必选 描述 rank_id int 是 参数解释: 当前计算进程,使用加速卡的全局rank ID。 约束限制: rank ID实例内唯一。 rank_id取值范围:[0,当前实例rank个数) device_id int 是 参数解释: 当前计算进程,使用加速卡的本地节点rank ID。 约束限制: rank ID节点内唯一。 rank ID取值范围:[0,当前实例在本地节点内rank个数) model_id string 是 参数解释: 唯一标识当前实例使用的推理模型。 约束限制: model_id创建规则:1~512个字符,支持数字、小写字母、“.”、“-”、“_”。 需要保证全局唯一。 tp_world_size int 否 参数解释: 实例TP并行度。 约束限制: tp_world_size 取值范围: [1,当前实例rank个数) 默认取值: 1。 pp_world_size int 否 参数解释: 实例PP并行度。 约束限制: pp_world_size 取值范围: [1,当前实例rank个数) 默认取值: 1。 如果初始化参数校验失败或者因与EMS内存池连接失败,可以参考异常处理示例捕获异常,避免阻塞推理服务启动;同时根据异常信息,初步分析原因并联系EMS工程师。
  • 返回结果说明 表4 返回结果 类型 说明 CcFuture 参数解释: 返回异步执行Future句柄。 表5 CcFuture 方法名称 参数 返回结果 描述 result 无 CcResult 参数解释: 无。 表6 CcResult 参数名称 参数类型 描述 success int 参数解释: 请求的批量key读写连续成功的个数。 取值范围: 0~请求key的个数。 默认取值: 无 total string 参数解释: 请求的批量key总个数。 取值范围: 请求列表中批量key的个数。 默认取值: 无
  • 代码样例 本示例用于批量多次下发异步保存KVCache的请求,并获取每个请求的最终执行结果。 import os import torch, torch_npu from ems import Ems, EmsConfig, EmsException, CcConfig, CcKvOption, KvBufferWrapper # 初始化cc配置 cc_config = CcConfig(rank_id=8, device_id=0, model_id="llama2-13b") # 初始化Ems config = EmsConfig(cc_config=cc_config) try: Ems.init(config) except EmsException as e: print(f"exception: {e}.") exit(1) # 获取context caching对象 cc = Ems.get_cc() if cc is None: print("cc is None.") exit(1) # 设置save请求的超时时间 option = CcKvOption(timeout=5000) # 组成键值列表 key_list = ["123", "66"] key_list2 = ["hello_world"] # 仅支持npu设备的tensor tensor1 = torch.ones(2, device="npu:1") tensor2 = torch.ones(6, device="npu:1") tensor3 = torch.ones(1, 4, 2, device="npu:1") len1 = tensor1.numel() * tensor1.element_size() len2 = tensor2.numel() * tensor2.element_size() val_list = [[KvBufferWrapper(tensor1.data_ptr, len1)], [KvBufferWrapper(tensor2.data_ptr, len2)]] len3 = tensor2.numel() * tensor3.element_size() val_list2 = [[KvBufferWrapper(tensor1.data_ptr, len3)]] future_list = [] # 多次异步下发异步save请求,增加save并发能力。 try: for idx in range(2): future_list.append(cc.async_save(option, key_list, val_list)) future_list.append(cc.async_save(option, key_list2, val_list2)) except EmsException as e: print(f"failed to save, {e}.") exit(2) try: for future in future_list: result = future.result() print(f"rsult:{result}") except EmsException as e: print(f"failed to get result, {e}.")
  • 请求参数说明 表1 请求参数列表 参数名称 参数类型 是否必选 描述 option CcKvOption 必选 参数解释: ContextCaching访问内存池的KV操作选项。 默认取值: 无 key_list List[string] 必选 参数解释: ContextCaching访问内存池的键名列表。 约束限制: 所有键名必须唯一。 取值范围: 单个key的长度小于128,且保证全局唯一。 默认取值: 无 value_list List[List[KvBufferWrapper]] 必选 参数解释: ContextCaching访问内存池的值列表。 约束限制: 值列表的数目必须跟键列表中的数目相同,形成一一对应的键值对。 默认取值: 无。 表2 CcKvOption 参数名称 参数类型 是否必选 描述 write_rcache bool 可选 参数解释: 是否将本次写入保存为本地读缓存,默认值为True。 取值范围: True:将本次写入保存为本地读缓存 False:不将本次写入保存为本地读缓存 默认取值: True read_local_only bool 可选 参数解释: 是否只读本地缓存,如果置为True,则不会从其他节点读取数据,只有读流程生效。 取值范围: True或者False。 默认取值: False timeout int 可选 参数解释: 请求超时时间,单位为毫秒。 取值范围: 大于0。 默认取值: 5000 表3 KvBufferWrapper 参数名称 参数类型 是否必选 描述 data_ptr int 必选 参数解释: 加速卡计算产生的KVCache连续显存起始地址。 约束限制: 只支持NPU显存地址。 取值范围: 大于0。 length int 必选 参数解释: 加速卡计算产生的KVCache连续显存长度。 取值范围: 大于0。
  • 准备环境 在推理/训练容器内执行命令npu-smio info查看NPU卡信息,确保成功挂载NPU加速卡,同时安装CANN软件包。安装CANN软件包请参考:昇腾CANN软件安装。 确保将宿主机EMS服务端容器共享的unix domain socket目录"/mnt/paas/kubernetes/kubelet/ems",通过增加负载配置文件hostPath项,将目录映射到推理/训练容器目录:"/dev/shm/ems";同时推理/训练容器内,运行服务的用户能够读写该文件夹及其文件。 从Python官网下载并安装合适的Python版本。 推荐使用的Python 3.x版本:3.9、3.10版本。 从PyCharm官网下载并安装最新社区版本。 运行命令 pip install pycryptodome==3.10.1,安装加密库。 从pip官网下载并安装最新社区版本
  • 读写Context Caching 本示例通过初始化并获取Context Caching配置,保存和加载显存数据。 # 引入模块 import os, torch, torch_npu from ems import Ems, EmsConfig, EmsException, CcConfig, CcKvOption, KvBufferWrapper # 初始化cc配置 cc_config = CcConfig(rank_id=8, device_id=0, model_id="llama2-13b") # 初始化Ems config = EmsConfig(cc_config=cc_config) try: Ems.init(config) except EmsException as e: print(f"exception: {e}.") exit(1) # 获取context caching对象 cc = Ems.get_cc() if cc is None: exit(1) # 设置save请求的超时时间 option = CcKvOption(timeout=5000) # 保存一个tensor data = torch.ones(6, device="npu:1") length = data.numel() * data.element_size() key_list = ["hello"] val_list = [[KvBufferWrapper(data.data_ptr, length)]] try: cc_result = cc.save(option, key_list, val_list) except EmsException as e: print(f"failed to save, {e}.") exit(1) # 读取保存的tensor数据到新的tensor, 保证跟之前保存的的tensor一样的shape和dtype data = torch.zero(6, device="npu:1") val_list = [[KvBufferWrapper(data.data_ptr, len)]] try: cc_result = cc.save(option, key_list, val_list) except EmsException as e: print(f"failed to save, {e}.") exit(1)