Summary:
Currently, whether split should be used depends on the size of subgroup.
It's possible that default PG is not eagerly initialized yet, but split is still
called.
This PR fixes this issue by removing split's dependency on subgroup size
Test Plan:
Modified UT
Reviewers:
Subscribers:
Tasks:
Tags:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/138781
Approved by: https://github.com/kwen2501
Fixes https://github.com/pytorch/pytorch/issues/137856.
### Issue 1
Today under `ProcessGroupNCCL::Options`, color is declared as:
```
int64_t split_color{0};
```
When passing this variable to `ncclCommSplit` which accepts `int`, the value may overflow and become negative, as in #137856. But NCCL API only accepts non-negative colors (or `NCCL_SPLIT_NOCOLOR`).
But that's not all.
### Issue 2
`split_color` is pybind'ed to python frontend. If we just change from `int64_t` to `int` in C++, pybind will complain:
```
[rank0]: TypeError: (): incompatible function arguments. The following argument types are supported:
[rank0]: 1. (self: torch._C._distributed_c10d.ProcessGroupNCCL.Options, arg0: int) -> None
```
This is because python `int` represents a wider range than C++ `int`. So we cannot pass hash values -- which are potentially big ints -- from python to C++. The PR modulo the hash value with `c_int`'s max value.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/137855
Approved by: https://github.com/wconstab
Thanks @eqy for reminding me of this RFC: https://github.com/pytorch/pytorch/issues/119797
This PR is meant to:
- provide a way to abort multiple PGs without deadlocking each other.
- provide a possibility to manually handle comm errors or timeouts (and potentially recovery of such).
One can find an example from: https://github.com/NVIDIA/nccl/issues/1013
## How is it different from `destroy_process_group`?
`destroy_process_group` is meant for normal exit, while `_abort_process_group` is meant for bailout upon hangs or failures. Similar to `ncclCommDestroy` vs `ncclCommAbort`.
## What's new in `_abort_process_group`?
It added support for "group abort" semantic. The "group abort" semantic is capable of aborting multiple NCCL comms concurrently, avoiding deadlock in otherwise serialized `ncclCommAbort` executions. Details are in the [RFC](https://github.com/pytorch/pytorch/issues/119797) targeting [the hang issue in multi-comm case](https://github.com/NVIDIA/nccl/issues/1013). `Group abort` semantic is added in NCCL 2.22.
## What's next?
Ideally, the watchdog's behavior should support "group abort" too. But this is hard to implement today due to a lack of "global view" by each PG's individual watchdog. A big semi-big refactor may be needed to "uplift" the watchdogs to a global level or consolidate them into one (i.e. one dog watching multiple PGs).
In any case, it may not be a bad idea to experiment the "group abort" feature with a manual API first and then extend to the automatic mode (watchdog).
Pull Request resolved: https://github.com/pytorch/pytorch/pull/132291
Approved by: https://github.com/eqy
synchronization
Summary:
Barrier is essentially intended to block CPU thread (instead of GPU
streams). Before we used 2 stream synchronizations (1. current stream
blocked by nccl stream end event, 2. CPU thread blocked on current
stream). This is unnecessary as we already have CPU thread blocking
logic in wait(). Also, adding barrier specific code block in the general
GPU synchronize() API is intrusive and confusing.
This PR cleans this.
Test Plan:
CI
Tags:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/137516
Approved by: https://github.com/fduwjj, https://github.com/kwen2501
Function `_get_pg_default_device` is being used outside of `distributed_c10d.py`.
A concern is that people may not be aware of what it actually does, due to bad naming of this function:
`Return the device to use with ``group`` for control flow usage (object collectives, barrier).`
The remediation is as follows:
- Added a deprecation warning to `_get_pg_default_device`;
- Added a private function `_get_object_coll_device` to undertake what it does;
- Added a `_device_capability` function for users who want to query the device support of a PG -- it returns a plain list, no more "default" choice.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/136790
Approved by: https://github.com/H-Huang
We introduced the dispatchable backend for a ProcessGroup and collective in https://github.com/pytorch/pytorch/issues/86225. This PR is a follow-up cleanup to clean up the option of a ProcessGroup and ask users to either set timeout or backend later on or directly create backend after creating a PG.
Also PGNCCL is using option class from ProcessGroup but we actually should use Option from backend class. So this PR is to make the type or name to be aligned with what we are doing in cpp side. I don't change the signature for the public API, so they still use args named "pg_options"
We need to make changes to the test to make it aligned with the change.
This is try to reland D62008954 by fixing internal errors.
Differential Revision: [D62483294](https://our.internmc.facebook.com/intern/diff/D62483294/)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/135653
Approved by: https://github.com/wz337, https://github.com/H-Huang
We introduced the dispatchable backend for a ProcessGroup and collective in https://github.com/pytorch/pytorch/issues/86225. This PR is a follow-up cleanup to clean up the option of a ProcessGroup and ask users to either set timeout or backend later on or directly create backend after creating a PG.
Also PGNCCL is using option class from ProcessGroup but we actually should use Option from backend class. So this PR is to make the type or name to be aligned with what we are doing in cpp side. I don't change the signature for the public API, so they still use args named "pg_options"
We need to make changes to the test to make it aligned with the change.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/132931
Approved by: https://github.com/H-Huang
**What does this PR achieve**
1. This PR rewrite ring attention backward algorithm to fuse the alltoall and overlap the gradient communication with computation.
2. Enables memory efficient attention with CP by templating the ring attention backward to verify the accuracy as fp32 gives us higher confident about the implementation correctness.
3. Provides some experimental APIs to enable context parallelism.
4. Ensures CP work with torch.compiler. The combination of causal masking and torch.compiler has not
yet worked.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/131351
Approved by: https://github.com/wanchaol
Addresses a common misconception about safety of using multiple NCCL
process groups from PyTorch.
Notably, it IS safe to use multiple process groups, so long as
communication operations from different groups are not allowed to
overlap. (Overlap of communication operations from one group with
compute operations IS ok).
TODO: after getting feedback on the text, update other copies of the warning on other APIs
Pull Request resolved: https://github.com/pytorch/pytorch/pull/131895
Approved by: https://github.com/fduwjj
We provide an API for user to add ephemeral timeout across all PGs within one rank and the timeout will reset when the first collective issued after the timeout added finishes.
Each extension only covers collectives after the issue and before the first collective finished. The diagram below shows how the timeout changes:
<img width="1174" alt="image" src="https://github.com/user-attachments/assets/354923b7-581c-40de-ae0f-1cd3da273ccc">
While this feature provides flexibility in specific scenarios, it introduces statefulness to timeout setting. Therefore, it is advisable to use this API sparingly and consider alternative approaches, such as directly setting the timeout or utilizing a barrier collective (one can set any timeout to the barrier), whenever feasible.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/130905
Approved by: https://github.com/ezyang
This is the implementation following the RFC: https://github.com/pytorch/pytorch/issues/130407
ncclCommSplit
Summary:
In current Pytorch/c10d, the new_group API is used to create a new
process group from the default pg. When device_id is specified in
init_process_group and nccl is used as the backend, the new_group call
will use ncclCommSplit to create the nccl communicators to save
communicator resources. It has a few drawbacks:
Redundant calls
Suppose the default group has 256 ranks, we need to have 32 children PGs
and each child PG has 8 ranks. in this case, each rank needs to call
new_group and ncclCommSplit 32 times because of how we implement
new_group API and the collective requirement of ncclCommSplit. For a
specific global rank, 31 calls of ncclCommSplit would be no_color split,
and only 1 of them is colored split. With the proposed new split_group
API, we expect only 1 call of split_group/ncclCommSplit is needed per
rank in the above example case
new_group can only split from default_pg
Ideally, a new pg should be able to be split from any pg
With the new split_group API, users can create new PGs using
ncclCommSplit with less number of calls and initialize the PG eagerly.
This is also useful in the cases of creating many P2P communicators.
Test Plan:
New UTs:
e.g., python test/distributed/test_c10d_nccl.py -k
test_comm_split_group_larger_scale
Tags:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/130507
Approved by: https://github.com/wconstab
This PR removes `ProcessGroupCudaP2P` and changes async-TP to use `SymmetricMemory`. The async-TP implementation is still workspace-based, but it now doesn't require a buffer size to be specified upfront.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/128762
Approved by: https://github.com/wanchaol
If users pass `device_id` to init_process_group, they enable eager init
for the default group. Then if they subsequently call `new_group`, the
device_id argument is not required as it should be assumed to match the
one used for init_process_group.
However, both `init_process_group` and `new_group` apis share a helper
function, which expects a `device_id` value that defaults to None. When
it's None, eager initialization is disabled.
This PR ensures that if a device_id was passed to init_process_group,
the same device_id will automatically be fed into the helper function
for any new_group calls that follow.
**Test plan**
I found an existing test in CI `test_comm_split_subgroup` that failed after my change, because it was asserting that backend comm_split counter did not increment eagerly, and its behavior had changed to increment eagerly. I updated the test in the PR to pass with my change.
I also tested locally via simple program with TORCH_CPP_LOG_LEVEL=INFO and
observed eager initialization of the 'lows' and 'highs' PGs before the
'Here' print.
```
import torch
import torch.distributed as dist
dist.init_process_group(backend="nccl", device_id =torch.device(f"cuda:{torch.distributed.get_node_local_rank(0)}"))
dist.new_group([0, 1], group_desc="lows")
dist.new_group([2, 3], group_desc="highs")
print("Here")
torch.distributed.destroy_process_group()
```
Output:
https://gist.github.com/wconstab/88a5ba0b970244ca1f79133f989e0349
Pull Request resolved: https://github.com/pytorch/pytorch/pull/129284
Approved by: https://github.com/pavanbalaji, https://github.com/fduwjj, https://github.com/d4l3k, https://github.com/nvcastet
Use `typing_extensions.deprecated` for deprecation annotation if possible. Otherwise, add `category=FutureWarning` to `warnings.warn("message")` if the category is missing.
Note that only warnings that their messages contain `[Dd]eprecat(ed|ion)` are updated in this PR.
Resolves#126888
- #126888
This PR is split from PR #126898.
- #126898
------
Pull Request resolved: https://github.com/pytorch/pytorch/pull/127689
Approved by: https://github.com/Skylion007
Use `typing_extensions.deprecated` for deprecation annotation if possible. Otherwise, add `category=FutureWarning` to `warnings.warn("message")` if the category is missing.
Note that only warnings that their messages contain `[Dd]eprecat(ed|ion)` are updated in this PR.
UPDATE: Use `FutureWarning` instead of `DeprecationWarning`.
Resolves#126888
- #126888
Pull Request resolved: https://github.com/pytorch/pytorch/pull/126898
Approved by: https://github.com/albanD
As for now, the document of distributed.new_group() says that it returns `None` when current ranks is not in the new created process group. However, it actually returns `GroupMember.NON_GROUP_MEMBER`. I have check the code and think it is more appropriate that we fix the document.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/122703
Approved by: https://github.com/wconstab, https://github.com/kwen2501
Summary:
ProcessGroupNCCL set up group_name/desc in c10d log and NCCL when initializing nccl communicator. In eager initialization mode, pg_name and pg_desc is set after communicator initialization so the information won't be available in pytorch log or NCCL communicator.
This PR fix this by setting pg_name/desc earlier
Differential Revision: D57759816
Pull Request resolved: https://github.com/pytorch/pytorch/pull/127053
Approved by: https://github.com/wconstab, https://github.com/kwen2501
## Context
This stack prototypes automatic micro-pipelining of `all-gather -> matmul` and `matmul -> reduce-scatter` via Inductor. The idea originates from the paper [Overlap Communication with Dependent Computation via
Decomposition in Large Deep Learning Models](https://dl.acm.org/doi/pdf/10.1145/3567955.3567959). The implementation and some key optimizations are heavily influenced by @lw's implementation in xformers.
The stack contains several components:
- `ProcessGroupCudaP2P` - a thin wrapper around `ProcessGroupNCCL`. It in addition maintains a P2P workspace that enables SM-free, one-sided P2P communication which is needed for optimal micro-pipelining.
- `fused_all_gather_matmul` and `fused_matmul_reduce_scatter` dispatcher ops.
- Post-grad fx pass that detects `all-gather -> matmul` and `matmul -> reduce-scatter` and replaces them with the fused dispatcher ops.
To enable the prototype feature:
- Set the distributed backend to `cuda_p2p`.
- Set `torch._inductor.config._micro_pipeline_tp` to `True`.
*NOTE: the prototype sets nothing in stone w.r.t to each component's design. The purpose is to have a performant baseline with reasonable design on which each component can be further improved.*
## Benchmark
Setup:
- 8 x H100 (500W) + 3rd gen NVSwitch.
- Llama3 8B training w/ torchtitan.
- 8-way TP. Reduced the number of layers from 32 to 8 for benchmarking purpose.
Trace (baseline): https://interncache-all.fbcdn.net/manifold/perfetto-artifacts/tree/ui/index.html#!/?url=https://interncache-all.fbcdn.net/manifold/perfetto_internal_traces/tree/shared_trace/yifu_tmpjaz8zgx0
<img width="832" alt="image" src="https://github.com/pytorch/pytorch/assets/4156752/4addba77-5abc-4d2e-93ea-f68078587fe1">
Trace (w/ micro pipelining): https://interncache-all.fbcdn.net/manifold/perfetto-artifacts/tree/ui/index.html#!/?url=https://interncache-all.fbcdn.net/manifold/perfetto_internal_traces/tree/shared_trace/yifu_tmpn073b4wn
<img width="963" alt="image" src="https://github.com/pytorch/pytorch/assets/4156752/4f44e78d-8196-43ab-a1ea-27390f07e9d2">
## This PR
`ProcessGroupCudaP2P` is a thin wrapper around `ProcessGroupNCCL`. By default, it routes all collectives to the underlying `ProcessGroupNCCL`. In addition, `ProcessGroupCudaP2P` initializes a P2P workspace that allows direct GPU memory access among the members. The workspace can be used in Python to optimize intra-node communication patterns or to create custom intra-node collectives in CUDA.
`ProcessGroupCudaP2P` aims to bridge the gap where certain important patterns can be better optimized via fine-grained P2P memory access than with collectives in the latest version of NCCL. It is meant to complement NCCL rather than replacing it.
Usage:
```
# Using ProcessGroupCudaP2P
dist.init_process_group(backend="cuda_p2p", ...)
# Using ProcessGroupCudaP2P while specifying ProcessGroupCudaP2P.Options
pg_options = ProcessGroupCudaP2P.Options()
dist.init_process_group(backend="cuda_p2p", pg_options=pg_options, ...)
# Using ProcessGroupCudaP2P while specifying ProcessGroupNCCL.Options
pg_options = ProcessGroupNCCL.Options()
dist.init_process_group(backend="cuda_p2p", pg_options=pg_options, ...)
# Using ProcessGroupCudaP2P while specifying both
# ProcessGroupCudaP2P.Options and ProcessGroupNCCL.Options
pg_options = ProcessGroupCudaP2P.Options()
pg_options.nccl_options = ProcessGroupNCCL.Options()
dist.init_process_group(backend="cuda_p2p", pg_options=pg_options, ...)
# Down-casting the backend to access p2p buffers for cuda_p2p specific
# optimizations
if is_cuda_p2p_group(group):
backend = get_cuda_p2p_backend(group)
if required_p2p_buffer_size > backend.get_buffer_size():
# fallback
p2p_buffer = backend.get_p2p_buffer(...)
else:
# fallback
```
Pull Request resolved: https://github.com/pytorch/pytorch/pull/122163
Approved by: https://github.com/wanchaol
## Context
This stack prototypes automatic micro-pipelining of `all-gather -> matmul` and `matmul -> reduce-scatter` via Inductor. The idea originates from the paper [Overlap Communication with Dependent Computation via
Decomposition in Large Deep Learning Models](https://dl.acm.org/doi/pdf/10.1145/3567955.3567959). The implementation and some key optimizations are heavily influenced by @lw's implementation in xformers.
The stack contains several components:
- `ProcessGroupCudaP2P` - a thin wrapper around `ProcessGroupNCCL`. It in addition maintains a P2P workspace that enables SM-free, one-sided P2P communication which is needed for optimal micro-pipelining.
- `fused_all_gather_matmul` and `fused_matmul_reduce_scatter` dispatcher ops.
- Post-grad fx pass that detects `all-gather -> matmul` and `matmul -> reduce-scatter` and replaces them with the fused dispatcher ops.
To enable the prototype feature:
- Set the distributed backend to `cuda_p2p`.
- Set `torch._inductor.config._micro_pipeline_tp` to `True`.
*NOTE: the prototype sets nothing in stone w.r.t to each component's design. The purpose is to have a performant baseline with reasonable design on which each component can be further improved.*
## Benchmark
Setup:
- 8 x H100 (500W) + 3rd gen NVSwitch.
- Llama3 8B training w/ torchtitan.
- 8-way TP. Reduced the number of layers from 32 to 8 for benchmarking purpose.
Trace (baseline): https://interncache-all.fbcdn.net/manifold/perfetto-artifacts/tree/ui/index.html#!/?url=https://interncache-all.fbcdn.net/manifold/perfetto_internal_traces/tree/shared_trace/yifu_tmpjaz8zgx0
<img width="832" alt="image" src="https://github.com/pytorch/pytorch/assets/4156752/4addba77-5abc-4d2e-93ea-f68078587fe1">
Trace (w/ micro pipelining): https://interncache-all.fbcdn.net/manifold/perfetto-artifacts/tree/ui/index.html#!/?url=https://interncache-all.fbcdn.net/manifold/perfetto_internal_traces/tree/shared_trace/yifu_tmpn073b4wn
<img width="963" alt="image" src="https://github.com/pytorch/pytorch/assets/4156752/4f44e78d-8196-43ab-a1ea-27390f07e9d2">
## This PR
`ProcessGroupCudaP2P` is a thin wrapper around `ProcessGroupNCCL`. By default, it routes all collectives to the underlying `ProcessGroupNCCL`. In addition, `ProcessGroupCudaP2P` initializes a P2P workspace that allows direct GPU memory access among the members. The workspace can be used in Python to optimize intra-node communication patterns or to create custom intra-node collectives in CUDA.
`ProcessGroupCudaP2P` aims to bridge the gap where certain important patterns can be better optimized via fine-grained P2P memory access than with collectives in the latest version of NCCL. It is meant to complement NCCL rather than replacing it.
Usage:
```
# Using ProcessGroupCudaP2P
dist.init_process_group(backend="cuda_p2p", ...)
# Using ProcessGroupCudaP2P while specifying ProcessGroupCudaP2P.Options
pg_options = ProcessGroupCudaP2P.Options()
dist.init_process_group(backend="cuda_p2p", pg_options=pg_options, ...)
# Using ProcessGroupCudaP2P while specifying ProcessGroupNCCL.Options
pg_options = ProcessGroupNCCL.Options()
dist.init_process_group(backend="cuda_p2p", pg_options=pg_options, ...)
# Using ProcessGroupCudaP2P while specifying both
# ProcessGroupCudaP2P.Options and ProcessGroupNCCL.Options
pg_options = ProcessGroupCudaP2P.Options()
pg_options.nccl_options = ProcessGroupNCCL.Options()
dist.init_process_group(backend="cuda_p2p", pg_options=pg_options, ...)
# Down-casting the backend to access p2p buffers for cuda_p2p specific
# optimizations
if is_cuda_p2p_group(group):
backend = get_cuda_p2p_backend(group)
if required_p2p_buffer_size > backend.get_buffer_size():
# fallback
p2p_buffer = backend.get_p2p_buffer(...)
else:
# fallback
```
Pull Request resolved: https://github.com/pytorch/pytorch/pull/122163
Approved by: https://github.com/wanchaol
fixes#126379
This is the easy fix. An additional fix that I did not do is to
deregister the excepthook (or rather, restore the orignal one) when
calling dist.destroy_process_group. This might be a bit complicated in
practice, so landing as is for now.
Also, couldn't figure out a clean way to test this. assertRaisesRegex
wasn't getting a string value, probably becuase of the stderr
redirection done via the excepthook in the first place.
Output from the original repro is cleaned up with the fix:
```
[rank0]: Traceback (most recent call last):
[rank0]: File "/data/users/whc/pytorch/except.py", line 6, in <module>
[rank0]: raise ZeroDivisionError
[rank0]: ZeroDivisionError
```
Pull Request resolved: https://github.com/pytorch/pytorch/pull/126739
Approved by: https://github.com/yf225
Addresses follow up comments on #123992 and allows the use case of
writing code that checks `get_node_local_rank(fallback_rank=0)` and
runs correctly whether in the presence of a launcher (e.g. torchrun),
or run locally on a single device.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/126737
Approved by: https://github.com/shuqiangzhang
The current call passes in `['/actual/path']` to os.walk which is a string pointing to no path and thus silently leads to and empty traversal.
There is an unused function just above that handles that, so I guess this is what was supposed to be called.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/126103
Approved by: https://github.com/suo
Update ruff to 0.4.1 .
This version fixes a lot false negatives/false positives, is 20-40% faster, and has various other bug fixes.
Below is a before and after table showing the execution time of ruff lint and ruff format in milliseconds courtesy of https://astral.sh/blog/ruff-v0.4.0
| Repository | Linter (v0.3) | Linter (v0.4) | Formatter (v0.3) | Formatter (v0.4) |
|----------------------------------------------------|---------------|---------------|------------------|------------------|
| [pytorch/pytorch](https://github.com/pytorch/pytorch) | 328.7 | 251.8 | 351.1 | 274.9 |
Pull Request resolved: https://github.com/pytorch/pytorch/pull/124549
Approved by: https://github.com/ezyang