PyTorch

张量

类似 Numpy 中的 ndarray, 区别在于能够利用 GPU 进行并行计算.

初始化

有多种初始化张量的方法,示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import torch
import numpy as np

# 使用 python 列表
data = [[1, 2], [3, 4]]
x_data = torch.tensor(data)

# 使用 numpy 数组
np_array = np.array(data)
x_np = torch.from_numpy(np_array)

# 使用其他张量(tensor)
x_ones = torch.ones_like(x_data) # retains the properties of x_data
print(f"Ones Tensor: \n {x_ones} \n")

x_rand = torch.rand_like(x_data, dtype=torch.float) # overrides the datatype of x_data
print(f"Random Tensor: \n {x_rand} \n")
1
2
3
4
5
6
7
Ones Tensor:
tensor([[1, 1],
[1, 1]])

Random Tensor:
tensor([[0.9769, 0.6933],
[0.0915, 0.3702]])

张量属性

形状, 数据类型, 存储设备

1
2
3
4
5
tensor = torch.rand(3, 4)

print(f"Shape of tensor: {tensor.shape}")
print(f"Datatype of tensor: {tensor.dtype}")
print(f"Device tensor is stored on: {tensor.device}")
1
2
3
Shape of tensor: torch.Size([3, 4])
Datatype of tensor: torch.float32
Device tensor is stored on: cpu

张量操作

张量支持非常多的操作, 100 多种,例如转置, 索引, 切片, 数学去处, 线性代数, 随机抽样等等.

可将张量移动到 GPU 进行计算, 可以显著提高计算的速度.

1
2
3
4
# We move our tensor to the GPU if available
if torch.cuda.is_available():
tensor = tensor.to('cuda')
print(f"Device tensor is stored on: {tensor.device}")
1
Device tensor is stored on: cuda:0

类似 Numpy 的索引和切片

1
2
3
tensor = torch.ones(4, 4)
tensor[:,1] = 0 # 所有行的第1列改为 0
print(tensor)
1
2
3
4
tensor([[1., 0., 1., 1.],
[1., 0., 1., 1.],
[1., 0., 1., 1.],
[1., 0., 1., 1.]])

归并 concate

1
2
t1 = torch.cat([tensor, tensor, tensor], dim=1)
print(t1)
1
2
3
4
tensor([[1., 0., 1., 1., 1., 0., 1., 1., 1., 0., 1., 1.],
[1., 0., 1., 1., 1., 0., 1., 1., 1., 0., 1., 1.],
[1., 0., 1., 1., 1., 0., 1., 1., 1., 0., 1., 1.],
[1., 0., 1., 1., 1., 0., 1., 1., 1., 0., 1., 1.]])

逐元素乘法

1
2
3
4
5
6
7
8
# 逐元素相乘
a = torch.tensor([[1, 2], [3, 4]])
b = torch.tensor([[5, 6], [7, 8]])

c = a * b
# 结果:
# tensor([[ 5, 12],
# [21, 32]])
1
2
3
4
5
6
7
8
# 如果形状不同,则会先广播之后再相乘
a = torch.tensor([[1, 2], [3, 4]]) # shape: (2, 2)
b = torch.tensor([10, 20]) # shape: (2,) 此处形状不同,会自动广播成([10, 20],[10, 20])
c = a * b

# 结果:
# tensor([[10, 40],
# [30, 80]])

矩阵乘法

torch.matmul() 或者使用运算符 @

1
2
3
4
5
6
7
a = torch.tensor([[1, 2], [3, 4]])   # (2, 2)
b = torch.tensor([[5, 6], [7, 8]]) # (2, 2)

c = torch.matmul(a, b) # 或 a @ b
# 结果:
# tensor([[19, 22],
# [43, 50]])

计算过程:

第一行第一列:1×5 + 2×7 = 19
第一行第二列:1×6 + 2×8 = 22

就地更新

在原方法的末尾添加下划线即可

1
2
3
print(tensor, "\n")
tensor.add_(5) # add 方法变成 add_
print(tensor)
1
2
3
4
5
6
7
8
9
tensor([[1., 0., 1., 1.],
[1., 0., 1., 1.],
[1., 0., 1., 1.],
[1., 0., 1., 1.]])

tensor([[6., 5., 6., 6.],
[6., 5., 6., 6.],
[6., 5., 6., 6.],
[6., 5., 6., 6.]])

关联 Numpy

在 CPU 上的张量可以和 Numpy 共享内存, 修改其中一个会直接影响另外一个

张量转 Numpy 数组

1
2
3
4
t = torch.ones(5)
print(f"t: {t}")
n = t.numpy()
print(f"n: {n}")
1
2
t: tensor([1., 1., 1., 1., 1.])
n: [1. 1. 1. 1. 1.]

当修改张量后, numpy 数组也会跟着变化

1
2
3
t.add_(1)
print(f"t: {t}")
print(f"n: {n}")
1
2
t: tensor([2., 2., 2., 2., 2.])
n: [2. 2. 2. 2. 2.]

Numpy数组转张量

修改 numpy 数组也会让张量跟着变化

1
2
n = np.ones(5)
t = torch.from_numpy(n)
1
2
3
np.add(n, 1, out=n) # out=n 参数表示就地更新原来的数组 n
print(f"t: {t}")
print(f"n: {n}")
1
2
t: tensor([2., 2., 2., 2., 2.], dtype=torch.float64)
n: [2. 2. 2. 2. 2.]

自动梯度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import torch
from torchvision.models import resnet18, ResNet18_Weights

model = resnet18(weights=ResNet18_Weights.DEFAULT)
data = torch.rand(1, 3, 64, 64)
labels = torch.rand(1, 1000)

prediction = model(data) # 前向传播
loss = (prediction - labels).sum()
loss.backward() # 反向传播,自动计算梯度值,并保存到张量的 grad 属性中

# 初始化优化器(此处为小批量梯度下降)
optim = torch.optim.SGD(model.parameters(), lr=1e-2, momentum=0.9)

optim.step() # 根据梯度调整权重参数的值,调整幅度与学习率有关

自动计算梯度的示例

1
2
3
4
import torch

a = torch.tensor([2., 3.], requires_grad=True)
b = torch.tensor([6., 4.], requires_grad=True)

假设:

Q=3a3b2
1
Q = 3*a**3 - b**2

假设 a、b 为权重参数,那么 Q 相对 a,b 的导数为:

Qa=9a2 Qb=2b
1
2
3
# 假设梯度向量值为 [1, 1]
external_grad = torch.tensor([1., 1.])
Q.backward(gradient=external_grad)
1
2
3
4
print(9*a**2 == a.grad)
print(-2*b == b.grad)
print(a.grad)
print(b.grad)
1
2
3
4
tensor([True, True])
tensor([True, True])
tensor([ 36.0000, 81.0000])
tensor([ -2.0000, -2.0000])

导数的本质是变化率,即当自变量 x 出现 1 个单位的变化时, 因变量 y 会出现多大的变化. 而损失值则是期待的变化幅度(变了多少个单位后将满足目标值).

计算图

当调用 loss.backward() 时, torch 会自动计算损失向量(梯度)的雅可比积, 将损失函数的梯度反向传播到所有相关的可训练权重参数, 用于后续的参数更新. 损失值与梯度的乘积即表示参数应该更新的量, 但通常会乘以一个学习率,以便实现小幅度的更新, 避免过于剧烈的震荡.

计算图是一个单向无环图(DAG), 记录着从输入的张量到输出的张量之间的每一步计算过程.

计算图是在前向传播的过程中动态构建的, 同时在调用 backward() 方法后会自动释放, 以便节省内存. 如果缓存计算图, 需要设置参数 retain_graph=true. 当重复调用 backward 时, 会报错. 因为第一次调用后会释放, 导致后续的调用找不到计算图了.

如果某个张量不需要构建计算图, 可设置参数 required_grad=False

1
2
3
4
5
6
7
8
x = torch.rand(5, 5)
y = torch.rand(5, 5)
z = torch.rand((5, 5), requires_grad=True)

a = x + y
print(f"Does `a` require gradients?: {a.requires_grad}")
b = x + z
print(f"Does `b` require gradients?: {b.requires_grad}")
1
2
Does `a` require gradients?: False
Does `b` require gradients?: True

当设置 required_grad=False 时, 因为没有构建计算图, 因此参数在训练过程中不会被更新, 相当于被冻结了. 这个功能在微调模型的场景中很有用, 因为我们通常需要冻结主干模型, 只训练最后的分类层.

1
2
3
4
5
6
7
8
9
10
11
12
13
from torch import nn, optim

model = resnet18(weights=ResNet18_Weights.DEFAULT)

# 冻结所有参数
for param in model.parameters():
param.requires_grad = False

# 将模型最后的 fc 层替换为一个新的线性层, 该线性层的参数没有冻结, 可进行微调训练
model.fc = nn.Linear(512, 10)

# 虽然模型参数都传递给了优化器, 但实际上只有最后一层的参数会构建计算图计算梯度并被优化器更新
optimizer = optim.SGD(model.parameters(), lr=1e-2, momentum=0.9)

另外还有一个全局的暂停自动梯度计算的方法是使用 torch.no_grad()

神经网络

nn.Module 类可用来构建一个神经网络中的模块(神经元), 模块由一个或多个层构成, 有一个 forward 前向传播的方法, 并返回一个计算结果 output

神经元通常包含一个非线性的激活层, 以便能够实现对复杂函数的拟合.

网络示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import torch
import torch.nn as nn
import torch.nn.functional as F


class Net(nn.Module):

def __init__(self):
super(Net, self).__init__()
# 输入1张图片, 6 个输出通道, 使用 5x5 的卷积核
self.conv1 = nn.Conv2d(1, 6, 5)
self.conv2 = nn.Conv2d(6, 16, 5)
# 仿射变换: y = Wx + b
self.fc1 = nn.Linear(16 * 5 * 5, 120) # 5*5 from image dimension
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)

def forward(self, input):
# 卷积层 C1: 输入1张图片, 6 个输出通道, 使用 5x5 的卷积核, 使用 ReLU 激活函数
# 输入张量尺寸 (N, 6, 28, 28), N 为单批的数量(即样本数)
c1 = F.relu(self.conv1(input))
# Subsampling layer S2: 2x2 grid, purely functional,
# this layer does not have any parameter, and outputs a (N, 6, 14, 14) Tensor
s2 = F.max_pool2d(c1, (2, 2))
# Convolution layer C3: 6 input channels, 16 output channels,
# 5x5 square convolution, it uses RELU activation function, and
# outputs a (N, 16, 10, 10) Tensor
c3 = F.relu(self.conv2(s2))
# Subsampling layer S4: 2x2 grid, purely functional,
# this layer does not have any parameter, and outputs a (N, 16, 5, 5) Tensor
s4 = F.max_pool2d(c3, 2)
# Flatten operation: purely functional, outputs a (N, 400) Tensor
s4 = torch.flatten(s4, 1)
# Fully connected layer F5: (N, 400) Tensor input,
# and outputs a (N, 120) Tensor, it uses RELU activation function
f5 = F.relu(self.fc1(s4))
# Fully connected layer F6: (N, 120) Tensor input,
# and outputs a (N, 84) Tensor, it uses RELU activation function
f6 = F.relu(self.fc2(f5))
# Fully connected layer OUTPUT: (N, 84) Tensor input, and
# outputs a (N, 10) Tensor
output = self.fc3(f6)
return output


