This adds lazy initialization support to ProcessGroupGloo via `TORCH_GLOO_LAZY_INIT` or via `create_device(..., lazy_init=True)`
This is still a draft PR as there's one race condition when doing coalesced operations that needs to be fixed upstream in Gloo first. Depends on https://github.com/facebookincubator/gloo/pull/427 landing first
This also updates the gloo submodule to include the required changes.
Test plan:
added lazy init test variants
```
pytest -v test/distributed/test_c10d_gloo.py -k Lazy
```
Pull Request resolved: https://github.com/pytorch/pytorch/pull/150801
Approved by: https://github.com/fduwjj
This adds a `reduce_scatter` implementation for ProcessGroupGloo. This is a pretty naive implementation as it does 1 allreduce per rank but may be useful for testing in FSDP etc. There was an existing implementation of reduce_scatter_tensor/reduce_scatter_tensor_coalesed that has a very similar implementation but requires a fixed tensor size per rank.
If users find these functions to be too slow we can address them as issues arise.
Gloo now supports all major distributed operations. Quite a few of these were added by @rohan-varma and @yifuwang but they didn't update the support chart. We also have `CUDAWork` variants of most operations so those were also added to the chart.
Test plan:
```
pytest -v test/distributed/test_c10d_gloo.py -k reduce_scatter
```
Pull Request resolved: https://github.com/pytorch/pytorch/pull/149869
Approved by: https://github.com/fduwjj
### Motivation:
As design illustrated in Intel distributed support RFC https://github.com/pytorch/pytorch/issues/141741, two sections are needed to enable intel distributed backend (`XCCL`) support in PyTorch.
1. Intel GPU distributed Backend integration in PyTorch `torch-xpu-ops`.
2. **Intel distributed Backend register in PyTorch distributed package**. This PR is to contribute section 2 change.
### Example:
Here is a simple example of using spawn to launch XCCL backend and perform allreduce on XPU tensors.
```
import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
def setup(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'
dist.init_process_group(rank=rank, world_size=world_size)
def cleanup():
dist.destroy_process_group()
def run_allreduce(rank, world_size):
setup(rank, world_size)
device = torch.device('xpu:{}'.format(rank))
x = torch.randn([2, 2], device=device)
dist.all_reduce(x)
cleanup()
if __name__ == '__main__':
world_size = 2
mp.spawn(run_allreduce, args=(world_size,), nprocs=world_size, join=True)
```
Pull Request resolved: https://github.com/pytorch/pytorch/pull/141856
Approved by: https://github.com/kwen2501, https://github.com/gujinghui, https://github.com/albanD
Doc updates:
* This adds documentation for the object oriented ProcessGroup APIs that are being used in torchft as well as https://github.com/pytorch/rfcs/pull/71 .
* It also does some general cleanups to simplify the distributed.rst by using `:methods`.
* It adds `__init__` definitions for the Stores
* I've reordered things so the collective APIs are before the Store/PG apis
Test plan:
```
lintrunner -a
cd docs && sphinx-autobuild source build/ -j auto -WT --keep-going
```
Pull Request resolved: https://github.com/pytorch/pytorch/pull/140853
Approved by: https://github.com/kwen2501
reland of https://github.com/pytorch/pytorch/pull/133113
I have to create a new PR because the previous reverted PR could not either be rebased, or imported successfully :(
----
Moving DTensor to be in the public namespace, to formally add the documentation page that includes all the public APIs. This includes:
* many path renames and path import fixes
* a dedicated doc page without too much content yet (adding in the next PRs)
* To preserve the BC for users still using the torch.distributed._tensor, I added a shim script to redirect old path calls to the new module
The BC preserving is evidented by the fact that all DTensor tests are still working without changing the public imports. So it's safe to land the changes
Pull Request resolved: https://github.com/pytorch/pytorch/pull/134203
Approved by: https://github.com/tianyu-l
Moving DTensor to be in the public namespace, to formally add the
documentation page that includes all the public APIs. This includes:
* many path renames and path import fixes
* a dedicated doc page without too much content yet (adding in the next
PRs)
* To preserve the BC for users still using the `torch.distributed._tensor`,
I added a shim script to redirect old path calls to the new module
The BC preserving is evidented by the fact that all DTensor tests are still
working without changing the public imports. So it's safe to land the
changes
Pull Request resolved: https://github.com/pytorch/pytorch/pull/133113
Approved by: https://github.com/XilunWu
ghstack dependencies: #133305, #133306
Loss parallel is the last piece of sequence parallelism to enable. It enables efficient distributed cross entropy computation when the input is sharded on the class dimension (in a classification problem with many classes). The implementation is via a context manager `loss_parallel`, after enabling which users can directly use `torch.nn.functional.cross_entropy` or `torch.nn.CrossEntropyLoss` without modifying other parts of their code.
Here are the underlying rationales why we are going through these op replacements:
1. `nn.functional.cross_entropy` is the common method that OSS user is using for things like transformer training, to avoid changing user code, we want user to still use this function for loss calculation if they are already using it.
2. `nn.functional.cross_entropy` boils down into `aten.log_softmax` and `aten.nll_loss_foward/backward`, and DTensor now supports those ops already (#117723#119255#118917#119256). They are doing computation with input *replicated* on the class dimension.
3. However when the input of this loss calculation is **sharded on the class dimension**, to run sharded computation efficiently, we need to run both `aten.log_softmax` and `aten.nll_loss_foward` with multiple all-reduce collectives **in the middle of** those aten ops. This is not possible if we are just overriding these two ops, so we need to have some way to **decompose** these two ops into smaller ops to have collectives run in the middle of these two ops.
4. We explored the existing decompositions (#118950). It seems working, except that `log_softmax_backward` and `nll_loss_backward` combined together in aten are implemented in a inefficient way, which would trigger an additional expensive collective. Recently some user also reported similar issues https://github.com/pytorch/pytorch/issues/119261.
5. Therefore, currently we are doing our own decomposition inside a context manager for sequence parallelism specifically. Once we have a better decomposition in core, we can possibly take that instead of reinventing the wheels here.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/119877
Approved by: https://github.com/wanchaol
Summary:
Rename _device_mesh.py to device_mesh.py, update all callsites, add documentation.
We created stubs for public class and methods in torch.distributed.device_mesh so that torch.distributed.device_mesh can be imported with or without distributed is available().
Original diff reverted: D51629761
Original PR reverted: https://github.com/pytorch/pytorch/pull/115099
Prior to landing, CI signals are all passed. Shipit added the "ci/trunk" label to the PR and DID NOT wait for it and went ahead committing. More context can be found in the reverted PR above.
Test Plan: CI.
Differential Revision: D51861018
Pull Request resolved: https://github.com/pytorch/pytorch/pull/115193
Approved by: https://github.com/fegin
Adds a useful high level wrapper for calling `dist.save/load` with the correct storage readers and writers.
Instead of doing:
```
DCP.save(
state_dict={...},
storage_writer=StorageWriter(...)
)
DCP.load(
state_dict={...},
storage_reader=StorageReader(...)
)
```
We can now do:
```
checkpointer = Checkpointer(...)
checkpointer.save(state_dict={...})
checkpointer.load(state_dict={...})
```
Pull Request resolved: https://github.com/pytorch/pytorch/pull/114603
Approved by: https://github.com/fegin, https://github.com/wz337
Summary:
Documenting the `Work` object
For a collective (broadcast, all_reduce, etc.) when async_op=True we return a `Work` object to which users can call `.wait()`, `.is_success()`, among other things but this class is not documented
Test Plan: Preview the docs build in OSS
Differential Revision: D51854974
Pull Request resolved: https://github.com/pytorch/pytorch/pull/115172
Approved by: https://github.com/wconstab
Summary:
Rename _device_mesh.py to device_mesh.py, update all callsites, adds documentation.
Original diff reverted: D51629761
Original PR reverted: https://github.com/pytorch/pytorch/pull/114991
It was failing because failing a public module binding tests in MacOS, and this is due to the change in import order for torch/distributed/fsdp/_common_utils.py. Since this original import would still work, we remove the changes in this file.
Test Plan: CI.
Differential Revision: D51825114
Pull Request resolved: https://github.com/pytorch/pytorch/pull/115099
Approved by: https://github.com/wanchaol, https://github.com/fegin
Add non-package python modules to the public API checks.
The original change is to remove the `ispkg` check in this line
https://github.com/pytorch/pytorch/blob/main/docs/source/conf.py#L518
Everything else is to add the appropriate modules to the rst files, make sure every module we provide can be imported (fixed by either making optional dependencies optional or just deleting files that have been un-importable for 3 years), make API that are both modules and functions (like torch.autograd.gradcheck) properly rendered on the docs website without confusion and add every non-documented API to the allow list (~3k of them).
Next steps will be to try and fix these missing docs
Pull Request resolved: https://github.com/pytorch/pytorch/pull/110568
Approved by: https://github.com/zou3519
We have a plethora of error types for various errors raised from c10d. These include `RuntimeError`, `TimeoutError`, `SocketError`, `DistBackendError` etc.
This results in messy code during error handling somewhat like this:
```
if "NCCL" in exception_str:
...
if "Timed out initializing process group in store based barrier on rank" in exception_str:
...
if "The client socket has timed out after" in exception_str:
...
if "Broken pipe" in exception_str:
...
if "Connection reset by peer" in exception_str:
...
```
To address this issue, in this PR I've ensured added these error types:
1. **DistError** - the base type of all distributed errors
2. **DistBackendError** - this already existed and referred to PG backend errors
3. **DistStoreError** - for errors originating from the store
4. **DistNetworkError** - for general network errors coming from the socket library
Pull Request resolved: https://github.com/pytorch/pytorch/pull/108191
Approved by: https://github.com/H-Huang
We have a plethora of error types for various errors raised from c10d. These include `RuntimeError`, `TimeoutError`, `SocketError`, `DistBackendError` etc.
This results in messy code during error handling somewhat like this:
```
if "NCCL" in exception_str:
...
if "Timed out initializing process group in store based barrier on rank" in exception_str:
...
if "The client socket has timed out after" in exception_str:
...
if "Broken pipe" in exception_str:
...
if "Connection reset by peer" in exception_str:
...
```
To address this issue, in this PR I've ensured added these error types:
1. **DistError** - the base type of all distributed errors
2. **DistBackendError** - this already existed and referred to PG backend errors
3. **DistStoreError** - for errors originating from the store
4. **DistNetworkError** - for general network errors coming from the socket library
Pull Request resolved: https://github.com/pytorch/pytorch/pull/107651
Approved by: https://github.com/H-Huang
Currently all of the distributed errors are thrown from the `TORCH_CHECK` macro which throws a generic `RuntimeError`. This change introduced a new error type `DistBackendError` which derives from `RuntimeError` to signify there was an error with the backend communication library. This allows for better error handling and analysis at higher levels in the stack. Motivation: https://docs.google.com/document/d/1j6VPOkC6znscliFuiDWMuMV1_fH4Abgdq7TCHMcXai4/edit#heading=h.a9rc38misyx8
Changes:
- introduce new error type
- Update `C10D_NCCL_CHECK`
Sample script to demonstrate new error type
```python
# python -m torch.distributed.run --nproc_per_node=2 <script>.py
import torch
import torch.distributed as dist
if __name__ == "__main__":
dist.init_process_group("nccl")
dist.broadcast(torch.tensor([1, 2, 3]).cuda(), 0)
```
Differential Revision: [D40998803](https://our.internmc.facebook.com/intern/diff/D40998803)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/88134
Approved by: https://github.com/rohan-varma
### Deprecation reasons:
- For most users training is on one GPU per process so these APIs are rarely used
- They added one more API dimension
- They can be expressed in a composed manner
- They are not abstracted – specific to GPU
- They caused backend APIs and implementations to have nested `std::vector<std::vector<Tensor>>`, which is hard to read or maintain
Pull Request resolved: https://github.com/pytorch/pytorch/pull/85961
Approved by: https://github.com/XilunWu, https://github.com/H-Huang
### Description
- This PR renames `_all_gather_base` to `all_gather_into_tensor` so that it is clearer in meaning.
- The `all_gather_into_tensor` API differs from the `all_gather` API in the output it accepts -- a single, large tensor instead of a list of tensors.
- This PR also adds deprecation warning to `_all_gather_base`.
### Issue
`_all_gather_base` was implemented in https://github.com/pytorch/pytorch/pull/33924 to avoid unnecessary flattening. There was previous effort (#82639) to merge `_all_gather_base` with the existing `all_gather` API by detecting the parameter type passed in for the output.
There are, however, two "blockers" that make the merge difficult:
(i) The merge leads to backward compatibility break. We would need to change the parameter name `tensor_list` in `all_gather` to a general name `output` that can cover both tensor and tensor list.
(ii) Recently, the `all_gather` API has added uneven tensor support, utilizing the tensor boundaries implied by the list. We are, however, not sure to add such support to the `_all_gather_base` function, because that would require users to pass in additional tensor boundary information.
In view of the above, we decided to productize `_all_gather_base` as a separate function, but with a clearer name.
### Testing
Added tests:
- `test_all_gather_into_cat_tensor_cuda` -- output form as with `torch.cat`. For example:
```
>>> tensor_in
tensor([1, 2], device='cuda:0') # Rank 0
tensor([3, 4], device='cuda:1') # Rank 1
>>> tensor_out
tensor([1, 2, 3, 4], device='cuda:0') # Rank 0
tensor([1, 2, 3, 4], device='cuda:1') # Rank 1
```
- `test_all_gather_into_stack_tensor_cuda` -- output form as with `torch.stack`. For example:
```
>>> tensor_out2
tensor([[1, 2],
[3, 4]], device='cuda:0') # Rank 0
tensor([[1, 2],
[3, 4]], device='cuda:1') # Rank 1
```
The output form is determined by the shape of the output tensor passed by the user, no flag used.
Cc @rohan-varma @mrshenli @crcrpar @ptrblck @H-Huang
Pull Request resolved: https://github.com/pytorch/pytorch/pull/85686
Approved by: https://github.com/rohan-varma, https://github.com/crcrpar
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/74006
updated recommendations about environment variables to use during debug
and performance tuning
Test Plan: `make html`
Reviewed By: rohan-varma
Differential Revision: D34767454
fbshipit-source-id: 08cd58469bf72b58702e50e82020fa19b43b5911
(cherry picked from commit ac7e6630f8043f85d3d16be17c6a8ad1ebb2990c)
Summary:
Working towards https://docs.google.com/document/d/10yx2-4gs0gTMOimVS403MnoAWkqitS8TUHX73PN8EjE/edit?pli=1#
This PR:
- Ensure that all the submodules are listed in a rst file (that ensure they are considered by the coverage tool)
- Remove some long deprecated code that just error out on import
- Remove the allow list altogether to ensure nothing gets added back there
Pull Request resolved: https://github.com/pytorch/pytorch/pull/73983
Reviewed By: anjali411
Differential Revision: D34787908
Pulled By: albanD
fbshipit-source-id: 163ce61e133b12b2f2e1cbe374f979e3d6858db7
(cherry picked from commit c9edfead7a01dc45bfc24eaf7220d2a84ab1f62e)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/73361
This PR adds the documentation for the newly introduced `TORCH_CPP_LOG_LEVEL` and how it can be used along with `TORCH_DISTRIBUTED_DEBUG` to adjust the log level of c10d.
ghstack-source-id: 149874995
Test Plan: Locally rendered and checked the documentation.
Reviewed By: rohan-varma
Differential Revision: D34452352
fbshipit-source-id: ecb54590f3030ddef9921a7152ca9f7fc9438345
(cherry picked from commit f4c7c6f3b27dbd3006686cf26a6e9e53cd2c8f09)