mirror of
https://github.com/zebrajr/pytorch.git
synced 2025-12-07 12:21:27 +01:00
Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/22479 In some cases, for example, when we training on CTR data, we would like to start training from old samples and finish on new recent samples. This diff add the option to disable the shuffling in DistributedSampler to accommodate this use case. Reviewed By: soumith Differential Revision: D16100388 fbshipit-source-id: 35566581f5250040b2db5ec408a63037b47a9f5d
68 lines
2.3 KiB
Python
68 lines
2.3 KiB
Python
import math
|
|
import torch
|
|
from . import Sampler
|
|
import torch.distributed as dist
|
|
|
|
|
|
class DistributedSampler(Sampler):
|
|
"""Sampler that restricts data loading to a subset of the dataset.
|
|
|
|
It is especially useful in conjunction with
|
|
:class:`torch.nn.parallel.DistributedDataParallel`. In such case, each
|
|
process can pass a DistributedSampler instance as a DataLoader sampler,
|
|
and load a subset of the original dataset that is exclusive to it.
|
|
|
|
.. note::
|
|
Dataset is assumed to be of constant size.
|
|
|
|
Arguments:
|
|
dataset: Dataset used for sampling.
|
|
num_replicas (optional): Number of processes participating in
|
|
distributed training.
|
|
rank (optional): Rank of the current process within num_replicas.
|
|
shuffle (optional): If true (default), sampler will shuffle the indices
|
|
"""
|
|
|
|
def __init__(self, dataset, num_replicas=None, rank=None, shuffle=True):
|
|
if num_replicas is None:
|
|
if not dist.is_available():
|
|
raise RuntimeError("Requires distributed package to be available")
|
|
num_replicas = dist.get_world_size()
|
|
if rank is None:
|
|
if not dist.is_available():
|
|
raise RuntimeError("Requires distributed package to be available")
|
|
rank = dist.get_rank()
|
|
self.dataset = dataset
|
|
self.num_replicas = num_replicas
|
|
self.rank = rank
|
|
self.epoch = 0
|
|
self.num_samples = int(math.ceil(len(self.dataset) * 1.0 / self.num_replicas))
|
|
self.total_size = self.num_samples * self.num_replicas
|
|
self.shuffle = shuffle
|
|
|
|
def __iter__(self):
|
|
# deterministically shuffle based on epoch
|
|
g = torch.Generator()
|
|
g.manual_seed(self.epoch)
|
|
if self.shuffle:
|
|
indices = torch.randperm(len(self.dataset), generator=g).tolist()
|
|
else:
|
|
indices = list(range(len(self.dataset)))
|
|
|
|
|
|
# add extra samples to make it evenly divisible
|
|
indices += indices[:(self.total_size - len(indices))]
|
|
assert len(indices) == self.total_size
|
|
|
|
# subsample
|
|
indices = indices[self.rank:self.total_size:self.num_replicas]
|
|
assert len(indices) == self.num_samples
|
|
|
|
return iter(indices)
|
|
|
|
def __len__(self):
|
|
return self.num_samples
|
|
|
|
def set_epoch(self, epoch):
|
|
self.epoch = epoch
|