云服务器内容精选

  • 代码改造点 引入多进程启动机制:初始化进程 引入几个变量:tcp协议,rank进程序号,worldsize开启的进程数量 分发数据:DataLoader中多了一个Sampler参数,避免不同进程数据重复 模型分发:DistributedDataParallel(model) 模型保存:在序号为0的进程下保存模型 import torch class Net(torch.nn.Module): pass model = Net().cuda() ### DistributedDataParallel Begin ### model = torch.nn.parallel.DistributedDataParallel(Net().cuda()) ### DistributedDataParallel End ###
  • 训练流程简述 相比于DP,DDP能够启动多进程进行运算,从而大幅度提升计算资源的利用率。可以基于torch.distributed实现真正的分布式计算,具体的原理此处不再赘述。大致的流程如下: 初始化进程组。 创建分布式并行模型,每个进程都会有相同的模型和参数。 创建数据分发Sampler,使每个进程加载一个mini batch中不同部分的数据。 网络中相邻参数分桶,一般为神经网络模型中需要进行参数更新的每一层网络。 每个进程前向传播并各自计算梯度。 模型某一层的参数得到梯度后会马上进行通讯并进行梯度平均。 各GPU更新模型参数。 具体流程图如下: 图1 多机多卡数据并行训练
  • 相关章节 单机多卡数据并行-DataParallel(DP):介绍单机多卡数据并行分布式训练原理和代码改造点。 多机多卡数据并行-DistributedDataParallel(DDP):介绍多机多卡数据并行分布式训练原理和代码改造点。 分布式调测适配及代码示例:提供了分布式训练调测具体的代码适配操作过程和代码示例。 分布式训练完整代码示例:针对Resnet18在cifar10数据集上的分类任务,给出了分布式训练改造(DDP)的完整代码示例,供用户学习参考。 基于开发环境使用SDK调测训练作业:介绍如何在ModelArts的开发环境中,使用SDK调测单机和多机分布式训练作业。
  • 约束限制 总览页面打开的CodeLab不支持此项功能,但是如果用户在AI Hub中打开了可用的案例,会自动跳转到CodeLab中,此时是可以使用这项功能的。 如果切换了Notebook的规格,那么只能在Notebook进行单机调测,不能进行分布式调测,也不能提交远程训练任务。 当前仅支持PyTorch和MindSpore AI框架,如果MindSpore要进行多机分布式训练调试,则每台机器上都必须有8张卡。 本文档提供的调测代码中涉及到的OBS路径,请用户替换为自己的实际OBS路径。 本文档提供的调测代码是以PyTorch为例编写的,不同的AI框架之间,整体流程是完全相同的,只需要修改个别的参数即可。
  • 创建训练作业 本案例创建训练作业时,需要配置如下参数。 表1 创建训练作业的配置说明 参数名称 说明 “创建方式” 选择“自定义算法”。 “启动方式” 选择“自定义”。 “镜像” 选择用于训练的自定义镜像。 “代码目录” 执行本次训练作业所需的代码目录。本文示例的代码目录为“obs://test-modelarts/ascend/code/”。 “启动命令” 代码目录中的Python启动脚本。本文示例的启动命令为“bash ${MA_JOB_DIR}/code/run_torch_ddp_npu.sh”。启动脚本的完整代码请参见代码示例。
  • 训练代码 以下代码中以“### 分布式改造,... ###”注释的代码即为多节点分布式训练需要适配的代码改造点。 不对示例代码进行任何修改,适配数据路径后即可在ModelArts上完成多节点分布式训练。 注释掉分布式代码改造点,即可完成单节点单卡训练。完整代码见分布式训练完整代码示例。 导入依赖包 import datetime import inspect import os import pickle import random import argparse import numpy as np import torch import torch.distributed as dist from torch import nn, optim from torch.utils.data import TensorDataset, DataLoader from torch.utils.data.distributed import DistributedSampler from sklearn.metrics import accuracy_score 定义加载数据的方法和随机数,由于加载数据部分代码较多,此处省略 def setup_seed(seed): torch.manual_seed(seed) torch.cuda.manual_seed_all(seed) np.random.seed(seed) random.seed(seed) torch.backends.cudnn.deterministic = True def get_data(path): pass 定义网络结构 class Block(nn.Module): def __init__(self, in_channels, out_channels, stride=1): super().__init__() self.residual_function = nn.Sequential( nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False), nn.BatchNorm2d(out_channels), nn.ReLU(inplace=True), nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=False), nn.BatchNorm2d(out_channels) ) self.shortcut = nn.Sequential() if stride != 1 or in_channels != out_channels: self.shortcut = nn.Sequential( nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False), nn.BatchNorm2d(out_channels) ) def forward(self, x): out = self.residual_function(x) + self.shortcut(x) return nn.ReLU(inplace=True)(out) class ResNet(nn.Module): def __init__(self, block, num_classes=10): super().__init__() self.conv1 = nn.Sequential( nn.Conv2d(3, 64, kernel_size=3, padding=1, bias=False), nn.BatchNorm2d(64), nn.ReLU(inplace=True)) self.conv2 = self.make_layer(block, 64, 64, 2, 1) self.conv3 = self.make_layer(block, 64, 128, 2, 2) self.conv4 = self.make_layer(block, 128, 256, 2, 2) self.conv5 = self.make_layer(block, 256, 512, 2, 2) self.avg_pool = nn.AdaptiveAvgPool2d((1, 1)) self.dense_layer = nn.Linear(512, num_classes) def make_layer(self, block, in_channels, out_channels, num_blocks, stride): strides = [stride] + [1] * (num_blocks - 1) layers = [] for stride in strides: layers.append(block(in_channels, out_channels, stride)) in_channels = out_channels return nn.Sequential(*layers) def forward(self, x): out = self.conv1(x) out = self.conv2(out) out = self.conv3(out) out = self.conv4(out) out = self.conv5(out) out = self.avg_pool(out) out = out.view(out.size(0), -1) out = self.dense_layer(out) return out 进行训练和验证 def main(): file_dir = os.path.dirname(inspect.getframeinfo(inspect.currentframe()).filename) seed = datetime.datetime.now().year setup_seed(seed) parser = argparse.ArgumentParser(description='Pytorch distribute training', formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('--enable_gpu', default='true') parser.add_argument('--lr', default='0.01', help='learning rate') parser.add_argument('--epochs', default='100', help='training iteration') parser.add_argument('--init_method', default=None, help='tcp_port') parser.add_argument('--rank', type=int, default=0, help='index of current task') parser.add_argument('--world_size', type=int, default=1, help='total number of tasks') parser.add_argument('--custom_data', default='false') parser.add_argument('--data_url', type=str, default=os.path.join(file_dir, 'input_dir')) parser.add_argument('--output_dir', type=str, default=os.path.join(file_dir, 'output_dir')) args, unknown = parser.parse_known_args() args.enable_gpu = args.enable_gpu == 'true' args.custom_data = args.custom_data == 'true' args.lr = float(args.lr) args.epochs = int(args.epochs) if args.custom_data: print('[warning] you are training on custom random dataset, ' 'validation accuracy may range from 0.4 to 0.6.') ### 分布式改造,DDP初始化进程,其中init_method, rank和world_size参数均由平台自动入参 ### dist.init_process_group(init_method=args.init_method, backend="nccl", world_size=args.world_size, rank=args.rank) ### 分布式改造,DDP初始化进程,其中init_method, rank和world_size参数均由平台自动入参 ### tr_set, val_set = get_data(args.data_url, custom_data=args.custom_data) batch_per_gpu = 128 gpus_per_node = torch.cuda.device_count() if args.enable_gpu else 1 batch = batch_per_gpu * gpus_per_node tr_loader = DataLoader(tr_set, batch_size=batch, shuffle=False) ### 分布式改造,构建DDP分布式数据sampler,确保不同进程加载到不同的数据 ### tr_sampler = DistributedSampler(tr_set, num_replicas=args.world_size, rank=args.rank) tr_loader = DataLoader(tr_set, batch_size=batch, sampler=tr_sampler, shuffle=False, drop_last=True) ### 分布式改造,构建DDP分布式数据sampler,确保不同进程加载到不同的数据 ### val_loader = DataLoader(val_set, batch_size=batch, shuffle=False) lr = args.lr * gpus_per_node max_epoch = args.epochs model = ResNet(Block).cuda() if args.enable_gpu else ResNet(Block) ### 分布式改造,构建DDP分布式模型 ### model = nn.parallel.DistributedDataParallel(model) ### 分布式改造,构建DDP分布式模型 ### optimizer = optim.Adam(model.parameters(), lr=lr) loss_func = torch.nn.CrossEntropyLoss() os.makedirs(args.output_dir, exist_ok=True) for epoch in range(1, max_epoch + 1): model.train() train_loss = 0 ### 分布式改造,DDP sampler, 基于当前的epoch为其设置随机数,避免加载到重复数据 ### tr_sampler.set_epoch(epoch) ### 分布式改造,DDP sampler, 基于当前的epoch为其设置随机数,避免加载到重复数据 ### for step, (tr_x, tr_y) in enumerate(tr_loader): if args.enable_gpu: tr_x, tr_y = tr_x.cuda(), tr_y.cuda() out = model(tr_x) loss = loss_func(out, tr_y) optimizer.zero_grad() loss.backward() optimizer.step() train_loss += loss.item() print('train | epoch: %d | loss: %.4f' % (epoch, train_loss / len(tr_loader))) val_loss = 0 pred_record = [] real_record = [] model.eval() with torch.no_grad(): for step, (val_x, val_y) in enumerate(val_loader): if args.enable_gpu: val_x, val_y = val_x.cuda(), val_y.cuda() out = model(val_x) pred_record += list(np.argmax(out.cpu().numpy(), axis=1)) real_record += list(val_y.cpu().numpy()) val_loss += loss_func(out, val_y).item() val_accu = accuracy_score(real_record, pred_record) print('val | epoch: %d | loss: %.4f | accuracy: %.4f' % (epoch, val_loss / len(val_loader), val_accu), '\n') if args.rank == 0: # save ckpt every epoch torch.save(model.state_dict(), os.path.join(args.output_dir, f'epoch_{epoch}.pth')) if __name__ == '__main__': main() 结果对比 分别以单机单卡和两节点16卡两种资源类型完成100epoch的cifar-10数据集训练,训练时长和测试集准确率如下。 表1 训练结果对比 资源类型 单机单卡 两节点16卡 耗时 60分钟 20分钟 准确率 80+ 80+
  • 数据集 cifar10数据集 在Notebook中,无法直接使用默认版本的torchvision获取数据集,因此示例代码中提供了三种训练数据加载方式。 cifar-10数据集下载链接,单击“CIFAR-10 python version”。 尝试基于torchvision获取cifar10数据集。 基于数据链接下载数据并解压,放置在指定目录下,训练集和测试集的大小分别为(50000,3,32,32)和(10000,3,32,32)。 考虑到下载cifar10数据集较慢,基于torch生成类似cifar10的随机数据集,训练集和测试集的大小分别为(5000,3,32,32)和(1000,3,32,32),标签仍为10类,指定custom_data = 'true'后可直接进行训练任务,无需加载数据。