Pytorch Distributed

  • nn.DataParallel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

input_size = 5
output_size = 2
batch_size = 30
data_size = 100
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

class RandomDataset(Dataset):
def __init__(self, size, length):
self.len = length
# 100*5
self.data = torch.randn(length, size)
def __getitem__(self, index):
# (5, )
return self.data[index]
def __len__(self):
# 100
return self.len

rand_loader = DataLoader(dataset=RandomDataset(input_size, data_size),
batch_size=batch_size,
shuffle=True)
next(iter(rand_loader)).shape # torch.Size([30, 5])

class Model(nn.Module):
def __init__(self, input_size, output_size):
# 5 => 2
super(Model, self).__init__()
self.fc = nn.Linear(input_size, output_size)
def forward(self, input):
output = self.fc(input)
print("\\tIn Model: input size", input.size(), # 每个进程都会print
"output size", output.size())
return output

model = Model(input_size, output_size)
if torch.cuda.device_count() > 1:
print("Let's use", torch.cuda.device_count(), "GPUs!")
# dim = 0 [30, xxx] -> [10, ...], [10, ...], [10, ...] on 3 GPUs
model = nn.DataParallel(model)
print(model)
# DataParallel(
# (module): Model(
# (fc): Linear(in_features=5, out_features=2, bias=True)
# )
# )

# tensor: to(device)
a = torch.randn(3, 4)
print('a.is_cuda', a.is_cuda) # False
b = a.to('cuda:0')
print('a.is_cuda', a.is_cuda) # False
print('b.is_cuda', b.is_cuda) # True

# model: to(device)
a = Model(3, 4)
print(next(a.parameters()).is_cuda) # False
b = a.to('cuda:0')
print(next(a.parameters()).is_cuda) # True
print(next(b.parameters()).is_cuda) # True

for data in rand_loader:
input = data.to(device)
output = model(input)
print("Outside: input size", input.size(),
"output_size", output.size())

# In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2])
# In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2])
# Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])
# In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2])
# In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2])
# Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])
  • 分布式数据并行时,模型(model parameters)/优化器(optimizer states)每张卡都会拷贝一份(replicas)
    • DDP 始终在卡间维持着模型参数和优化器状态的同步一致性在整个训练过程中
  • Data Parallel,batch input,通过 DistributedSampler split & 分发到不同的 gpus 上
    • 此时虽然模型/optimizer 相同,但因为数据输入不同,导致 loss 不同,反向传播时计算到的梯度也会不同
    • 此时 ddp 如何保证卡间,model/optimizer 的同步一致性呢
      • ring all-reduce algorithm 其同步过程不需要等待所有的卡都计算完成一轮梯度
  • world,world_size:
    • world:as a group containing all the processes for your distributed training.
      • 通常,每一个 gpu 代表一个进程(process)
      • world 内的 process 可以彼此通信,所以有 ddp 分布式训练的;
  • rank
    • rank: is the unique ID given to a process, 进程级别的概念,rank 是为了标识、识别进程,因为进程间(process)需要通信;
    • local rank:is the a unique local ID for processes running in a single node
  • node 理解为一个 server,2个servers(多机,机器之间需要通信)就是2个nodes
    • 比如每个 node/server/machine 各有4张卡(4 gpus),一个 2 个node/server;
    • world_size: 2*4 == 8
    • ranks: [0, 1, 2, 3, 4, 5, 6, 7]
    • local_rank: [0, 1, 2, 3], [0, 1, 2, 3]
  • An example for setup
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def ddp_setup(rank, world_size):
"""
Args:
rank: Unique identifier of each process
world_size: Total number of processes
"""
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "12355"
init_process_group(backend="nccl", rank=rank, world_size=world_size)
torch.cuda.set_device(rank)

if not torch.distributed.is_initialized():
torch.distributed.init_process_group("nccl")
# torchrun --nproc_per_node 的参数
model_parallel_size = int(os.environ.get("WORLD_SIZE", 1))
local_rank = int(os.environ.get("LOCAL_RANK", 0))
torch.cuda.set_device(local_rank)
  • nn.parallel.DistributedDataParallel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import os
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
# custom trainer
class Trainer:
def __init__(self,
model: torch.nn.Module,
train_dataloader: DataLoader,
optimizer: torch.optim.Optimizer,
gpu_id: int) -> None:
# rank
self.gpu_id = gpu_id
self.model = model.to(gpu_id)
self.train_dataloader = train_dataloader
self.optimizer = optimizer
self.model = DDP(model, device_ids=[gpu_id])

def _run_batch(self, xs, ys):
self.optimizer.zero_grad()
output = self.model(xs)
loss = F.cross_entropy(output, ys)
loss.backward()
self.optimizer.step()

def _run_epoch(self, epoch):
batch_size = len(next(iter(self.train_dataloader))[0])
print(f'[GPU: {self.gpu_id}] Epoch: {epoch} | Batchsize: {batch_size} | Steps: {len(self.train_dataloader)}')
self.train_dataloader.sampler.set_epoch(epoch)
for xs, ys in self.train_dataloader:
xs = xs.to(self.gpu_id)
ys = ys.to(self.gpu_id)
self._run_batch(xs, ys)

def train(self, max_epoch: int):
for epoch in range(max_epoch):
self._run_epoch(epoch)