net = Net()
print(net)
1
2
3
4
5
6
7
Net(
(conv1): Conv2d(1, 6, kernel_size=(5, 5), stride=(1, 1))
(conv2): Conv2d(6, 16, kernel_size=(5, 5), stride=(1, 1))
(fc1): Linear(in_features=400, out_features=120, bias=True)
(fc2): Linear(in_features=120, out_features=84, bias=True)
(fc3): Linear(in_features=84, out_features=10, bias=True)
)

可使用 model.parameters() 方法访问所有可学习参数

1
2
3
params = list(net.parameters())
print(len(params))
print(params[0].size()) # conv1's .weight
1
2
10
torch.Size([6, 1, 5, 5])

可使用 model.zero_grad() 方法重置梯度

1
2
net.zero_grad()
out.backward(torch.randn(1, 10))

torch.nn 强制要求样本以批次为单位作为输入, 不能以单个样本为单位作为输入. 最小批次数量为 1, 因此输入张量的第 1 个维度即是单批的样本数量. 如果每批只有一个样本, 则该值为 1.

对于单个样本, 可使用 input.unsqueeze(0) 方法, 给样本添加一个批次的维度

损失函数

损失函数用于计算模型的输出和目标值之间的差异

1
2
3
4
5
6
7
output = net(input)
target = torch.randn(10) # a dummy target, for example
target = target.view(1, -1) # make it the same shape as output
criterion = nn.MSELoss()

loss = criterion(output, target)
print(loss)
1
tensor(1.2850, grad_fn=<MseLossBackward0>)

以下是损失张量的计算图

1
2
3
4
input -> conv2d -> relu -> maxpool2d -> conv2d -> relu -> maxpool2d
-> flatten -> linear -> relu -> linear -> relu -> linear
-> MSELoss
-> loss
1
2
3
print(loss.grad_fn)  # MSELoss
print(loss.grad_fn.next_functions[0][0]) # Linear
print(loss.grad_fn.next_functions[0][0].next_functions[0][0]) # ReLU
1
2
3
<MseLossBackward0 object at 0x7febcb77eb00>
<AddmmBackward0 object at 0x7febcb77f310>
<AccumulateGrad object at 0x7febcb471cf0>

反向传播

梯度值是累加的, 因此每次进行反向传播时, 需要先调用 model.zero_grad 方法重置梯度值

1
2
3
4
5
6
7
8
9
net.zero_grad()     # 清除可学习参数上的旧梯度值

print('conv1.bias.grad before backward')
print(net.conv1.bias.grad)

loss.backward()

print('conv1.bias.grad after backward')
print(net.conv1.bias.grad)
1
2
3
4
conv1.bias.grad before backward
None
conv1.bias.grad after backward
tensor([ 0.0101, -0.0044, 0.0021, 0.0144, 0.0112, 0.0002])

更新参数

对于 SGD 小批量梯度优化器, 参数的更新规则如下:

1
weight = weight - learning_rate * gradient

使用 python 代码实现如下:

1
2
3
learning_rate = 0.01
for f in net.parameters():
f.data.sub_(f.grad.data * learning_rate) # 使用 sub_ 方法实现就地更新

除了 SGD 外, 还有很多种参数更新方法, 例如: Nesterov-SGD, Adam, RMSProp 等. 可以使用 torch.optim 进行初始化, 选择合适的优化器, 实现对参数的更新

1
2
3
4
5
6
7
8
9
10
11
import torch.optim as optim

# 创建优化器
optimizer = optim.SGD(net.parameters(), lr=0.01)

# 更新过程:
optimizer.zero_grad() # 需要手动重置梯度(因为每次计算梯度值后会默认自动累积,而不是替换)
output = net(input)
loss = criterion(output, target)
loss.backward()
optimizer.step() # 调用 step() 后才会开始更新

训练分类器

数据预处理

可使用一些常见的 python 库将文本, 图片, 音频, 视频等数据转成 numpy 格式, 然后再转成 Tensor 张量. 对于视频类型的数据, torch 有一个专门的 torchvision 库可实现常见数据集的加载和预处理.

训练图片分类器

加载数据并规范化

此处以 CIFAR10 数据集进行示例

1
2
3
import torch
import torchvision
import torchvision.transforms as transforms
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
transform = transforms.Compose(
[transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]) # 规划化器

batch_size = 4
# 定义训练集
trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
download=True, transform=transform)
# 定义数据加载器(本质上是一个迭代器)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size,
shuffle=True, num_workers=2)
# 测试集
testset = torchvision.datasets.CIFAR10(root='./data', train=False,
download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size,
shuffle=False, num_workers=2)

classes = ('plane', 'car', 'bird', 'cat',
'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

定义神经网络

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import torch.nn as nn
import torch.nn.functional as F


class Net(nn.Module):
def __init__(self):
super().__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)

def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = torch.flatten(x, 1) # flatten all dimensions except batch
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x


net = Net()

定义损失函数和优化器

1
2
3
4
import torch.optim as optim

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

训练网络

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
for epoch in range(2):  # loop over the dataset multiple times

running_loss = 0.0
for i, data in enumerate(trainloader, 0):
# 遍历数据
inputs, labels = data

# 重置梯度
optimizer.zero_grad()

# 前向传播 + 反向传播 + 优化
outputs = net(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()

# 打印统计
running_loss += loss.item()
if i % 2000 == 1999: # 每 2000 批
print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}')
running_loss = 0.0

print('Finished Training')
1
2
3
4
5
6
7
8
9
10
11
12
13
[1,  2000] loss: 2.229
[1, 4000] loss: 1.905
[1, 6000] loss: 1.667
[1, 8000] loss: 1.566
[1, 10000] loss: 1.507
[1, 12000] loss: 1.469
[2, 2000] loss: 1.403
[2, 4000] loss: 1.362
[2, 6000] loss: 1.339
[2, 8000] loss: 1.319
[2, 10000] loss: 1.303
[2, 12000] loss: 1.270
Finished Training

保存训练后的模型参数

1
2
PATH = './cifar_net.pth'
torch.save(net.state_dict(), PATH)

测试网络模型

1
2
3
4
5
dataiter = iter(testloader)
images, labels = next(dataiter)

net = Net()
net.load_state_dict(torch.load(PATH, weights_only=True))
1
2
3
4
5
6
outputs = net(images)

_, predicted = torch.max(outputs, 1)

print('Predicted: ', ' '.join(f'{classes[predicted[j]]:5s}'
for j in range(4)))
1
Predicted:  cat   ship  ship  ship

使用 GPU 训练

1
2
3
4
5
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
# 将模型复制到 GPU 设备
net.to(device)
# 输入和标签也需要复制到 GPU
inputs, labels = data[0].to(device), data[1].to(device)

多 GPU 训练

1
2
3
4
5
6
7
# 复制模型和数据
device = torch.device("cuda:0")
model.to(device)
# tensor.to(device) 方法会返回一个指向新张量的引用,而不是就地更新
mytensor = my_tensor.to(device)
# 开启多 GPU 的并行
model = nn.DataParallel(model)

准备数据

1
2
3
4
5
6
7
8
9
10
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

# Parameters and DataLoaders
input_size = 5
output_size = 2

batch_size = 30
data_size = 100
1
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 随机数据集
class RandomDataset(Dataset):

def __init__(self, size, length):
self.len = length
self.data = torch.randn(length, size)

def __getitem__(self, index):
return self.data[index]

def __len__(self):
return self.len

rand_loader = DataLoader(dataset=RandomDataset(input_size, data_size),
batch_size=batch_size, shuffle=True)

准备模型

1
2
3
4
5
6
7
8
9
10
11
12
13
class Model(nn.Module):
# Our model

def __init__(self, input_size, output_size):
super(Model, self).__init__()
self.fc = nn.Linear(input_size, output_size)

def forward(self, input):
output = self.fc(input)
print("\tIn Model: input size", input.size(),
"output size", output.size())

return output

并行训练

检查是否有多个 GPU 可用, 如有, 开启并行训练

1
2
3
4
5
6
7
model = Model(input_size, output_size)
if torch.cuda.device_count() > 1:
print("Let's use", torch.cuda.device_count(), "GPUs!")
# dim = 0 [30, xxx] -> [10, ...], [10, ...], [10, ...] on 3 GPUs
model = nn.DataParallel(model) # 开启并行

model.to(device)
1
2
3
4
5
6
7
Let's use 4 GPUs!

DataParallel(
(module): Model(
(fc): Linear(in_features=5, out_features=2, bias=True)
)
)

运行模型

1
2
3
4
5
for data in rand_loader:
input = data.to(device)
output = model(input)
print("Outside: input size", input.size(),
"output_size", output.size())

以下是有 4 个 GPU 并行时的输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])
In Model: input size torch.Size([8, 5]) output size torch.Size([8, 2])
In Model: input size torch.Size([8, 5]) output size torch.Size([8, 2])
In Model: input size torch.Size([8, 5]) output size torch.Size([8, 2])
In Model: input size torch.Size([6, 5]) output size torch.Size([6, 2])
Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])
In Model: input size torch.Size([8, 5]) output size torch.Size([8, 2])
In Model: input size torch.Size([8, 5]) output size torch.Size([8, 2])
In Model: input size torch.Size([8, 5]) output size torch.Size([8, 2])
In Model: input size torch.Size([6, 5]) output size torch.Size([6, 2])
Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])
In Model: input size torch.Size([3, 5]) output size torch.Size([3, 2])
In Model: input size torch.Size([3, 5]) output size torch.Size([3, 2])
In Model: input size torch.Size([3, 5]) output size torch.Size([3, 2])
In Model: input size torch.Size([1, 5]) output size torch.Size([1, 2])
Outside: input size torch.Size([10, 5]) output_size torch.Size([10, 2])

2 个 GPU 时:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# on 2 GPUs
Let's use 2 GPUs!
In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2])
In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2])
Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])
In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2])
In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2])
Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])
In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2])
In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2])
Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])
In Model: input size torch.Size([5, 5]) output size torch.Size([5, 2])
In Model: input size torch.Size([5, 5]) output size torch.Size([5, 2])
Outside: input size torch.Size([10, 5]) output_size torch.Size([10, 2])

3 个 GPU 时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Let's use 3 GPUs!
In Model: input size torch.Size([10, 5]) output size torch.Size([10, 2])
In Model: input size torch.Size([10, 5]) output size torch.Size([10, 2])
In Model: input size torch.Size([10, 5]) output size torch.Size([10, 2])
Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])
In Model: input size torch.Size([10, 5]) output size torch.Size([10, 2])
In Model: input size torch.Size([10, 5]) output size torch.Size([10, 2])
In Model: input size torch.Size([10, 5]) output size torch.Size([10, 2])
Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])
In Model: input size torch.Size([10, 5]) output size torch.Size([10, 2])
In Model: input size torch.Size([10, 5]) output size torch.Size([10, 2])
In Model: input size torch.Size([10, 5]) output size torch.Size([10, 2])
Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])
In Model: input size torch.Size([4, 5]) output size torch.Size([4, 2])
In Model: input size torch.Size([4, 5]) output size torch.Size([4, 2])
In Model: input size torch.Size([2, 5]) output size torch.Size([2, 2])
Outside: input size torch.Size([10, 5]) output_size torch.Size([10, 2])

