Fixes#79828
In distributed environment, before this PR, DataLoader would create a Tensor holding the shared seed in RANK 0 and send the Tensor to other processes. However, when `NCCL` is used as the distributed backend, the Tensor is required to be moved to cuda before broadcasted from RANK 0 to other RANKs. And, this causes the Issue where DataLoader doesn't move the Tensor to cuda before sharing using `NCCL`.
After offline discussion with @mrshenli, we think the distributed Store is a better solution as the shared seed is just an integer value. Then, we can get rid of the dependency on NCCL and CUDA when sharing info between distributed processes for DataLoader.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/79829
Approved by: https://github.com/VitalyFedyunin, https://github.com/NivekT
1. Change the sharding strategy from sharding by worker first then by rank to sharding in the order of rank then workers.
2. Change to fetch Rank and World size in main process for the sake of `spawn`.
For the change 1:
Before this PR, for the case when dataset can not be evenly divided by `worker_num * world_size`, more data will be retrieved by workers in first RANKs.
Using the following example:
- dataset size: 100
- world_size: 4
- num_worker: 2
The number of data retrieved by each rank before this PR
- Rank 0: 26
- Rank 1: 26
- Rank 2: 24
- Rank 3: 24
The number of data retrieved by each rank after this PR
- Rank 0: 25
- Rank 1: 25
- Rank 2: 25
- Rank 3: 25
For the change 2:
Before this PR, `dist` functions are invoked inside worker processes. It's fine when the worker processes are forked from the parent process. All environment variables are inherited and exposed to these `dist` functions. However, when the worker processes are spawned, they won't be able to access to these environment variables, then the dataset won't be sharded by rank.
After this PR, `_sharding_worker_init_fn` should be working for both `spawn` and `fork` case.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/79041
Approved by: https://github.com/VitalyFedyunin, https://github.com/NivekT
Fixes https://github.com/pytorch/data/issues/426
This PR introduces two main changes:
- It ensures the `ShufflerDataPipe` would share the same seed across distributed processes.
- Users can reset `shuffle` for persistent workers per epoch.
Detail:
- `shared_seed` is shared across distributed and worker processes. It will seed a `shared_rng` to provide seeds to each `ShufflerDataPipe` in the pipeline
- `worker_loop` now accepts a new argument of `shared_seed` to accept this shared seed.
- The `shared_seed` is attached to `_ResumeIteration` for resetting seed per epoch for `persistent worker`
- I choose not to touch `base_seed` simply for BC issue
I used this [script](https://gist.github.com/ejguan/d88f75fa822cb696ab1bc5bc25844f47) to test the result with `world_size=4`. Please check the result in: https://gist.github.com/ejguan/6ee2d2de12ca57f9eb4b97ef5a0e300b
You can see there isn't any duplicated/missing element for each epoch. And, with the same seed, the order of data remains the same across epochs.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/78765
Approved by: https://github.com/VitalyFedyunin
Fixes https://github.com/pytorch/data/issues/426
This PR introduces two main changes:
- It ensures the `ShufflerDataPipe` would share the same seed across distributed processes.
- Users can reset `shuffle` for persistent workers per epoch.
Detail:
- `shared_seed` is shared across distributed and worker processes. It will seed a `shared_rng` to provide seeds to each `ShufflerDataPipe` in the pipeline
- `worker_loop` now accepts a new argument of `shared_seed` to accept this shared seed.
- The `shared_seed` is attached to `_ResumeIteration` for resetting seed per epoch for `persistent worker`
- I choose not to touch `base_seed` simply for BC issue
I used this [script](https://gist.github.com/ejguan/d88f75fa822cb696ab1bc5bc25844f47) to test the result with `world_size=4`. Please check the result in: https://gist.github.com/ejguan/6ee2d2de12ca57f9eb4b97ef5a0e300b
You can see there isn't any duplicated/missing element for each epoch. And, with the same seed, the order of data remains the same across epochs.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/78765
Approved by: https://github.com/VitalyFedyunin
This is the first PR to make DataPipe deterministic.
Users should be able to use `torch.manual_seed(seed)` to control the shuffle order for the following cases:
- Directly over `DataPipe`
- For single-process DataLoader
- Multiprocessing DataLoader
Unfortunately, for distributed training, users have to run `apply_shuffle_seed` manually to make sure all distributed processes having the same order of shuffle.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/77741
Approved by: https://github.com/VitalyFedyunin, https://github.com/NivekT
Summary:
X-link: https://github.com/pytorch/data/pull/368
This is PR aims to expose the right data-relate API.
There are two more changes made in this PR to convert public api to private api
`check_lambda_fn` -> `_check_lambda_fn`
`deprecation_warning` -> `_deprecation_warning`
Pull Request resolved: https://github.com/pytorch/pytorch/pull/76143
Reviewed By: albanD, NivekT
Differential Revision: D35798311
Pulled By: ejguan
fbshipit-source-id: b13fded5c88a533c706702fb2070c918c839dca4
(cherry picked from commit 0b534b829a2e90e1e533951c6d334fdeaa9358b9)
Summary:
pin_memory, has optional device parameter to specify
which device you want to pin for. With this above change
the Dataloader will work only for CUDA backend. To add
support for other backend which supports pinned memory,
dataloader is updated with device as optional parameter.
Fixes #{issue number}
Pull Request resolved: https://github.com/pytorch/pytorch/pull/65402
Reviewed By: zou3519
Differential Revision: D32282204
Pulled By: VitalyFedyunin
fbshipit-source-id: e2e09876969af108d0db38af7c2d1b2f1cfa9858
(cherry picked from commit 3b76e151964fce442e27fe8fb5c37af930da4fa1)
Without this, `DataLoader2` will just add an `Shuffler` to the end of the datapipe if `shuffle=True`:
```py
from torch.utils.data.dataloader_experimental import DataLoader2
from torchdata.datapipes.iter import IterableWrapper, IterDataPipe, Shuffler
class Sorter(IterDataPipe):
def __init__(self, datapipe):
self.datapipe = datapipe
def __iter__(self):
return iter(sorted(self.datapipe))
data = list(range(1000))
dp = IterableWrapper(data)
dp = Shuffler(dp).set_shuffle(False)
dp = Sorter(dp)
dl2 = DataLoader2(dp, shuffle=True, batch_size=None)
assert list(dl2) == data # fails unless you hit a lucky random seed
```
This example is somewhat non-sensical, but demonstrates we cannot simply add a `Shuffler`.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/75014
Approved by: https://github.com/ejguan
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/71579Fixes#1551
As the comment in the code, register a function to terminate persistent workers.
By adding a reference of these workers in `atexit`, it would prevent Python interpreter kills these persistent worker processes before `pin_memorh_thread` exits.
And, if users explicitly kills DataLoader iterator, such function in `atexit` would be a no-op.
Test Plan: Imported from OSS
Reviewed By: VitalyFedyunin
Differential Revision: D33896537
Pulled By: ejguan
fbshipit-source-id: 36b57eac7523d8aa180180c2b61fc693ea4638ae
(cherry picked from commit 05add2ae0f)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/71579Fixes#1551
As the comment in the code, register a function to terminate persistent workers. Using `atexit` to make sure termination of persistent workers always happens at the end (after pin_memory_thread exits).
We need such mechanism because Python interpreter would clean up worker process before DataLoader iterator in some rare cases.
Test Plan: Imported from OSS
Reviewed By: VitalyFedyunin
Differential Revision: D33694867
Pulled By: ejguan
fbshipit-source-id: 0847f4d424a0cd6b3c0be8235d505415970254e8
(cherry picked from commit 18ad4621af)
Summary:
This is to fix Pyre errors in our applications:
* calling `tensor.cos()` etc.
* creating a data loader with batch sampler that is `List[List[int]]`.
Test Plan: TODO: rebase the diffs and run Pyre.
Reviewed By: ejguan
Differential Revision: D31309564
fbshipit-source-id: 1c6f3070d7570260de170e2fe2153d277b246745
Summary:
Not entirely sure how to use TypeVar but if someone could give me a hint it would be appreciated. Also let me know if you want me to add tests so we can make sure non-integer samplers actually work. It seems like `test/test_dataloader.py` is the correct location but that's a big file.
Fixes https://github.com/pytorch/pytorch/issues/63483
ejguan
Pull Request resolved: https://github.com/pytorch/pytorch/pull/63500
Reviewed By: mruberry
Differential Revision: D30403689
Pulled By: ejguan
fbshipit-source-id: 464e09e5aad3215b94a29cc5e21cb4b10ec136e3
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/60959
Add TorchVitals for Dataloader, this indicates that the data loader was enabled.
This is a no-op if TORCH_VITALS environment variable is not set.
Test Plan: buck test mode/dbg caffe2/test:torch -- --regex vitals
Reviewed By: VitalyFedyunin
Differential Revision: D29445146
fbshipit-source-id: d5778fff3dafb3c0463fec7a498bff4905597518
Summary:
During development it is common practice to put `type: ignore` comments on lines that are correct, but `mypy` doesn't recognize this. This often stems from the fact, that the used `mypy` version wasn't able to handle the used pattern.
With every new release `mypy` gets better at handling complex code. In addition to fix all the previously accepted but now failing patterns, we should also revisit all `type: ignore` comments to see if they are still needed or not. Fortunately, we don't need to do it manually: by adding `warn_unused_ignores = True` to the configuration, `mypy` will error out in case it encounters an `type: ignore` that is no longer needed.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/60006
Reviewed By: jbschlosser, malfet
Differential Revision: D29133237
Pulled By: albanD
fbshipit-source-id: 41e82edc5cd5affa7ccedad044b59b94dad4425a
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/56528
Tried to search across internal and external usage of DataLoader. People haven't started to use `generator` for `DataLoader`.
Test Plan: Imported from OSS
Reviewed By: albanD
Differential Revision: D27908487
Pulled By: ejguan
fbshipit-source-id: 14c83ed40d4ba4dc988b121968a78c2732d8eb93
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/56488
Considering amount of requests for this feature, introduce numpy seeding as default within each worker for DataLoader.
## BC-breaking Note:
- By introducing default numpy.random seeding strategy to workers of DataLoader, users don't need to manually set seed for workers by the `worker_init_fn`. And this PR won't influence users who are currently using `worker_init_fn` to set customized seed for workers.
- DataLoader will preserve reproducibility for users who are using numpy.random within Dataset.
- Multiprocessing (without `worker_init_fn` to define seed for numpy)
- Start method as `spawn`: Each worker will now have seed for numpy random, rather than the seed generated from the imported time of Numpy module that make the DataLoader lose the reproducibility.
- Start method as `fork`: Each worker not only have the same benefit as `spawn`, but also have different seed for numpy as default, rather than inheriting the same seed.
Using the following Dataset and script as an example:
```py
class RandomDataset(Dataset):
def __getitem__(self, ind):
item = [ind, np.random.randint(1, 10000)]
return item
def __len__(self):
return 20
if __name__ == '__main__'"
ctx = mp.get_context('fork')
ds = RandomDataset()
g = torch.Generator()
g.manual_seed(0)
dl = DataLoader(ds, 2, shuffle=False, num_workers=4, multiprocessing_context=ctx, generator=g)
epochs = 2
for _ in range(epochs):
for batch in d;:
print(batch)
print("====" * 10)
```
### 1.8.1:
Each worker generates same random result per iteration. And the seed will be reset to same for each epoch.
```py
tensor([[ 0, 7449],
[ 1, 1519]])
tensor([[ 2, 7449],
[ 3, 1519]])
tensor([[ 4, 9645],
[ 5, 2387]])
tensor([[ 6, 9645],
[ 7, 2387]])
tensor([[ 8, 3118],
[ 9, 4552]])
=========================
tensor([[ 0, 7449],
[ 1, 1519]])
tensor([[ 2, 7449],
[ 3, 1519]])
tensor([[ 4, 9645],
[ 5, 2387]])
tensor([[ 6, 9645],
[ 7, 2387]])
tensor([[ 8, 3118],
[ 9, 4552]])
=========================
```
### This PR:
Each worker has different seed at the beginning and re-seed for each epoch.
```py
tensor([[ 0, 8715],
[ 1, 5555]])
tensor([[ 2, 6379],
[ 3, 1432]])
tensor([[ 4, 3271],
[ 5, 5132]])
tensor([[ 6, 4287],
[ 7, 1104]])
tensor([[ 8, 8682],
[ 9, 1699]])
=========================
tensor([[ 0, 1374],
[ 1, 996]])
tensor([[ 2, 143],
[ 3, 3507]])
tensor([[ 4, 5887],
[ 5, 4730]])
tensor([[ 6, 7274],
[ 7, 738]])
tensor([[ 8, 6374],
[ 9, 1572]])
=========================
```
Test Plan: Imported from OSS
Reviewed By: albanD
Differential Revision: D27908486
Pulled By: ejguan
fbshipit-source-id: 5f313a30563bedeb88be214fa4beca0cefe9e4f4
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/49486
Remove code for Python 3.5 and lower.
There's more that can be removed/modernised, but sticking mainly to redundant version checks here, to keep the diff/PR smaller.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/46579
Reviewed By: zou3519
Differential Revision: D24453571
Pulled By: ezyang
fbshipit-source-id: c2cfcf05d6c5f65df64d89c331692c9aec09248e
Summary:
This small PR fixes a one character typo in the docstring for `DataLoader`.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/49437
Reviewed By: ngimel
Differential Revision: D25665971
Pulled By: mrshenli
fbshipit-source-id: b60f975f1e3bf0bb8f88e39f490f716c602f087e
Summary:
Fixes https://github.com/pytorch/pytorch/issues/47441
To give user more information about python level functions in profiler traces, we propose to instrument on the following functions:
```
_BaseDataLoaderIter.__next__
Optimizer.step
Optimizer.zero_grad
```
Because the record_function already uses if (!active) to check whether the profiler is enabled, so we don't explicitly call torch.autograd._profiler_enabled() before each instrument.
Acknowledgement: nbcsm, guotuofeng, gunandrose4u , guyang3532 , mszhanyi
Pull Request resolved: https://github.com/pytorch/pytorch/pull/47655
Reviewed By: smessmer
Differential Revision: D24960386
Pulled By: ilia-cher
fbshipit-source-id: 2eb655789e2e2f506e1b8f95ad3d470c83281102
Summary:
This PR needs discussion as it changes the behavior of `DataLoader`. It can be closed if its not considered a good practice.
Currently, the `DataLoader` spawns a new `_BaseDataLoaderIter` object every epoch,
In the case of the multiprocess DataLoader, every epoch the worker processes are re-created and they make a copy of the original `Dataset` object.
If users want to cache data or do some tracking on their datasets, all their data will be wiped out every epoch. Notice that this doesn't happen when the number of workers is 0. giving some inconsistencies with the multiprocess and serial data loaders.
This PR keeps the `_BaseDataLoaderIter` object alive and just resets it within epochs, so the workers remain active and so their own `Dataset` objects. People seem to file issues about this often.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/35795
Reviewed By: ailzhang
Differential Revision: D23426612
Pulled By: VitalyFedyunin
fbshipit-source-id: e16950036bae35548cd0cfa78faa06b6c232a2ea