Commit Graph

151 Commits

Author SHA1 Message Date
Jason Fried
03f50fcc02 [codemod][3.10][NamedTuple] Use typing_extensions to get NamedTuple Generics (#101830)
Summary:
3.10 doesn't have support for Generic NamedTuples, but it exists in future versions so typing_extensions supports it

(Note: this ignores all push blocking failures!)

Test Plan: sandcastle

Reviewed By: itamaro

Differential Revision: D45923201

Pull Request resolved: https://github.com/pytorch/pytorch/pull/101830
Approved by: https://github.com/izaitsevfb
2023-05-19 22:50:18 +00:00
Wei Ji
f95d42b1b7 [DataPipe] Update docstring for functional form of DataPipes (#100446)
Copy the docstring from IterDataPipe and MapDataPipe classes to their functional form. Done using [`functools.update_wrapper`](https://docs.python.org/3/library/functools.html#functools.update_wrapper), xref https://stackoverflow.com/questions/6394511/python-functools-wraps-equivalent-for-classes.

See also parallel change to `.pyi` stub files at https://github.com/pytorch/pytorch/pull/100503

Fixes https://github.com/pytorch/data/issues/792 and https://github.com/weiji14/zen3geo/issues/69.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/100446
Approved by: https://github.com/NivekT
2023-05-18 19:59:00 +00:00
Aaron Gokaslan
85f38b8a33 [BE] Update flake8-comprehensions and adapt to rule C418 (#99178)
Applies rule C418 and fixes all instances of it. Also updates flake8-comprehension

Pull Request resolved: https://github.com/pytorch/pytorch/pull/99178
Approved by: https://github.com/ezyang
2023-04-15 15:33:42 +00:00
erjia
29d2e4b7fa Forward fix for DataLoader to accept custom Sharding DataPipe (#97287)
Fixes #96975

Changes:
- Make sure custom ShardingDataPipe with `apply_sharding` can be used by `DataLoader`
  - Allow the `apply_sharding` function without the last argument of `sharding_group`
- Make `DataLoader` not relying on `sharding_group`
Pull Request resolved: https://github.com/pytorch/pytorch/pull/97287
Approved by: https://github.com/NivekT
2023-04-05 22:33:37 +00:00
Tillmann Falck
939c4ae6cd [DataPipe] Add copy option to fork DataPipe (#96030)
Fixes pytorch/data#1061 and fixes pytorch/data#1032
Pull Request resolved: https://github.com/pytorch/pytorch/pull/96030
Approved by: https://github.com/ejguan, https://github.com/NivekT
2023-03-10 17:31:56 +00:00
erjia
738cc5e644 Fix validate_input_col for nn.Module or Callable (#96213)
Forward fix the problem introduced in https://github.com/pytorch/pytorch/pull/95067

Not all `Callable` objects have `__name__` implemented. Using `repr` as the backup solution to get function name or reference.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/96213
Approved by: https://github.com/NivekT
2023-03-08 01:30:17 +00:00
Felix
e6f3e16d89 Fix: validate_input_col for partial functions (#95067)
Fixes #95066

#### Proposed change:
do not call `str()` on a `Callable` to determine its name

#### Reasoning:
Please see https://github.com/pytorch/pytorch/issues/95066 for reasoning and examples

#### Effect:
* The code example given in https://github.com/pytorch/pytorch/issues/95066 now executes instantly.
* If invalid input is provided, the stacktrace now prints nicely as
  ```
  ValueError: The function foo takes 1 parameters, but 2 are required.
  ```

Pull Request resolved: https://github.com/pytorch/pytorch/pull/95067
Approved by: https://github.com/NivekT, https://github.com/ejguan
2023-03-03 21:05:07 +00:00
Aaron Gokaslan
67d9790985 [BE] Apply almost all remaining flake8-comprehension checks (#94676)
Applies the remaining flake8-comprehension fixes and checks. This changes replace all remaining unnecessary generator expressions with list/dict/set comprehensions which are more succinct, performant, and better supported by our torch.jit compiler. It also removes useless generators such as 'set(a for a in b)`, resolving it into just the set call.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/94676
Approved by: https://github.com/ezyang
2023-02-12 01:01:25 +00:00
Wenlei Xie
d8f4026ebf Continue support sharding pipes in tud.datapipes.iter.grouping as deprecated (#94527)
Summary:
https://github.com/pytorch/pytorch/pull/94095 moves this into `tud.datapipes.iter.sharding`. However, since previously this is a public API, this is a BC break change.

As discussed in https://github.com/pytorch/data/pull/987#issuecomment-1422440049, we will have backward compatbile support but with deprecated warning.

Differential Revision: D43161015

Pull Request resolved: https://github.com/pytorch/pytorch/pull/94527
Approved by: https://github.com/ejguan, https://github.com/NivekT
2023-02-10 18:42:10 +00:00
Aaron Gokaslan
748bac8757 [BE]: Apply pyupgrade yield from and unit test alias upgrades (#94309)
Applies some more harmless pyupgrades. This one gets rid of deprecated aliases in unit_tests and more upgrades yield for loops into yield from generators which are more performance and propagates more information / exceptions from original generator. This is the modern recommended way of forwarding generators.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/94309
Approved by: https://github.com/albanD
2023-02-07 20:08:58 +00:00
Wenlei Xie
d6dec1a5cf Refactor sharding data pipe into a seperate file (#94095)
Move `ShardingFilterIterDataPipe` into a dedicated file.

Also, propose to have a dedicated parent class (`_ShardingIterDataPipe`) for sharding data pipe, as this seems more like a "system/engine-level" datapipe that gives strong hints to RS on how to execute, and needs first-class citizen treatment in RS (compared with other "user-level" datapipe that are mostly composable `Callable[[Iterable], Iterable]`.  So we don't need to based on whether `is_shardable` and `apply_sharding` are presented in DataPipe in `graph_settings.py`. But open to other discussions.

Open question: Should
[ShardingRoundRobinDispatcherIterDataPipe](01fc762003/torchdata/datapipes/iter/util/sharding.py (L16-L17)) also be considered as a `_ShardingIterDataPipe`? (e.g. this sharding is executed by replicating (the metadata), while `ShardingRoundRobinDispatcherIterDataPipe` hints too expensive to replicate so requires round robin data exchange/dispatch).

Differential Revision: D43014692

Pull Request resolved: https://github.com/pytorch/pytorch/pull/94095
Approved by: https://github.com/ejguan, https://github.com/NivekT
2023-02-07 09:12:02 +00:00
SvenDS9
b073c09f7a Added keep_key option to Grouper (#92532)
Fixes https://github.com/pytorch/data/issues/256

The testing of this module is currently suboptimal in general. We should improve this in the future.

@ejguan

Pull Request resolved: https://github.com/pytorch/pytorch/pull/92532
Approved by: https://github.com/ejguan
2023-01-25 20:58:21 +00:00
erjia
7a112c43c1 [DataLoader2] Fix apply_sharding to accept one sharding_filter per branch (#90769)
Changes:
- Allow multiple `sharding_filter` in the pipeline as long as they are not on the same branch
- [x] Add test

Example:
```mermaid
graph TD;
DP1-->sharding_filter_1;
sharding_filter_1-->DP3;
DP2-->sharding_filter_2;
sharding_filter_2-->DP4;
DP3-->DP4;
DP4-->output;
```
In order to properly shard `DP1` and `DP2`, we should allow multiple `sharding_filter`s
Pull Request resolved: https://github.com/pytorch/pytorch/pull/90769
Approved by: https://github.com/NivekT
2022-12-14 22:03:41 +00:00
Bert Maher
d3178465ee [dynamo] VariableTracker.call_method requires a name (#88311)
Summary: as title

Test Plan: Before: N2743445, After: N2748186.  Note there's a new error, but at least we got past the easy one.

Differential Revision: D40938415

Pull Request resolved: https://github.com/pytorch/pytorch/pull/88311
Approved by: https://github.com/brad-mengchi
2022-11-10 18:17:23 +00:00
Vitaly Fedyunin
9dadf8fcc2 [DataPipes] Add group support to the sharding_filter (#88424)
Differential Revision: [D41006747](https://our.internmc.facebook.com/intern/diff/D41006747)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/88424
Approved by: https://github.com/ejguan
2022-11-07 22:07:01 +00:00
Kevin Tse
c719ec9c11 [DataPipe] Fix MapDataPipe spawn lambda test (#85668)
The test in its original form fails and I believe it is because the expected result is incorrect, unless we expect different behaviors between `IterDataPipe` and `MapDataPipe` in multiprocessing.

Differential Revision: [D39832182](https://our.internmc.facebook.com/intern/diff/D39832182)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/85668
Approved by: https://github.com/ejguan
2022-09-27 19:58:15 +00:00
Kevin Tse
64a526d4af [DataLoader] Replacing traverse function with traverse_datapipes (#85667)
This PR deprecates `traverse` function and replaces it with `traverse_datapipes` instead.

While use `DataLoader`, I realized that it is raising `FutureWarning` even though I am not explicitly using `traverse`. What is happening is that `DataLoader` invokes `traverse(dp, only_datapipe=True)`, and the usage of the keyword causes the `only_datapipe` warning to be raised.

```
/home/ubuntu/miniconda3/lib/python3.8/site-packages/torch/utils/data/graph.py:102: FutureWarning: `only_datapipe` is deprecated from `traverse` function and will be removed after 1.13.
  warnings.warn(msg, FutureWarning)
```

A few things we'd like to do:
1. Deprecate the key word arg `only_datapipe`
2. Change the default behavior from `only_datapipe=False` to `only_datapipe=True` in the future
3. Do not raise a warning when users are using the function correctly

This creates a paradox it is impossible for the users to change their code to match the future default behavior (i.e. call `traverse(dp)` without `only_datapipe`):
  - they cannot do so because the default behavior of `traverse` hasn't changed yet, so they must use `only_datapipe=True`
  - if they use `only_datapipe=True`, eventually the kwarg will go away and cause a runtime error; they also get a `FutureWarning` in the present

IIUC, there doesn't seem to be a way to accomplish those 3 goals without replacing the function with a new one that has a different name; hence, this PR. Let me know if there is a better alternative.

If this looks right, I will send a follow up PR in `TorchData`.

Differential Revision: [D39832183](https://our.internmc.facebook.com/intern/diff/D39832183)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/85667
Approved by: https://github.com/ejguan
2022-09-27 19:58:15 +00:00
Erjia Guan
ea72a0991c Add support to traverse all python collection objects (#84079)
Fixes https://github.com/pytorch/data/issues/752

This PR makes `traverse` function supporting more collections data structures from Python. The `getstate_hook` will be invoked after custom `__getstate__` function. This would guarantee that `traverse` function will be working as long as the `DataPipe` is working properly with multiprocessing.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/84079
Approved by: https://github.com/NivekT, https://github.com/VitalyFedyunin
2022-09-23 16:21:25 +00:00
erjia
33bb8ae350 Set shuffle to DataPipes with set_shuffle API (#83741)
This PR requires PR is landed: https://github.com/pytorch/pytorch/pull/83202

## changes
- For `apply_shuffle_setting` and `apply_shuffle_seed`, it makes sure it will apply shuffle setting to each of DataPipe that contains a method called `set_shuffle` or `set_seed`.
- Change the API from `apply_shuffle_seed` to `apply_random_seed`.
- Fix a bug that `apply_shuffle_seed` only accepts DataPipe that is hashable. After the PR, this function uses `id` to prevent seeding the same DataPipe multiple times per epoch.
- Fix another bug from `shuffler` that `reset` with `_enable=False` would also reset `_seed`.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/83741
Approved by: https://github.com/NivekT
2022-09-13 13:38:58 +00:00
Kevin Tse
cfb9d0d233 [DataPipe] Fixing map function signature validation (#84279)
As @pmeier [points out](https://github.com/pytorch/pytorch/pull/80267#discussion_r958423241), #80267 introduces a bug where an exception is thrown when a built-in function (or a function implemented in C) is used with `.map` because `inspect.signature(fn)` cannot find the function's signature.

This PR skips over a function when its signature cannot be found. I believe this case is rare, and if the `fn` is truly incompatible with the usage of `input_col`/`output_col`, an exception will be raised at run time such that users will be able to examine what is wrong.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/84279
Approved by: https://github.com/pmeier, https://github.com/janeyx99
2022-08-31 19:55:01 +00:00
erjia
3f94726453 [DataPipe] Convert MapDataPipe.shuffle to IterDataPipe (#83202)
Fixes: https://github.com/pytorch/data/issues/718

This is an alternative PR against https://github.com/pytorch/pytorch/pull/82974

This PR would change the behavior for both types to the same behavior as `IterDataPipe.shuffle`
- Lazily generating seed per iteration
- Each iterators has a new seed
- Convert `MapDataPipe.shuffle` to an `IterDataPipe`

## BC-breaking Note:
This PR changes the return type of `MapDataPipe.shuffle` from a `MapDataPipe` to a `IterDataPipe`.

### 1. 12
Output as `MapDataPipe`
```
>>> from torch.utils.data import IterDataPipe, MapDataPipe
>>> from torch.utils.data.datapipes.map import SequenceWrapper
>>> dp = SequenceWrapper(list(range(10))).shuffle()
>>> isinstance(dp, MapDataPipe)
True
>>> isinstance(dp, IterDataPipe)
False
```

### This PR:
Output as `IterDataPipe`
```
>>> from torch.utils.data import IterDataPipe, MapDataPipe
>>> from torch.utils.data.datapipes.map import SequenceWrapper
>>> dp = SequenceWrapper(list(range(10))).shuffle()
>>> isinstance(dp, MapDataPipe)
False
>>> isinstance(dp, IterDataPipe)
True
```
Pull Request resolved: https://github.com/pytorch/pytorch/pull/83202
Approved by: https://github.com/NivekT
2022-08-29 08:57:17 +00:00
PyTorch MergeBot
7244a3737c Revert "[DataPipe] Convert MapDataPipe.shuffle to IterDataPipe (#83202)"
This reverts commit a423c966a7.

Reverted https://github.com/pytorch/pytorch/pull/83202 on behalf of https://github.com/facebook-github-bot due to Diff reverted internally
2022-08-28 18:00:17 +00:00
erjia
a423c966a7 [DataPipe] Convert MapDataPipe.shuffle to IterDataPipe (#83202)
Fixes: https://github.com/pytorch/data/issues/718

This is an alternative PR against https://github.com/pytorch/pytorch/pull/82974

This PR would change the behavior for both types to the same behavior as `IterDataPipe.shuffle`
- Lazily generating seed per iteration
- Each iterators has a new seed
- Convert `MapDataPipe.shuffle` to an `IterDataPipe`

## BC-breaking Note:
This PR changes the return type of `MapDataPipe.shuffle` from a `MapDataPipe` to a `IterDataPipe`.

### 1. 12
Output as `MapDataPipe`
```
>>> from torch.utils.data import IterDataPipe, MapDataPipe
>>> from torch.utils.data.datapipes.map import SequenceWrapper
>>> dp = SequenceWrapper(list(range(10))).shuffle()
>>> isinstance(dp, MapDataPipe)
True
>>> isinstance(dp, IterDataPipe)
False
```

### This PR:
Output as `IterDataPipe`
```
>>> from torch.utils.data import IterDataPipe, MapDataPipe
>>> from torch.utils.data.datapipes.map import SequenceWrapper
>>> dp = SequenceWrapper(list(range(10))).shuffle()
>>> isinstance(dp, MapDataPipe)
False
>>> isinstance(dp, IterDataPipe)
True
```
Pull Request resolved: https://github.com/pytorch/pytorch/pull/83202
Approved by: https://github.com/NivekT
2022-08-26 23:33:20 +00:00
erjia
4c19981316 [DataPipe] Reset Shuffler's iterator when NotStarted (#83535)
This PR changes the behavior of `IterDataPipe` to always invoke `reset` for the state of `NotStarted`. The main reason is we normally put lazy initialization code into `reset` function. Even for the state of `NotStarted`, we should invoke `reset` to initialize those lazy variables. Otherwise, we have to manually determine if the state is `NotStarted` or `Iterating` in `__iter__` function and only manually invoke `reset` in the state of `NotStarted`.

This PR also makes `Shuffler` is able to serialize with `buffer` and `rng_state`.

The following part is removed:

~I am also add `_snapshot_state` into serialization state and during `__setstate__` only change the state to `Restored` if the original state is `Iterating`. Especially, for the case of deserializing/serializing `NotStarted` DataPipe (multiprocessing), we would invoke `set_seed` for `Shuffler`. We need the `DataPipe` remains as `NotStarted` to properly `reset`.~

I am listing all the expected behavior state transition below:
- Initial state: `NotStarted`
  - `iter` -> Call `reset` and change the state to `Iterating`
  - serialize/deserialize -> Keep the state as `NotStarted` (will `reset` if `iter` is called afterwards)
- Initial state: `Iterating`
  - `iter` -> Call `reset` and keep the state to `Iterating`
  - serialize/deserialize -> Change the state as `Restored`
- Initial state: `Restored`
  - `iter` -> Only change the state to `Iterating`
  - serialize/deserialize -> Not allowed

Pull Request resolved: https://github.com/pytorch/pytorch/pull/83535
Approved by: https://github.com/NivekT
2022-08-25 19:45:41 +00:00
Robert
5c49c7bbba [WIP] Validating input_col for certain datapipes (#80267)
Follow up from #79344.

Currently WIP due to multiple test failures.

Waiting for #80140 to land
Pull Request resolved: https://github.com/pytorch/pytorch/pull/80267
Approved by: https://github.com/ejguan
2022-08-24 17:34:28 +00:00
Kevin Tse
14b660fcc0 [DataPipe] Correct the type of exception that is being raised by ShufflerMapDataPipe (#82666)
Fixes https://github.com/pytorch/data/issues/708

The following code snippet used to fail, now it has been added as a test case:
```python
dp1 = dp.map.SequenceWrapper(range(10))
shuffle_dp1 = dp1.shuffle()
dp2 = dp.map.SequenceWrapper(range(10))
shuffle_dp2 = dp2.shuffle()
zip_dp = shuffle_dp1.zip(shuffle_dp2)
list(zip_dp)  # This used to fail
```

The issue was that `ShufflerMapDataPipe` raises a `KeyError` when an out of bound index is passed into it, but that was not handled by `zip_dp`'s `__getitem__` which only handled `IndexError`. With this change, it handles both.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/82666
Approved by: https://github.com/ejguan
2022-08-03 19:05:17 +00:00
Kevin Tse
35d97e21c8 [DataPipe] Simple graph snapshotting (#79479)
This mostly completes the "poor man's snapshotting" implementation (named "simple snapshotting"). This is the most basic version of snapshotting but it should work for all DataPipes. I will be adding more efficient implementation for different types of DataPipes in future PRs.

### Implementation

The general idea of the simple snapshot is that we will:
1. Create a new iterator
2. Move that iterator forward by `n_iterations`
3. Save that as the `_fast_forward_iterator` of the DataPipe
4. The next time `iter` is called on the DataPipe, use the `_fast_forward_iterator`

### Usage
As of this implementation, the usage will something like:
```python
rng = torch.Generator()
initial_rng_state = rng.get_state()
datapipe: IterDataPipe = ...
# Some usage of the DataPipe, here maybe yielding the first 5 values
n_iter = 5
it = iter(datapipe)
for _ in range(n_iter):
    next(it)
serialized_graph = pickle.dumps(datapipe)

# The serialized object has most of the sufficient information for simple snapshot (except for initial RNG state)
# It can be deserialized at a later point in time or by a different process
deserialized_graph = pickle.loads(serialized_graph)
# I think `DataLoader2` or `ReadingService` should store `initial_rng_state` that can be saved by the API that we later use
rng_for_deserialized = torch.Generator()
rng_for_deserialized.set_state(initial_rng_state)
n_iterations = deserialized_graph._number_of_samples_yielded

_simple_snapshot_graph(deserialized_graph, n_iterations, rng=rng_for_deserialized)
# The while DataPipe graph should have the same state as before serialization, such that:
self.assertEqual(list(it), list(deserialized_graph))  # True
```

### Next Steps
If this looks acceptable, the next step is I will modify `DataLoader2`'s prototype ReadingService (the one with queues) to remember things like `initial_rng_state` and to have methods `save_snapshot` that will return the `(serialized graph, initial_rng)` and `restore_snapshot`. This should work for single worker data loading.

Note that, in the long term, `initial_rng_state` may not be necessary if we are able to directly save/restore the buffer and RNG state of `Shuffler` (that is work in progress). However, `initial_rng_state` and simple snapshot is still a good fall-back option for some edge cases where the buffer can't be stored.

Differential Revision: [D37943406](https://our.internmc.facebook.com/intern/diff/D37943406)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/79479
Approved by: https://github.com/ejguan
2022-07-23 02:53:15 +00:00
Kevin Tse
428e44ffa1 [DataPipe] Fixes various warnings, exceptions, and clean up testing (#81833)
I went through most of the warnings and exceptions raised in our tests to find these issues.

Changes:
1. In testing, `self.assertEquals` is deprecated, converting to `self.assertEqual` to get rid of the warning
2. Small changes for cleanliness and get rid of warnings (no actual change to result)
3. Correct `is_every_instance_exhausted` logic for `_Forker`
4. Catch `RunTimeError` raised by invalidated iterator during clean up
5. Check if attribute `parent_stream` exists before trying to access it

Differential Revision: [D38020122](https://our.internmc.facebook.com/intern/diff/D38020122)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/81833
Approved by: https://github.com/ejguan
2022-07-21 18:59:40 +00:00
erjia
ccbf04dd5f [DataPipe] Fix fork/unzip with a single child (#81502)
When `Forker` or `Unzipper` only contains a single child, the buffer should be cleaned up. This is one of the root causes for the issue reported internally. See: https://fburl.com/2k0et1gv
Pull Request resolved: https://github.com/pytorch/pytorch/pull/81502
Approved by: https://github.com/VitalyFedyunin, https://github.com/NivekT
2022-07-18 16:53:19 +00:00
Erjia Guan
782f18e9b5 [DLv2] Make graph traverse working with unhashable DataPipe (#80509)
Summary:
This Diff removes the requirement for `traverse` function that `DataPipe` needs to be hash-able. `traverse` function now is using `id` of `DataPipe` instance rather than `DataPipe` itself as the key for both `cache` and graph.

But, it requires the changes of type of `DataPipeGraph` from `Dict[DataPipe, "DataPipeGraph"]` to `Dict[int, Tuple[DataPipe, "DataPipeGraph"]]`.

Differential Revision: D37354153

Ref PR in TorchData: https://github.com/pytorch/data/pull/559
Pull Request resolved: https://github.com/pytorch/pytorch/pull/80509
Approved by: https://github.com/VitalyFedyunin
2022-07-12 14:47:42 +00:00
Vitaly Fedyunin
bcab5257de Expanding DataPipe to support DataFrames (#71931)
Differential Revision: [D37500516](https://our.internmc.facebook.com/intern/diff/D37500516)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/71931
Approved by: https://github.com/ejguan
2022-07-08 18:46:10 +00:00
Kevin Tse
b8e50f512f [DataPipe] Count number of successful yields for IterDataPipe (#79657)
This PR adds an attribute and logic to count the number of successful yields from `IterDataPipe`. This information can be useful to fast-forward a DataPipe (or the entire graph) back to a certain state.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/79657
Approved by: https://github.com/VitalyFedyunin
2022-06-28 17:30:33 +00:00
Erjia Guan
3d218e1c87 Raise warning for unpickable local function (#547) (#80232)
Summary:
X-link: https://github.com/pytorch/data/pull/547

Fixes https://github.com/pytorch/data/issues/538
- Improve the validation function to raise warning about unpickable function when either lambda or local function is provided to DataPipe.
- The inner function from functools.partial object is extracted as well for validation
- Mimic the behavior of pickle module for local lambda function: It would only raise Error for the local function rather than lambda function. So, we will raise warning about local function not lambda function.
```py

>>> import pickle
>>> def fn():
...     lf = lambda x: x
...     pickle.dumps(lf)
>>> pickle.dumps(fn)
AttributeError: Can't pickle local object 'fn.<locals>.<lambda>'
```

This Diff also fixes the Error introduced by https://github.com/pytorch/pytorch/pull/79344

Test Plan:
CI on PyTorch and TorchData
Manually validated the tests from TorchVision

Differential Revision: D37417556

Pull Request resolved: https://github.com/pytorch/pytorch/pull/80232
Approved by: https://github.com/NivekT
2022-06-27 21:47:09 +00:00
PyTorch MergeBot
fcdaf35114 Revert "Add validation for mapper function in datapipes with input_col (#79344)"
This reverts commit 787ac4edf8.

Reverted https://github.com/pytorch/pytorch/pull/79344 on behalf of https://github.com/ejguan due to This PR breaks multiple use cases and the CI from TorchVision becomes red
2022-06-24 17:17:33 +00:00
PyTorch MergeBot
79ba65c0f2 Revert "Raise warning for unpickable local function (#80140)"
This reverts commit 4b75b7d3c1.

Reverted https://github.com/pytorch/pytorch/pull/80140 on behalf of https://github.com/ejguan due to It will break the CI for TorchData
2022-06-24 14:49:06 +00:00
erjia
4b75b7d3c1 Raise warning for unpickable local function (#80140)
Fixes https://github.com/pytorch/data/issues/538

- Improve the validation function to raise warning about unpickable function when either lambda or local function is provided to `DataPipe`.
- The inner function from `functools.partial` object is extracted as well for validation
- Mimic the behavior of `pickle` module for local lambda function: It would only raise Error for the local function rather than `lambda` function. So, we will raise warning about local function not lambda function.
```py
>>> import pickle
>>> def fn():
...     lf = lambda x: x
...     pickle.dumps(lf)
>>> pickle.dumps(fn)
AttributeError: Can't pickle local object 'fn.<locals>.<lambda>'
```
Pull Request resolved: https://github.com/pytorch/pytorch/pull/80140
Approved by: https://github.com/VitalyFedyunin, https://github.com/NivekT
2022-06-24 13:50:51 +00:00
Robert
787ac4edf8 Add validation for mapper function in datapipes with input_col (#79344)
As linked in https://github.com/pytorch/data/issues/362
Pull Request resolved: https://github.com/pytorch/pytorch/pull/79344
Approved by: https://github.com/ejguan, https://github.com/NivekT
2022-06-23 18:49:35 +00:00
Robert Xiu
9fca008809 [DataPipe] Adding functional API for FileLister (#78419)
Fixes #78263

Follow-up from pytorch/data#387. This adds a functional API `list_files()` to `FileListerDataPipe`.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/78419
Approved by: https://github.com/NivekT, https://github.com/ejguan
2022-06-06 17:26:19 +00:00
erjia
9b6cb83b0c Make ShufflerDataPipe deterministic for persistent DL and distributed DL (#78765)
Fixes https://github.com/pytorch/data/issues/426

This PR introduces two main changes:
- It ensures the `ShufflerDataPipe` would share the same seed across distributed processes.
- Users can reset `shuffle` for persistent workers per epoch.

Detail:
- `shared_seed` is shared across distributed and worker processes. It will seed a `shared_rng` to provide seeds to each `ShufflerDataPipe` in the pipeline
- `worker_loop` now accepts a new argument of `shared_seed` to accept this shared seed.
- The `shared_seed` is attached to `_ResumeIteration` for resetting seed per epoch for `persistent worker`
- I choose not to touch `base_seed` simply for BC issue

I used this [script](https://gist.github.com/ejguan/d88f75fa822cb696ab1bc5bc25844f47) to test the result with `world_size=4`. Please check the result in: https://gist.github.com/ejguan/6ee2d2de12ca57f9eb4b97ef5a0e300b

You can see there isn't any duplicated/missing element for each epoch. And, with the same seed, the order of data remains the same across epochs.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/78765
Approved by: https://github.com/VitalyFedyunin
2022-06-06 17:24:00 +00:00
PyTorch MergeBot
129d9dbb15 Revert "Make ShufflerDataPipe deterministic for persistent DL and distributed DL (#78765)"
This reverts commit b769a0e18b.

Reverted https://github.com/pytorch/pytorch/pull/78765 on behalf of https://github.com/janeyx99 due to broke lint on trunk
2022-06-06 14:24:51 +00:00
erjia
b769a0e18b Make ShufflerDataPipe deterministic for persistent DL and distributed DL (#78765)
Fixes https://github.com/pytorch/data/issues/426

This PR introduces two main changes:
- It ensures the `ShufflerDataPipe` would share the same seed across distributed processes.
- Users can reset `shuffle` for persistent workers per epoch.

Detail:
- `shared_seed` is shared across distributed and worker processes. It will seed a `shared_rng` to provide seeds to each `ShufflerDataPipe` in the pipeline
- `worker_loop` now accepts a new argument of `shared_seed` to accept this shared seed.
- The `shared_seed` is attached to `_ResumeIteration` for resetting seed per epoch for `persistent worker`
- I choose not to touch `base_seed` simply for BC issue

I used this [script](https://gist.github.com/ejguan/d88f75fa822cb696ab1bc5bc25844f47) to test the result with `world_size=4`. Please check the result in: https://gist.github.com/ejguan/6ee2d2de12ca57f9eb4b97ef5a0e300b

You can see there isn't any duplicated/missing element for each epoch. And, with the same seed, the order of data remains the same across epochs.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/78765
Approved by: https://github.com/VitalyFedyunin
2022-06-06 13:36:37 +00:00
Kevin Tse
b4a6730ce1 [DataPipe] Refactor 'mux' to have buffer as an instance variable
Pull Request resolved: https://github.com/pytorch/pytorch/pull/77775

Approved by: https://github.com/ejguan
2022-05-19 19:55:27 +00:00
erjia
99f6e614e8 Seed Shuffler for MP DataLoader without explicit manual_seed. (#77855)
Follow up on https://github.com/pytorch/pytorch/pull/77741

This PR guarantees the `Shuffler` in first iteration with MP DataLoader has the same seed across worker processes when users don't specify the seed.
Check newly added tests
Pull Request resolved: https://github.com/pytorch/pytorch/pull/77855
Approved by: https://github.com/NivekT
2022-05-19 17:28:26 +00:00
erjia
365ce350cb Make ShufflerDataPipe deterministic for SP & MP DataLoader (#77741)
This is the first PR to make DataPipe deterministic.

Users should be able to use `torch.manual_seed(seed)` to control the shuffle order for the following cases:
- Directly over `DataPipe`
- For single-process DataLoader
- Multiprocessing DataLoader

Unfortunately, for distributed training, users have to run `apply_shuffle_seed` manually to make sure all distributed processes having the same order of shuffle.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/77741
Approved by: https://github.com/VitalyFedyunin, https://github.com/NivekT
2022-05-18 23:32:07 +00:00
Ning Li (Seattle)
4d1ead6dff [DataPipe] Update mux data pipe (#76384) (#77145)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/76384

OSS issue discussion: https://github.com/pytorch/data/issues/346
This diff updates `mux` and `mux_longest` data pipe.
`mux`: Yields one element at a time from each of the input Iterable DataPipes (functional name: ``mux``). As in, one element from the 1st input DataPipe, then one element from the 2nd DataPipe in the next iteration, and so on. It ends when the shortest input DataPipe is exhausted.

`mux` example:

```
>>> from torchdata.datapipes.iter import IterableWrapper
>>> dp1, dp2, dp3 = IterableWrapper(range(3)), IterableWrapper(range(10, 15)), IterableWrapper(range(20, 25))
>>> list(dp1.mux(dp2, dp3))
[0, 10, 20, 1, 11, 21, 2, 12, 22]
```

Test Plan:
buck test mode/opt //caffe2/test:datapipe

https://www.internalfb.com/intern/testinfra/testrun/4785074706282345

Differential Revision: D36017945

Pull Request resolved: https://github.com/pytorch/pytorch/pull/77145
Approved by: https://github.com/NivekT, https://github.com/ejguan
2022-05-18 16:23:07 +00:00
Kevin Tse
bbaefdf6b5 [DataPipe] Enforcing single valid iterator for IterDataPipes multiple DataPipes as outputs
Pull Request resolved: https://github.com/pytorch/pytorch/pull/75995

Approved by: https://github.com/VitalyFedyunin
2022-05-18 01:31:39 +00:00
Kevin Tse
7c52f204e0 [DataPipe] Enforcing single valid iterator for IterDataPipes without multiple outputs
Pull Request resolved: https://github.com/pytorch/pytorch/pull/70479

Approved by: https://github.com/ejguan
2022-05-18 01:31:38 +00:00
Vitaly Fedyunin
edffd595c2 [DataLoader] Adding ability to use dill to pass DataPipes in mutiprocessing
Pull Request resolved: https://github.com/pytorch/pytorch/pull/77288

Approved by: https://github.com/ejguan, https://github.com/NivekT
2022-05-15 23:04:03 +00:00
Kevin Tse
a008d19ff7 [DataPipe] Revamp serialization logic of DataPipes
Pull Request resolved: https://github.com/pytorch/pytorch/pull/74984

Approved by: https://github.com/ejguan
2022-05-10 16:16:46 +00:00
zengk95
ef63408853 Revert [DataPipe] Update mux data pipe
Reverts #76384

this this is breaking tests test_demux_mux_datapipe (__main__.TestIterableDataPipeBasic. See logs: a997046017
and was red on the PR as well: https://hud.pytorch.org/pytorch/pytorch/pull/76384
Pull Request resolved: https://github.com/pytorch/pytorch/pull/76507
Approved by: https://github.com/kit1980
2022-04-28 00:06:30 +00:00