8 个 GPU 时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
Let's use 8 GPUs!
In Model: input size torch.Size([4, 5]) output size torch.Size([4, 2])
In Model: input size torch.Size([4, 5]) output size torch.Size([4, 2])
In Model: input size torch.Size([2, 5]) output size torch.Size([2, 2])
In Model: input size torch.Size([4, 5]) output size torch.Size([4, 2])
In Model: input size torch.Size([4, 5]) output size torch.Size([4, 2])
In Model: input size torch.Size([4, 5]) output size torch.Size([4, 2])
In Model: input size torch.Size([4, 5]) output size torch.Size([4, 2])
In Model: input size torch.Size([4, 5]) output size torch.Size([4, 2])
Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])
In Model: input size torch.Size([4, 5]) output size torch.Size([4, 2])
In Model: input size torch.Size([4, 5]) output size torch.Size([4, 2])
In Model: input size torch.Size([4, 5]) output size torch.Size([4, 2])
In Model: input size torch.Size([4, 5]) output size torch.Size([4, 2])
In Model: input size torch.Size([4, 5]) output size torch.Size([4, 2])
In Model: input size torch.Size([4, 5]) output size torch.Size([4, 2])
In Model: input size torch.Size([2, 5]) output size torch.Size([2, 2])
In Model: input size torch.Size([4, 5]) output size torch.Size([4, 2])
Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])
In Model: input size torch.Size([4, 5]) output size torch.Size([4, 2])
In Model: input size torch.Size([4, 5]) output size torch.Size([4, 2])
In Model: input size torch.Size([4, 5]) output size torch.Size([4, 2])
In Model: input size torch.Size([4, 5]) output size torch.Size([4, 2])
In Model: input size torch.Size([4, 5]) output size torch.Size([4, 2])
In Model: input size torch.Size([4, 5]) output size torch.Size([4, 2])
In Model: input size torch.Size([4, 5]) output size torch.Size([4, 2])
In Model: input size torch.Size([2, 5]) output size torch.Size([2, 2])
Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])
In Model: input size torch.Size([2, 5]) output size torch.Size([2, 2])
In Model: input size torch.Size([2, 5]) output size torch.Size([2, 2])
In Model: input size torch.Size([2, 5]) output size torch.Size([2, 2])
In Model: input size torch.Size([2, 5]) output size torch.Size([2, 2])
In Model: input size torch.Size([2, 5]) output size torch.Size([2, 2])
Outside: input size torch.Size([10, 5]) output_size torch.Size([10, 2])

DataParallel 实现的是数据并行, 即将数据拆分到不同的 GPU 上进行计算, 最后再合并计算结果.

简单示例

numpy

使用 numpy 来计算梯度的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# -*- coding: utf-8 -*-
import numpy as np
import math

# Create random input and output data
x = np.linspace(-math.pi, math.pi, 2000)
y = np.sin(x)

# 随机初始化参数值
a = np.random.randn()
b = np.random.randn()
c = np.random.randn()
d = np.random.randn()

learning_rate = 1e-6
for t in range(2000):
# 前向传播 y = a + b x + c x^2 + d x^3
y_pred = a + b * x + c * x ** 2 + d * x ** 3

# 计算损失
loss = np.square(y_pred - y).sum()
if t % 100 == 99:
print(t, loss)

# 计算参数 a, b, c, d 相对 loss 的偏层数
grad_y_pred = 2.0 * (y_pred - y) # 因为是均方差, 所以有个导数是 2.0
grad_a = grad_y_pred.sum() # 最后需要 sum 各个梯度值
grad_b = (grad_y_pred * x).sum()
grad_c = (grad_y_pred * x ** 2).sum()
grad_d = (grad_y_pred * x ** 3).sum()

# 更新参数
a -= learning_rate * grad_a
b -= learning_rate * grad_b
c -= learning_rate * grad_c
d -= learning_rate * grad_d

print(f'Result: y = {a} + {b} x + {c} x^2 + {d} x^3')

由于损失值的计算公式如下:

L=(y^y)2,y^=a+bx+cx2+dx3

因此, 损失 L 相对 a, b, c, d 的导数计算公式如下:

  • La=2(y^y)1=grad_a
  • Lb=2(y^y)x=grad_b
  • Lc=2(y^y)x2=grad_c
  • Ld=2(y^y)x3=grad_d

Tensors

相比 numpy array, Tensor 最大的好处是可以利用 GPU 实现并行计算, 能够显著提高计算速度(可能达50x)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# 以下是使用 tensor 实现相同的计算
import torch
import math


dtype = torch.float
device = torch.device("cpu") # 使用 CPU 计算
# device = torch.device("cuda:0") # 使用 GPU 计算

# 输入
x = torch.linspace(-math.pi, math.pi, 2000, device=device, dtype=dtype)
y = torch.sin(x)

# Randomly initialize weights
a = torch.randn((), device=device, dtype=dtype)
b = torch.randn((), device=device, dtype=dtype)
c = torch.randn((), device=device, dtype=dtype)
d = torch.randn((), device=device, dtype=dtype)

learning_rate = 1e-6
for t in range(2000):
# 前向传播, 计算预测值 y_pred
y_pred = a + b * x + c * x ** 2 + d * x ** 3

# 计算损失
loss = (y_pred - y).pow(2).sum().item()
if t % 100 == 99:
print(t, loss)

# 计算梯度
grad_y_pred = 2.0 * (y_pred - y)
grad_a = grad_y_pred.sum()
grad_b = (grad_y_pred * x).sum()
grad_c = (grad_y_pred * x ** 2).sum()
grad_d = (grad_y_pred * x ** 3).sum()

# 更新参数
a -= learning_rate * grad_a
b -= learning_rate * grad_b
c -= learning_rate * grad_c
d -= learning_rate * grad_d


print(f'Result: y = {a.item()} + {b.item()} x + {c.item()} x^2 + {d.item()} x^3')

Autograd

以上示例是手动实现前向和反向传播, 对于只有1~2 层的神经网络来说还可以接受. 但对于深度神经网络来说, 工作量就会变得非常恐怖了. PyTorch 通过引入计算图, 来实现自动的梯度计算.

计算图的节点是输入张量, 边是计算输出张量的函数. 例如假设 x 是一个输入张量, 那么 x.grad 里面将用于存放输出的标量相对于 x 的梯度.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import torch
import math

dtype = torch.float
device = torch.accelerator.current_accelerator().type if torch.accelerator.is_available() else "cpu"
print(f"Using {device} device")
torch.set_default_device(device)

x = torch.linspace(-1, 1, 2000, dtype=dtype)
y = torch.exp(x) # 泰勒展开式为1 + x + (1/2) x**2 + (1/3!) x**3 + ...

a = torch.randn((), dtype=dtype, requires_grad=True)
b = torch.randn((), dtype=dtype, requires_grad=True)
c = torch.randn((), dtype=dtype, requires_grad=True)
d = torch.randn((), dtype=dtype, requires_grad=True)

initial_loss = 1.
learning_rate = 1e-5
for t in range(5000):
y_pred = a + b * x + c * x ** 2 + d * x ** 3

# 损失为方差和
loss = (y_pred - y).pow(2).sum()

# 保存初始损失, 以便可以比对训练过程中的损失值下降情况
if t==0:
initial_loss=loss.item()

if t % 100 == 99:
print(f'Iteration t = {t:4d} loss(t)/loss(0) = {round(loss.item()/initial_loss, 6):10.6f} a = {a.item():10.6f} b = {b.item():10.6f} c = {c.item():10.6f} d = {d.item():10.6f}')

# 自动计算梯度
loss.backward()

# 手动更新参数, 注意:此处的 grad 不再是手动计算, 而 backward 时自动计算并存储在 grad 属性中
with torch.no_grad():
a -= learning_rate * a.grad
b -= learning_rate * b.grad
c -= learning_rate * c.grad
d -= learning_rate * d.grad

# 重置梯度
a.grad = None
b.grad = None
c.grad = None
d.grad = None

print(f'Result: y = {a.item()} + {b.item()} x + {c.item()} x^2 + {d.item()} x^3')

自定义 autograd

autograd 运算符实际由两个函数构成,一个是前向传播函数, 它根据输入计算输出. 一个是反向传播函数, 它根据输出张量相某个标量损失的梯度计算输入张量相对同一个标量损失的梯度.

通过继承 torch.autograd.Function 可以自定义 autograd 运算符, 只需实现 forward 和 backward 函数即可.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import torch
import math


class LegendrePolynomial3(torch.autograd.Function):
"""
通过继承 torch.autograd.Function 类,并实现其中的前向传播和反向传播方法来创建自定义的自动求导函数。这些方法操作的是张量
"""

@staticmethod
def forward(ctx, input):
"""
在前向传播中,会接收到一个包含输入数据的张量,并返回一个包含输出结果的张量。ctx是一个上下文对象,可用于暂存反向传播计算所需的信息。可以通过 ctx.save_for_backward方法缓存张量以供反向传播使用。其他对象可以直接作为 ctx对象的属性存储,例如 ctx.my_object = my_object
"""
ctx.save_for_backward(input)
return 0.5 * (5 * input ** 3 - 3 * input)

@staticmethod
def backward(ctx, grad_output):
"""
在反向传播中,会接收到一个包含损失相对于输出梯度的张量,然后需要计算损失相对于输入的梯度。
"""
input, = ctx.saved_tensors
return grad_output * 1.5 * (5 * input ** 2 - 1)


dtype = torch.float
device = torch.device("cpu")
# device = torch.device("cuda:0") # 使用 GPU

# 创建张量来存储输入和输出
# 默认情况下,requires_grad=False,这表示在反向传播过程中,我们不需要计算这些张量的梯度
x = torch.linspace(-math.pi, math.pi, 2000, device=device, dtype=dtype)
y = torch.sin(x)

a = torch.full((), 0.0, device=device, dtype=dtype, requires_grad=True)
b = torch.full((), -1.0, device=device, dtype=dtype, requires_grad=True)
c = torch.full((), 0.0, device=device, dtype=dtype, requires_grad=True)
d = torch.full((), 0.3, device=device, dtype=dtype, requires_grad=True)

learning_rate = 5e-6
for t in range(2000):

P3 = LegendrePolynomial3.apply
y_pred = a + b * P3(c + d * x)

loss = (y_pred - y).pow(2).sum()
if t % 100 == 99:
print(t, loss.item())

loss.backward()

with torch.no_grad():
a -= learning_rate * a.grad
b -= learning_rate * b.grad
c -= learning_rate * c.grad
d -= learning_rate * d.grad

# 重置梯度
a.grad = None
b.grad = None
c.grad = None
d.grad = None

print(f'Result: y = {a.item()} + {b.item()} * P3({c.item()} + {d.item()} x)')

nn 模块

相比 autograd 运算符, nn.Module 提供了一个更高层级的抽象, 以便能够更加便捷的构建神经网络, 它有点类似于网络中的层.

nn 库中包括一些模块(Modules) 同样是基于输入张量, 计算输出张量. 但它拥有一些内部状态, 例如包含可学习参数. nn 库还包含一些常用的损失函数.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# -*- coding: utf-8 -*-
import torch
import math

x = torch.linspace(-math.pi, math.pi, 2000)
y = torch.sin(x)

# y = x + x^2 + x^3
p = torch.tensor([1, 2, 3])
# unsqueeze 用来给张量添加新维度, -1 表示新维度添加在最末尾, 添加后, 原本的 [2000,] 变成了 [2000, 1]
xu = x.unsqueeze(-1)
xx = xu.pow(p)

# 由于 p 的形状是 (3,), xu 的形状是 (2000, 1), pow 运算会触发广播, 使得最终的形状为 (2000, 3)

# nn.Sequential 可用来顺序组合多个层
# nn.Linear 线性层使用线性函数基于输入计算输出
# nn.Flatten 层将线性层的输出还原为一维张量, 以便跟最终的 y 的形状相同
model = torch.nn.Sequential(
torch.nn.Linear(3, 1),
torch.nn.Flatten(0, 1)
)