# custom dataset
class MyTrainDataset(Dataset):
def __init__(self, size):
self.size = size
self.data = [(torch.rand(20), torch.rand(1)) for _ in range(size)]
def __len__(self):
return self.size
def __getitem__(self, index):
return self.data[index]
train_dataset = MyTrainDataset(2048)
train_dataset[0]
# (tensor([0.4790, 0.5080, 0.1934, 0.5247, 0.6372, 0.9930, 0.2379, 0.9182, 0.3659,
# 0.8408, 0.2347, 0.1770, 0.8691, 0.2810, 0.2156, 0.8289, 0.9372, 0.6358,
# 0.4338, 0.2754]),
# tensor([0.6307]))

import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
def main(rank: int, world_size: int, max_epochs: int, batch_size: int):
ddp_setup(rank, world_size)
train_dataset = MyTrainDataset(2048)
train_dataloader = DataLoader(train_dataset,
batch_size=batch_size,
pin_memory=True,
shuffle=False,
# batch input: split to each gpus (各个 gpu 之间没有 overlaping samples)
sampler=DistributedSampler(train_dataset))
model = torch.nn.Linear(20, 1)
optimzer = torch.optim.SGD(model.parameters(), lr=1e-3)
trainer = Trainer(model=model, gpu_id=rank, optimizer=optimzer, train_dataloader=train_dataloader)
trainer.train(max_epochs)
destroy_process_group()
world_size = torch.cuda.device_count()
# 直接运行
!python ddp_gpus.py --max_epochs 5 --batch_size 32 # 多卡
# 区别:
def ddp_setup(rank, world_size):
"""
Args:
rank: Unique identifier of each process
world_size: Total number of processes
"""
# rank 0 process
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "12355"
# nccl:NVIDIA Collective Communication Library
# 分布式情况下的,gpus 间通信
init_process_group(backend="nccl", rank=rank, world_size=world_size)
torch.cuda.set_device(rank)

world_size = torch.cuda.device_count()
mp.spawn(main, args=(world_size, args.max_epochs, args.batch_size), nprocs=world_size)

# <https://github.com/chunhuizhang/pytorch_distribute_tutorials/blob/main/tutorials/ddp_gpus.py>

# torchrun运行
!torchrun ddp_gpus_torchrun.py --max_epochs 5 --batch_size 32 # 单卡
!torchrun --nproc-per-node=2 ddp_gpus_torchrun.py --max_epochs 5 --batch_size 32 # 多卡
!python -m torch.distributed.launch --use-env --nproc-per-node=2 ddp_gpus_torchrun.py --max_epochs 5 --batch_size 32 # 多卡, 与上行等价
# 区别
def ddp_setup():
"""
Args:
rank: Unique identifier of each process
world_size: Total number of processes
"""
# rank 0 process
# os.environ["MASTER_ADDR"] = "localhost"
# os.environ["MASTER_PORT"] = "12355"
# nccl:NVIDIA Collective Communication Library
# 分布式情况下的,gpus 间通信
init_process_group(backend="nccl")
torch.cuda.set_device(int(os.environ['LOCAL_RANK']))

# world_size = torch.cuda.device_count()
main(args.max_epochs, args.batch_size)

# <https://github.com/chunhuizhang/pytorch_distribute_tutorials/blob/main/tutorials/ddp_gpus_torchrun.py>
  • model parallel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
from transformers import LlamaTokenizer, LlamaForCausalLM, GenerationConfig
model = LlamaForCausalLM.from_pretrained("decapoda-research/llama-7b-hf",
load_in_8bit=True,
device_map="auto", # "auto", "balanced", "balanced_low_0", "sequential"
) # GPU(s) > CPU (RAM) > Disk
for i, para in enumerate(model.named_parameters()):
print(f'{i}, \\t {para[1].device} \\t{para[1].dtype}')

# An example on ToyModel
import torch
import torch.nn as nn
import torch.optim as optim
class ToyModel(nn.Module):
def __init__(self):
super(ToyModel, self).__init__()
self.net1 = torch.nn.Linear(10000, 10).to('cuda:0')
self.relu = torch.nn.ReLU()
self.net2 = torch.nn.Linear(10, 5).to('cuda:1')

def forward(self, x):
x = self.relu(self.net1(x.to('cuda:0')))
return self.net2(x.to('cuda:1'))

model = ToyModel()
loss_fn = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.001)
optimizer.zero_grad()
outputs = model(torch.randn(20, 10000))
labels = torch.randn(20, 5).to('cuda:1')
loss_fn(outputs, labels).backward()
optimizer.step()

# An example on ResNet
import torch
from torch import nn
from torchvision.models.resnet import ResNet, Bottleneck
# from torchvision.models.resnet import resnet18, resnet34, resnet50, resnet101, resnet152
model = ResNet(Bottleneck, [3, 4, 6, 3]) # resnet50
from torchsummary import summary
summary(model, input_size=(3, 128, 128), device='cpu')

class ModelParallelResNet50(ResNet):
def __init__(self, num_classes=1000):
super().__init__(Bottleneck, [3, 4, 6, 3], num_classes=num_classes)
# conv1 => bn1 => relu => maxpool => layer1-layer4 => avgpool => fc
self.seq1 = nn.Sequential(
self.conv1,
self.bn1,
self.relu,
self.maxpool,
self.layer1,
self.layer2
).to('cuda:0')

self.seq2 = nn.Sequential(
self.layer3,
self.layer4,
self.avgpool,
).to('cuda:1')

self.fc.to('cuda:1')

def forward(self, x):
x = self.seq2(self.seq1(x).to('cuda:1'))
return self.fc(x.view(x.size(0), -1))

def model_size(model):
return sum([para.numel() for para in model.parameters()])

one_hot_indices = torch.LongTensor(5) \\
.random_(0, num_classes) \\
.view(5, 1)
labels = torch.zeros(5, num_classes) \\
.scatter_(1, one_hot_indices, 1)