Fixes https://github.com/pytorch/data/issues/426
This PR introduces two main changes:
- It ensures the `ShufflerDataPipe` would share the same seed across distributed processes.
- Users can reset `shuffle` for persistent workers per epoch.
Detail:
- `shared_seed` is shared across distributed and worker processes. It will seed a `shared_rng` to provide seeds to each `ShufflerDataPipe` in the pipeline
- `worker_loop` now accepts a new argument of `shared_seed` to accept this shared seed.
- The `shared_seed` is attached to `_ResumeIteration` for resetting seed per epoch for `persistent worker`
- I choose not to touch `base_seed` simply for BC issue
I used this [script](https://gist.github.com/ejguan/d88f75fa822cb696ab1bc5bc25844f47) to test the result with `world_size=4`. Please check the result in: https://gist.github.com/ejguan/6ee2d2de12ca57f9eb4b97ef5a0e300b
You can see there isn't any duplicated/missing element for each epoch. And, with the same seed, the order of data remains the same across epochs.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/78765
Approved by: https://github.com/VitalyFedyunin
Fixes https://github.com/pytorch/data/issues/426
This PR introduces two main changes:
- It ensures the `ShufflerDataPipe` would share the same seed across distributed processes.
- Users can reset `shuffle` for persistent workers per epoch.
Detail:
- `shared_seed` is shared across distributed and worker processes. It will seed a `shared_rng` to provide seeds to each `ShufflerDataPipe` in the pipeline
- `worker_loop` now accepts a new argument of `shared_seed` to accept this shared seed.
- The `shared_seed` is attached to `_ResumeIteration` for resetting seed per epoch for `persistent worker`
- I choose not to touch `base_seed` simply for BC issue
I used this [script](https://gist.github.com/ejguan/d88f75fa822cb696ab1bc5bc25844f47) to test the result with `world_size=4`. Please check the result in: https://gist.github.com/ejguan/6ee2d2de12ca57f9eb4b97ef5a0e300b
You can see there isn't any duplicated/missing element for each epoch. And, with the same seed, the order of data remains the same across epochs.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/78765
Approved by: https://github.com/VitalyFedyunin
This is the first PR to make DataPipe deterministic.
Users should be able to use `torch.manual_seed(seed)` to control the shuffle order for the following cases:
- Directly over `DataPipe`
- For single-process DataLoader
- Multiprocessing DataLoader
Unfortunately, for distributed training, users have to run `apply_shuffle_seed` manually to make sure all distributed processes having the same order of shuffle.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/77741
Approved by: https://github.com/VitalyFedyunin, https://github.com/NivekT
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/76384
OSS issue discussion: https://github.com/pytorch/data/issues/346
This diff updates `mux` and `mux_longest` data pipe.
`mux`: Yields one element at a time from each of the input Iterable DataPipes (functional name: ``mux``). As in, one element from the 1st input DataPipe, then one element from the 2nd DataPipe in the next iteration, and so on. It ends when the shortest input DataPipe is exhausted.
`mux` example:
```
>>> from torchdata.datapipes.iter import IterableWrapper
>>> dp1, dp2, dp3 = IterableWrapper(range(3)), IterableWrapper(range(10, 15)), IterableWrapper(range(20, 25))
>>> list(dp1.mux(dp2, dp3))
[0, 10, 20, 1, 11, 21, 2, 12, 22]
```
Test Plan:
buck test mode/opt //caffe2/test:datapipe
https://www.internalfb.com/intern/testinfra/testrun/4785074706282345
Differential Revision: D36017945
Pull Request resolved: https://github.com/pytorch/pytorch/pull/77145
Approved by: https://github.com/NivekT, https://github.com/ejguan
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/76384
OSS issue discussion: https://github.com/pytorch/data/issues/346
This diff updates `mux` and `mux_longest` data pipe.
`mux`: Yields one element at a time from each of the input Iterable DataPipes (functional name: ``mux``). As in, one element from the 1st input DataPipe, then one element from the 2nd DataPipe in the next iteration, and so on. It ends when the shortest input DataPipe is exhausted.
`mux` example:
```
>>> from torchdata.datapipes.iter import IterableWrapper
>>> dp1, dp2, dp3 = IterableWrapper(range(3)), IterableWrapper(range(10, 15)), IterableWrapper(range(20, 25))
>>> list(dp1.mux(dp2, dp3))
[0, 10, 20, 1, 11, 21, 2, 12, 22]
```
Test Plan:
buck test mode/dev //pytorch/data/test:tests -- --exact 'pytorch/data/test:tests - test_mux_longest_iterdatapipe (test_datapipe.TestDataPipe)'
https://www.internalfb.com/intern/testinfra/testrun/3096224791148107
Reviewed By: ejguan
Differential Revision: D35799965
fbshipit-source-id: 320e71a342ec27e6e9200624aad42f4b99f97c3a
(cherry picked from commit 741ed595275df6c05026ed6f0e78d7052328fb7d)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/73396
Separating DataPipes from Dataset into different files. This makes the code more maintainable and simplifies some of the code generation.
I have also tried to move `datapipe.py` into `torch.utils.data.datapipes`, but that will lead to circular import and rewriting many import statements. Should I put more time and go down that path some more?
Fixes https://github.com/pytorch/data/issues/213
Test Plan: Imported from OSS
Reviewed By: ejguan
Differential Revision: D34481962
Pulled By: NivekT
fbshipit-source-id: 42fb26fe7fc334636852cfd8719fc807bdaa7912
(cherry picked from commit 81e76a64e297cb5c58caa951c554e49526173936)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/73119
Test if a DataPipe is serializable after its contents are partially read and completely read. This is especially important for DataPipes with buffers.
Test Plan: Imported from OSS
Reviewed By: ejguan
Differential Revision: D34354496
Pulled By: NivekT
fbshipit-source-id: 36971d68b9ca1de81fb254e9a459b8f54fe0f9ff
(cherry picked from commit e8f39a7aa364bd2b19145788f7e67c06f948f81b)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/72896
Fixing the issue described here: https://github.com/pytorch/data/issues/214
There will be a follow-up PR in TorchData as well
Test Plan: Imported from OSS
Reviewed By: gchanan
Differential Revision: D34258669
Pulled By: NivekT
fbshipit-source-id: 6dd88250ed14ebe779915dc46139be7e012e9d1b
(cherry picked from commit 025b8ed98019e576bfef04c33a3f33ed1a426a66)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/72123
There is a bug to fix the typing system in DataPipe, which would take more than 1 week to fix. I will follow up on it later this month. As branch cut is today, add this PR to disable typing to make sure release works.
Test Plan: Imported from OSS
Reviewed By: VitalyFedyunin
Differential Revision: D33920610
Pulled By: ejguan
fbshipit-source-id: febff849ab2272fd3b1c5127a20f27eb82992d9c
(cherry picked from commit ee103e62e7)
Summary:
Fixes https://github.com/pytorch/pytorch/issues/70103
I used an argument so it can be disabled. I called it `deterministic_order` because `sort` can be confusing, as it's actually sorted but by dir levels.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/70435
Reviewed By: albanD
Differential Revision: D33899755
Pulled By: ejguan
fbshipit-source-id: e8a08f03a49120333b2d27f332cd21a3240a02a9
(cherry picked from commit 4616e43ec3)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/70215
A few renaming, formatting, and additional tests to make the unit tests better.
cc VitalyFedyunin ejguan NivekT
Test Plan: Imported from OSS
Reviewed By: ejguan
Differential Revision: D33344610
Pulled By: NivekT
fbshipit-source-id: bb36f7452bdc44964c9ce0650c7ae308ba2c5aa5
(cherry picked from commit 0aae20cb27)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/71161
Users should import these DataPipes from [TorchData](https://github.com/pytorch/data) if they would like to use them. We will be checking for any downstream library usage before landing this PR.
Test Plan: Imported from OSS
Reviewed By: mruberry
Differential Revision: D33532272
Pulled By: NivekT
fbshipit-source-id: 9dbfb21baf2d1183e0aa379049ad8304753e08a1
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/70367
This PR renames the `FileLoaderIterDataPipe` to `FileOpenerIterDataPipe`. For the sake of not breaking many CI tests immediately, it still preserves `FileLoader` as an alias. This will allow downstream libraries/users to migrate their use cases before we fully remove all references to `FileLoader` from PyTorch.
Fixes https://github.com/pytorch/data/issues/103. More detailed discussion about this decision is also in the linked issue.
cc VitalyFedyunin ejguan NivekT pmeier Nayef211
Test Plan: Imported from OSS
Reviewed By: ejguan
Differential Revision: D33301648
Pulled By: NivekT
fbshipit-source-id: 59278dcd44e372df0ba2001a4eecbf9792580d0b
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/69391
As part of the efforts to unify the APIs across different data backends (e.g. TorchData, TorchArrow), we are making changes to different DataPipes' APIs. In this PR, we are removing the input argument `nesting_level` from `FilterIterDataPipe`.
cc VitalyFedyunin ejguan NivekT
Test Plan: Imported from OSS
Reviewed By: ejguan
Differential Revision: D32849462
Pulled By: NivekT
fbshipit-source-id: 91cf1dc03dd3d3cbd7a9c6ccbd791ade91355f30
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/69390
As part of the efforts to unify the APIs across different data backends (e.g. TorchData, TorchArrow), we are making changes to different DataPipes' APIs. In this PR, we are removing the input argument `nesting_level` from `MapperIterDataPipe`.
cc VitalyFedyunin ejguan NivekT
Test Plan: Imported from OSS
Reviewed By: ejguan
Differential Revision: D32849465
Pulled By: NivekT
fbshipit-source-id: 963ce70b84a7658331d126e5ed9fdb12273c8e1f
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/66277
Previously, it is grouped together with tests related to `MapDataPipe`, but it should be with `IterDataPipe`.
cc VitalyFedyunin ejguan NivekT
Test Plan: Imported from OSS
Reviewed By: ejguan
Differential Revision: D31485823
Pulled By: NivekT
fbshipit-source-id: d13d8c28cbfc305da0e3033d4109a0f971281a02
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/66275
Once this is added to Core, TorchData's PR will not need a custom class and can use this wrapper instead.
cc VitalyFedyunin ejguan NivekT
Test Plan: Imported from OSS
Reviewed By: ejguan
Differential Revision: D31485822
Pulled By: NivekT
fbshipit-source-id: 790de27629c89c0ca7163a8ee5a09ee8b8233340
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/65220Fixes#65221
- Remove deepcopy from Mapper to support file handles
- Convert `IterableWrapper` to deepcopy iterable instance within each iterator to prevent in-place modification (different data per epoch)
- Convert `IDP` to `IterableWrapper` in test_datapipe.py
- Refine the variable names (prevent using `dp` that is module reference)
Test Plan: Imported from OSS
Reviewed By: malfet
Differential Revision: D31021886
Pulled By: ejguan
fbshipit-source-id: 72a9eee66c758e2717d591cd0942892bddedc223
Summary:
ghstack is not working for the second commit so I'm manually creating this PR for now. Please only look at changes related to the second commit in this PR (there is a PR for the first commit).
This PR removes TarArchiveReader's dependency on FileLoader DataPipe, by allowing it to use a IterDataPipe of path names as input rather than a tuple of path name and a stream.
It also adds additional tests to ensure that the DataPipe is functioning properly when it is read multiple times or reset half way through reading.
The whole stack fixes https://github.com/pytorch/pytorch/issues/64281 - issues related to unclosed buffer stream.
Stack:
* __->__ https://github.com/pytorch/pytorch/issues/64788
* https://github.com/pytorch/pytorch/issues/64786
cc VitalyFedyunin ejguan
Pull Request resolved: https://github.com/pytorch/pytorch/pull/64788
Reviewed By: jbschlosser, ejguan
Differential Revision: D30901176
Pulled By: NivekT
fbshipit-source-id: 59746a8d0144fc6d3ce0feb2d76445b82e6d414e
Summary:
There are two warnings produced by `test_fork_datapipe`. This PR addresses the issues raised by those warnings without impacting the test cases.
cc VitalyFedyunin ejguan
Pull Request resolved: https://github.com/pytorch/pytorch/pull/64827
Reviewed By: ejguan
Differential Revision: D30870528
Pulled By: NivekT
fbshipit-source-id: 580a001c6fa3ff6f8b04a7e5183e58861938204b
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/64404
This PR remove `filter`'s inheritance from `map`. This allows `filter` to not have a `__len__` function and that behavior is what we would like.
cc VitalyFedyunin ejguan
Test Plan: Imported from OSS
Reviewed By: gchanan
Differential Revision: D30713120
Pulled By: NivekT
fbshipit-source-id: 4d5d07555297ee2bd4b49842c0d26cdc00638f6c
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/64220
Remove `ByKeyGrouperIterDataPipe` due to duplicated functionality.
Fix a bug in `GrouperIterDataPipe` using the existing test.
Test Plan: Imported from OSS
Reviewed By: VitalyFedyunin
Differential Revision: D30650542
Pulled By: ejguan
fbshipit-source-id: 666b4d28282fb4f49f3ff101b8d08be16a50d836
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/63422Fixes#63095
Make `DataChunk` delegate to list method. Then it will support in-place operations:
- `sort`
- `reverse`
- `append`
- `extend`
- `random.shuffle`
Test Plan: Imported from OSS
Reviewed By: ngimel
Differential Revision: D30379027
Pulled By: ejguan
fbshipit-source-id: d176bd0cc8b89b915c7bb184ff243ab1f605616d
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/62768
This is part of TorchArrow DF support preparation, separating it to multiple PRs to simplify review process.
Test Plan: Imported from OSS
Reviewed By: ejguan
Differential Revision: D30149090
Pulled By: VitalyFedyunin
fbshipit-source-id: a36b5ff56e2ac6b06060014d4cd41b487754acb8
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/61312
Sorting according to isort output. Alphabetically ordered one per line imports help merging.
Test Plan: Imported from OSS
Reviewed By: ejguan
Differential Revision: D29588833
Pulled By: VitalyFedyunin
fbshipit-source-id: 4c80c3086132b50894e734ad6c5799d78d689e42
Summary:
As part of https://github.com/pytorch/pytorch/issues/57031, this PR adds the ConcatMapDataPipe functional datapipe for the MapDataPipe class.
We may need to discuss how to treat the datapipes with no valid length. For now, I just use them as if they have infinite length and the `__getitem__` could not go pass them.
Thank you for your time on reviewing this~
cc ejguan
Pull Request resolved: https://github.com/pytorch/pytorch/pull/61010
Reviewed By: soulitzer
Differential Revision: D29587679
Pulled By: ejguan
fbshipit-source-id: 5eb97fa727209bec6c534520057c64a78000626e
Summary:
Fixes issues that are discussed with ezyang in the comments of PR https://github.com/pytorch/pytorch/issues/59498
Improved code and documentation clarity, and refactored .filter to nesting_level directly
Pull Request resolved: https://github.com/pytorch/pytorch/pull/60423
Reviewed By: ezyang
Differential Revision: D29281599
Pulled By: NivekT
fbshipit-source-id: a9bbaf52f492db0741c00f3ceb4022b08ddb1506
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/59816
Add two new DataPipes, one for getting web file urls to yield streams and one for getting streams to yield bytes.
Test Plan:
Add test_web_iterable_datapipe in test/test_datapipes.py. The test initiates a local http server for serving test files. Test below locally ok.
1. create and load 16M localhost file urls (each of size 10 Bytes)
2. create and load a 64GB localhost file
in the unit test, for sake of testing time, disabling both stress test and large file test
Imported from OSS
Reviewed By: VitalyFedyunin
Differential Revision: D29051186
fbshipit-source-id: f8e44491e670560bf445af96f94d98230436f396
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/58938
When run `test_datapipe.py`, python `gc` would report lots of `ResourceWarning`s due to unclosed stream. It's not only annoying, there are two potential problems:
- Performance regression because `gc` requires additional memory and computation to track reference
- Python `gc` runs periodically so we many encountered an error of too many open files due to OS limitation
To reduce the warning:
- Explicitly close byte stream
- Modify `test_datapipe.py` to use context manager
Small fix:
- Reorder import in `test_datapipe.py`
Further investigation:
Can we directly use context manager in `LoadFileFromDisk` and `ReadFileFromTar` to eliminate this Error?
- Probably no. It's feasible only if the pipeline is synchronized and without prefetching. When we enable these two features, the scope guard of the context manager doesn't work.
- We may need to implement some reference counter attached to these file byte stream to close by itself.
Test Plan: Imported from OSS
Reviewed By: jbschlosser
Differential Revision: D28689862
Pulled By: ejguan
fbshipit-source-id: bb2a85defb8a4ab5384db902ef6ad062185c2653
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/55836
Change construct_time_validation to argument_validation as we should provide users the flexibility to use this decorator over all different functions, which are required with type validation.
It can also work as a construct-time validation
```py
class ExampleDataPipe(IterDataPipe):
argument_validation
def __init__(self, dp: IterDataPipe[int]):
self.dp = dp
...
```
Notebook is also updated.
Test Plan: Imported from OSS
Reviewed By: VitalyFedyunin
Differential Revision: D27743478
Pulled By: ejguan
fbshipit-source-id: 49743152d121028cd7d72d89dc7df5c7c7b94c41
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/57824
Implement type check for string type. Re-raise detailed exception at compile time.
```py
>>> class InvalidData(Generic[T_co], NamedTuple): # Invalid generic namedtuple in Python typing
... name: str
... data: T_co
class DP(IterDataPipe['InvalidData[int]']):
... pass
TypeError: InvalidData[int] is not supported by Python typing
```
Add `__type_class__` attribute to class, which optimizes the static checking flow by reducing checking times.
```py
>>> class DP1(IterDataPipe[Union[int, str]]):
... pass
>>> class DP2(DP1[int]):
... pass
>>> list((cls, getattr(cls, '__type_class__', None)) for cls in DP2.__mro__)
[(<class '__main__.DP2'>, False), (<class 'abc.DP1[int]'>, True), (<class '__main__.DP1'>, False), (<class 'abc.IterableDataset[typing.Union[int, str]]'>, True), (<class 'torch.utils.data.dataset.IterableDataset'>, False), (<class 'torch.utils.data.dataset.Dataset'>, None), (<class 'typing.Generic'>, None), (<class 'object'>, None)]
```
Among the class of `DP2`'s MRO, only `DP2`, `DP1` will be static checked when `__type_class__` is `False`. `abc.DP1[int]` and `abc.IterableDataset[typing.Union[int, str]]` will be ignored since they are just a class with typing.
## Future
When Python 3.6 is deprecated, using TypeAlias rather than TypeMeta can eliminates the usage of `__type_class__` attribute.
Test Plan: Imported from OSS
Reviewed By: VitalyFedyunin
Differential Revision: D28289104
Pulled By: ejguan
fbshipit-source-id: 1da97460c8bfc48cea7396033fde484a24caba7c
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/54544
## Feature
- Add `subinstance(data, type)` to check `data` is a subtype instance of the `type`
- Add a decorator of `runtime_validation` to validate the returned data from `__iter__` is subtype instance of hint.
Test Plan: Imported from OSS
Reviewed By: VitalyFedyunin
Differential Revision: D27327234
Pulled By: ejguan
fbshipit-source-id: fb6a332762b0fe75284bb2b52a13ed171b42558c
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/54066
## Feature
- Add a decorator `construct_time_validation` to validate each input datapipe according to the corresponding type hint.
Test Plan: Imported from OSS
Reviewed By: VitalyFedyunin
Differential Revision: D27327236
Pulled By: ejguan
fbshipit-source-id: a9d4c6edb5b05090bd5a369eee50a6fb4d7cf957
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/54020
## Feature
- Add `issubtype` to check the type is a subtype of the other type.
- Add `_DataPipeMeta` (mimic Python typing 3.6)
- Add `type` attribute for each DataPipe
- Save original `__init__` function for each DataPipe
- Validate return hint of `__iter__`
- Replace `__init__` function bases on `type`
- Fixed type: Put original `__init__` back, if it exists or use a plain `__init__`
- Non-fixed type: Add new `__init__` with the functionality to copy `cls.type` for each instance. (Optimized for memory)
No Error for main repo, `torchvision`, `torchaudio` and `torchtext`.
## Future
- Add same thing for `__getitem__`.
- When DataFrame came out, add an another type for DataFrame with column name and type.
Test Plan: Imported from OSS
Reviewed By: VitalyFedyunin
Differential Revision: D27327232
Pulled By: ejguan
fbshipit-source-id: fd3a6029c16f5d814b1d7e1b1566fdcd8fd1ad9a
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/54299
## Feature
- Check type is a subtype of another type
Prerequisite for DataPipe tying system.
Test Plan: Imported from OSS
Reviewed By: VitalyFedyunin
Differential Revision: D27327235
Pulled By: ejguan
fbshipit-source-id: 8f50a663a86540677c9e132ac7c5216fdac46f70