# 使用 nn 库中 的 MSE 函数作为损失函数
loss_fn = torch.nn.MSELoss(reduction='sum')

learning_rate = 1e-6
for t in range(2000):

y_pred = model(xx)

loss = loss_fn(y_pred, y)
if t % 100 == 99:
print(t, loss.item())

# 重置梯度
model.zero_grad()

# 反向传播, 自动计算所有可学习参数的梯度
loss.backward()

# 更新参数, 如果有使用优化器, 则此时可直接调用优化器的 step 方法自动完成参数更新
with torch.no_grad():
for param in model.parameters():
param -= learning_rate * param.grad


linear_layer = model[0]

# 对于线性层, 其参数的形式为 (weight, bias)
print(f'Result: y = {linear_layer.bias.item()} + {linear_layer.weight[:, 0].item()} x + {linear_layer.weight[:, 1].item()} x^2 + {linear_layer.weight[:, 2].item()} x^3')

optim

nn 库使用 optim 类实现对参数优化算法的封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# -*- coding: utf-8 -*-
import torch
import math

x = torch.linspace(-math.pi, math.pi, 2000)
y = torch.sin(x)

p = torch.tensor([1, 2, 3])
xx = x.unsqueeze(-1).pow(p)

model = torch.nn.Sequential(
torch.nn.Linear(3, 1),
torch.nn.Flatten(0, 1)
)
loss_fn = torch.nn.MSELoss(reduction='sum')

learning_rate = 1e-3
# 使用 RMSprop 算法作为优化器
optimizer = torch.optim.RMSprop(model.parameters(), lr=learning_rate)
for t in range(2000):
y_pred = model(xx)
loss = loss_fn(y_pred, y)
if t % 100 == 99:
print(t, loss.item())
optimizer.zero_grad()
loss.backward()

# 更新参数
optimizer.step()


linear_layer = model[0]
print(f'Result: y = {linear_layer.bias.item()} + {linear_layer.weight[:, 0].item()} x + {linear_layer.weight[:, 1].item()} x^2 + {linear_layer.weight[:, 2].item()} x^3')

自定义 Module

继承 nn.Module, 实现 forward 方法返回输出张量即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# -*- coding: utf-8 -*-
import torch
import math


class Polynomial3(torch.nn.Module):
def __init__(self):
super().__init__()
self.a = torch.nn.Parameter(torch.randn(()))
self.b = torch.nn.Parameter(torch.randn(()))
self.c = torch.nn.Parameter(torch.randn(()))
self.d = torch.nn.Parameter(torch.randn(()))

def forward(self, x):
return self.a + self.b * x + self.c * x ** 2 + self.d * x ** 3

def string(self):
# 可以添加一些自定义方法
return f'y = {self.a.item()} + {self.b.item()} x + {self.c.item()} x^2 + {self.d.item()} x^3'


x = torch.linspace(-math.pi, math.pi, 2000)
y = torch.sin(x)

model = Polynomial3()

# 创建损失函数和优化器
criterion = torch.nn.MSELoss(reduction='sum')
optimizer = torch.optim.SGD(model.parameters(), lr=1e-6)
for t in range(2000):
y_pred = model(x)
loss = criterion(y_pred, y)
if t % 100 == 99:
print(t, loss.item())

optimizer.zero_grad()
loss.backward()
optimizer.step()

print(f'Result: {model.string()}')

控制流+权重共享

在前向传播中引入条件分支, 这会导致不同的计算图路径. 由于 Pytorch 会动态构建计算图, 因此能够很好的支持这种条件分支.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# -*- coding: utf-8 -*-
import random
import torch
import math


class DynamicNet(torch.nn.Module):
def __init__(self):
super().__init__()
self.a = torch.nn.Parameter(torch.randn(()))
self.b = torch.nn.Parameter(torch.randn(()))
self.c = torch.nn.Parameter(torch.randn(()))
self.d = torch.nn.Parameter(torch.randn(()))
self.e = torch.nn.Parameter(torch.randn(()))

def forward(self, x):
y = self.a + self.b * x + self.c * x ** 2 + self.d * x ** 3
for exp in range(4, random.randint(4, 6)):
y = y + self.e * x ** exp # exp 的取值范围为 4 或 5, 随机的
return y

def string(self):
return f'y = {self.a.item()} + {self.b.item()} x + {self.c.item()} x^2 + {self.d.item()} x^3 + {self.e.item()} x^4 ? + {self.e.item()} x^5 ?'


x = torch.linspace(-math.pi, math.pi, 2000)
y = torch.sin(x)

model = DynamicNet()

criterion = torch.nn.MSELoss(reduction='sum')
optimizer = torch.optim.SGD(model.parameters(), lr=1e-8, momentum=0.9)
for t in range(30000):
y_pred = model(x)
loss = criterion(y_pred, y)
if t % 2000 == 1999:
print(t, loss.item())
optimizer.zero_grad()
loss.backward()
optimizer.step()

print(f'Result: {model.string()}')

梯度

requires_grad

假设存在以下示例网络:

ypred=ReLU(xW+b) L=MSE(ypred,y)
1
2
3
4
5
6
7
8
9
10
# tensor setup
x = torch.ones(1, 3) # input with shape: (1, 3)
W = torch.ones(3, 2, requires_grad=True) # weights with shape: (3, 2)
b = torch.ones(1, 2, requires_grad=True) # bias with shape: (1, 2)
y = torch.ones(1, 2) # output with shape: (1, 2)

# forward pass
z = (x @ W) + b # pre-activation with shape: (1, 2)
y_pred = F.relu(z) # activation with shape: (1, 2)
loss = F.mse_loss(y_pred, y) # scalar loss
y=fk(fk1(f1(x))) yx=fkfk1fk1fk2f1x

前向传播之后生成的计算图:

如果一个节点不是由带 requires_grad=True 的张量计算出来的, 那么它就是一个叶子节点, 否则就是非叶子节点. 对于非叶子节点, 其 requires_grad=True 必然是 True, 不然反向传播就无法成功了. 对于叶子节点, 则在初始化时, 需要显式的设置参数 requires_grad=True (默认是 False, 如果初始化时没有设置, 可通过调用 requires_grad_() 方法后补)

retain_grad

在反向传播时,非叶子节点的梯度是会计算的, 但是不会保存在非叶子节点(张量)的 .grad 属性上, 因此它无法使用 .grad 直接访问. 除非显式的设置参数 retain_grad=True

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
z = (x @ W) + b
y_pred = F.relu(z)
loss = F.mse_loss(y_pred, y)

# 显式要求保存这几个张量的梯度
z.retain_grad()
y_pred.retain_grad()
loss.retain_grad()

# 重置(避免累积)
W.grad = None
b.grad = None

loss.backward()

print(f"{W.grad=}")
print(f"{b.grad=}")
print(f"{z.grad=}")
print(f"{y_pred.grad=}")
print(f"{loss.grad=}")
1
2
3
4
5
6
7
W.grad=tensor([[3., 3.],
[3., 3.],
[3., 3.]])
b.grad=tensor([[3., 3.]])
z.grad=tensor([[3., 3.]])
y_pred.grad=tensor([[3., 3.]])
loss.grad=tensor(1.)

对于叶子节点, 如果 requires_grad=True, 那么默认是保存梯度的. 此时再额外调用 retain_grad() 属于冗余的操作. 但如果叶子节点的 requires_grad=False 的话, 那么调用 retain_grad() 方法会报错, 因为该节点的梯度不会被计算, 因此自然也无法保存. 所以说, retain_grad() 只在非叶子节点的场景中使用.

主存到显存

主存是分块的, 当 OS 尝试从某个主存块中读取数据却找不到时, 会触发触常, 然后 OS 会从磁盘加载数据到内存中. 同理, 此时如果主存已满, 则 OS 会将部分数据移动到磁盘上, 以便主存能够腾出空间容纳新的数据.

分块的目的是为了能够更充分的利用内存空间, 但是代价是数据的物理内存地址可能不连续. 此时将无法充分发挥 DMA 的效率, 导致将数据复制到 GPU 前需要多一次整理的动作(将数据先复制到缓冲区, 再从缓冲区复制到 GPU), 带来了额外的开锁.

数据在内存中也可以显式设置为固定类型(页锁定, pin_memory), 对于页锁定内在, 它不会被交换到磁盘.

当 OS 加载一个文件时, 它并不会一下子将所有数据都加载到内存中, 而是等到使用时, 如果内存中找不到, 才会触发数据的加载. 当尝试将数据从主存复制到 GPU 的显存时, 由于数据有可能不在主存中, 而是在磁盘中. 为避免等待 IO, 这个复制过程通常是异步的, non_blocking=True

从页锁定的主存中复制数据会更快, 因为数据已经提前加载到内存中了. 同时由于物理地址连续, 可以直接使用 DMA 读取.

数据从主存复制到显存时, CUDA 会自动处理好数据同步问题, 不会出现要读取数据时, 数据却不存在导致报错的情况. 反之, 如果是将数据从显存拷贝到主存, 则需要手工同步一下, 才能保证读取数据前, 数据已经拷贝完成.

梯度可视化

方便排查处理训练过程中出现的梯度消失或梯度爆炸问题.

示例: 设计两个简单的模型, 一个使用 BatchNorm, 一个没有使用. 并跟踪梯度变化, 以便观察 BatchNorm 能否缓解梯度消失问题.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def fc_layer(in_size, out_size, norm_layer):
"""Return a stack of linear->norm->sigmoid layers"""
return nn.Sequential(nn.Linear(in_size, out_size), norm_layer(out_size), nn.Sigmoid())

class Net(nn.Module):
"""Define a network that has num_layers of linear->norm->sigmoid transformations"""
def __init__(self, in_size=28*28, hidden_size=128,
out_size=10, num_layers=3, batchnorm=False):
super().__init__()
if batchnorm is False:
norm_layer = nn.Identity
else:
norm_layer = nn.BatchNorm1d

layers = []
layers.append(fc_layer(in_size, hidden_size, norm_layer))

for i in range(num_layers-1):
layers.append(fc_layer(hidden_size, hidden_size, norm_layer))

layers.append(nn.Linear(hidden_size, out_size))

self.layers = nn.Sequential(*layers)

def forward(self, x):
x = torch.flatten(x, 1)
return self.layers(x)

初始化数据

1
2
3
4
5
6
7
8
9
10
11
12
13
# set up dummy data
x = torch.randn(10, 28, 28)
y = torch.randint(10, (10, ))

# init model
model_bn = Net(batchnorm=True, num_layers=3)
model_nobn = Net(batchnorm=False, num_layers=3)

model_bn.train()
model_nobn.train()

optimizer_bn = optim.SGD(model_bn.parameters(), lr=0.01, momentum=0.9)
optimizer_nobn = optim.SGD(model_nobn.parameters(), lr=0.01, momentum=0.9)
1
2
print(model_bn.layers[0])
print(model_nobn.layers[0])
1
2
3
4
5
6
7
8
9
10
Sequential(
(0): Linear(in_features=784, out_features=128, bias=True)
(1): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(2): Sigmoid()
)
Sequential(
(0): Linear(in_features=784, out_features=128, bias=True)
(1): Identity()
(2): Sigmoid()
)

注册钩子

为了能够读取中间状态的数据, 相比使用 retain_grad(), 更推荐的方法是注册反向传播钩子(backward pass hook).

定义钩子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def hook_forward(module_name, grads, hook_backward):
def hook(module, args, output):
# 在前向传播过程中, 为每个输出张量注册反向钩子
output.register_hook(hook_backward(module_name, grads))
return hook

