-
代码样例 import os, torch, torch_npu
from ems import Ems, EmsConfig, EmsException, CcConfig, CcKvOption, KvBufferWrapper
# 初始化cc配置
cc_config = CcConfig(rank_id=8, device_id=0, model_id="llama2-13b")
# 初始化Ems
config = EmsConfig(cc_config=cc_config)
try:
Ems.init(config)
except EmsException as e:
print(f"exception: {e}.")
exit(1)
# 获取context caching对象
cc = Ems.get_cc()
if cc is None:
print("cc is None.")
exit(1)
# 设置save请求的超时时间
option = CcKvOption(timeout=5000)
# 组成键值列表
key_list = ["123", "66"]
# 仅支持npu设备的tensor
tensor1 = torch.ones(2, device="npu:1")
tensor2 = torch.ones(6, device="npu:1")
len1 = tensor1.numel() * tensor1.element_size()
len2 = tensor2.numel() * tensor2.element_size()
val_list = [[KvBufferWrapper(tensor1.data_ptr, len1)], [KvBufferWrapper(tensor2.data_ptr, len2)]]
# 可以根据不同异常,采取不同处理方式,例如超时错误可以重试。
try:
cc_result = cc.load(option, key_list, val_list)
except EmsException as e:
print(f"failed to load, {e}.")
exit(2)
print(f"succeed to load key num {cc_result.success}.")
print(cc_result)
-
返回结果说明 表4 返回结果 类型 说明 CcResult 参数解释: ContextCaching访问内存池的执行结果。 表5 CcResult 参数名称 参数类型 描述 success int 参数解释: 请求的批量key读写连续成功的个数。 取值范围: 0~请求key的个数。 默认取值: 无 total string 参数解释: 请求的批量key总个数。 取值范围: 请求列表中批量key的个数。 默认取值: 无
-
请求参数说明 表1 请求参数列表 参数名称 参数类型 是否必选 描述 option CcKvOption 必选 参数解释: ContextCaching访问内存池的KV操作选项。 默认取值: 无 key_list List[str] 必选 参数解释: ContextCaching访问内存池的键名列表。 约束限制: 所有键名必须唯一。 取值范围: 单个key的长度小于128,且保证全局唯一。 默认取值: 无 value_list List[List[KvBufferWrapper]] 必选 参数解释: ContextCaching访问内存池的值列表。 约束限制: 值列表的数目必须跟键列表中的数目相同,形成一一对应的键值对。 默认取值: 无。 表2 CcKvOption 参数名称 参数类型 是否必选 描述 write_rcache bool 可选 参数解释: 是否将本次写入保存为本地读缓存,默认值为True。 取值范围: True或者False。 默认取值: True read_local_only bool 可选 参数解释: 是否只读本地缓存,如果置为True,则不会从其他节点读取数据,只有读流程生效。 取值范围: True或者False。 默认取值: False timeout int 可选 参数解释: 请求超时时间,单位为毫秒。 取值范围: 大于0。 默认取值: 5000 表3 KvBufferWrapper 参数名称 参数类型 是否必选 描述 data_ptr int 必选 参数解释: 加速卡计算产生的KVCache连续显存起始地址。 约束限制: 只支持NPU显存地址。 取值范围: 大于0。 length int 必选 参数解释: 加速卡计算产生的KVCache连续显存长度。 取值范围: 大于0。
-
代码样例 import os, torch, torch_npu
from ems import Ems, EmsConfig, EmsException, CcConfig, CcKvOption, KvBufferWrapper
# 初始化cc配置
cc_config = CcConfig(rank_id=8, device_id=0, model_id="llama2-13b")
# 初始化Ems
config = EmsConfig(cc_config=cc_config)
try:
Ems.init(config)
except EmsException as e:
print(f"exception: {e}.")
exit(1)
# 获取context caching对象
cc = Ems.get_cc()
if cc is None:
print("cc is None.")
exit(1)
# 设置save请求的超时时间
option = CcKvOption(timeout=5000)
# 组成键值列表
key_list = ["123", "66"]
# 仅支持npu设备的tensor
tensor1 = torch.ones(2, device="npu:1")
tensor2 = torch.ones(6, device="npu:1")
len1 = tensor1.numel() * tensor1.element_size()
len2 = tensor2.numel() * tensor2.element_size()
val_list = [[KvBufferWrapper(tensor1.data_ptr, len1)], [KvBufferWrapper(tensor2.data_ptr, len2)]]
# 可以根据不同异常,采取不同处理方式,例如超时错误可以重试。
try:
cc_result = cc.save(option, key_list, val_list)
except EmsException as e:
print(f"failed to save, {e}.")
exit(2)
print(cc_result)
-
返回结果说明 表4 返回结果 类型 说明 CcResult 参数解释: ContextCaching访问内存池的执行结果。 表5 CcResult 参数名称 参数类型 描述 success int 参数解释: 请求的批量key读写连续成功的个数。 取值范围: 0~请求key的个数。 默认取值: 无 total string 参数解释: 请求的批量key总个数。 取值范围: 请求列表中批量key的个数。 默认取值: 无
-
请求参数说明 表1 请求参数列表 参数名称 参数类型 是否必选 描述 option CcKvOption 必选 参数解释: ContextCaching访问内存池的KV操作选项。 默认取值: 无。 key_list List[string] 必选 参数解释: ContextCaching访问内存池的键名列表。 约束限制: 所有键名必须唯一。 取值范围: 单个key的长度小于128,且保证全局唯一。 默认取值: 无。 value_list List[List[KvBufferWrapper]] 必选 参数解释: ContextCaching访问内存池的值列表。 约束限制: 值列表的数目必须跟键列表中的数目相同,形成一一对应的键值对。 默认取值: 无。 表2 CcKvOption 参数名称 参数类型 是否必选 描述 write_rcache bool 可选 参数解释: 是否将本次写入保存为本地读缓存,默认值为True。 取值范围: True:将本次写入保存为本地读缓存 False:不将本次写入保存为本地读缓存 默认取值: True read_local_only bool 可选 参数解释: 是否只读本地缓存,如果置为True,则不会从其他节点读取数据,只有读流程生效。 取值范围: True或者False。 默认取值: False timeout int 可选 参数解释: 请求超时时间,单位为毫秒。 取值范围: 大于0。 默认取值: 5000 表3 KvBufferWrapper 参数名称 参数类型 是否必选 描述 data_ptr int 必选 参数解释: 加速卡计算产生的KVCache连续显存起始地址。 约束限制: 只支持NPU显存地址。 取值范围: 大于0。 length int 必选 参数解释: 加速卡计算产生的KVCache连续显存长度。 取值范围: 大于0。
-
代码样例 以下为获取ContextCaching对象示例,用于后续执行该对象内部的方法。 from ems import Ems, EmsConfig, EmsException, CcConfig
# 初始化cc配置
cc_config = CcConfig(rank_id=8, device_id=0, model_id="llama2-13b")
# 初始化Ems
config = EmsConfig(cc_config=cc_config)
try:
Ems.init(config)
except EmsException as e:
print(f"exception: {e}.")
exit(1)
cc = Ems.get_cc()
if cc is None:
print(f"cc is not initialized.")
-
代码样例 本示例用于获取异常详细描述信息 。 import os
import torch, torch_npu
from ems import Ems, EmsConfig, EmsException, CcConfig, CcKvOption, KvBufferWrapper
# 初始化cc配置
cc_config = CcConfig(rank_id=8, device_id=0, model_id="llama2-13b")
# 初始化Ems
config = EmsConfig(cc_config=cc_config)
try:
Ems.init(config)
except EmsException as e:
print(f"exception: {e}.")
exit(1)
# 获取context caching对象
cc = Ems.get_cc()
if cc is None:
print("cc is None.")
exit(1)
# 设置save请求的超时时间
option = CcKvOption(timeout=5000)
# 组成键值列表
key_list = ["hello_world"]
# 仅支持npu设备的tensor
data = torch.ones(1, 4, 2, device="npu:1")
data_len = data.numel() * data.element_size()
val_list = [[KvBufferWrapper(data.data_ptr, data_len)]]
# 多次异步下发异步load请求,增加load并发能力。
try:
result = cc.load(option, key_list, val_list)
except EmsException as e:
print(f"failed to load, message: {e.message()}.")
-
代码样例 默认不使用密钥(access_id/access_key)访问内存池: # 引入模块
import os
from ems import Ems, EmsConfig, EmsException, CcConfig
# 初始化cc配置
cc_config = CcConfig(rank_id=8, device_id=0, model_id="llama2-13b")
# 初始化Ems
config = EmsConfig(cc_config=cc_config)
try:
Ems.init(config)
except EmsException as e:
print(f"exception: {e}.") 多租户场景,指的是同一套推理服务器集群中,仅有一套EMS集群,通过租户隔离不同业务/租户在推理中产生的KVCache数据。需要引入额外的鉴权组件服务,提前申请access_id/access_key后,通过永久访问密钥(access_id/access_key)初始化EMS SDK代码如下: # 引入模块
import os
from ems import Ems, EmsConfig, EmsException
# 通过环境变量获取access_id及access_key
access_id = os.getenv("access_id", None)
access_key = os.getenv("access_key", None)
# 初始化cc配置
cc_config = CcConfig(rank_id=8, device_id=0, model_id="llama2-13b")
# 初始化Ems
config = EmsConfig(access_id=access_id, access_key=access_key, cc_config=cc_config)
try:
Ems.init(config)
except EmsException as e:
print(f"exception: {e}.") 一个进程中只会初始化一次Ems,以及Context Caching对象。 当前默认支持无密钥方式访问EMS内存池,如果需要在多租场景使用密钥,请联系EMS技术支持。
-
构造函数参数描述 参数名称 参数类型 是否必选 描述 config 表1 必选 参数解释: EMS存储SDK初始化配置。 约束限制: 不能为None 默认取值: 无 表1 EmsConfig 参数名称 参数类型 是否必选 描述 access_id string 可选 参数解释: 业务访问内存池身份凭证,由用户自行指定并保证唯一性,在需要进行业务多租隔离场景使用。 约束限制: access_id需要保证全局唯一。 access_id创建规则: 1~512个字符,支持数字、小写字母、“.”、“-”、“_”。 默认取值: None,即为空。 access_key string 可选 参数解释: SDK访问内存池资源的密钥,与身份凭证access_id配合使用。多租场景需要在部署EMS服务时搭建鉴权服务,并通过控制面Restful接口预先申请。 约束限制: access_key创建规则:32~256个字符,支持数字、大小写字母、“+”、“/”、“=”。 默认取值: None,即为空。 cc_config 表2 CcConfig 可选 参数解释: 用于初始化Context Caching的配置。 默认取值: None,表示不使用Context Caching功能。 表2 CcConfig 参数名称 参数类型 是否必选 描述 rank_id int 是 参数解释: 当前计算进程,使用加速卡的全局rank ID。 约束限制: rank ID实例内唯一。 rank_id取值范围:[0,当前实例rank个数) device_id int 是 参数解释: 当前计算进程,使用加速卡的本地节点rank ID。 约束限制: rank ID节点内唯一。 rank ID取值范围:[0,当前实例在本地节点内rank个数) model_id string 是 参数解释: 唯一标识当前实例使用的推理模型。 约束限制: model_id创建规则:1~512个字符,支持数字、小写字母、“.”、“-”、“_”。 需要保证全局唯一。 tp_world_size int 否 参数解释: 实例TP并行度。 约束限制: tp_world_size 取值范围: [1,当前实例rank个数) 默认取值: 1。 pp_world_size int 否 参数解释: 实例PP并行度。 约束限制: pp_world_size 取值范围: [1,当前实例rank个数) 默认取值: 1。 如果初始化参数校验失败或者因与EMS内存池连接失败,可以参考异常处理示例捕获异常,避免阻塞推理服务启动;同时根据异常信息,初步分析原因并联系EMS工程师。
-
返回结果说明 表4 返回结果 类型 说明 CcFuture 参数解释: 返回异步执行Future句柄。 表5 CcFuture 方法名称 参数 返回结果 描述 result 无 CcResult 参数解释: 无。 表6 CcResult 参数名称 参数类型 描述 success int 参数解释: 请求的批量key读写连续成功的个数。 取值范围: 0~请求key的个数。 默认取值: 无 total string 参数解释: 请求的批量key总个数。 取值范围: 请求列表中批量key的个数。 默认取值: 无
-
代码样例 本示例用于批量多次下发异步保存KVCache的请求,并获取每个请求的最终执行结果。 import os
import torch, torch_npu
from ems import Ems, EmsConfig, EmsException, CcConfig, CcKvOption, KvBufferWrapper
# 初始化cc配置
cc_config = CcConfig(rank_id=8, device_id=0, model_id="llama2-13b")
# 初始化Ems
config = EmsConfig(cc_config=cc_config)
try:
Ems.init(config)
except EmsException as e:
print(f"exception: {e}.")
exit(1)
# 获取context caching对象
cc = Ems.get_cc()
if cc is None:
print("cc is None.")
exit(1)
# 设置save请求的超时时间
option = CcKvOption(timeout=5000)
# 组成键值列表
key_list = ["123", "66"]
key_list2 = ["hello_world"]
# 仅支持npu设备的tensor
tensor1 = torch.ones(2, device="npu:1")
tensor2 = torch.ones(6, device="npu:1")
tensor3 = torch.ones(1, 4, 2, device="npu:1")
len1 = tensor1.numel() * tensor1.element_size()
len2 = tensor2.numel() * tensor2.element_size()
val_list = [[KvBufferWrapper(tensor1.data_ptr, len1)], [KvBufferWrapper(tensor2.data_ptr, len2)]]
len3 = tensor2.numel() * tensor3.element_size()
val_list2 = [[KvBufferWrapper(tensor1.data_ptr, len3)]]
future_list = []
# 多次异步下发异步save请求,增加save并发能力。
try:
for idx in range(2):
future_list.append(cc.async_save(option, key_list, val_list))
future_list.append(cc.async_save(option, key_list2, val_list2))
except EmsException as e:
print(f"failed to save, {e}.")
exit(2)
try:
for future in future_list:
result = future.result()
print(f"rsult:{result}")
except EmsException as e:
print(f"failed to get result, {e}.")
-
请求参数说明 表1 请求参数列表 参数名称 参数类型 是否必选 描述 option CcKvOption 必选 参数解释: ContextCaching访问内存池的KV操作选项。 默认取值: 无 key_list List[string] 必选 参数解释: ContextCaching访问内存池的键名列表。 约束限制: 所有键名必须唯一。 取值范围: 单个key的长度小于128,且保证全局唯一。 默认取值: 无 value_list List[List[KvBufferWrapper]] 必选 参数解释: ContextCaching访问内存池的值列表。 约束限制: 值列表的数目必须跟键列表中的数目相同,形成一一对应的键值对。 默认取值: 无。 表2 CcKvOption 参数名称 参数类型 是否必选 描述 write_rcache bool 可选 参数解释: 是否将本次写入保存为本地读缓存,默认值为True。 取值范围: True:将本次写入保存为本地读缓存 False:不将本次写入保存为本地读缓存 默认取值: True read_local_only bool 可选 参数解释: 是否只读本地缓存,如果置为True,则不会从其他节点读取数据,只有读流程生效。 取值范围: True或者False。 默认取值: False timeout int 可选 参数解释: 请求超时时间,单位为毫秒。 取值范围: 大于0。 默认取值: 5000 表3 KvBufferWrapper 参数名称 参数类型 是否必选 描述 data_ptr int 必选 参数解释: 加速卡计算产生的KVCache连续显存起始地址。 约束限制: 只支持NPU显存地址。 取值范围: 大于0。 length int 必选 参数解释: 加速卡计算产生的KVCache连续显存长度。 取值范围: 大于0。
-
准备环境 在推理/训练容器内执行命令npu-smio info查看NPU卡信息,确保成功挂载NPU加速卡,同时安装CANN软件包。安装CANN软件包请参考:昇腾CANN软件安装。 确保将宿主机EMS服务端容器共享的unix domain socket目录"/mnt/paas/kubernetes/kubelet/ems",通过增加负载配置文件hostPath项,将目录映射到推理/训练容器目录:"/dev/shm/ems";同时推理/训练容器内,运行服务的用户能够读写该文件夹及其文件。 从Python官网下载并安装合适的Python版本。 推荐使用的Python 3.x版本:3.9、3.10版本。 从PyCharm官网下载并安装最新社区版本。 运行命令 pip install pycryptodome==3.10.1,安装加密库。 从pip官网下载并安装最新社区版本
-
读写Context Caching 本示例通过初始化并获取Context Caching配置,保存和加载显存数据。 # 引入模块
import os, torch, torch_npu
from ems import Ems, EmsConfig, EmsException, CcConfig, CcKvOption, KvBufferWrapper
# 初始化cc配置
cc_config = CcConfig(rank_id=8, device_id=0, model_id="llama2-13b")
# 初始化Ems
config = EmsConfig(cc_config=cc_config)
try:
Ems.init(config)
except EmsException as e:
print(f"exception: {e}.")
exit(1)
# 获取context caching对象
cc = Ems.get_cc()
if cc is None:
exit(1)
# 设置save请求的超时时间
option = CcKvOption(timeout=5000)
# 保存一个tensor
data = torch.ones(6, device="npu:1")
length = data.numel() * data.element_size()
key_list = ["hello"]
val_list = [[KvBufferWrapper(data.data_ptr, length)]]
try:
cc_result = cc.save(option, key_list, val_list)
except EmsException as e:
print(f"failed to save, {e}.")
exit(1)
# 读取保存的tensor数据到新的tensor, 保证跟之前保存的的tensor一样的shape和dtype
data = torch.zero(6, device="npu:1")
val_list = [[KvBufferWrapper(data.data_ptr, len)]]
try:
cc_result = cc.save(option, key_list, val_list)
except EmsException as e:
print(f"failed to save, {e}.")
exit(1)