Commit Graph

157 Commits

Author SHA1 Message Date
Peter Bell
b53cc6cf8d [dynamo] Fix test_replay_record.py (#116230)
This test isn't run in CI because the CI runners don't have dill installed.
This fixes the tests so they run for me locally, and in the next PR I add
dill to the CI so we can test it properly.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/116230
Approved by: https://github.com/jansel
2024-01-24 23:42:35 +00:00
rzou
79e6d2ae9d Remove incorrect usages of skipIfTorchDynamo (#117114)
Using `@skipifTorchDynamo` is wrong, the correct usage is
`@skipIfTorchDynamo()` or `@skipIfTorchDynamo("msg")`. This would cause
tests to stop existing.
Added an assertion for this and fixed the incorrect callsites.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/117114
Approved by: https://github.com/voznesenskym
2024-01-10 22:25:31 +00:00
Aaron Gokaslan
6de28e92d2 [BE]: Apply FURB118 (prev): replaces unnecessary lambdas with operator. (#116027)
This replaces a bunch of unnecessary lambdas with the operator package. This is semantically equivalent, but the operator package is faster, and arguably more readable. When the FURB rules are taken out of preview, I will enable it as a ruff check.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/116027
Approved by: https://github.com/malfet
2023-12-20 19:35:08 +00:00
Aaron Gokaslan
ee5d981249 [BE]: Enable RUFF PERF402 and apply fixes (#115505)
* Enable PERF402. Makes code more efficient and succinct by removing useless list copies that could be accomplished either via a list constructor or extend call. All test cases have noqa added since performance is not as sensitive in that folder.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/115505
Approved by: https://github.com/malfet
2023-12-20 18:01:24 +00:00
Michael Voznesensky
4691cb26b3 Disable compile for massive data pipe test (#109063)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/109063
Approved by: https://github.com/clee2000
ghstack dependencies: #108846
2023-09-12 00:15:52 +00:00
Justin Chu
73e1455327 [BE] Enable ruff's UP rules and autoformat test/ (#105434)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/105434
Approved by: https://github.com/albanD
2023-07-19 20:36:06 +00:00
Jason Fried
03f50fcc02 [codemod][3.10][NamedTuple] Use typing_extensions to get NamedTuple Generics (#101830)
Summary:
3.10 doesn't have support for Generic NamedTuples, but it exists in future versions so typing_extensions supports it

(Note: this ignores all push blocking failures!)

Test Plan: sandcastle

Reviewed By: itamaro

Differential Revision: D45923201

Pull Request resolved: https://github.com/pytorch/pytorch/pull/101830
Approved by: https://github.com/izaitsevfb
2023-05-19 22:50:18 +00:00
Wei Ji
f95d42b1b7 [DataPipe] Update docstring for functional form of DataPipes (#100446)
Copy the docstring from IterDataPipe and MapDataPipe classes to their functional form. Done using [`functools.update_wrapper`](https://docs.python.org/3/library/functools.html#functools.update_wrapper), xref https://stackoverflow.com/questions/6394511/python-functools-wraps-equivalent-for-classes.

See also parallel change to `.pyi` stub files at https://github.com/pytorch/pytorch/pull/100503

Fixes https://github.com/pytorch/data/issues/792 and https://github.com/weiji14/zen3geo/issues/69.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/100446
Approved by: https://github.com/NivekT
2023-05-18 19:59:00 +00:00
Aaron Gokaslan
85f38b8a33 [BE] Update flake8-comprehensions and adapt to rule C418 (#99178)
Applies rule C418 and fixes all instances of it. Also updates flake8-comprehension

Pull Request resolved: https://github.com/pytorch/pytorch/pull/99178
Approved by: https://github.com/ezyang
2023-04-15 15:33:42 +00:00
erjia
29d2e4b7fa Forward fix for DataLoader to accept custom Sharding DataPipe (#97287)
Fixes #96975

Changes:
- Make sure custom ShardingDataPipe with `apply_sharding` can be used by `DataLoader`
  - Allow the `apply_sharding` function without the last argument of `sharding_group`
- Make `DataLoader` not relying on `sharding_group`
Pull Request resolved: https://github.com/pytorch/pytorch/pull/97287
Approved by: https://github.com/NivekT
2023-04-05 22:33:37 +00:00
Tillmann Falck
939c4ae6cd [DataPipe] Add copy option to fork DataPipe (#96030)
Fixes pytorch/data#1061 and fixes pytorch/data#1032
Pull Request resolved: https://github.com/pytorch/pytorch/pull/96030
Approved by: https://github.com/ejguan, https://github.com/NivekT
2023-03-10 17:31:56 +00:00
erjia
738cc5e644 Fix validate_input_col for nn.Module or Callable (#96213)
Forward fix the problem introduced in https://github.com/pytorch/pytorch/pull/95067

Not all `Callable` objects have `__name__` implemented. Using `repr` as the backup solution to get function name or reference.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/96213
Approved by: https://github.com/NivekT
2023-03-08 01:30:17 +00:00
Felix
e6f3e16d89 Fix: validate_input_col for partial functions (#95067)
Fixes #95066

#### Proposed change:
do not call `str()` on a `Callable` to determine its name

#### Reasoning:
Please see https://github.com/pytorch/pytorch/issues/95066 for reasoning and examples

#### Effect:
* The code example given in https://github.com/pytorch/pytorch/issues/95066 now executes instantly.
* If invalid input is provided, the stacktrace now prints nicely as
  ```
  ValueError: The function foo takes 1 parameters, but 2 are required.
  ```

Pull Request resolved: https://github.com/pytorch/pytorch/pull/95067
Approved by: https://github.com/NivekT, https://github.com/ejguan
2023-03-03 21:05:07 +00:00
Aaron Gokaslan
67d9790985 [BE] Apply almost all remaining flake8-comprehension checks (#94676)
Applies the remaining flake8-comprehension fixes and checks. This changes replace all remaining unnecessary generator expressions with list/dict/set comprehensions which are more succinct, performant, and better supported by our torch.jit compiler. It also removes useless generators such as 'set(a for a in b)`, resolving it into just the set call.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/94676
Approved by: https://github.com/ezyang
2023-02-12 01:01:25 +00:00
Wenlei Xie
d8f4026ebf Continue support sharding pipes in tud.datapipes.iter.grouping as deprecated (#94527)
Summary:
https://github.com/pytorch/pytorch/pull/94095 moves this into `tud.datapipes.iter.sharding`. However, since previously this is a public API, this is a BC break change.

As discussed in https://github.com/pytorch/data/pull/987#issuecomment-1422440049, we will have backward compatbile support but with deprecated warning.

Differential Revision: D43161015

Pull Request resolved: https://github.com/pytorch/pytorch/pull/94527
Approved by: https://github.com/ejguan, https://github.com/NivekT
2023-02-10 18:42:10 +00:00
Aaron Gokaslan
748bac8757 [BE]: Apply pyupgrade yield from and unit test alias upgrades (#94309)
Applies some more harmless pyupgrades. This one gets rid of deprecated aliases in unit_tests and more upgrades yield for loops into yield from generators which are more performance and propagates more information / exceptions from original generator. This is the modern recommended way of forwarding generators.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/94309
Approved by: https://github.com/albanD
2023-02-07 20:08:58 +00:00
Wenlei Xie
d6dec1a5cf Refactor sharding data pipe into a seperate file (#94095)
Move `ShardingFilterIterDataPipe` into a dedicated file.

Also, propose to have a dedicated parent class (`_ShardingIterDataPipe`) for sharding data pipe, as this seems more like a "system/engine-level" datapipe that gives strong hints to RS on how to execute, and needs first-class citizen treatment in RS (compared with other "user-level" datapipe that are mostly composable `Callable[[Iterable], Iterable]`.  So we don't need to based on whether `is_shardable` and `apply_sharding` are presented in DataPipe in `graph_settings.py`. But open to other discussions.

Open question: Should
[ShardingRoundRobinDispatcherIterDataPipe](01fc762003/torchdata/datapipes/iter/util/sharding.py (L16-L17)) also be considered as a `_ShardingIterDataPipe`? (e.g. this sharding is executed by replicating (the metadata), while `ShardingRoundRobinDispatcherIterDataPipe` hints too expensive to replicate so requires round robin data exchange/dispatch).

Differential Revision: D43014692

Pull Request resolved: https://github.com/pytorch/pytorch/pull/94095
Approved by: https://github.com/ejguan, https://github.com/NivekT
2023-02-07 09:12:02 +00:00
SvenDS9
b073c09f7a Added keep_key option to Grouper (#92532)
Fixes https://github.com/pytorch/data/issues/256

The testing of this module is currently suboptimal in general. We should improve this in the future.

@ejguan

Pull Request resolved: https://github.com/pytorch/pytorch/pull/92532
Approved by: https://github.com/ejguan
2023-01-25 20:58:21 +00:00
erjia
7a112c43c1 [DataLoader2] Fix apply_sharding to accept one sharding_filter per branch (#90769)
Changes:
- Allow multiple `sharding_filter` in the pipeline as long as they are not on the same branch
- [x] Add test

Example:
```mermaid
graph TD;
DP1-->sharding_filter_1;
sharding_filter_1-->DP3;
DP2-->sharding_filter_2;
sharding_filter_2-->DP4;
DP3-->DP4;
DP4-->output;
```
In order to properly shard `DP1` and `DP2`, we should allow multiple `sharding_filter`s
Pull Request resolved: https://github.com/pytorch/pytorch/pull/90769
Approved by: https://github.com/NivekT
2022-12-14 22:03:41 +00:00
Bert Maher
d3178465ee [dynamo] VariableTracker.call_method requires a name (#88311)
Summary: as title

Test Plan: Before: N2743445, After: N2748186.  Note there's a new error, but at least we got past the easy one.

Differential Revision: D40938415

Pull Request resolved: https://github.com/pytorch/pytorch/pull/88311
Approved by: https://github.com/brad-mengchi
2022-11-10 18:17:23 +00:00
Vitaly Fedyunin
9dadf8fcc2 [DataPipes] Add group support to the sharding_filter (#88424)
Differential Revision: [D41006747](https://our.internmc.facebook.com/intern/diff/D41006747)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/88424
Approved by: https://github.com/ejguan
2022-11-07 22:07:01 +00:00
Kevin Tse
c719ec9c11 [DataPipe] Fix MapDataPipe spawn lambda test (#85668)
The test in its original form fails and I believe it is because the expected result is incorrect, unless we expect different behaviors between `IterDataPipe` and `MapDataPipe` in multiprocessing.

Differential Revision: [D39832182](https://our.internmc.facebook.com/intern/diff/D39832182)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/85668
Approved by: https://github.com/ejguan
2022-09-27 19:58:15 +00:00
Kevin Tse
64a526d4af [DataLoader] Replacing traverse function with traverse_datapipes (#85667)
This PR deprecates `traverse` function and replaces it with `traverse_datapipes` instead.

While use `DataLoader`, I realized that it is raising `FutureWarning` even though I am not explicitly using `traverse`. What is happening is that `DataLoader` invokes `traverse(dp, only_datapipe=True)`, and the usage of the keyword causes the `only_datapipe` warning to be raised.

```
/home/ubuntu/miniconda3/lib/python3.8/site-packages/torch/utils/data/graph.py:102: FutureWarning: `only_datapipe` is deprecated from `traverse` function and will be removed after 1.13.
  warnings.warn(msg, FutureWarning)
```

A few things we'd like to do:
1. Deprecate the key word arg `only_datapipe`
2. Change the default behavior from `only_datapipe=False` to `only_datapipe=True` in the future
3. Do not raise a warning when users are using the function correctly

This creates a paradox it is impossible for the users to change their code to match the future default behavior (i.e. call `traverse(dp)` without `only_datapipe`):
  - they cannot do so because the default behavior of `traverse` hasn't changed yet, so they must use `only_datapipe=True`
  - if they use `only_datapipe=True`, eventually the kwarg will go away and cause a runtime error; they also get a `FutureWarning` in the present

IIUC, there doesn't seem to be a way to accomplish those 3 goals without replacing the function with a new one that has a different name; hence, this PR. Let me know if there is a better alternative.

If this looks right, I will send a follow up PR in `TorchData`.

Differential Revision: [D39832183](https://our.internmc.facebook.com/intern/diff/D39832183)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/85667
Approved by: https://github.com/ejguan
2022-09-27 19:58:15 +00:00
Erjia Guan
ea72a0991c Add support to traverse all python collection objects (#84079)
Fixes https://github.com/pytorch/data/issues/752

This PR makes `traverse` function supporting more collections data structures from Python. The `getstate_hook` will be invoked after custom `__getstate__` function. This would guarantee that `traverse` function will be working as long as the `DataPipe` is working properly with multiprocessing.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/84079
Approved by: https://github.com/NivekT, https://github.com/VitalyFedyunin
2022-09-23 16:21:25 +00:00
erjia
33bb8ae350 Set shuffle to DataPipes with set_shuffle API (#83741)
This PR requires PR is landed: https://github.com/pytorch/pytorch/pull/83202

## changes
- For `apply_shuffle_setting` and `apply_shuffle_seed`, it makes sure it will apply shuffle setting to each of DataPipe that contains a method called `set_shuffle` or `set_seed`.
- Change the API from `apply_shuffle_seed` to `apply_random_seed`.
- Fix a bug that `apply_shuffle_seed` only accepts DataPipe that is hashable. After the PR, this function uses `id` to prevent seeding the same DataPipe multiple times per epoch.
- Fix another bug from `shuffler` that `reset` with `_enable=False` would also reset `_seed`.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/83741
Approved by: https://github.com/NivekT
2022-09-13 13:38:58 +00:00
Kevin Tse
cfb9d0d233 [DataPipe] Fixing map function signature validation (#84279)
As @pmeier [points out](https://github.com/pytorch/pytorch/pull/80267#discussion_r958423241), #80267 introduces a bug where an exception is thrown when a built-in function (or a function implemented in C) is used with `.map` because `inspect.signature(fn)` cannot find the function's signature.

This PR skips over a function when its signature cannot be found. I believe this case is rare, and if the `fn` is truly incompatible with the usage of `input_col`/`output_col`, an exception will be raised at run time such that users will be able to examine what is wrong.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/84279
Approved by: https://github.com/pmeier, https://github.com/janeyx99
2022-08-31 19:55:01 +00:00
erjia
3f94726453 [DataPipe] Convert MapDataPipe.shuffle to IterDataPipe (#83202)
Fixes: https://github.com/pytorch/data/issues/718

This is an alternative PR against https://github.com/pytorch/pytorch/pull/82974

This PR would change the behavior for both types to the same behavior as `IterDataPipe.shuffle`
- Lazily generating seed per iteration
- Each iterators has a new seed
- Convert `MapDataPipe.shuffle` to an `IterDataPipe`

## BC-breaking Note:
This PR changes the return type of `MapDataPipe.shuffle` from a `MapDataPipe` to a `IterDataPipe`.

### 1. 12
Output as `MapDataPipe`
```
>>> from torch.utils.data import IterDataPipe, MapDataPipe
>>> from torch.utils.data.datapipes.map import SequenceWrapper
>>> dp = SequenceWrapper(list(range(10))).shuffle()
>>> isinstance(dp, MapDataPipe)
True
>>> isinstance(dp, IterDataPipe)
False
```

### This PR:
Output as `IterDataPipe`
```
>>> from torch.utils.data import IterDataPipe, MapDataPipe
>>> from torch.utils.data.datapipes.map import SequenceWrapper
>>> dp = SequenceWrapper(list(range(10))).shuffle()
>>> isinstance(dp, MapDataPipe)
False
>>> isinstance(dp, IterDataPipe)
True
```
Pull Request resolved: https://github.com/pytorch/pytorch/pull/83202
Approved by: https://github.com/NivekT
2022-08-29 08:57:17 +00:00
PyTorch MergeBot
7244a3737c Revert "[DataPipe] Convert MapDataPipe.shuffle to IterDataPipe (#83202)"
This reverts commit a423c966a7.

Reverted https://github.com/pytorch/pytorch/pull/83202 on behalf of https://github.com/facebook-github-bot due to Diff reverted internally
2022-08-28 18:00:17 +00:00
erjia
a423c966a7 [DataPipe] Convert MapDataPipe.shuffle to IterDataPipe (#83202)
Fixes: https://github.com/pytorch/data/issues/718

This is an alternative PR against https://github.com/pytorch/pytorch/pull/82974

This PR would change the behavior for both types to the same behavior as `IterDataPipe.shuffle`
- Lazily generating seed per iteration
- Each iterators has a new seed
- Convert `MapDataPipe.shuffle` to an `IterDataPipe`

## BC-breaking Note:
This PR changes the return type of `MapDataPipe.shuffle` from a `MapDataPipe` to a `IterDataPipe`.

### 1. 12
Output as `MapDataPipe`
```
>>> from torch.utils.data import IterDataPipe, MapDataPipe
>>> from torch.utils.data.datapipes.map import SequenceWrapper
>>> dp = SequenceWrapper(list(range(10))).shuffle()
>>> isinstance(dp, MapDataPipe)
True
>>> isinstance(dp, IterDataPipe)
False
```

### This PR:
Output as `IterDataPipe`
```
>>> from torch.utils.data import IterDataPipe, MapDataPipe
>>> from torch.utils.data.datapipes.map import SequenceWrapper
>>> dp = SequenceWrapper(list(range(10))).shuffle()
>>> isinstance(dp, MapDataPipe)
False
>>> isinstance(dp, IterDataPipe)
True
```
Pull Request resolved: https://github.com/pytorch/pytorch/pull/83202
Approved by: https://github.com/NivekT
2022-08-26 23:33:20 +00:00
erjia
4c19981316 [DataPipe] Reset Shuffler's iterator when NotStarted (#83535)
This PR changes the behavior of `IterDataPipe` to always invoke `reset` for the state of `NotStarted`. The main reason is we normally put lazy initialization code into `reset` function. Even for the state of `NotStarted`, we should invoke `reset` to initialize those lazy variables. Otherwise, we have to manually determine if the state is `NotStarted` or `Iterating` in `__iter__` function and only manually invoke `reset` in the state of `NotStarted`.

This PR also makes `Shuffler` is able to serialize with `buffer` and `rng_state`.

The following part is removed:

~I am also add `_snapshot_state` into serialization state and during `__setstate__` only change the state to `Restored` if the original state is `Iterating`. Especially, for the case of deserializing/serializing `NotStarted` DataPipe (multiprocessing), we would invoke `set_seed` for `Shuffler`. We need the `DataPipe` remains as `NotStarted` to properly `reset`.~

I am listing all the expected behavior state transition below:
- Initial state: `NotStarted`
  - `iter` -> Call `reset` and change the state to `Iterating`
  - serialize/deserialize -> Keep the state as `NotStarted` (will `reset` if `iter` is called afterwards)
- Initial state: `Iterating`
  - `iter` -> Call `reset` and keep the state to `Iterating`
  - serialize/deserialize -> Change the state as `Restored`
- Initial state: `Restored`
  - `iter` -> Only change the state to `Iterating`
  - serialize/deserialize -> Not allowed

Pull Request resolved: https://github.com/pytorch/pytorch/pull/83535
Approved by: https://github.com/NivekT
2022-08-25 19:45:41 +00:00
Robert
5c49c7bbba [WIP] Validating input_col for certain datapipes (#80267)
Follow up from #79344.

Currently WIP due to multiple test failures.

Waiting for #80140 to land
Pull Request resolved: https://github.com/pytorch/pytorch/pull/80267
Approved by: https://github.com/ejguan
2022-08-24 17:34:28 +00:00
Kevin Tse
14b660fcc0 [DataPipe] Correct the type of exception that is being raised by ShufflerMapDataPipe (#82666)
Fixes https://github.com/pytorch/data/issues/708

The following code snippet used to fail, now it has been added as a test case:
```python
dp1 = dp.map.SequenceWrapper(range(10))
shuffle_dp1 = dp1.shuffle()
dp2 = dp.map.SequenceWrapper(range(10))
shuffle_dp2 = dp2.shuffle()
zip_dp = shuffle_dp1.zip(shuffle_dp2)
list(zip_dp)  # This used to fail
```

The issue was that `ShufflerMapDataPipe` raises a `KeyError` when an out of bound index is passed into it, but that was not handled by `zip_dp`'s `__getitem__` which only handled `IndexError`. With this change, it handles both.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/82666
Approved by: https://github.com/ejguan
2022-08-03 19:05:17 +00:00
Kevin Tse
35d97e21c8 [DataPipe] Simple graph snapshotting (#79479)
This mostly completes the "poor man's snapshotting" implementation (named "simple snapshotting"). This is the most basic version of snapshotting but it should work for all DataPipes. I will be adding more efficient implementation for different types of DataPipes in future PRs.

### Implementation

The general idea of the simple snapshot is that we will:
1. Create a new iterator
2. Move that iterator forward by `n_iterations`
3. Save that as the `_fast_forward_iterator` of the DataPipe
4. The next time `iter` is called on the DataPipe, use the `_fast_forward_iterator`

### Usage
As of this implementation, the usage will something like:
```python
rng = torch.Generator()
initial_rng_state = rng.get_state()
datapipe: IterDataPipe = ...
# Some usage of the DataPipe, here maybe yielding the first 5 values
n_iter = 5
it = iter(datapipe)
for _ in range(n_iter):
    next(it)
serialized_graph = pickle.dumps(datapipe)

# The serialized object has most of the sufficient information for simple snapshot (except for initial RNG state)
# It can be deserialized at a later point in time or by a different process
deserialized_graph = pickle.loads(serialized_graph)
# I think `DataLoader2` or `ReadingService` should store `initial_rng_state` that can be saved by the API that we later use
rng_for_deserialized = torch.Generator()
rng_for_deserialized.set_state(initial_rng_state)
n_iterations = deserialized_graph._number_of_samples_yielded

_simple_snapshot_graph(deserialized_graph, n_iterations, rng=rng_for_deserialized)
# The while DataPipe graph should have the same state as before serialization, such that:
self.assertEqual(list(it), list(deserialized_graph))  # True
```

### Next Steps
If this looks acceptable, the next step is I will modify `DataLoader2`'s prototype ReadingService (the one with queues) to remember things like `initial_rng_state` and to have methods `save_snapshot` that will return the `(serialized graph, initial_rng)` and `restore_snapshot`. This should work for single worker data loading.

Note that, in the long term, `initial_rng_state` may not be necessary if we are able to directly save/restore the buffer and RNG state of `Shuffler` (that is work in progress). However, `initial_rng_state` and simple snapshot is still a good fall-back option for some edge cases where the buffer can't be stored.

Differential Revision: [D37943406](https://our.internmc.facebook.com/intern/diff/D37943406)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/79479
Approved by: https://github.com/ejguan
2022-07-23 02:53:15 +00:00
Kevin Tse
428e44ffa1 [DataPipe] Fixes various warnings, exceptions, and clean up testing (#81833)
I went through most of the warnings and exceptions raised in our tests to find these issues.

Changes:
1. In testing, `self.assertEquals` is deprecated, converting to `self.assertEqual` to get rid of the warning
2. Small changes for cleanliness and get rid of warnings (no actual change to result)
3. Correct `is_every_instance_exhausted` logic for `_Forker`
4. Catch `RunTimeError` raised by invalidated iterator during clean up
5. Check if attribute `parent_stream` exists before trying to access it

Differential Revision: [D38020122](https://our.internmc.facebook.com/intern/diff/D38020122)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/81833
Approved by: https://github.com/ejguan
2022-07-21 18:59:40 +00:00
erjia
ccbf04dd5f [DataPipe] Fix fork/unzip with a single child (#81502)
When `Forker` or `Unzipper` only contains a single child, the buffer should be cleaned up. This is one of the root causes for the issue reported internally. See: https://fburl.com/2k0et1gv
Pull Request resolved: https://github.com/pytorch/pytorch/pull/81502
Approved by: https://github.com/VitalyFedyunin, https://github.com/NivekT
2022-07-18 16:53:19 +00:00
Erjia Guan
782f18e9b5 [DLv2] Make graph traverse working with unhashable DataPipe (#80509)
Summary:
This Diff removes the requirement for `traverse` function that `DataPipe` needs to be hash-able. `traverse` function now is using `id` of `DataPipe` instance rather than `DataPipe` itself as the key for both `cache` and graph.

But, it requires the changes of type of `DataPipeGraph` from `Dict[DataPipe, "DataPipeGraph"]` to `Dict[int, Tuple[DataPipe, "DataPipeGraph"]]`.

Differential Revision: D37354153

Ref PR in TorchData: https://github.com/pytorch/data/pull/559
Pull Request resolved: https://github.com/pytorch/pytorch/pull/80509
Approved by: https://github.com/VitalyFedyunin
2022-07-12 14:47:42 +00:00
Vitaly Fedyunin
bcab5257de Expanding DataPipe to support DataFrames (#71931)
Differential Revision: [D37500516](https://our.internmc.facebook.com/intern/diff/D37500516)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/71931
Approved by: https://github.com/ejguan
2022-07-08 18:46:10 +00:00
Kevin Tse
b8e50f512f [DataPipe] Count number of successful yields for IterDataPipe (#79657)
This PR adds an attribute and logic to count the number of successful yields from `IterDataPipe`. This information can be useful to fast-forward a DataPipe (or the entire graph) back to a certain state.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/79657
Approved by: https://github.com/VitalyFedyunin
2022-06-28 17:30:33 +00:00
Erjia Guan
3d218e1c87 Raise warning for unpickable local function (#547) (#80232)
Summary:
X-link: https://github.com/pytorch/data/pull/547

Fixes https://github.com/pytorch/data/issues/538
- Improve the validation function to raise warning about unpickable function when either lambda or local function is provided to DataPipe.
- The inner function from functools.partial object is extracted as well for validation
- Mimic the behavior of pickle module for local lambda function: It would only raise Error for the local function rather than lambda function. So, we will raise warning about local function not lambda function.
```py

>>> import pickle
>>> def fn():
...     lf = lambda x: x
...     pickle.dumps(lf)
>>> pickle.dumps(fn)
AttributeError: Can't pickle local object 'fn.<locals>.<lambda>'
```

This Diff also fixes the Error introduced by https://github.com/pytorch/pytorch/pull/79344

Test Plan:
CI on PyTorch and TorchData
Manually validated the tests from TorchVision

Differential Revision: D37417556

Pull Request resolved: https://github.com/pytorch/pytorch/pull/80232
Approved by: https://github.com/NivekT
2022-06-27 21:47:09 +00:00
PyTorch MergeBot
fcdaf35114 Revert "Add validation for mapper function in datapipes with input_col (#79344)"
This reverts commit 787ac4edf8.

Reverted https://github.com/pytorch/pytorch/pull/79344 on behalf of https://github.com/ejguan due to This PR breaks multiple use cases and the CI from TorchVision becomes red
2022-06-24 17:17:33 +00:00
PyTorch MergeBot
79ba65c0f2 Revert "Raise warning for unpickable local function (#80140)"
This reverts commit 4b75b7d3c1.

Reverted https://github.com/pytorch/pytorch/pull/80140 on behalf of https://github.com/ejguan due to It will break the CI for TorchData
2022-06-24 14:49:06 +00:00
erjia
4b75b7d3c1 Raise warning for unpickable local function (#80140)
Fixes https://github.com/pytorch/data/issues/538

- Improve the validation function to raise warning about unpickable function when either lambda or local function is provided to `DataPipe`.
- The inner function from `functools.partial` object is extracted as well for validation
- Mimic the behavior of `pickle` module for local lambda function: It would only raise Error for the local function rather than `lambda` function. So, we will raise warning about local function not lambda function.
```py
>>> import pickle
>>> def fn():
...     lf = lambda x: x
...     pickle.dumps(lf)
>>> pickle.dumps(fn)
AttributeError: Can't pickle local object 'fn.<locals>.<lambda>'
```
Pull Request resolved: https://github.com/pytorch/pytorch/pull/80140
Approved by: https://github.com/VitalyFedyunin, https://github.com/NivekT
2022-06-24 13:50:51 +00:00
Robert
787ac4edf8 Add validation for mapper function in datapipes with input_col (#79344)
As linked in https://github.com/pytorch/data/issues/362
Pull Request resolved: https://github.com/pytorch/pytorch/pull/79344
Approved by: https://github.com/ejguan, https://github.com/NivekT
2022-06-23 18:49:35 +00:00
Robert Xiu
9fca008809 [DataPipe] Adding functional API for FileLister (#78419)
Fixes #78263

Follow-up from pytorch/data#387. This adds a functional API `list_files()` to `FileListerDataPipe`.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/78419
Approved by: https://github.com/NivekT, https://github.com/ejguan
2022-06-06 17:26:19 +00:00
erjia
9b6cb83b0c Make ShufflerDataPipe deterministic for persistent DL and distributed DL (#78765)
Fixes https://github.com/pytorch/data/issues/426

This PR introduces two main changes:
- It ensures the `ShufflerDataPipe` would share the same seed across distributed processes.
- Users can reset `shuffle` for persistent workers per epoch.

Detail:
- `shared_seed` is shared across distributed and worker processes. It will seed a `shared_rng` to provide seeds to each `ShufflerDataPipe` in the pipeline
- `worker_loop` now accepts a new argument of `shared_seed` to accept this shared seed.
- The `shared_seed` is attached to `_ResumeIteration` for resetting seed per epoch for `persistent worker`
- I choose not to touch `base_seed` simply for BC issue

I used this [script](https://gist.github.com/ejguan/d88f75fa822cb696ab1bc5bc25844f47) to test the result with `world_size=4`. Please check the result in: https://gist.github.com/ejguan/6ee2d2de12ca57f9eb4b97ef5a0e300b

You can see there isn't any duplicated/missing element for each epoch. And, with the same seed, the order of data remains the same across epochs.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/78765
Approved by: https://github.com/VitalyFedyunin
2022-06-06 17:24:00 +00:00
PyTorch MergeBot
129d9dbb15 Revert "Make ShufflerDataPipe deterministic for persistent DL and distributed DL (#78765)"
This reverts commit b769a0e18b.

Reverted https://github.com/pytorch/pytorch/pull/78765 on behalf of https://github.com/janeyx99 due to broke lint on trunk
2022-06-06 14:24:51 +00:00
erjia
b769a0e18b Make ShufflerDataPipe deterministic for persistent DL and distributed DL (#78765)
Fixes https://github.com/pytorch/data/issues/426

This PR introduces two main changes:
- It ensures the `ShufflerDataPipe` would share the same seed across distributed processes.
- Users can reset `shuffle` for persistent workers per epoch.

Detail:
- `shared_seed` is shared across distributed and worker processes. It will seed a `shared_rng` to provide seeds to each `ShufflerDataPipe` in the pipeline
- `worker_loop` now accepts a new argument of `shared_seed` to accept this shared seed.
- The `shared_seed` is attached to `_ResumeIteration` for resetting seed per epoch for `persistent worker`
- I choose not to touch `base_seed` simply for BC issue

I used this [script](https://gist.github.com/ejguan/d88f75fa822cb696ab1bc5bc25844f47) to test the result with `world_size=4`. Please check the result in: https://gist.github.com/ejguan/6ee2d2de12ca57f9eb4b97ef5a0e300b

You can see there isn't any duplicated/missing element for each epoch. And, with the same seed, the order of data remains the same across epochs.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/78765
Approved by: https://github.com/VitalyFedyunin
2022-06-06 13:36:37 +00:00
Kevin Tse
b4a6730ce1 [DataPipe] Refactor 'mux' to have buffer as an instance variable
Pull Request resolved: https://github.com/pytorch/pytorch/pull/77775

Approved by: https://github.com/ejguan
2022-05-19 19:55:27 +00:00
erjia
99f6e614e8 Seed Shuffler for MP DataLoader without explicit manual_seed. (#77855)
Follow up on https://github.com/pytorch/pytorch/pull/77741

This PR guarantees the `Shuffler` in first iteration with MP DataLoader has the same seed across worker processes when users don't specify the seed.
Check newly added tests
Pull Request resolved: https://github.com/pytorch/pytorch/pull/77855
Approved by: https://github.com/NivekT
2022-05-19 17:28:26 +00:00
erjia
365ce350cb Make ShufflerDataPipe deterministic for SP & MP DataLoader (#77741)
This is the first PR to make DataPipe deterministic.

Users should be able to use `torch.manual_seed(seed)` to control the shuffle order for the following cases:
- Directly over `DataPipe`
- For single-process DataLoader
- Multiprocessing DataLoader

Unfortunately, for distributed training, users have to run `apply_shuffle_seed` manually to make sure all distributed processes having the same order of shuffle.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/77741
Approved by: https://github.com/VitalyFedyunin, https://github.com/NivekT
2022-05-18 23:32:07 +00:00