def hook_backward(module_name, grads):
def hook(grad):
# 反向传播钩子, 用来收集梯度
grads.append((module_name, grad))
return hook

def get_all_layers(model, hook_forward, hook_backward):
layers = dict()
grads = []
for name, layer in model.named_modules():
# 仅处理叶子模块(没有子模块)
if any(layer.children()) is False:
layers[layer] = name
layer.register_forward_hook(hook_forward(name, grads, hook_backward))
return layers, grads

layers_bn, grads_bn = get_all_layers(model_bn, hook_forward, hook_backward)
layers_nobn, grads_nobn = get_all_layers(model_nobn, hook_forward, hook_backward)

训练并可视化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
epochs = 10

for epoch in range(epochs):

# 每轮训练前先清空, 避免累积
grads_bn.clear()
grads_nobn.clear()

optimizer_bn.zero_grad()
optimizer_nobn.zero_grad()

y_pred_bn = model_bn(x)
y_pred_nobn = model_nobn(x)

loss_bn = F.cross_entropy(y_pred_bn, y)
loss_nobn = F.cross_entropy(y_pred_nobn, y)

loss_bn.backward()
loss_nobn.backward()

optimizer_bn.step()
optimizer_nobn.step()
1
2
3
4
5
6
7
8
9
10
11
12
13
def get_grads(grads):
layer_idx = []
avg_grads = []
for idx, (name, grad) in enumerate(grads):
if grad is not None:
avg_grad = grad.abs().mean()
avg_grads.append(avg_grad)
# idx is backwards since we appended in backward pass
layer_idx.append(len(grads) - 1 - idx)
return layer_idx, avg_grads

layer_idx_bn, avg_grads_bn = get_grads(grads_bn)
layer_idx_nobn, avg_grads_nobn = get_grads(grads_nobn)
1
2
3
4
5
6
7
8
9
fig, ax = plt.subplots()
ax.plot(layer_idx_bn, avg_grads_bn, label="With BatchNorm", marker="o")
ax.plot(layer_idx_nobn, avg_grads_nobn, label="Without BatchNorm", marker="x")
ax.set_xlabel("Layer depth")
ax.set_ylabel("Average gradient")
ax.set_title("Gradient flow")
ax.grid(True)
ax.legend()
plt.show()

分布式训练

DDP

数据并行, 每张显卡部署相同的模型和优化器. 每张显卡训练的数据不同, 因此在更新参数前, 需要先同步数据, 之后再统一更新参数, 确保更新后的模型参数是每张显卡上面是一样的.

DistributedSampler 可用来确保每张显卡获得的数据不重复

数据的同步是可以并行的, 例如有 4 张显卡, 每次同步 1/4 到另外一张显卡上. 通过使用环状同步, 可以实现同步的并行.

相较早期更简单的 DataParallel, DDP 的性能更好一些, 二者的对比如下:

特性 DataParallel DistributedDataParallel
开销 更大;模型在每次前向传播时都会被复制并销毁 模型仅复制一次
并行支持 仅支持单机并行 支持扩展到多台机器
性能 速度较慢;在单个进程中使用多线程,会遇到全局解释器锁(GIL)竞争 速度更快(无 GIL 竞争),因为它使用多进程

以下是单 GPU 训练时的代码脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from datautils import MyTrainDataset


class Trainer:
def __init__(
self,
model: torch.nn.Module,
train_data: DataLoader,
optimizer: torch.optim.Optimizer,
gpu_id: int,
save_every: int,
) -> None:
self.gpu_id = gpu_id
self.model = model.to(gpu_id)
self.train_data = train_data
self.optimizer = optimizer
self.save_every = save_every

def _run_batch(self, source, targets):
self.optimizer.zero_grad()
output = self.model(source)
loss = F.cross_entropy(output, targets)
loss.backward()
self.optimizer.step()

def _run_epoch(self, epoch):
b_sz = len(next(iter(self.train_data))[0])
print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")
for source, targets in self.train_data:
source = source.to(self.gpu_id)
targets = targets.to(self.gpu_id)
self._run_batch(source, targets)

def _save_checkpoint(self, epoch):
ckp = self.model.state_dict()
PATH = "checkpoint.pt"
torch.save(ckp, PATH)
print(f"Epoch {epoch} | Training checkpoint saved at {PATH}")

def train(self, max_epochs: int):
for epoch in range(max_epochs):
self._run_epoch(epoch)
if epoch % self.save_every == 0:
self._save_checkpoint(epoch)


def load_train_objs():
train_set = MyTrainDataset(2048) # 虚拟一份数据集
model = torch.nn.Linear(20, 1) # 用一个线性层模拟待训练模型
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
return train_set, model, optimizer


def prepare_dataloader(dataset: Dataset, batch_size: int):
return DataLoader(
dataset,
batch_size=batch_size,
pin_memory=True,
shuffle=True
)


def main(device, total_epochs, save_every, batch_size):
dataset, model, optimizer = load_train_objs()
train_data = prepare_dataloader(dataset, batch_size)
trainer = Trainer(model, train_data, optimizer, device, save_every)
trainer.train(total_epochs)


if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='simple distributed training job')
parser.add_argument('total_epochs', type=int, help='Total epochs to train the model')
parser.add_argument('save_every', type=int, help='How often to save a snapshot')
parser.add_argument('--batch_size', default=32, type=int, help='Input batch size on each device (default: 32)')
args = parser.parse_args()

device = 0 # shorthand for cuda:0
main(device, args.total_epochs, args.save_every, args.batch_size)

以下是切换到 DDP 模式进行训练的脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from datautils import MyTrainDataset

# 引入一些新的库
import torch.multiprocessing as mp # 管理多个进程的不同 GPU
from torch.utils.data.distributed import DistributedSampler # 按 GPU 切分数据
from torch.nn.parallel import DistributedDataParallel as DDP # 数据并行模式
from torch.distributed import init_process_group, destroy_process_group # 进程组管理
import os

# 设置 DDP 的参数
def ddp_setup(rank, world_size):
"""
Args:
rank: 每个进程的识别代号
world_size: 进程总数
"""
os.environ["MASTER_ADDR"] = "localhost" # 因为是一机多卡, 所以 IP 地址是自己
os.environ["MASTER_PORT"] = "12355" # 监听的端口
torch.cuda.set_device(rank)
# 使用 nccl 库负责 CUDA GPU 间的通讯(XPU 用 xccl, CPU 用 gloo)
# init_process_group 负责初始化分布式训练的进程组
init_process_group(backend="nccl", rank=rank, world_size=world_size)

class Trainer:
def __init__(
self,
model: torch.nn.Module,
train_data: DataLoader,
optimizer: torch.optim.Optimizer,
gpu_id: int,
save_every: int,
) -> None:
self.gpu_id = gpu_id
self.model = model.to(gpu_id)
self.train_data = train_data
self.optimizer = optimizer
self.save_every = save_every
self.model = DDP(model, device_ids=[gpu_id]) # 用 DDP 对 model 进行封装

def _run_batch(self, source, targets):
self.optimizer.zero_grad()
output = self.model(source)
loss = F.cross_entropy(output, targets)
loss.backward()
self.optimizer.step()

def _run_epoch(self, epoch):
b_sz = len(next(iter(self.train_data))[0])
print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")
self.train_data.sampler.set_epoch(epoch) # 调用 set_epoch 模拟 shuffle
for source, targets in self.train_data:
source = source.to(self.gpu_id)
targets = targets.to(self.gpu_id)
self._run_batch(source, targets)

def _save_checkpoint(self, epoch):
ckp = self.model.module.state_dict()
PATH = "checkpoint.pt"
torch.save(ckp, PATH)
print(f"Epoch {epoch} | Training checkpoint saved at {PATH}")

def train(self, max_epochs: int):
for epoch in range(max_epochs):
self._run_epoch(epoch)
# 仅让一个 GPU 负责保存模型参数即可, 毕竟所有 GPU 上的模型是相同的, 没必要重复保存
if self.gpu_id == 0 and epoch % self.save_every == 0:
self._save_checkpoint(epoch)


def load_train_objs():
train_set = MyTrainDataset(2048)
model = torch.nn.Linear(20, 1)
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
return train_set, model, optimizer


def prepare_dataloader(dataset: Dataset, batch_size: int):
return DataLoader(
dataset,
batch_size=batch_size,
pin_memory=True,
shuffle=False, # 关闭 shuffle
sampler=DistributedSampler(dataset) # 使用 DistributedSampler 切分训练数据
)

# 增加了 rank 和 world_size 参数
def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int):
ddp_setup(rank, world_size) # 初始化 DDP
dataset, model, optimizer = load_train_objs()
train_data = prepare_dataloader(dataset, batch_size)
trainer = Trainer(model, train_data, optimizer, rank, save_every) # 用 rank 替换 device
trainer.train(total_epochs)
destroy_process_group() # 训练结束后销毁进程


if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='simple distributed training job')
parser.add_argument('total_epochs', type=int, help='Total epochs to train the model')
parser.add_argument('save_every', type=int, help='How often to save a snapshot')
parser.add_argument('--batch_size', default=32, type=int, help='Input batch size on each device (default: 32)')
args = parser.parse_args()

world_size = torch.cuda.device_count() # 获取可用的 GPU 数量
mp.spawn(main, args=(world_size, args.save_every, args.total_epochs, args.batch_size), nprocs=world_size) # 孵化多个进程

异常处理

由于涉及多节点多进程多个 GPU, 训练过程中出现异常的概率会大幅上升. 为避免出现中断导致从头开始训练, 需要引入一个从中断处继续训练的机制. 可使用 pytorch torchrun 提供的 snapshot 功能, 为每轮训练保存快照, 这样当出现中断时, 就可以从快照中快速恢复.

虽然每轮或每几轮会保存 checkpoint 检查点, 但是检查点只包括模型的参数, 没有保存中断前的整个状态, 例如epoch 轮次等. 因此需要在 snapshot 中保存更丰富的信息以便恢复中断时的状态

使用 torchrun 后的几个变化:

  • 无须手动设置 rank, world_size 等环境变量, torchrun 会自动计算
  • 无须手动使用 mp.spawn 孵化多个进程, torchrun 会自动创建多个进程(单节点也可以适用);

当某个进程出现异常导致训练中断后, torchrun 会中断其他所有进程. 再次运行 torchrun 会从中断处继续训练, 损失多少取决于快照保存的时间间隔.

在训练过程中, 如果添加或删除节点, torchrun 会自动在所有节点上重启所有进程, 无需手动干预

引入 snapshot 后, 代码结构大致变成下面这个样子:

1
2
3
4
5
6
7
8
9
10
11
def main():
load_snapshot(snapshot_path) # 运行时优先加载快照(如有)
initialize()
train()

def train():
for batch in iter(dataset):
train_step(batch)

if should_checkpoint: # 在训练过程中设置保存快照的条件
save_snapshot(snapshot_path)

以下是引入 torchrun 和 snapshot 后的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from datautils import MyTrainDataset

import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
import os

# torchrun 会自动设置 RANK, 因此无需再手动配置 rank 和 world_size
def ddp_setup():
torch.cuda.set_device(int(os.environ["LOCAL_RANK"])) # 使用 torchrun 设置的 RANK 环境变量
init_process_group(backend="nccl")

