Summary:
fix https://github.com/pytorch/pytorch/issues/40604
Add parameter to Dataloader to configure the per-worker prefetch number.
Before this edit, the prefetch process always prefetch 2 * num_workers data items, this commit help us make this configurable, e.x. you can specify to prefetch 10 * num_workers data items.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/41130
Reviewed By: izdeby
Differential Revision: D22705288
Pulled By: albanD
fbshipit-source-id: 2c483fce409735fef1351eb5aa0b033f8e596561
Summary:
Based on discussion with jlucier (https://github.com/pytorch/pytorch/pull/38925#issuecomment-655859195) . `batch_size` change isn't made because data loader only has the notion of `batch_sampler`, not batch size. If `batch_size` dependent sharding is needed, users can still access it from their own code.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/41175
Differential Revision: D22456525
Pulled By: zou3519
fbshipit-source-id: 5281fcf14807f219de06e32107d5fe7d5b6a8623
Summary:
There still are occasional reports of DataLoader workers not exiting (e.g., https://github.com/pytorch/pytorch/issues/39570). Before we figure out why, we should just kill them if the join timesout to prevent hanging.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/39869
Differential Revision: D22018501
Pulled By: ezyang
fbshipit-source-id: 66a00d0f5b3e303b6106b336949176b3ff8ac8ae
Summary:
Since the check was added in https://github.com/pytorch/pytorch/pull/6249, one can not pass an iterable as a sampler to the data loader anymore, which was a very handy feature (e.g., https://github.com/pytorch/pytorch/issues/1337). I think the check should be removed for two-fold reasons:
1. It is too strict. There is no reason that it should not be a general iterable.
2. It is inconsistent. In `DataLoader` (the main place where people use samplers), you can pass a general iterable as `batch_sampler` but not `sampler` due to this check.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/38403
Differential Revision: D21555958
Pulled By: soumith
fbshipit-source-id: c7267bb99a31edd8f2750689205d6edc5dab5cff
Summary:
Fixes https://github.com/pytorch/pytorch/issues/973
Common failure scenario:
* DataLoader creates workers and communicates with them through SHMs
* Workers send back through an AF_UNIX socket file descriptors to SHMs containing data
* The limit of open files gets fully used
* A FD gets stripped from a socket message coming back from a worker, without the worker knowing this.
* This causes a `RuntimeError: received 0 items of ancdata` in the standard `multiprocessing` package
* The exception is not handled by PyTorch and so is presented to the users.
After this change the user will see
```
Traceback (most recent call last):
File "/home/wbaranowski/git/Quansight/pytorch/torch/utils/data/dataloader.py", line 761, in _try_get_data
data = self._data_queue.get(timeout=timeout)
File "/home/wbaranowski/miniconda3/envs/pytorch-cuda-dev/lib/python3.6/multiprocessing/queues.py", line 113, in get
return _ForkingPickler.loads(res)
File "/home/wbaranowski/git/Quansight/pytorch/torch/multiprocessing/reductions.py", line 294, in rebuild_storage_fd
fd = df.detach()
File "/home/wbaranowski/miniconda3/envs/pytorch-cuda-dev/lib/python3.6/multiprocessing/resource_sharer.py", line 58, in detach
return reduction.recv_handle(conn)
File "/home/wbaranowski/miniconda3/envs/pytorch-cuda-dev/lib/python3.6/multiprocessing/reduction.py", line 184, in recv_handle
return recvfds(s, 1)[0]
File "/home/wbaranowski/miniconda3/envs/pytorch-cuda-dev/lib/python3.6/multiprocessing/reduction.py", line 162, in recvfds
len(ancdata))
RuntimeError: received 0 items of ancdata
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/wbaranowski/git/Quansight/pytorch/torch/utils/data/dataloader.py", line 787, in _try_get_data
fs = [tempfile.NamedTemporaryFile() for i in range(10)]
File "/home/wbaranowski/git/Quansight/pytorch/torch/utils/data/dataloader.py", line 787, in <listcomp>
fs = [tempfile.NamedTemporaryFile() for i in range(10)]
File "/home/wbaranowski/miniconda3/envs/pytorch-cuda-dev/lib/python3.6/tempfile.py", line 551, in NamedTemporaryFile
(fd, name) = _mkstemp_inner(dir, prefix, suffix, flags, output_type)
File "/home/wbaranowski/miniconda3/envs/pytorch-cuda-dev/lib/python3.6/tempfile.py", line 262, in _mkstemp_inner
fd = _os.open(file, flags, 0o600)
OSError: [Errno 24] Too many open files: '/tmp/tmpnx_f6v_f'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "test_shm_leak.py", line 56, in <module>
worker_init_fn=worker_init_fn
File "/home/wbaranowski/git/Quansight/pytorch/torch/utils/data/dataloader.py", line 345, in __next__
data = self._next_data()
File "/home/wbaranowski/git/Quansight/pytorch/torch/utils/data/dataloader.py", line 861, in _next_data
idx, data = self._get_data()
File "/home/wbaranowski/git/Quansight/pytorch/torch/utils/data/dataloader.py", line 828, in _get_data
success, data = self._try_get_data()
File "/home/wbaranowski/git/Quansight/pytorch/torch/utils/data/dataloader.py", line 791, in _try_get_data
"Too many open files. Communication with the"
RuntimeError: Too many open files. Communication with the workers is no longer possible. Please increase the limit using `ulimit -n` in the shell or change the sharing strategy by calling `torch.multiprocessing.set_sharing_strategy('file_system')` at the beginning of your code
```
Pull Request resolved: https://github.com/pytorch/pytorch/pull/34768
Differential Revision: D20538053
Pulled By: ezyang
fbshipit-source-id: be4425cf2fa02aff61619b2b829c153cb1a867cb
Summary:
The following script reproduces the hang
```py
import multiprocessing, logging
logger = multiprocessing.log_to_stderr()
logger.setLevel(multiprocessing.SUBDEBUG)
import torch
class Dataset:
def __len__(self):
return 23425
def __getitem__(self, idx):
return torch.randn(3, 128, 128), idx % 100
ds = Dataset()
trdl = torch.utils.data.DataLoader(ds, batch_size=64, num_workers=300, pin_memory=True, shuffle=True)
for e in range(1000):
for ii, (x, y) in enumerate(trdl):
print(f'tr {e: 5d} {ii: 5d} avg y={y.mean(dtype=torch.double).item()}')
if ii % 2 == 0:
print("="*200 + "BEFORE ERROR" + "="*200)
1/0
```
The process will hang at joining the putting thread of `data_queue` in **main process**. The root cause is that too many things are put in the queue from the **worker processes**, and the `put` at 062ac6b472/torch/utils/data/dataloader.py (L928) is blocked at background thread. The `pin_memory_thread` exits from the set `pin_memory_thread_done_event`, without getting the `(None, None)`. Hence, the main process needs the same treatment as the workers did at
062ac6b472/torch/utils/data/_utils/worker.py (L198) .
After the patch, the script finishes correctly.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/33721
Differential Revision: D20089209
Pulled By: ezyang
fbshipit-source-id: e73fbfdd7631afe1ce5e1edd05dbdeb7b85ba961
Summary:
Copy-paste comment from code for reasoning:
```
# NOTE [ IterableDataset and __len__ ]
#
# For `IterableDataset`, `__len__` could be inaccurate when one naively
# does multi-processing data loading, since the samples will be duplicated.
# However, no real use case should be actually using that behavior, so
# it should count as a user error. We should generally trust user
# code to do the proper thing (e.g., configure each replica differently
# in `__iter__`), and give us the correct `__len__` if they choose to
# implement it (this will still throw if the dataset does not implement
# a `__len__`).
#
# To provide a further warning, we track if `__len__` was called on the
# `DataLoader`, save the returned value in `self._len_called`, and warn
# if the iterator ends up yielding more than this number of samples.
```
Fixes https://github.com/pytorch/pytorch/issues/30184
Pull Request resolved: https://github.com/pytorch/pytorch/pull/23587
Differential Revision: D18852625
Pulled By: ailzhang
fbshipit-source-id: aea8d4d70c7f21aaa69b35908a6f43026493d826
Summary:
Back in April, malmaud added type annotations for `dataloader.py`. However, at about the same time, SsnL in https://github.com/pytorch/pytorch/issues/19228 replaced `_DataLoaderIter` with `_BaseDataLoaderIter` and two subclasses, `_SingleProcessDataLoaderIter`, and `_MultiProcessingDataLoaderIter`. However - probably because these changes happened in parallel at roughly the same time, the type stubs and several other references in the codebase were never updated to match this refactoring.
I've gone ahead and done the updates to reflect the refactoring in https://github.com/pytorch/pytorch/issues/19228, which fixes the specific type stub/impelementation mismatch pointed out in https://github.com/pytorch/pytorch/issues/26673, although not the broader problem that pytorch doesn't have a test to make sure that the `.pyi` type stub files match the real API defined in `.py` files.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/27105
Differential Revision: D17813641
Pulled By: ezyang
fbshipit-source-id: ed7ac025c8d6ad3f298dd073347ec83bb4b6600c
Summary:
1. Prefixed underscores to any `DataLoaderIter` attribute that is not part of the data loader ctor argument list.
2. Prefixed `DataLoader.dataset_kind` with underscore because it only makes sense with the private enum `_DatasetKind`, and is an implementation detail.
3. Disallow setting `DataLoader.dataset` and `DataLoader.batch_sampler` after initializing a `DataLoader` because they affect other attributes in `__init__`.
These changes should not have major BC breaking effect since the big changes are on the iterator class and most users don't even store it. I GitHub searched `pin_memory_thread` and (while I didn't look through all result pages) results I see are forks of pytorch and blog posts on how data loader works.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/23744
Differential Revision: D16732507
Pulled By: ezyang
fbshipit-source-id: 9f04d000b4200b8047f31eaa3473780b66cebd26
Summary:
Otherwise you may see errors like
```
Exception ignored in: <function _MultiProcessingDataLoaderIter.__del__ at 0x000001F99F5CB9D8>
Traceback (most recent call last):
File "C:\Users\Divyansh J\Anaconda3\envs\pytorch\lib\site-packages\torch\utils\data\dataloader.py", line 883, in __del__
self._shutdown_workers()
File "C:\Users\Divyansh J\Anaconda3\envs\pytorch\lib\site-packages\torch\utils\data\dataloader.py", line 860, in _shutdown_workers
if self.workers_status[worker_id]:
IndexError: list index out of range
```
e.g. https://discuss.pytorch.org/t/how-to-construct-dataset-with-iterator-for-multi-process-dataloader/49612/5
Pull Request resolved: https://github.com/pytorch/pytorch/pull/23761
Differential Revision: D16644687
Pulled By: soumith
fbshipit-source-id: a60e847431264525079456ff422317af1ac2be4b
Summary:
When an exception occurs in one of the modules passed to `parallel_apply()`, it is caught and re-raised in the main thread. This preserves the original exception type and message, but has the traceback point at the position where it's re-raised, rather than the original point of failure.
This PR saves the exception information required to generate the traceback, and includes the original traceback in the message of the exception raised in the main thread.
Before:
```
...
File ".../torch/nn/parallel/data_parallel.py", line 153, in parallel_apply
return parallel_apply(replicas, inputs, kwargs, self.device_ids[:len(replicas)])
File ".../torch/nn/parallel/parallel_apply.py", line 84, in parallel_apply
raise output
RuntimeError: expected type torch.FloatTensor but got torch.cuda.FloatTensor
```
After:
```
...
File ".../torch/nn/parallel/data_parallel.py", line 153, in parallel_apply
return parallel_apply(replicas, inputs, kwargs, self.device_ids[:len(replicas)])
File ".../torch/nn/parallel/parallel_apply.py", line 88, in parallel_apply
''.join(traceback.format_exception(*exc_info)))
RuntimeError: Caught exception in replica 0. Original traceback and message:
Traceback (most recent call last):
...
File "../models/foo.py", line 319, in bar
baz = asdf / ghij[:, np.newaxis]
RuntimeError: expected type torch.FloatTensor but got torch.cuda.FloatTensor
```
I took care to raise an exception of the original type (in case the main code checks for that), but replaced the message. It helped me find a bug that did not occur outside `data_parallel()`.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/18055
Differential Revision: D16444972
Pulled By: zhangguanheng66
fbshipit-source-id: ec436c9d4677fad18106a8046cfa835a20a101ce
Summary:
This is a modified version of https://github.com/pytorch/pytorch/pull/14705 since commit structure for that PR is quite messy.
1. Add `IterableDataset`.
3. So we have 2 data loader mods: `Iterable` and `Map`.
1. `Iterable` if the `dataset` is an instance of `IterableDataset`
2. `Map` o.w.
3. Add better support for non-batch loading (i.e., `batch_size=None` and `batch_sampler=None`). This is useful in doing things like bulk loading.
3. Refactor `DataLoaderIter` into two classes, `_SingleProcessDataLoaderIter` and `_MultiProcessingDataLoaderIter`. Rename some methods to be more generic, e.g., `get_batch` -> `get_data`.
4. Add `torch.utils.data.get_worker_info` which returns worker information in a worker proc (e.g., worker id, dataset obj copy, etc.) and can be used in `IterableDataset.__iter__` and `worker_init_fn` to do per-worker configuration.
5. Add `ChainDataset`, which is the analog of `ConcatDataset` for `IterableDataset`.
7. Import torch.utils.data in `torch/__init__.py`
9. data loader examples and documentations
10. Use `get_worker_info` to detect whether we are in a worker process in `default_collate`
Closes https://github.com/pytorch/pytorch/issues/17909, https://github.com/pytorch/pytorch/issues/18096, https://github.com/pytorch/pytorch/issues/19946, and some of https://github.com/pytorch/pytorch/issues/13023
Pull Request resolved: https://github.com/pytorch/pytorch/pull/19228
Reviewed By: bddppq
Differential Revision: D15058152
fbshipit-source-id: 9e081a901a071d7e4502b88054a34b450ab5ddde
Summary:
Resubmit #20698 which got messed up.
Idea is that when PyTorch is used in a custom build environment (e.g. Facebook), it's useful to track usage of various APIs centrally. This PR introduces a simple very lightweight mechanism to do so - only first invocation of a trigger point would be logged. This is significantly more lightweight than #18235 and thus we can allow to put logging in e.g. TensorImpl.
Also adds an initial list of trigger points. Trigger points are added in such a way that no static initialization triggers them, i.e. just linking with libtorch.so will not cause any logging. Further suggestions of what to log are welcomed.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/20745
Differential Revision: D15429196
Pulled By: dzhulgakov
fbshipit-source-id: a5e41a709a65b7ebccc6b95f93854e583cf20aca
Summary:
It's been hard to understand how workers are launched and what code runs in the worker vs. main process, especially on Windows, which leads to many of our samples failing. This explains when workers run an how to make code work on Windows as well.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/18091
Differential Revision: D15083766
Pulled By: soumith
fbshipit-source-id: 8a7e60defc8a72ec63874f657d7d5267d951dccf
Summary:
Also
1. Bump multiprocessing test timeout following python core tests
2. Fix one type of flakiness in `test_proper_exit`.
3. Add trace reporting when loader process hangs in `test_proper_exit` using `faulthandler`.
3. Give `test_proper_exit` another try.
I'll heavily retest this.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/19421
Differential Revision: D15063728
Pulled By: ezyang
fbshipit-source-id: 4e0d992622e11053c44a9ec237b88b9a28a4472c
Summary:
Renewed attempt at https://github.com/pytorch/pytorch/pull/14171
From the original PR:
> Currently, the pin_memory_batch function in the dataloader will return a batch comprised of any unrecognized type without pinning the data, because it doesn't know how.
>
>This behavior was preventing us from overlapping data prefetching in Mask-RCNN, whose custom collate_fn returns a custom batch type.
The old PR allowed the user to implement batch pinning for custom batch and data types by passing a custom pin function to the dataloader. slayton58 suggested a cleaner approach: allow the user to define a `pin_memory` method on their custom types, and have `pin_memory_batch` [check for the presence of that method](https://github.com/pytorch/pytorch/pull/16743/files#diff-9f154cbd884fe654066b1621fad654f3R56) in the incoming batch as a fallback. I've updated the test and docstrings accordingly.
The old PR was merged but then reverted due to weird cuda OOM errors on windows that may or may not have been related. I have no idea why my changes would cause such errors (then or now) but it's something to keep an eye out for.
fmassa and yf225 who were my POCs on the old PR.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/16743
Differential Revision: D13991745
Pulled By: ezyang
fbshipit-source-id: 74e71f62a03be453b4caa9f5524e9bc53467fa17
Summary:
1. Improve error message for better debugging info
2. Increase timeout
3. Also apply the windows worker failure detection mechanism on non-Windows platforms, for better robustness
Attempt to fix#14501
cc ezyang
Pull Request resolved: https://github.com/pytorch/pytorch/pull/16249
Differential Revision: D13784702
Pulled By: ezyang
fbshipit-source-id: 09a7cff83ab9edce561ed69f9fb555ab35d1275f
Summary:
Same as #14668, and was approved there.
ailzhang , please apply this patch to Horizon's `data_streamer.py`: https://gist.github.com/SsnL/020fdb3d6b7016d81b6ba1d04cc41459 Thank you!
Below is the original description at #14668:
As I am working on tasks in https://github.com/pytorch/pytorch/issues/13023, I realized how unreadable the code is because all functions to be run in multiprocessing must be at top global level. Adding more functionalities to `dataloader.py` will only make things worse.
So in this PR, I refactor `dataloader.py` and move much of it into `data._utils`. E.g., the `_worker_loop` and related methods are now in `data._utils.worker`, signal handling code in `data._utils.signal_handling`, collating code in `data._utils.collate`, etc. This split, IMHO, makes code much clearer. I will base my future changes to DataLoader on top of this.
No functionality is changed, except that I added `torch._six.queue`.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/15331
Reviewed By: yf225
Differential Revision: D13503120
Pulled By: ailzhang
fbshipit-source-id: 94df16b4d80ad1102c437cde0d5a2e62cffe1f8e
Summary:
As I am working on tasks in https://github.com/pytorch/pytorch/issues/13023, I realized how unreadable the code is because all functions to be run in multiprocessing must be at top global level. Adding more functionalities to `dataloader.py` will only make things worse.
So in this PR, I refactor `dataloader.py` and move much of it into `data._utils`. E.g., the `_worker_loop` and related methods are now in `data._utils.worker`, signal handling code in `data._utils.signal_handling`, collating code in `data._utils.collate`, etc. This split, IMHO, makes code much clearer. I will base my future changes to DataLoader on top of this.
No functionality is changed, except that I added `torch._six.queue`.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/14668
Reviewed By: soumith
Differential Revision: D13289919
Pulled By: ailzhang
fbshipit-source-id: d701bc7bb48f5dd7b163b5be941a9d27eb277a4c
Summary:
Currently, the `pin_memory_batch` function in the dataloader will return a batch comprised of any unrecognized type without pinning the data, because it doesn't know how.
This behavior was preventing us from overlapping data prefetching in Mask-RCNN, whose custom `collate_fn` returns a custom batch type.
The present PR adds the ability for the user to pass a `pin_fn` alongside any custom `collate_fn` to handle such custom types.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/14171
Differential Revision: D13166669
Pulled By: soumith
fbshipit-source-id: ca965f9841d4a259b3ca4413c8bd0d8743d433ab
Summary:
I struggled with yet another DataLoader hang for the entire evening. After numerous experiments, I realized that it is unsafe to do anything when Python is shutting down. We also unfortunately implement our DataLaoder cleaning-up logic in `__del__`, a function that may or may not be called during shutdown, and if called, may or may not be called before core library resources are freed.
Fortunately, we are already setting all our workers and pin_memory_thread as daemonic. So in case of Python shutting down, we can just do a no-op in `__del__` and rely on the automatic termination of daemonic children.
An `atexit` hook is used to detect Python exit.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/12700
Differential Revision: D10419027
Pulled By: SsnL
fbshipit-source-id: 5753e70d03e69eb1c9ec4ae2154252d51e2f79b0
Summary:
Current behavior is that each process (main and workers) will print trace from `KeyboardInterrupt`. And the main process will also print
```
RuntimeError: DataLoader worker (pid 46045) exited unexpectedly with exit code 1. Details are lost due to multiprocessing. Rerunning with nm_workers=0 may give better error trace.
```
due to our SIGCLD handler.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/11718
Differential Revision: D9840844
Pulled By: SsnL
fbshipit-source-id: 1a05060bb02907fef5aac3f274d2c84f9f42d187
Summary:
`Process.start()` actually take some time as it needs to start a
process and pass the arguments over via a pipe. Therefore, we
only add a worker to self.workers list after it started, so
that we do not call `.join()` if program dies before it starts,
and `__del__` tries to join it but will get:
AssertionError: can only join a started process.
Example trace when such error happens:
```py
[unrelated]
File "/private/home/ssnl/miniconda3/lib/python3.7/site-packages/torch/utils/data/dataloader.py", line 500, in __iter__
return _DataLoaderIter(self)
File "/private/home/ssnl/miniconda3/lib/python3.7/site-packages/torch/utils/data/dataloader.py", line 292, in __init__
w.start()
File "/private/home/ssnl/miniconda3/lib/python3.7/multiprocessing/process.py", line 112, in start
self._popen = self._Popen(self)
File "/private/home/ssnl/miniconda3/lib/python3.7/multiprocessing/context.py", line 223, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "/private/home/ssnl/miniconda3/lib/python3.7/multiprocessing/context.py", line 277, in _Popen
return Popen(process_obj)
File "/private/home/ssnl/miniconda3/lib/python3.7/multiprocessing/popen_fork.py", line 20, in __init__
self._launch(process_obj)
File "/private/home/ssnl/miniconda3/lib/python3.7/multiprocessing/popen_fork.py", line 70, in _launch
self.pid = os.fork()
KeyboardInterrupt
Exception ignored in: <function _DataLoaderIter.__del__ at 0x7fa704d5aa60>
Traceback (most recent call last):
File "/private/home/ssnl/miniconda3/lib/python3.7/site-packages/torch/utils/data/dataloader.py", line 398, in __del__
self._shutdown_workers()
File "/private/home/ssnl/miniconda3/lib/python3.7/site-packages/torch/utils/data/dataloader.py", line 392, in _shutdown_workers
w.join()
File "/private/home/ssnl/miniconda3/lib/python3.7/multiprocessing/process.py", line 139, in join
assert self._popen is not None, 'can only join a started process'
AssertionError: can only join a started process
```
No test because hard to reliably trigger.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/11432
Reviewed By: ezyang
Differential Revision: D9735430
Pulled By: SsnL
fbshipit-source-id: a8912d9bb4063f210d6236267b178173810e2351