Move `ShardingFilterIterDataPipe` into a dedicated file.
Also, propose to have a dedicated parent class (`_ShardingIterDataPipe`) for sharding data pipe, as this seems more like a "system/engine-level" datapipe that gives strong hints to RS on how to execute, and needs first-class citizen treatment in RS (compared with other "user-level" datapipe that are mostly composable `Callable[[Iterable], Iterable]`. So we don't need to based on whether `is_shardable` and `apply_sharding` are presented in DataPipe in `graph_settings.py`. But open to other discussions.
Open question: Should
[ShardingRoundRobinDispatcherIterDataPipe](01fc762003/torchdata/datapipes/iter/util/sharding.py (L16-L17)) also be considered as a `_ShardingIterDataPipe`? (e.g. this sharding is executed by replicating (the metadata), while `ShardingRoundRobinDispatcherIterDataPipe` hints too expensive to replicate so requires round robin data exchange/dispatch).
Differential Revision: D43014692
Pull Request resolved: https://github.com/pytorch/pytorch/pull/94095
Approved by: https://github.com/ejguan, https://github.com/NivekT
This PR requires PR is landed: https://github.com/pytorch/pytorch/pull/83202
## changes
- For `apply_shuffle_setting` and `apply_shuffle_seed`, it makes sure it will apply shuffle setting to each of DataPipe that contains a method called `set_shuffle` or `set_seed`.
- Change the API from `apply_shuffle_seed` to `apply_random_seed`.
- Fix a bug that `apply_shuffle_seed` only accepts DataPipe that is hashable. After the PR, this function uses `id` to prevent seeding the same DataPipe multiple times per epoch.
- Fix another bug from `shuffler` that `reset` with `_enable=False` would also reset `_seed`.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/83741
Approved by: https://github.com/NivekT
### Description
Across PyTorch's docstrings, both `callable` and `Callable` for variable types. The Callable should be capitalized as we are referring to the `Callable` type, and not the Python `callable()` function.
### Testing
There shouldn't be any testing required.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/82487
Approved by: https://github.com/albanD
Fixes#79828
In distributed environment, before this PR, DataLoader would create a Tensor holding the shared seed in RANK 0 and send the Tensor to other processes. However, when `NCCL` is used as the distributed backend, the Tensor is required to be moved to cuda before broadcasted from RANK 0 to other RANKs. And, this causes the Issue where DataLoader doesn't move the Tensor to cuda before sharing using `NCCL`.
After offline discussion with @mrshenli, we think the distributed Store is a better solution as the shared seed is just an integer value. Then, we can get rid of the dependency on NCCL and CUDA when sharing info between distributed processes for DataLoader.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/79829
Approved by: https://github.com/VitalyFedyunin, https://github.com/NivekT
1. Change the sharding strategy from sharding by worker first then by rank to sharding in the order of rank then workers.
2. Change to fetch Rank and World size in main process for the sake of `spawn`.
For the change 1:
Before this PR, for the case when dataset can not be evenly divided by `worker_num * world_size`, more data will be retrieved by workers in first RANKs.
Using the following example:
- dataset size: 100
- world_size: 4
- num_worker: 2
The number of data retrieved by each rank before this PR
- Rank 0: 26
- Rank 1: 26
- Rank 2: 24
- Rank 3: 24
The number of data retrieved by each rank after this PR
- Rank 0: 25
- Rank 1: 25
- Rank 2: 25
- Rank 3: 25
For the change 2:
Before this PR, `dist` functions are invoked inside worker processes. It's fine when the worker processes are forked from the parent process. All environment variables are inherited and exposed to these `dist` functions. However, when the worker processes are spawned, they won't be able to access to these environment variables, then the dataset won't be sharded by rank.
After this PR, `_sharding_worker_init_fn` should be working for both `spawn` and `fork` case.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/79041
Approved by: https://github.com/VitalyFedyunin, https://github.com/NivekT
Fixes https://github.com/pytorch/data/issues/426
This PR introduces two main changes:
- It ensures the `ShufflerDataPipe` would share the same seed across distributed processes.
- Users can reset `shuffle` for persistent workers per epoch.
Detail:
- `shared_seed` is shared across distributed and worker processes. It will seed a `shared_rng` to provide seeds to each `ShufflerDataPipe` in the pipeline
- `worker_loop` now accepts a new argument of `shared_seed` to accept this shared seed.
- The `shared_seed` is attached to `_ResumeIteration` for resetting seed per epoch for `persistent worker`
- I choose not to touch `base_seed` simply for BC issue
I used this [script](https://gist.github.com/ejguan/d88f75fa822cb696ab1bc5bc25844f47) to test the result with `world_size=4`. Please check the result in: https://gist.github.com/ejguan/6ee2d2de12ca57f9eb4b97ef5a0e300b
You can see there isn't any duplicated/missing element for each epoch. And, with the same seed, the order of data remains the same across epochs.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/78765
Approved by: https://github.com/VitalyFedyunin
Fixes https://github.com/pytorch/data/issues/426
This PR introduces two main changes:
- It ensures the `ShufflerDataPipe` would share the same seed across distributed processes.
- Users can reset `shuffle` for persistent workers per epoch.
Detail:
- `shared_seed` is shared across distributed and worker processes. It will seed a `shared_rng` to provide seeds to each `ShufflerDataPipe` in the pipeline
- `worker_loop` now accepts a new argument of `shared_seed` to accept this shared seed.
- The `shared_seed` is attached to `_ResumeIteration` for resetting seed per epoch for `persistent worker`
- I choose not to touch `base_seed` simply for BC issue
I used this [script](https://gist.github.com/ejguan/d88f75fa822cb696ab1bc5bc25844f47) to test the result with `world_size=4`. Please check the result in: https://gist.github.com/ejguan/6ee2d2de12ca57f9eb4b97ef5a0e300b
You can see there isn't any duplicated/missing element for each epoch. And, with the same seed, the order of data remains the same across epochs.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/78765
Approved by: https://github.com/VitalyFedyunin
This is the first PR to make DataPipe deterministic.
Users should be able to use `torch.manual_seed(seed)` to control the shuffle order for the following cases:
- Directly over `DataPipe`
- For single-process DataLoader
- Multiprocessing DataLoader
Unfortunately, for distributed training, users have to run `apply_shuffle_seed` manually to make sure all distributed processes having the same order of shuffle.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/77741
Approved by: https://github.com/VitalyFedyunin, https://github.com/NivekT
Summary:
X-link: https://github.com/pytorch/data/pull/368
This is PR aims to expose the right data-relate API.
There are two more changes made in this PR to convert public api to private api
`check_lambda_fn` -> `_check_lambda_fn`
`deprecation_warning` -> `_deprecation_warning`
Pull Request resolved: https://github.com/pytorch/pytorch/pull/76143
Reviewed By: albanD, NivekT
Differential Revision: D35798311
Pulled By: ejguan
fbshipit-source-id: b13fded5c88a533c706702fb2070c918c839dca4
(cherry picked from commit 0b534b829a2e90e1e533951c6d334fdeaa9358b9)
Summary:
pin_memory, has optional device parameter to specify
which device you want to pin for. With this above change
the Dataloader will work only for CUDA backend. To add
support for other backend which supports pinned memory,
dataloader is updated with device as optional parameter.
Fixes #{issue number}
Pull Request resolved: https://github.com/pytorch/pytorch/pull/65402
Reviewed By: zou3519
Differential Revision: D32282204
Pulled By: VitalyFedyunin
fbshipit-source-id: e2e09876969af108d0db38af7c2d1b2f1cfa9858
(cherry picked from commit 3b76e151964fce442e27fe8fb5c37af930da4fa1)
Without this, `DataLoader2` will just add an `Shuffler` to the end of the datapipe if `shuffle=True`:
```py
from torch.utils.data.dataloader_experimental import DataLoader2
from torchdata.datapipes.iter import IterableWrapper, IterDataPipe, Shuffler
class Sorter(IterDataPipe):
def __init__(self, datapipe):
self.datapipe = datapipe
def __iter__(self):
return iter(sorted(self.datapipe))
data = list(range(1000))
dp = IterableWrapper(data)
dp = Shuffler(dp).set_shuffle(False)
dp = Sorter(dp)
dl2 = DataLoader2(dp, shuffle=True, batch_size=None)
assert list(dl2) == data # fails unless you hit a lucky random seed
```
This example is somewhat non-sensical, but demonstrates we cannot simply add a `Shuffler`.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/75014
Approved by: https://github.com/ejguan
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/71579Fixes#1551
As the comment in the code, register a function to terminate persistent workers.
By adding a reference of these workers in `atexit`, it would prevent Python interpreter kills these persistent worker processes before `pin_memorh_thread` exits.
And, if users explicitly kills DataLoader iterator, such function in `atexit` would be a no-op.
Test Plan: Imported from OSS
Reviewed By: VitalyFedyunin
Differential Revision: D33896537
Pulled By: ejguan
fbshipit-source-id: 36b57eac7523d8aa180180c2b61fc693ea4638ae
(cherry picked from commit 05add2ae0f)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/71579Fixes#1551
As the comment in the code, register a function to terminate persistent workers. Using `atexit` to make sure termination of persistent workers always happens at the end (after pin_memory_thread exits).
We need such mechanism because Python interpreter would clean up worker process before DataLoader iterator in some rare cases.
Test Plan: Imported from OSS
Reviewed By: VitalyFedyunin
Differential Revision: D33694867
Pulled By: ejguan
fbshipit-source-id: 0847f4d424a0cd6b3c0be8235d505415970254e8
(cherry picked from commit 18ad4621af)
Summary:
This is to fix Pyre errors in our applications:
* calling `tensor.cos()` etc.
* creating a data loader with batch sampler that is `List[List[int]]`.
Test Plan: TODO: rebase the diffs and run Pyre.
Reviewed By: ejguan
Differential Revision: D31309564
fbshipit-source-id: 1c6f3070d7570260de170e2fe2153d277b246745
Summary:
Not entirely sure how to use TypeVar but if someone could give me a hint it would be appreciated. Also let me know if you want me to add tests so we can make sure non-integer samplers actually work. It seems like `test/test_dataloader.py` is the correct location but that's a big file.
Fixes https://github.com/pytorch/pytorch/issues/63483
ejguan
Pull Request resolved: https://github.com/pytorch/pytorch/pull/63500
Reviewed By: mruberry
Differential Revision: D30403689
Pulled By: ejguan
fbshipit-source-id: 464e09e5aad3215b94a29cc5e21cb4b10ec136e3
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/60959
Add TorchVitals for Dataloader, this indicates that the data loader was enabled.
This is a no-op if TORCH_VITALS environment variable is not set.
Test Plan: buck test mode/dbg caffe2/test:torch -- --regex vitals
Reviewed By: VitalyFedyunin
Differential Revision: D29445146
fbshipit-source-id: d5778fff3dafb3c0463fec7a498bff4905597518
Summary:
During development it is common practice to put `type: ignore` comments on lines that are correct, but `mypy` doesn't recognize this. This often stems from the fact, that the used `mypy` version wasn't able to handle the used pattern.
With every new release `mypy` gets better at handling complex code. In addition to fix all the previously accepted but now failing patterns, we should also revisit all `type: ignore` comments to see if they are still needed or not. Fortunately, we don't need to do it manually: by adding `warn_unused_ignores = True` to the configuration, `mypy` will error out in case it encounters an `type: ignore` that is no longer needed.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/60006
Reviewed By: jbschlosser, malfet
Differential Revision: D29133237
Pulled By: albanD
fbshipit-source-id: 41e82edc5cd5affa7ccedad044b59b94dad4425a