class Trainer:
def __init__(
self,
model: torch.nn.Module,
train_data: DataLoader,
optimizer: torch.optim.Optimizer,
save_every: int,
snapshot_path: str, # 快照保存路径
) -> None:
self.gpu_id = int(os.environ["LOCAL_RANK"]) # 使用 torchrun 设置的 RANK 环境变量
self.model = model.to(self.gpu_id)
self.train_data = train_data
self.optimizer = optimizer
self.save_every = save_every
self.epochs_run = 0
self.snapshot_path = snapshot_path
if os.path.exists(snapshot_path): # 如有快照, 优先加载, 而非从头开始
print("Loading snapshot")
self._load_snapshot(snapshot_path)

self.model = DDP(self.model, device_ids=[self.gpu_id])

# 加载快照
def _load_snapshot(self, snapshot_path):
loc = f"cuda:{self.gpu_id}"
snapshot = torch.load(snapshot_path, map_location=loc)
# 读取快照中的数据
self.model.load_state_dict(snapshot["MODEL_STATE"])
self.epochs_run = snapshot["EPOCHS_RUN"]
print(f"Resuming training from snapshot at Epoch {self.epochs_run}")

def _run_batch(self, source, targets):
self.optimizer.zero_grad()
output = self.model(source)
loss = F.cross_entropy(output, targets)
loss.backward()
self.optimizer.step()

def _run_epoch(self, epoch):
b_sz = len(next(iter(self.train_data))[0])
print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")
self.train_data.sampler.set_epoch(epoch)
for source, targets in self.train_data:
source = source.to(self.gpu_id)
targets = targets.to(self.gpu_id)
self._run_batch(source, targets)

# 保存快照, 取代 checkpoint 保存
def _save_snapshot(self, epoch):
# 保存 model.module.state_dict 和 epoch 两个值, 原 checkpoint 只保存了前者
snapshot = {
"MODEL_STATE": self.model.module.state_dict(),
"EPOCHS_RUN": epoch,
}
torch.save(snapshot, self.snapshot_path)
print(f"Epoch {epoch} | Training snapshot saved at {self.snapshot_path}")

def train(self, max_epochs: int):
# 从 epochs_run 处开始, 不再从 0 开始
for epoch in range(self.epochs_run, max_epochs):
self._run_epoch(epoch)
# 按 save_every 保存快照(仅在 GPU:0 上面保存, 之前是保存 checkpoint)
if self.gpu_id == 0 and epoch % self.save_every == 0:
self._save_snapshot(epoch)


def load_train_objs():
train_set = MyTrainDataset(2048) # load your dataset
model = torch.nn.Linear(20, 1) # load your model
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
return train_set, model, optimizer


def prepare_dataloader(dataset: Dataset, batch_size: int):
return DataLoader(
dataset,
batch_size=batch_size,
pin_memory=True,
shuffle=False,
sampler=DistributedSampler(dataset)
)


def main(save_every: int, total_epochs: int, batch_size: int, snapshot_path: str = "snapshot.pt"):
ddp_setup()
dataset, model, optimizer = load_train_objs()
train_data = prepare_dataloader(dataset, batch_size)
trainer = Trainer(model, train_data, optimizer, save_every, snapshot_path)
trainer.train(total_epochs)
destroy_process_group()


if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='simple distributed training job')
parser.add_argument('total_epochs', type=int, help='Total epochs to train the model')
parser.add_argument('save_every', type=int, help='How often to save a snapshot')
parser.add_argument('--batch_size', default=32, type=int, help='Input batch size on each device (default: 32)')
args = parser.parse_args()
# 直接调用 main 函数, 由 torchrun 进行调度, 不再需要手动 mp.spawn
main(args.save_every, args.total_epochs, args.batch_size)
1
torchrun --standalone --nproc_per_node=4 multigpu_torchrun.py 50 10

多节点训练

当使用多个节点时, 因为快照只保存在其中一台机器(节点)上, 因此当出现异常导致所有进程重启时, 所有节点都需要从该节点获取快照, 因此有两点很重要, 一是该节点的带宽尽量大一些; 二是确保 TCP 能够连通(注意检查防火墙设置)

由于多节点需要通过网络进行通讯, 其带宽远小于相同节点多个 GPU 之间的通讯带宽, 因此 1 机多卡的训练速度要远大于多机单卡;

有两种方法进行多节点的训练:

  • 在每个节点上面运行相同参数的 torchrun 命令;
  • 创建一个集群, 然后使用一个管理器统一进行调度(例如 slurm);

在单节点场景中, gpu_id 来自 torchrun 设置的 LOCAL_RANK 环境变量. 在多点节场景中, 还需要使用 torchrun 设置的另外一个全局 RANK 环境变量, 以便唯一识别每个 GPU 进程.

当出现异常导致 torchrun 重启所有进程时, 每个进程获得的 LOCAL_RANK 和 RANK 变量可能和之前的不同.

torchrun 支持异构扩展, 即每个节点的 GPU 数量可以不同, 例如有些节点配备 4 个 GPU, 有些配备 2 个GPU

调试时, 可设置环境变量 NCCL_DEBUG=INFO, 这样可以查看详细的日志.

以下是使用多节点训练时的代码变更:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from datautils import MyTrainDataset

import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
import os


def ddp_setup():
torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
init_process_group(backend="nccl")

class Trainer:
def __init__(
self,
model: torch.nn.Module,
train_data: DataLoader,
optimizer: torch.optim.Optimizer,
save_every: int,
snapshot_path: str,
) -> None:
self.local_rank = int(os.environ["LOCAL_RANK"]) # 原 gpu_id 属性替换为 local_rank
self.global_rank = int(os.environ["RANK"]) # 此处使用 RANK 来唯一识别 GPU 进程
self.model = model.to(self.local_rank)
self.train_data = train_data
self.optimizer = optimizer
self.save_every = save_every
self.epochs_run = 0
self.snapshot_path = snapshot_path
if os.path.exists(snapshot_path):
print("Loading snapshot")
self._load_snapshot(snapshot_path)

self.model = DDP(self.model, device_ids=[self.local_rank])

def _load_snapshot(self, snapshot_path):
loc = f"cuda:{self.local_rank}"
snapshot = torch.load(snapshot_path, map_location=loc)
self.model.load_state_dict(snapshot["MODEL_STATE"])
self.epochs_run = snapshot["EPOCHS_RUN"]
print(f"Resuming training from snapshot at Epoch {self.epochs_run}")

def _run_batch(self, source, targets):
self.optimizer.zero_grad()
output = self.model(source)
loss = F.cross_entropy(output, targets)
loss.backward()
self.optimizer.step()

def _run_epoch(self, epoch):
b_sz = len(next(iter(self.train_data))[0])
print(f"[GPU{self.global_rank}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}") # 打印日志时, 使用 global_rank 进行区分不同的 GPU 进程
self.train_data.sampler.set_epoch(epoch)
for source, targets in self.train_data:
source = source.to(self.local_rank)
targets = targets.to(self.local_rank)
self._run_batch(source, targets)

def _save_snapshot(self, epoch):
snapshot = {
"MODEL_STATE": self.model.module.state_dict(),
"EPOCHS_RUN": epoch,
}
torch.save(snapshot, self.snapshot_path)
print(f"Epoch {epoch} | Training snapshot saved at {self.snapshot_path}")

def train(self, max_epochs: int):
for epoch in range(self.epochs_run, max_epochs):
self._run_epoch(epoch)
if self.global_rank == 0 and epoch % self.save_every == 0:
self._save_snapshot(epoch)


def load_train_objs():
train_set = MyTrainDataset(2048) # load your dataset
model = torch.nn.Linear(20, 1) # load your model
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
return train_set, model, optimizer


def prepare_dataloader(dataset: Dataset, batch_size: int):
return DataLoader(
dataset,
batch_size=batch_size,
pin_memory=True,
shuffle=False,
sampler=DistributedSampler(dataset)
)


def main(save_every: int, total_epochs: int, batch_size: int, snapshot_path: str = "snapshot.pt"):
ddp_setup()
dataset, model, optimizer = load_train_objs()
train_data = prepare_dataloader(dataset, batch_size)
trainer = Trainer(model, train_data, optimizer, save_every, snapshot_path)
trainer.train(total_epochs)
destroy_process_group()


if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='simple distributed training job')
parser.add_argument('total_epochs', type=int, help='Total epochs to train the model')
parser.add_argument('save_every', type=int, help='How often to save a snapshot')
parser.add_argument('--batch_size', default=32, type=int, help='Input batch size on each device (default: 32)')
args = parser.parse_args()

main(args.save_every, args.total_epochs, args.batch_size)

方案一: 在每台节点上手动单独运行 torchrun 命令

1
2
3
4
5
6
7
8
# 节点1
torchrun \
--proc_per_node=4 \ # 这里的数字取决于每个节点上的 GPU 数量, 在不同的节点上的 GPU 数量可能不同
--nnodes=2 \ # 节点总数
--node_rank=0 \ # 当前节点编号
--rdzv_id=456 \ # renderzvous 会合节点ID, 各节点相同
--rdzv_endpoint=172.31.43.139:29603 \ # 会合节点的IP:端口, 各节点相同
multinode_torchrun.py 50 10
1
2
3
4
5
6
7
8
# 节点2
torchrun \
--proc_per_node=2 \ # 当前节点只有 2 个 GPU
--nnodes=2 \ # 节点总数
--node_rank=1 \ # 当前节点编号
--rdzv_id=456 \ # renderzvous 会合节点ID, 各节点相同
--rdzv_endpoint=172.31.43.139:29603 \ # 会合节点的IP:端口, 各节点相同
multinode_torchrun.py 50 10

方案二: 创建集群, 使用调度器

不同云服务商有不同的创建集群的方法

创建 slurm 脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#!/bin/bash

#SBATCH --job-name=multinode-example
#SBATCH --nodes=4
#SBATCH --ntasks=4
#SBATCH --gpus-per-task=1
#SBATCH --cpus-per-task=4 # 多个 CPU 可以让数据加载变快一些

nodes=( $( scontrol show hostnames $SLURM_JOB_NODELIST ) )
nodes_array=($nodes)
head_node=${nodes_array[0]}
head_node_ip=$(srun --nodes=1 --ntasks=1 -w "$head_node" hostname --ip-address)

echo Node IP: $head_node_ip
export LOGLEVEL=INFO

srun torchrun \
--nnodes 4 \
--nproc_per_node 1 \
--rdzv_id $RANDOM \
--rdzv_backend c10d \
--rdzv_endpoint $head_node_ip:29500 \
/shared/examples/multinode_torchrun.py 50 10

运行脚本开始训练

1
sbatch slurm/sbatch_run.sh

可使用 squeue 命令查看任务队列

使用 DDP 模式单机多卡或多机多卡训练 minGPT 的示例

DDP 的实现原理: 给模型的参数注册钩子, 在反向传播时, 将梯度同步到所有进程, 以便所有进程对应的模型的参数梯度值达成最终一致性, 这样各个进程在更新参数后的模型状态能够保持一致.

处理速度差异

不同的进程的计算进度通常有先有后, 在数据同步时, 不同进程之间需要相互等待, 确保所有分片的数据完成同步后, 再进行下一步的计算. 因此需要设置一个足够大的 timeout 值, 避免有些进程在等待的过程中出现超时.

torch.distributed 库中有个 barrier() 方法可用来设置集合点, 确保所有进程到达该集合点后, 再进入下一步. 这样可以避免 A 进程还未保存模型数据, B 进程已经先加载数据并往下计算.

结合模型并行

Model Parallelism, 将模型的不同层放在不同的 GPU 上面, 不同 GPU 组成流水线;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class ToyMpModel(nn.Module):
def __init__(self, dev0, dev1): # 参数是两个 GPU
super(ToyMpModel, self).__init__()
self.dev0 = dev0
self.dev1 = dev1
self.net1 = torch.nn.Linear(10, 10).to(dev0) # 线性层1 在 dev0
self.relu = torch.nn.ReLU()
self.net2 = torch.nn.Linear(10, 5).to(dev1) # 线性层2 在 dev1

def forward(self, x):
x = x.to(self.dev0) # 复制输入到 dev0
x = self.relu(self.net1(x))
x = x.to(self.dev1) # 复制上一层的输出到 dev1
return self.net2(x)

模型并行有几种不同类型:

  • Tensor Paralelism, 张量并行
  • Pipeline Paralelism, 流水线并行
  • Expert Parallelism, 专家并行

Uneven Inputs

不同的 GPU 进程可能收到的 batch 大小不同, 出现不均衡的输入, 会导致不同的进程的迭代次数不一样. 此时某些进程可能提前完成计算, 但其他进程可能还在训练中. 如果让完成计算的进程退出, 会导致后续同步数据时, 其他进程一直处理等待的状态, 最终造成死锁.

为避免这个问题 torch.distributed.algorithms.join 库中引入了 Join 上下文管理器. 当某些进程提前完成数据的计算后, 会进入 join 模式. 在这个模式下, 该进程不会提前退出, 而是会假装仍在工作, 会继续参与数据同步的工作, 只是它发的梯度数据是零, 同时在 step 时也不会更新模型的参数. 直到所有进程都完成计算后, 整个上下文才会终止.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.distributed.algorithms.join import Join
from torch.nn.parallel import DistributedDataParallel as DDP

BACKEND = "nccl"
WORLD_SIZE = 2
NUM_INPUTS = 5

def worker(rank):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'
dist.init_process_group(BACKEND, rank=rank, world_size=WORLD_SIZE)

model = DDP(torch.nn.Linear(1, 1).to(rank), device_ids=[rank])
# Rank 1 比 rank 0 多一个输入, 模拟输入不平衡的场景
inputs = [torch.tensor([1]).float() for _ in range(NUM_INPUTS + rank)]

num_inputs = 0
with Join([model]): # 使用 Join 管理上下文
for input in inputs:
num_inputs += 1
loss = model(input).sum()
loss.backward()

print(f"Rank {rank} has exhausted all {num_inputs} of its inputs!")

def main():
mp.spawn(worker, nprocs=WORLD_SIZE, join=True)

if __name__ == "__main__":
main()

Join 还可以同时管理多个类, 例如 ZeroRedundancyOptimizer, 示例如下:

ZeroRedundancyOptimizer 用来给优化器状态进行分片, 这样优化器的数据也可以像梯度一样是分布式的, 好处是可以减少单个 GPU 的内存占用, 等实际需要使用时, 多个 GPU 之间再进行同步即可. 缺点是需要付出一点数据同步的时间. 以时间换空间, 是否使用要看用哪种优化器, 有些优化器的状态占内存较多, 例如 Adam. 有些则没有;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from torch.distributed.optim import ZeroRedundancyOptimizer as ZeRO
from torch.optim import Adam

def worker(rank):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'
dist.init_process_group(BACKEND, rank=rank, world_size=WORLD_SIZE)

model = DDP(torch.nn.Linear(1, 1).to(rank), device_ids=[rank])
optim = ZeRO(model.parameters(), Adam, lr=0.01) # 使用 ZeroRedundancyOptimizer
# Rank 1 比 rank 0 多一个输入, 模拟输入不平衡的场景
inputs = [torch.tensor([1]).float() for _ in range(NUM_INPUTS + rank)]

num_inputs = 0
with Join([model, optim]): # 优化器和模型一起 Join
for input in inputs:
num_inputs += 1
loss = model(input).sum()
loss.backward()
optim.step()

print(f"Rank {rank} has exhausted all {num_inputs} of its inputs!")

Join 支持传入一些参数

1
2
3
4
# 传入 divide_by_initial_world_size 参数
with Join([model, optim], divide_by_initial_world_size=False):
for input in inputs:
...

FSDP2

Fully Sharded Data Parallel, 全分片数据并行. DDP 只是训练数据分片, FSDP 则更进一步, 连模型参数, 优化器状态都进行分片, 这样做的好处是可以减少单个 GPU 的内存容量要求.

由于 FSDP 将模型的参数也分片了, 因此相比 DDP, 它不仅要同步梯度, 还多了一个同步参数的动作. 即每个 GPU 需要先all-gather 动态的收集其他 GPU 上面的参数, 基于完整的模型参数计算出结果后, 再释放掉内存, 只保留自己所负责的那部分参数. 所以从结果上来看, 每个 GPU 上面的模型参数好像是完整的, 但实际上这种完整是一种临时拼凑的状态,是一种逻辑上的完整,而不是物理上的完整. 计算结束后, 实际保存的仍然只是一个参数分片.

FSDP 或许可视为一种更加激进的分片策略, 通过极致的以时间换空间, 进一步压榨 GPU 显存

模型初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from torch.distributed.fsdp import fully_shard, FSDPModule

model = Transformer()

for layer in model.layers:
fully_shard(layer) # 模型的每一层都要 fully_shard 进行分片

fully_shard(model) # 最后模型也要分片 fully_shard

assert isinstance(model, Transformer)
assert isinstance(model, FSDPModule)
print(model)
# FSDPTransformer(
# (tok_embeddings): Embedding(...)
# ...
# (layers): 3 x FSDPTransformerBlock(...)
# (output): Linear(...)
# )

fully_shard 会将模型的参数由 Tensor 转成 DTensor 类型(一种数据抽象), 用来表示它是做了分片的参数. DTensor 有助于简化各项操作的逻辑, 减少手动处理这些分片数据的同步工作.

FSDP2 在参数的第一个维度上进行分片, 因此第一维的数量需要是 world size 的倍数, 否则无法均匀分片. 此时 FSDP 默认会报错.

由于 FSDP2 模式下参数是分片的, 因此在保存模型时, 可以借助 DCP(Distributed Checkpointing)实现分布式保存. 每个 GPU 进程只保存自己的那部分参数, 这样可以减少通讯成本. 后续加载参数时, 每个 GPU 也可以只加载自己负责的那部分参数.

前向/后向预取

由于模型参数是分片, 计算前需要实时 all-gather 聚合, 这样就涉及了通信的时间. 理想情况下让计算和通信同时进行, 这样计算的时候,不用等待通信. 有两种实现的方案:

  • 隐式预取: 在计算当前层的时候, 后台自动偷偷 all-gather 预取下一个 layer 或 block 所需的参数.
  • 显式预取: 手动控制什么时候预取参数, 好处是非常灵活, 一般有以下几种使用场景:
    • 隐式只预取一层, 显式则可以预取多层, 好处是减少等待的发生, 缺点是占用多一些内存空间;
    • 预热: 在调用 model(x) 之前先预取, 这样调用 model 后直接开算, 不用等待;
    • CPU 工作负载过高, 没有及时发出预取指令, 造成空闲等待, 因此只能手动完成;

以下是隐式预取的图示:

以下是显式预取的代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
num_to_forward_prefetch = 2
for i, layer in enumerate(model.layers):
if i >= len(model.layers) - num_to_forward_prefetch:
break
layers_to_prefetch = [
model.layers[i + j] for j in range(1, num_to_forward_prefetch + 1)
]
layer.set_modules_to_forward_prefetch(layers_to_prefetch) # 前向预取

num_to_backward_prefetch = 2
for i, layer in enumerate(model.layers):
if i < num_to_backward_prefetch:
continue
layers_to_prefetch = [
model.layers[i - j] for j in range(1, num_to_backward_prefetch + 1)
]
layer.set_modules_to_backward_prefetch(layers_to_prefetch) # 后向预取

for _ in range(epochs):
model.unshard() # 手动触发一次 all-gather, 减少后续 GPU 的等待时间, 正常 unshard 是隐式触发的
x = torch.randint(0, vocab_size, (batch_size, seq_len), device=device)
loss = model(x).sum()
loss.backward()
optim.step()
optim.zero_grad()

开启混合精度

FSDP2 支持灵活的混合精度策略, 以便提高训练的速度. 典型用法示例:

  • 在前向和后向传播将默认的 float32 转成 bfloat16 类型, 这样可以节省显存占用;
  • 做梯度聚合时, 将 bfloat16 类型转回 float32, 以便提高梯度计算的精度, 避免误差可能带来的发散;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
model = Transformer(model_args)
# 混合精度策略
fsdp_kwargs = {
"mp_policy": MixedPrecisionPolicy(
param_dtype=torch.bfloat16, # 参数用 bfloat16
reduce_dtype=torch.float32, # 梯度聚合用 float32
)
}
for layer in model.layers:
fully_shard(layer, **fsdp_kwargs) # 传递策略参数
fully_shard(model, **fsdp_kwargs) # 传递策略参数

# sharded parameters are float32
for param in model.parameters():
assert param.dtype == torch.float32

# unsharded parameters are bfloat16
model.unshard()
for param in model.parameters(recurse=False):
assert param.dtype == torch.bfloat16
model.reshard()

# 优化器使用默认的 float32
optim = torch.optim.Adam(model.parameters(), lr=1e-2)

# training loop
# ...

梯度裁剪与基于 DTensor 的优化器

在 full_shard 模型后, 再初始化优化器, 以便优化器能够正确跟踪分布在不同 GPU 上面的参数分片.

参数分片后的包装类型是 DTensor, 它默认支持梯度裁剪. 梯度裁剪涉及计算范数, 此时需要先聚合梯度. DTensor 会自动拦截各种张量 Tensor 的正常操作, 然后在需要时会自动完成通信聚合的相关工作, 对用户来说是透明的.

TP

工作原理

对张量进行拆分计算的示意图, 例如 MLP 和自注意力两种计算场景:

TP 的工作原理主要由三部分构成:

  • 静态分片
    • 模型初始化时, TP 为模型的每一层选择不同的并行策略(按行或按列)
    • 调用 parallelize_module 将模型转换为并行版本;
    • 将普通张量 Tensor 转换为支持并行的 DTensor 张量(自动处理跨设备的通信)
  • 动态通信
    • all-reduce
    • all-gather
    • reduce-scatter
  • 局部计算: 每个 GPU 只负责计算自己负责的那部分分片数据;

使用场景

虽然 FSDP 已经能够实现数据并行, 也能够对模型参数进行切片, 但它仍然面临一些瓶颈:

  • 当 GPU 数量变多时, 通信成本变成瓶颈;
  • 每个 GPU 的 batch size 最小为 1, 过多的 GPU 导致总体 batch size 非常大. 但过大的 batch size 可能导致收敛不稳定;
  • 较小的 batth size 有可能无法满足 GPU 的形状规整要求(例如维度是 8, 16, 32 的倍数), 导致浮点运算的效率不高;

TP/SP 刚好可以用来解决以上问题:

  • 在节点间使用 FSDP, 单节点内部的多 GPU 之间使用 TP, 这样可以显著减少通信成本. 例如一机八卡, 那么节点间的通信成本可以除以 8 倍;相当于 FSDP 是以节点进行切片, 而不是以 GPU 进行切片;
  • 张量支持切片后, GPU 的 batch size 就可以小于 1 了, 二者实现了解耦;
  • 张量切片有助于满足 GPU 浮点运算对矩阵形状的规整要求;
问题 解决方案 效果
FSDP 在超大规模 GPU 下通信开销大 节点内用 TP/SP,节点间用 FSDP 减少 FSDP 通信域,降低延迟
数据并行受批大小限制 引入 TP/SP 解耦 GPU 数与批大小 可继续增加 GPU 而不破坏收敛
小 batch 导致计算效率低 TP/SP 优化矩阵形状 提高 FLOPS 利用率

使用方法

1
2
3
4
# 初始化一个 device emsh
from torch.distributed.device_mesh import init_device_mesh

tp_mesh = init_device_mesh("cuda", (8,))

如何实施张量并行, 取决于模型某一层如何进行计算, 因此要视具体情况而定, 举例如下:

1
2
3
# 假设有如下的 SwiGLU 激活运算, 来源于 Transformer block 的 feed_forward 层
def forward(self, x):
return self.w2(F.silu(self.w1(x)) * self.w3(x))

w1 和 w3 是两个线性层, 是可以并行的矩阵乘法运算, w2 同样是线性层, 基于前二者的逐元素相乘的结果再做矩阵乘法;

  • 第1步: w1 和 w3 是单独对输入 x 做矩阵乘法, 因此二者可以按列切片, 这样每个 GPU 只需计算自己负责的那一部分列即可;x 则需要完整的广播到所有 GPU, 没有切片;
  • 第2步: w1 和 w3 是逐元素相乘, 因此前面的计算结果仍然可以在 GPU 内部做局部计算, 无须和其他 GPU 进行通信;
  • 第3步: w2 是矩阵乘法, 可将 w2 按行切分, 每个 GPU 单独计算出一部分结果, 最后通过 all reduce 聚合成完整的结果;
1
2
3
4
5
6
7
8
# 制定如下的并行策略
from torch.distributed.tensor.parallel import ColwiseParallel, RowwiseParallel, parallelize_module

layer_tp_plan = {
"feed_foward.w1": ColwiseParallel(),
"feed_forward.w2": RowwiseParallel(),
"feed_forward.w3": ColwiseParallel(),
}
组件 并行方式 输入布局 输出布局 是否需要通信
w1, w3 Column-wise Replicated Sharded 否(各自独立计算)
w1(x) * w3(x) Element-wise Sharded Sharded 否(同设备上直接相乘)
w2 Row-wise Sharded Replicated(经 all-reduce) 是(最后一次 all-reduce)

以上示例是 feed_forward 层, 对于 Transformer 模块中的 self attention 注意力层, 并行策略大概类似:

1
2
3
4
5
6
7
8
9
10
11
12
13
layer_tp_plan = {
# q,k,v 按列切分
# use_local_output=False 确保输出仍为 DTensor, 而不是普通 tensor, 保留分片信息, 非常重要!!!
"attention.wq": ColwiseParallel(use_local_output=False),
"attention.wk": ColwiseParallel(use_local_output=False),
"attention.wv": ColwiseParallel(use_local_output=False),
# o 按行切分
"attention.wo": RowwiseParallel(),
# 以下是 feed_forward 层
"feed_forward.w1": ColwiseParallel(),
"feed_forward.w2": RowwiseParallel(),
"feed_forward.w3": ColwiseParallel(),
}

使用 parallel_module 函数将 transformer block 转换成张量并行模式

1
2
3
4
5
6
7
8
for layer_id, transformer_block in enumerate(model.layers):
layer_tp_plan = {...} # 详见前面的示例, 此处内容略

parallelize_module(
module=transformer_block,
device_mesh=tp_mesh,
parallelize_plan=layer_tp_plan,
)

对于模型头部的 Embedding 层和尾部的 Linear 层, 采用类似的张量并行策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
model = parallelize_module(
model,
tp_mesh,
{
# 嵌入层按行分片, 将 vocal 词汇表切分到不同的设备上
# 也可按列分, 或者不分(不分最简单, 除非特别大)
"tok_embeddings": RowwiseParallel(
input_layouts=Replicate(), # 输入的 token ids 是全量副本, 不分片
),
"output": ColwiseParallel(
output_layouts=Replicate(),# 输出是一个拼接后的完整副本, 为了方便后续计算 loss
),
}
)

用于归一化层

SP(sequence parallel, 序列并行), 是 TP 张量并行的一个变体. TP 是对权重进行切分, 比如按矩阵的行或列. 而 SP 则更进一步, 对输入序列的长度进行切分. 它主要用于 LayerNorm 或者 RMSNorm 等归一化层中, 将一个长序列切分成多个片段, 每个 GPU 负责处理其中一个片段. 目的是为了减少激活值所占用的内存. 因为激活值会随着输入序列的长度和批量大小呈线性增长.

SP: Shard(1), 在第1维(seq维)进行切分, [batch, seq_len, hidden_dim] 变成 [batch, seq_len / N, hidden_dim]

RMSNorm 或 LayerNorm 等归一化层是在 hiddem_dim 维度上做归一化, 与 sequence 维度无关, 因此归一化层很适合用来分片, 不同分片之间无需通信. 而 Attention 或 FeedForward 则不适合用来做 SP, 因为它们的矩阵需要完整的行或列, SP 后会带来很大的通信成本.

代码示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from torch.distributed.tensor.parallel import (
PrepareModuleInput,
SequenceParallel,
)

layer_tp_plan = {
# 当使用 SP 分片, 输出有可能需要转回 Replicate, 以供后续的层使用
"attention_norm": SequenceParallel(), # SP 分片
"attention": PrepareModuleInput(
input_layouts=(Shard(1), Replicate()), # 需要将 Shard(1) 转回 Replicate
desired_input_layouts=(Replicate(), Replicate()), # 期望的输入格式
),
"attention.wq": ColwiseParallel(use_local_output=False),
"attention.wk": ColwiseParallel(use_local_output=False),
"attention.wv": ColwiseParallel(use_local_output=False),
"attention.wo": RowwiseParallel(output_layouts=Shard(1)),
"ffn_norm": SequenceParallel(), # SP 分片
"feed_forward": PrepareModuleInput(
input_layouts=(Shard(1),), # 需要将 Shard(1) 转回 Replicate
desired_input_layouts=(Replicate(),), # 期望的输入格式
),
"feed_forward.w1": ColwiseParallel(),
"feed_forward.w2": RowwiseParallel(output_layouts=Shard(1)),
"feed_forward.w3": ColwiseParallel(),
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 处理头尾的层
model = parallelize_module(
model,
tp_mesh,
{
"tok_embeddings": RowwiseParallel(嵌入层
input_layouts=Replicate(),
output_layouts=Shard(1), # 将输出转成 SP 的 Shard(1)
),
"norm": SequenceParallel(),
"output": ColwiseParallel(
input_layouts=Shard(1),
output_layouts=Replicate() # 输出转成 Replicate
),
}
)

损失并行

如果 vocal_size 已经是分片的, 那么在计算损失时, 每个 GPU 也可先做自己负责的 logits 的局部交叉熵损失计算, 之后再 reduce 得到最终的 loss;

1
2
3
4
# 使用 loss_parallel() 上下文管理器, cross_entropy 会自动按分片做损失的并行计算
with loss_parallel():
loss = torch.nn.functional.cross_entropy(logits, labels)
loss.backward() # 需要在上下文中调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
model = parallelize_module(
model,
tp_mesh,
{
"tok_embeddings": RowwiseParallel(
input_layouts=Replicate(),
output_layouts=Shard(1),
),
"norm": SequenceParallel(),
"output": ColwiseParallel(
input_layouts=Shard(1),
# 输出是 DTensor, 不再是 Replicate, 以便能够实现 loss parallel
use_local_output=False,
),
},
)

结合 FSDP

FSDP 负责节点间的切片, TP 负责单节点多个 GPU 间的切片

以上切分方式可通过设置 DeviceMesh 的参数来实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from torch.distributed.device_mesh import init_device_mesh
from torch.distributed.tensor.parallel import ColwiseParallel, RowwiseParallel, parallelize_module
from torch.distributed.fsdp import fully_shard

# 2d mesh 网格, 8路 DP, 8路 TP, 总共 64 个 GPU
mesh_2d = init_device_mesh("cuda", (8, 8))
tp_mesh = mesh_2d["tp"] # 单节点内部的子网格
dp_mesh = mesh_2d["dp"] # 跨节点的子网格

model = Model(...)

tp_plan = {...}

# 先在节点内部启用张量并行
model_tp = parallelize_module(model, tp_mesh, tp_plan)
# 再在节点间启用 FSDP 数据并行
model_2d = fully_shard(model_tp, mesh=dp_mesh, ...)

使用 FSDP + TP 的代码示例

DeviceMesh

DeviceMesh 是用来管理进程组的一个高级抽象, 简化手动管理这些进程的工作, 例如给每个进程分配 rank id

1
2
3
4
5
6
7
8
# DeviceMesh 让创建多维并行变得很简单, 以下是创建二维并行的示例
# 无须再手工创建 replicate group 和 shard group
from torch.distributed.device_mesh import init_device_mesh
mesh_2d = init_device_mesh("cuda", (2, 4), mesh_dim_names=("replicate", "shard"))

# Users can access the underlying process group thru `get_group` API.
replicate_group = mesh_2d.get_group(mesh_dim="replicate")
shard_group = mesh_2d.get_group(mesh_dim="shard")

HSDP

HSDP, Hybrid Sharding Data Paralelism, 混合数据并行, 单节点内部使用 FSDP, 多节点之间使用 DDP

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import torch
import torch.nn as nn

from torch.distributed.device_mesh import init_device_mesh
from torch.distributed.fsdp import fully_shard as FSDP


class ToyModel(nn.Module):
def __init__(self):
super(ToyModel, self).__init__()
self.net1 = nn.Linear(10, 10)
self.relu = nn.ReLU()
self.net2 = nn.Linear(10, 5)

def forward(self, x):
return self.net2(self.relu(self.net1(x)))


# HSDP: MeshShape(2, 4)
mesh_2d = init_device_mesh("cuda", (2, 4), mesh_dim_names=("dp_replicate", "dp_shard"))
model = FSDP(
ToyModel(), device_mesh=mesh_2d
)

自定义并行方案

对于复杂的自定义并行方案, DeviceMesh 支持从网格中提取子网格, 进行一些自定义的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
from torch.distributed.device_mesh import init_device_mesh

# 定义一个三维并行的网格
mesh_3d = init_device_mesh("cuda", (2, 2, 2), mesh_dim_names=("replicate", "shard", "tp"))

# 提取子网格
hsdp_mesh = mesh_3d["replicate", "shard"]
tp_mesh = mesh_3d["tp"]

# 提取进程组
replicate_group = hsdp_mesh["replicate"].get_group()
shard_group = hsdp_mesh["shard"].get_group()
tp_group = tp_mesh.get_group()

PP

Pipeline Paralism, 流水线并行. 当模型太大, 无法放入单个 GPU 时, 可考虑使用 PP 并行模式. 将模型按层或模块拆分为多个 stage, 每个 GPU 负责一部分 stage;同时将一个 batch 的数据, 也拆分成多个小的 micro-batch, 逐个输入流水线.

micro-batch 的好处是小步快跑,这样有助于让多个 stage 同时跑起来;如果没有拆分, 则后面的 stage 需要等待前面的 stage 处理完;尽管如此, 启动和结束时出现一个小的等待窗口仍然是不可避免的, 称为 bubble;

执行顺序如下(时间步 t1, t2, …):

时间 S1(设备0) S2(设备1)
t1 M1 前向 ——
t2 M2 前向 M1 前向 + 反向
t3 M3 前向 M2 前向 + 反向
t4 M4 前向 M3 前向 + 反向
t5 —— M4 前向 + 反向

PyTorch
https://ccw1078.github.io/2024/12/13/Pytorch/
作者
ccw
发布于
2024年12月13日
许可协议