Commit Graph

445 Commits

Author SHA1 Message Date
Shuqiang Zhang
a1b22e369b [c10d] add an API to get the future result(success or failure) of a collective and customize error handling (#137799)
Summary:
This PR is trying to let users to know what exact collective call from the python thread is failing, and
customize their own error handling function, instead of watchdog thread crashing everything.

This is potentially very useful in fault tolerant training, in which we can have in-process restart.
E.g., when an nccl error is detected, users can potentially abort comms, re-init comms and go back to the previous check pointed step and try again, instead of crashing the whole job.

This is to allow users to check the status of each collective call,
using the ivalue::future libs in PT core. This also allows users to
attach its customized failure handling functions by:
work.get_future_result().then(erro_handling_func)

Note that the above call is also non-blocking for CPU thread
Test Plan:
Added a new test: test_get_future_result to verify the workResutl is
correctly propagated to the users

Tags:

Pull Request resolved: https://github.com/pytorch/pytorch/pull/137799
Approved by: https://github.com/fduwjj, https://github.com/wconstab
2024-10-16 00:20:09 +00:00
Yifu Wang
91bc9dc2c9 [SymmetricMemory] implement timeout for barrier(), put_signal() and wait_signal() (#137643)
Suggested by @lw for better safety/reliability.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/137643
Approved by: https://github.com/weifengpy, https://github.com/lw
2024-10-15 21:35:14 +00:00
Andrew Gu
e269a5cb09 [TCPStore] Throw value error if passing world_size=0 to TCPStore (#137792)
This fixes https://github.com/pytorch/pytorch/issues/137577.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/137792
Approved by: https://github.com/fegin, https://github.com/H-Huang
ghstack dependencies: #137713, #137721
2024-10-11 23:42:57 +00:00
Ke Wen
fe148024fe [c10d][experimental] Add _abort_process_group (#132291)
Thanks @eqy for reminding me of this RFC: https://github.com/pytorch/pytorch/issues/119797

This PR is meant to:
- provide a way to abort multiple PGs without deadlocking each other.
- provide a possibility to manually handle comm errors or timeouts (and potentially recovery of such).
One can find an example from: https://github.com/NVIDIA/nccl/issues/1013

## How is it different from `destroy_process_group`?
`destroy_process_group` is meant for normal exit, while `_abort_process_group` is meant for bailout upon hangs or failures. Similar to `ncclCommDestroy` vs `ncclCommAbort`.

## What's new in `_abort_process_group`?
It added support for "group abort" semantic. The "group abort" semantic is capable of aborting multiple NCCL comms concurrently, avoiding deadlock in otherwise serialized `ncclCommAbort` executions. Details are in the [RFC](https://github.com/pytorch/pytorch/issues/119797) targeting [the hang issue in multi-comm case](https://github.com/NVIDIA/nccl/issues/1013). `Group abort` semantic is added in NCCL 2.22.

## What's next?
Ideally, the watchdog's behavior should support "group abort" too. But this is hard to implement today due to a lack of "global view" by each PG's individual watchdog. A big semi-big refactor may be needed to "uplift" the watchdogs to a global level or consolidate them into one (i.e. one dog watching multiple PGs).

In any case, it may not be a bad idea to experiment the "group abort" feature with a manual API first and then extend to the automatic mode (watchdog).

Pull Request resolved: https://github.com/pytorch/pytorch/pull/132291
Approved by: https://github.com/eqy
2024-10-11 05:04:17 +00:00
Yifu Wang
39c5122a4f [IntraNodeComm] replace all-reduce kernels with corresponding symm_mem ops (#137475)
## This Stack

Implement custom all-reduce algos available in `IntraNodeComm` as `symm_mem` ops and replace the existing `IntraNodeComm` kernels with them.

## This PR
- Replaces one-shot all-reduce with `symm_mem::one_shot_all_reduce_out`
- Replaces two-shot all-reduce with `symm_mem::two_shot_all_reduce_`
- Removes HCM all-reduce (at least for now). Due to the nature of its accumulation order, we can't guarantee the numerical consistency across all ranks.
- Removes the `IntraNodeComm` python binding (its original purpose is superceded by `SymmetricMemory`).
- Removes methods that were made for the python binding.
- Replaces nvlink detection logic with `DMAConnectivityDetector`.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/137475
Approved by: https://github.com/Chillee
ghstack dependencies: #137471, #137472, #137473, #137474
2024-10-09 23:30:16 +00:00
Shuqiang Zhang
76dca1fef3 [c10d] separate the codes for GPU stream synchronization and CPU thread synchronization (#137295)
code
Summary:
This PR should not change the existing behavior of work.wait(), just
separate the stream synchronization code from the CPU busy wait code.

Also, remove the need of a private synchronization function.

In a longer term, we would like to give user the flexibility of bypassing the watchdog thread and handle the collective error by themselves.

Test Plan:
python test/distributed/test_c10d_nccl.py NcclErrorHandlingTest

Pull Request resolved: https://github.com/pytorch/pytorch/pull/137295
Approved by: https://github.com/kwen2501
2024-10-08 08:53:47 +00:00
Howard Huang
0ccd39a64b Fix prefix store seg fault (#136872)
fixes https://github.com/pytorch/pytorch/issues/136723

Do not allow `None` to be passed into `PrefixStore`

Pull Request resolved: https://github.com/pytorch/pytorch/pull/136872
Approved by: https://github.com/kwen2501
2024-09-30 20:43:08 +00:00
Yifu Wang
6e70ec9aa5 [SymmetricMemory] expose the multicast_ptr (#136840)
This allows writing triton kernels using the `multimem` ptx instructions.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/136840
Approved by: https://github.com/Chillee
2024-09-27 20:41:33 +00:00
Yifu Wang
da1560c49f [SymmetricMemory] add support for cuStreamWriteValue32 (#136488)
cuStreamWriteValue efficiently combines the issuing of a system-level fence with the update of a single memory location. It is highly suitable for inter-stream progress sharing (e.g., all_gather_with_progress).

Exposing it via SymmetricMemory allows users to more easily implement efficient progress-aware matmuls in triton ([xformers example](https://github.com/facebookresearch/xformers/blob/main/xformers/ops/_triton/sequence_parallel_fused_kernels.py)).

Pull Request resolved: https://github.com/pytorch/pytorch/pull/136488
Approved by: https://github.com/eqy, https://github.com/Chillee
2024-09-24 20:56:29 +00:00
fduwjj
a0c7029a75 [c10d][Reland] Remove Option for ProcessGroup and Expose backend Options to reflect the correct code structure (#132931) (#135653)
We introduced the dispatchable backend for a ProcessGroup and collective in https://github.com/pytorch/pytorch/issues/86225. This PR is a follow-up cleanup to clean up the option of a ProcessGroup and ask users to either set timeout or backend later on or directly create backend after creating a PG.

Also PGNCCL is using option class from ProcessGroup but we actually should use Option from backend class. So this PR is to make the type or name to be aligned with what we are doing in cpp side. I don't change the signature for the public API, so they still use args named "pg_options"

We need to make changes to the test to make it aligned with the change.

This is try to reland D62008954 by fixing internal errors.

Differential Revision: [D62483294](https://our.internmc.facebook.com/intern/diff/D62483294/)

Pull Request resolved: https://github.com/pytorch/pytorch/pull/135653
Approved by: https://github.com/wz337, https://github.com/H-Huang
2024-09-16 19:56:42 +00:00
Tristan Rice
3ce433aef2 [TCPStore] use wait counters (#135283)
This replaces the existing TCPStore counters with the new shared wait counters. There's no users of the tcpstore counters so should be completely safe to remove.

Test plan:

Existing tests + build

There's no OSS backend for wait counters so can't write any tests with them currently.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/135283
Approved by: https://github.com/c-p-i-o
2024-09-06 19:54:25 +00:00
PyTorch MergeBot
351ba3e67c Revert "[c10d] Remove Option for ProcessGroup and Expose backend Options to reflect the correct code structure (#132931)"
This reverts commit 65864d0134.

Reverted https://github.com/pytorch/pytorch/pull/132931 on behalf of https://github.com/ZainRizvi due to This PR is breaking builds internally due to the removal of ProcessGroup::Options ([comment](https://github.com/pytorch/pytorch/pull/132931#issuecomment-2321862402))
2024-08-30 16:27:40 +00:00
Shuqiang Zhang
d13ce2e2b5 [c10d] release gil lock during eager init (#134779)
Summary:
We found that if we init the pG in a background thread, it would block
the main thread till init is complete. This is because in the pybinding
we never release the GIL lock
Test Plan:
existing CI on eager init

Tags:

Pull Request resolved: https://github.com/pytorch/pytorch/pull/134779
Approved by: https://github.com/c-p-i-o
2024-08-29 23:25:33 +00:00
fduwjj
65864d0134 [c10d] Remove Option for ProcessGroup and Expose backend Options to reflect the correct code structure (#132931)
We introduced the dispatchable backend for a ProcessGroup and collective in https://github.com/pytorch/pytorch/issues/86225. This PR is a follow-up cleanup to clean up the option of a ProcessGroup and ask users to either set timeout or backend later on or directly create backend after creating a PG.

Also PGNCCL is using option class from ProcessGroup but we actually should use Option from backend class. So this PR is to make the type or name to be aligned with what we are doing in cpp side. I don't change the signature for the public API, so they still use args named "pg_options"

We need to make changes to the test to make it aligned with the change.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/132931
Approved by: https://github.com/H-Huang
2024-08-29 22:40:12 +00:00
Yifu Wang
78d69bfe11 [SymmetricMemory] introduce multicast support, multimem_all_reduce_ and multimem_one_shot_all_reduce (#133424)
### Summary
- Added multicast support to SymmetricMemory. If the cuda runtime and cuda driver have multicast support, SymmetricMemory associate all peer buffers with a multicast object and exposes the multicast virtual address.
- Implemented `multimem_all_reduce_` and `multimem_one_shot_all_reduce` based on the multicast support. The two variants shows different performance characteristic for different message size. We plan to use Inductor for collective algo selection (and required symmetric memory buffer allocation).

### Benchmark

8xH100 (non-standard version with HBM2e at 650W). NVSwitch V3 with NVLS support.

![image](https://github.com/user-attachments/assets/4998a16b-c2c0-4797-9dd0-1da2303df947)

![image](https://github.com/user-attachments/assets/278ad361-52cb-4864-82c6-bb67e8d0a3fe)

Differential Revision: [D61682507](https://our.internmc.facebook.com/intern/diff/D61682507)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/133424
Approved by: https://github.com/yf225, https://github.com/weifengpy
2024-08-23 20:09:20 +00:00
PyTorch MergeBot
cedfac20c7 Revert "[SymmetricMemory] introduce multicast support, multimem_all_reduce_ and multimem_one_shot_all_reduce (#133424)"
This reverts commit 66d3eb783c.

Reverted https://github.com/pytorch/pytorch/pull/133424 on behalf of https://github.com/jeanschmidt due to Broke internal ADS builds, see D61611517 ([comment](https://github.com/pytorch/pytorch/pull/133424#issuecomment-2304676328))
2024-08-22 13:29:27 +00:00
Yifu Wang
66d3eb783c [SymmetricMemory] introduce multicast support, multimem_all_reduce_ and multimem_one_shot_all_reduce (#133424)
### Summary
- Added multicast support to SymmetricMemory. If the cuda runtime and cuda driver have multicast support, SymmetricMemory associate all peer buffers with a multicast object and exposes the multicast virtual address.
- Implemented `multimem_all_reduce_` and `multimem_one_shot_all_reduce` based on the multicast support. The two variants shows different performance characteristic for different message size. We plan to use Inductor for collective algo selection (and required symmetric memory buffer allocation).

### Benchmark

8xH100 (non-standard version with HBM2e at 650W). NVSwitch V3 with NVLS support.

![image](https://github.com/user-attachments/assets/4998a16b-c2c0-4797-9dd0-1da2303df947)

![image](https://github.com/user-attachments/assets/278ad361-52cb-4864-82c6-bb67e8d0a3fe)

Pull Request resolved: https://github.com/pytorch/pytorch/pull/133424
Approved by: https://github.com/yf225, https://github.com/weifengpy
2024-08-21 05:11:21 +00:00
Yifu Wang
11af423eca [SymmetricMemory] make buffer_ptrs_dev, signal_pad_ptrs_dev, buffer_size, and signal_pad_size accessible in python (#133680)
These allows us to experiment with creative applications with triton.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/133680
Approved by: https://github.com/Chillee
2024-08-20 10:15:35 +00:00
Howard Huang
0f90ffe94a Remove ProcessGroupRoundRobin (#132888)
`_round_robin_process_groups` is deprecated and should be removed.

258f47fc0b/torch/csrc/distributed/c10d/ProcessGroupRoundRobin.cpp (L10-L12)

Pull Request resolved: https://github.com/pytorch/pytorch/pull/132888
Approved by: https://github.com/Skylion007, https://github.com/wanchaol, https://github.com/c-p-i-o, https://github.com/fduwjj
2024-08-08 01:07:40 +00:00
fduwjj
4e610924d4 [c10d] Add a new API for adding ephemeral timeout for one local rank and the timeout will reset when the first collective finishes (#130905)
We provide an API for user to add ephemeral timeout across all PGs within one rank and the timeout will reset when the first collective issued after the timeout added finishes.

Each extension only covers collectives after the issue and before the first collective finished. The diagram below shows how the timeout changes:

<img width="1174" alt="image" src="https://github.com/user-attachments/assets/354923b7-581c-40de-ae0f-1cd3da273ccc">

While this feature provides flexibility in specific scenarios, it introduces statefulness to timeout setting. Therefore, it is advisable to use this API sparingly and consider alternative approaches, such as directly setting the timeout or utilizing a barrier collective (one can set any timeout to the barrier), whenever feasible.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/130905
Approved by: https://github.com/ezyang
2024-08-06 03:47:58 +00:00
Tristan Rice
9027db1ab8 TCPStore: fix remote address (#131773) (#131913)
Summary:
This fixes corrupt remote address logs caused by dangling pointers to addrinfo_storage inside of addrinfo.

This relands it since it got reverted due to a fmt::format issue internally.

Original Pull Request: https://github.com/pytorch/pytorch/pull/131773
Approved by: https://github.com/kurman

Test Plan:
Enable debug logs and verify addresses are correct

```
TORCH_CPP_LOG_LEVEL=INFO TORCH_DISABLE_SHARE_RDZV_TCP_STORE=1 TORCH_DISTRIBUTED_DEBUG=DETAIL LOGLEVEL=INFO python test/distributed/test_store.py -v
buck2 test @//mode/dev-nosan //caffe2/test/distributed:store
```

Differential Revision: D60296583

Pull Request resolved: https://github.com/pytorch/pytorch/pull/131913
Approved by: https://github.com/kurman, https://github.com/rsdcastro, https://github.com/Skylion007
2024-07-30 17:27:33 +00:00
PyTorch MergeBot
696e83a1da Revert "TCPStore: fix remote address (#131773)"
This reverts commit 9039131a89.

Reverted https://github.com/pytorch/pytorch/pull/131773 on behalf of https://github.com/clee2000 due to broke internal builds D60265883, something about formatter ([comment](https://github.com/pytorch/pytorch/pull/131773#issuecomment-2253123800))
2024-07-26 16:47:57 +00:00
Tristan Rice
9039131a89 TCPStore: fix remote address (#131773)
This fixes corrupt remote address logs caused by dangling pointers to addrinfo_storage inside of addrinfo.

Test plan:

Enable debug logs and verify addresses are correct

```
TORCH_CPP_LOG_LEVEL=INFO TORCH_DISABLE_SHARE_RDZV_TCP_STORE=1 TORCH_DISTRIBUTED_DEBUG=DETAIL LOGLEVEL=INFO python test/distributed/test_store.py -v
```

Pull Request resolved: https://github.com/pytorch/pytorch/pull/131773
Approved by: https://github.com/kurman
2024-07-25 21:55:25 +00:00
Yifu Wang
a136a7d623 [Functional Collective] enable custom work registration from python (#130354)
This PR does two things:
- Allow tensor -> work registration in Python via `torch._C._distributed_c10d.register_work`. Calling `torch.ops._c10d_functional.wait_tensor` on a tensor would trigger `.wait()` on the registered work object.
- Allow user-defined work object in Python to work with functional collectives.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/130354
Approved by: https://github.com/wanchaol, https://github.com/fegin, https://github.com/wconstab
2024-07-22 21:45:19 +00:00
Shuqiang Zhang
4aef5a1134 [c10] add an option to pg_config split share (#130877)
Summary:
context is: #129865
We want to give users an option to not share comms resouces so that
comm opts can overlap
Test Plan:
Augmentd UT

Tags:

Pull Request resolved: https://github.com/pytorch/pytorch/pull/130877
Approved by: https://github.com/fduwjj
2024-07-19 21:11:26 +00:00
PyTorch MergeBot
5f3d8b8788 Revert "[c10] add an option to pg_config split share (#130877)"
This reverts commit 367213a608.

Reverted https://github.com/pytorch/pytorch/pull/130877 on behalf of https://github.com/atalman due to breaks internal build ([comment](https://github.com/pytorch/pytorch/pull/130877#issuecomment-2239298810))
2024-07-19 14:24:50 +00:00
Shuqiang Zhang
367213a608 [c10] add an option to pg_config split share (#130877)
Summary:
context is: #129865
We want to give users an option to not share comms resouces so that
comm opts can overlap
Test Plan:
Augmentd UT

Tags:

Pull Request resolved: https://github.com/pytorch/pytorch/pull/130877
Approved by: https://github.com/fduwjj
2024-07-18 19:03:00 +00:00
Chirag Pandya
83c95c48f7 Flight recoder data as JSON (#129505)
Summary:
Provide a new API to retrieve flight recorder data as JSON.
The one minor difference between flight recorder as Pickle v/s JSON is
that the JSON API does not retrieve stack traces at the moment.
This ends up being far too much data.

Test Plan:
unit test

Differential Revision: [D59536460](https://our.internmc.facebook.com/intern/diff/D59536460)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/129505
Approved by: https://github.com/wconstab, https://github.com/d4l3k
2024-07-10 21:50:27 +00:00
Tristan Rice
f85bda8bdd c10d/Handlers: expose running handlers from Python (#130149)
This adds a `_run_handler` method that will invoke a specific handler.

Test plan:

```
python test/distributed/elastic/test_control_plane.py
```

Pull Request resolved: https://github.com/pytorch/pytorch/pull/130149
Approved by: https://github.com/kurman, https://github.com/c-p-i-o
2024-07-09 20:20:59 +00:00
cyy
f4dcf2ae93 [1/N] Change #include <c10/util/Optional.h> to #include <optional> (#128301)
Fixes #ISSUE_NUMBER

Pull Request resolved: https://github.com/pytorch/pytorch/pull/128301
Approved by: https://github.com/ezyang, https://github.com/r-barnes
2024-07-08 07:03:53 +00:00
Yifu Wang
67416a2996 [c10d] Introduce a util for detecting DMA connectivity among devices (#129510)
This PR introduces `_detect_dma_connectivity` - a utility for detecting DMA connectivity among devices.

The "DMA connectivity" in this context is more stringent than the ability to perform memory copy without CPU involvement. We define it as the ability for a device to issue load/store instructions and perform atomic operations on memory that resides on connected devices. The ability translates to the ability to run most aten GPU operations with operands backed by remote memory. `_detect_dma_connectivity` can help PyTorch and its users to determine whether certain DMA-based optimizations are possible.

`_detect_dma_connectivity` takes a `(device_type, connection_type)` pair and returns a matrix describing the connectivity. Connectivity detectors are statically registered on a `(device_type, connection_type)` basis. This PR implements the detector for `(CUDA, "nvlink")`. Later, detectors for pairs such as `(ROCM, "infinity_fabric")` can be introduced.

Example:

```python3
>>> from torch._C._autograd import DeviceType
>>> from torch._C._distributed_c10d import _detect_dma_connectivity
>>> connectivity = _detect_dma_connectivity(DeviceType.CUDA, "nvlink")
>>> for row in connectivity.matrix:
...     print(row)
...
[0, 18, 18, 18, 18, 18, 18, 18]
[18, 0, 18, 18, 18, 18, 18, 18]
[18, 18, 0, 18, 18, 18, 18, 18]
[18, 18, 18, 0, 18, 18, 18, 18]
[18, 18, 18, 18, 0, 18, 18, 18]
[18, 18, 18, 18, 18, 0, 18, 18]
[18, 18, 18, 18, 18, 18, 0, 18]
[18, 18, 18, 18, 18, 18, 18, 0]
```

Pull Request resolved: https://github.com/pytorch/pytorch/pull/129510
Approved by: https://github.com/weifengpy
2024-06-27 23:02:07 +00:00
Yifu Wang
bbd47f7b2f Remove ProcessGroupCudaP2P and change async-TP to use SymmetricMemory (#128762)
This PR removes `ProcessGroupCudaP2P` and changes async-TP to use `SymmetricMemory`. The async-TP implementation is still workspace-based, but it now doesn't require a buffer size to be specified upfront.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/128762
Approved by: https://github.com/wanchaol
2024-06-25 22:32:21 +00:00
Yifu Wang
217aac96d7 Introduce a prototype for SymmetricMemory (#128582)
Stack from [ghstack](https://github.com/ezyang/ghstack) (oldest at bottom):

This PR introduces a prototype for `SymmetricMemory` (including a CUDA implementation) - a remote-memory access-based communication primitive. It allows for user-defined communication patterns/kernels and is designed to be torch.compile-friendly. It addresses the major limitations of `IntraNodeComm` and `ProcessGroupCudaP2p` and serves as a replacement for them.

### SymmetricMemory

`SymmetricMemory` represents symmetric allocations across a group of devices. The allocations represented by a `SymmetricMemory` object are accessible by all devices in the group. The class can be used for **op-level custom communication patterns** (via the get_buffer APIs and the synchronization primitives), as well as **custom communication kernels** (via the buffer and signal_pad device pointers).

### Python API Example

```python
from torch._C.distributed_c10d import _SymmetricMemory

# Set a store for rendezvousing symmetric allocations on a group of devices
# identified by group_name. The concept of groups is logical; users can
# utilize predefined groups (e.g., a group of device identified by a
# ProcessGroup) or create custom ones. Note that a SymmetricMemoryAllocator
# backends might employ a more efficient communication channel for the actual
# rendezvous process and only use the store for bootstrapping purposes.
_SymmetricMemory.set_group_info(group_name, rank, world_size, store)

# Identical to empty_strided, but allows symmetric memory access to be
# established for the allocated tensor via _SymmetricMemory.rendezvous().
# This function itself is not a collective operation.
t = _SymmetricMemory.empty_strided_p2p((64, 64), (64, 1), torch.float32, group_name)

# Users can write Python custom ops that leverages the symmetric memory access.
# Below are examples of things users can do (assuming the group's world_size is 2).

# Establishes symmetric memory access on tensors allocated via
# _SymmetricMemory.empty_strided_p2p(). rendezvous() is a one-time process,
# and the mapping between a local memory region and the associated SymmetricMemory
# object is unique. Subsequent calls to rendezvous() with the same tensor will receive
# the cached SymmetricMemory object.
#
# The function has a collective semantic and must be invoked simultaneously
# from all rendezvous participants.
symm_mem = _SymmetricMemory.rendezvous(t)

# This represents the allocation on rank 0 and is accessible from all devices.
buf = symm_mem.get_buffer(0, (64, 64), torch.float32)

if symm_mem.rank == 0:
    symm_mem.wait_signal(src_rank=1)
    assert buf.eq(42).all()
else:
    # The remote buffer can be used as a regular tensor
    buf.fill_(42)
    symm_mem.put_signal(dst_rank=0)

symm_mem.barrier()

if symm_mem.rank == 0:
    symm_mem.barrier()
    assert buf.eq(43).all()
else:
    new_val = torch.empty_like(buf)
    new_val.fill_(43)
    # Contiguous copies to/from a remote buffer utilize copy engines
    # which bypasses SMs (i.e. no need to load the data into registers)
    buf.copy_(new_val)
    symm_mem.barrier()
```

### Custom CUDA Comm Kernels

Given a tensor, users can access the associated `SymmetricMemory` which provides pointer to remote buffers/signal_pads needed for custom communication kernels.

```cpp
TORCH_API c10::intrusive_ptr<SymmetricMemory> get_symmetric_memory(
    const at::Tensor& tensor);

class TORCH_API SymmetricMemory : public c10::intrusive_ptr_target {
 public:
  ...
  virtual std::vector<void*> get_buffer_ptrs() = 0;
  virtual std::vector<void*> get_signal_pad_ptrs() = 0;
  virtual void** get_buffer_ptrs_dev() = 0;
  virtual void** get_signal_pad_ptrs_dev() = 0;
  virtual size_t get_buffer_size() = 0;
  virtual size_t get_signal_pad_size() = 0;
  virtual int get_rank() = 0;
  virtual int get_world_size() = 0;
  ...
};
```

### Limitations of IntraNodeComm and ProcessGroupCudaP2p
Both `IntraNodeComm` (used by `ProcessGroupCudaP2p`) manages a single fixed-size workspace. This approach:
- Leads to awkward UX in which the required workspace needs to be specified upfront.
- Can not avoid extra copies for some algorithms in eager mode (e.g., custom/multimem all-reduce, reduce-scatter, all-gather).
- Prevents torch.compile from eliminating all copies.

In addition, they only offer out-of-the-box communication kernels and don't expose required pointers for user-defined, custom CUDA comm kernels.

* __->__ #128582

Differential Revision: [D58849033](https://our.internmc.facebook.com/intern/diff/D58849033)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/128582
Approved by: https://github.com/wanchaol
2024-06-21 08:49:11 +00:00
PyTorch MergeBot
63a724d8e1 Revert "Introduce a prototype for SymmetricMemory (#128582)"
This reverts commit 8771e3429c.

Reverted https://github.com/pytorch/pytorch/pull/128582 on behalf of https://github.com/fbgheith due to breaking internal builds ([comment](https://github.com/pytorch/pytorch/pull/128582#issuecomment-2181656181))
2024-06-20 22:31:29 +00:00
Yifu Wang
8771e3429c Introduce a prototype for SymmetricMemory (#128582)
Stack from [ghstack](https://github.com/ezyang/ghstack) (oldest at bottom):

This PR introduces a prototype for `SymmetricMemory` (including a CUDA implementation) - a remote-memory access-based communication primitive. It allows for user-defined communication patterns/kernels and is designed to be torch.compile-friendly. It addresses the major limitations of `IntraNodeComm` and `ProcessGroupCudaP2p` and serves as a replacement for them.

### SymmetricMemory

`SymmetricMemory` represents symmetric allocations across a group of devices. The allocations represented by a `SymmetricMemory` object are accessible by all devices in the group. The class can be used for **op-level custom communication patterns** (via the get_buffer APIs and the synchronization primitives), as well as **custom communication kernels** (via the buffer and signal_pad device pointers).

### Python API Example

```python
from torch._C.distributed_c10d import _SymmetricMemory

# Set a store for rendezvousing symmetric allocations on a group of devices
# identified by group_name. The concept of groups is logical; users can
# utilize predefined groups (e.g., a group of device identified by a
# ProcessGroup) or create custom ones. Note that a SymmetricMemoryAllocator
# backends might employ a more efficient communication channel for the actual
# rendezvous process and only use the store for bootstrapping purposes.
_SymmetricMemory.set_group_info(group_name, rank, world_size, store)

# Identical to empty_strided, but allows symmetric memory access to be
# established for the allocated tensor via _SymmetricMemory.rendezvous().
# This function itself is not a collective operation.
t = _SymmetricMemory.empty_strided_p2p((64, 64), (64, 1), torch.float32, group_name)

# Users can write Python custom ops that leverages the symmetric memory access.
# Below are examples of things users can do (assuming the group's world_size is 2).

# Establishes symmetric memory access on tensors allocated via
# _SymmetricMemory.empty_strided_p2p(). rendezvous() is a one-time process,
# and the mapping between a local memory region and the associated SymmetricMemory
# object is unique. Subsequent calls to rendezvous() with the same tensor will receive
# the cached SymmetricMemory object.
#
# The function has a collective semantic and must be invoked simultaneously
# from all rendezvous participants.
symm_mem = _SymmetricMemory.rendezvous(t)

# This represents the allocation on rank 0 and is accessible from all devices.
buf = symm_mem.get_buffer(0, (64, 64), torch.float32)

if symm_mem.rank == 0:
    symm_mem.wait_signal(src_rank=1)
    assert buf.eq(42).all()
else:
    # The remote buffer can be used as a regular tensor
    buf.fill_(42)
    symm_mem.put_signal(dst_rank=0)

symm_mem.barrier()

if symm_mem.rank == 0:
    symm_mem.barrier()
    assert buf.eq(43).all()
else:
    new_val = torch.empty_like(buf)
    new_val.fill_(43)
    # Contiguous copies to/from a remote buffer utilize copy engines
    # which bypasses SMs (i.e. no need to load the data into registers)
    buf.copy_(new_val)
    symm_mem.barrier()
```

### Custom CUDA Comm Kernels

Given a tensor, users can access the associated `SymmetricMemory` which provides pointer to remote buffers/signal_pads needed for custom communication kernels.

```cpp
TORCH_API c10::intrusive_ptr<SymmetricMemory> get_symmetric_memory(
    const at::Tensor& tensor);

class TORCH_API SymmetricMemory : public c10::intrusive_ptr_target {
 public:
  ...
  virtual std::vector<void*> get_buffer_ptrs() = 0;
  virtual std::vector<void*> get_signal_pad_ptrs() = 0;
  virtual void** get_buffer_ptrs_dev() = 0;
  virtual void** get_signal_pad_ptrs_dev() = 0;
  virtual size_t get_buffer_size() = 0;
  virtual size_t get_signal_pad_size() = 0;
  virtual int get_rank() = 0;
  virtual int get_world_size() = 0;
  ...
};
```

### Limitations of IntraNodeComm and ProcessGroupCudaP2p
Both `IntraNodeComm` (used by `ProcessGroupCudaP2p`) manages a single fixed-size workspace. This approach:
- Leads to awkward UX in which the required workspace needs to be specified upfront.
- Can not avoid extra copies for some algorithms in eager mode (e.g., custom/multimem all-reduce, reduce-scatter, all-gather).
- Prevents torch.compile from eliminating all copies.

In addition, they only offer out-of-the-box communication kernels and don't expose required pointers for user-defined, custom CUDA comm kernels.

* __->__ #128582

Pull Request resolved: https://github.com/pytorch/pytorch/pull/128582
Approved by: https://github.com/wanchaol
2024-06-19 03:38:58 +00:00
PyTorch MergeBot
77830d509f Revert "Introduce a prototype for SymmetricMemory (#128582)"
This reverts commit 7a39755da2.

Reverted https://github.com/pytorch/pytorch/pull/128582 on behalf of https://github.com/fbgheith due to breaking internal builds ([comment](https://github.com/pytorch/pytorch/pull/128582#issuecomment-2176685232))
2024-06-18 18:11:43 +00:00
Yifu Wang
7a39755da2 Introduce a prototype for SymmetricMemory (#128582)
Stack from [ghstack](https://github.com/ezyang/ghstack) (oldest at bottom):

This PR introduces a prototype for `SymmetricMemory` (including a CUDA implementation) - a remote-memory access-based communication primitive. It allows for user-defined communication patterns/kernels and is designed to be torch.compile-friendly. It addresses the major limitations of `IntraNodeComm` and `ProcessGroupCudaP2p` and serves as a replacement for them.

### SymmetricMemory

`SymmetricMemory` represents symmetric allocations across a group of devices. The allocations represented by a `SymmetricMemory` object are accessible by all devices in the group. The class can be used for **op-level custom communication patterns** (via the get_buffer APIs and the synchronization primitives), as well as **custom communication kernels** (via the buffer and signal_pad device pointers).

### Python API Example

```python
from torch._C.distributed_c10d import _SymmetricMemory

# Set a store for rendezvousing symmetric allocations on a group of devices
# identified by group_name. The concept of groups is logical; users can
# utilize predefined groups (e.g., a group of device identified by a
# ProcessGroup) or create custom ones. Note that a SymmetricMemoryAllocator
# backends might employ a more efficient communication channel for the actual
# rendezvous process and only use the store for bootstrapping purposes.
_SymmetricMemory.set_group_info(group_name, rank, world_size, store)

# Identical to empty_strided, but allows symmetric memory access to be
# established for the allocated tensor via _SymmetricMemory.rendezvous().
# This function itself is not a collective operation.
t = _SymmetricMemory.empty_strided_p2p((64, 64), (64, 1), torch.float32, group_name)

# Users can write Python custom ops that leverages the symmetric memory access.
# Below are examples of things users can do (assuming the group's world_size is 2).

# Establishes symmetric memory access on tensors allocated via
# _SymmetricMemory.empty_strided_p2p(). rendezvous() is a one-time process,
# and the mapping between a local memory region and the associated SymmetricMemory
# object is unique. Subsequent calls to rendezvous() with the same tensor will receive
# the cached SymmetricMemory object.
#
# The function has a collective semantic and must be invoked simultaneously
# from all rendezvous participants.
symm_mem = _SymmetricMemory.rendezvous(t)

# This represents the allocation on rank 0 and is accessible from all devices.
buf = symm_mem.get_buffer(0, (64, 64), torch.float32)

if symm_mem.rank == 0:
    symm_mem.wait_signal(src_rank=1)
    assert buf.eq(42).all()
else:
    # The remote buffer can be used as a regular tensor
    buf.fill_(42)
    symm_mem.put_signal(dst_rank=0)

symm_mem.barrier()

if symm_mem.rank == 0:
    symm_mem.barrier()
    assert buf.eq(43).all()
else:
    new_val = torch.empty_like(buf)
    new_val.fill_(43)
    # Contiguous copies to/from a remote buffer utilize copy engines
    # which bypasses SMs (i.e. no need to load the data into registers)
    buf.copy_(new_val)
    symm_mem.barrier()
```

### Custom CUDA Comm Kernels

Given a tensor, users can access the associated `SymmetricMemory` which provides pointer to remote buffers/signal_pads needed for custom communication kernels.

```cpp
TORCH_API c10::intrusive_ptr<SymmetricMemory> get_symmetric_memory(
    const at::Tensor& tensor);

class TORCH_API SymmetricMemory : public c10::intrusive_ptr_target {
 public:
  ...
  virtual std::vector<void*> get_buffer_ptrs() = 0;
  virtual std::vector<void*> get_signal_pad_ptrs() = 0;
  virtual void** get_buffer_ptrs_dev() = 0;
  virtual void** get_signal_pad_ptrs_dev() = 0;
  virtual size_t get_buffer_size() = 0;
  virtual size_t get_signal_pad_size() = 0;
  virtual int get_rank() = 0;
  virtual int get_world_size() = 0;
  ...
};
```

### Limitations of IntraNodeComm and ProcessGroupCudaP2p
Both `IntraNodeComm` (used by `ProcessGroupCudaP2p`) manages a single fixed-size workspace. This approach:
- Leads to awkward UX in which the required workspace needs to be specified upfront.
- Can not avoid extra copies for some algorithms in eager mode (e.g., custom/multimem all-reduce, reduce-scatter, all-gather).
- Prevents torch.compile from eliminating all copies.

In addition, they only offer out-of-the-box communication kernels and don't expose required pointers for user-defined, custom CUDA comm kernels.

* __->__ #128582

Pull Request resolved: https://github.com/pytorch/pytorch/pull/128582
Approved by: https://github.com/wanchaol
2024-06-15 10:20:21 +00:00
PyTorch MergeBot
846bb30e13 Revert "[1/N] Change #include <c10/util/Optional.h> to #include <optional> (#128301)"
This reverts commit bd72e28314.

Reverted https://github.com/pytorch/pytorch/pull/128301 on behalf of https://github.com/huydhn due to Sorry for reverting your change but it fails XLA build bd72e28314. Please rebase your PR before relanding because I think the failure is hidden by an unrelated broken trunk XLA failure from your current base commit ([comment](https://github.com/pytorch/pytorch/pull/128301#issuecomment-2169035822))
2024-06-15 01:58:20 +00:00
cyy
bd72e28314 [1/N] Change #include <c10/util/Optional.h> to #include <optional> (#128301)
Fixes #ISSUE_NUMBER

Pull Request resolved: https://github.com/pytorch/pytorch/pull/128301
Approved by: https://github.com/ezyang
2024-06-14 23:21:01 +00:00
Chirag Pandya
0bf2fe522a [RFC] Provide optional switches to _dump_nccl_trace (#127651)
Summary:
Data from PyTorch distributed is mostly useful during initial stages of model development.
Provide options to reduce data sent/dumped.
`_dump_nccl_trace` takes 3 optional switches. Default as before returns everything
- `includeCollectives`: option to also include collectives: Default is True.
- `includeStacktraces`: option to include stack traces in collectives. Default is True.
- `onlyActive`: option to only send active collective work - i.e. not completed. Default is
    False (i.e. send everything)

Test Plan:
Unit tests

Reviewers:

Subscribers:

Tasks:

Tags:

Pull Request resolved: https://github.com/pytorch/pytorch/pull/127651
Approved by: https://github.com/wconstab
2024-06-09 14:00:57 +00:00
PyTorch MergeBot
02a901f1e9 Revert "[RFC] Provide optional switches to _dump_nccl_trace (#127651)"
This reverts commit 0a761f0627.

Reverted https://github.com/pytorch/pytorch/pull/127651 on behalf of https://github.com/atalman due to Breaks internal CI ([comment](https://github.com/pytorch/pytorch/pull/127651#issuecomment-2156076838))
2024-06-08 15:30:04 +00:00
Will Constable
8ca4cefc7d [C10D] Ensure gil is not released when calling toPyBytes (#128212)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/128212
Approved by: https://github.com/Skylion007, https://github.com/XilunWu
2024-06-07 18:24:10 +00:00
Xilun Wu
85758fa5ae [c10d][TCPStore] make TCPStore server use libuv by default (#127957)
**Summary**
This PR switches the default TCPStore server backend to a new implementation that utilizes [`libuv`](https://github.com/libuv/libuv) for significantly lower initialization time and better scalability:
<img width="714" alt="image" src="https://github.com/pytorch/pytorch/assets/12968408/18503011-da5d-4104-8ba9-abc456438b02">

We hope this improvement would benefit users from a much shorter startup time in large-scale jobs. Eventually, we hope to fully replace the old TCPStore backend implementation with the libuv one.

**What it changes**
This PR changes the underlying TCPStore server backend to `libuv` if users don't explicitly specify to use the old TCPStore server. This change is not supposed to cause any user notice except significant faster TCPStore startup for large-scale jobs.

One thing to note is, we do not support the initialization approach where user passes in a socket for libuv backend. We plan to support it as a next step but we choose to disable it before fully testing. If you are initializing TCPStore in this approach, you can see the next section to remain using the old TCPStore server.

**Fallback/Remain using the old TCPStore server**
For users who want to stay with the old TCPStore backend, there're 3 ways:

1. If user is directly instantiating TCPStore object, user can pass in argument `use_libuv=False` to use the old TCPStore server backend e.g. `store = torch.distributed.TCPStore(..., use_libuv=False)`.
2. Or, specify the TCPStore backend option in `init_method` when calling default ProcessGroup init, e.g. `torch.distributed.init_process_group(..., init_method="{YOUR_RENDEZVOUS_METHOD}://{YOUR_HOSTNAME}:{YOUR_PORT}?use_libuv=0")`
3. Or, user can set environment variable `USE_LIBUV` to `"0"` when launching.

These 3 approach are in order of precedence. That being said, if user specifies `use_libuv=0` in `init_method` and also sets environment var `USE_LIBUV="1"`, the former will take effect and the TCPStore backend instantiated will be the old one instead of the one using libuv.

**Operating Systems Compatibility**
From the CI signals, we believe the new implementation has the same behavior as the old TCPStore server on all supported platforms. If you notice any behavior discrepancy, please file an issue with `oncall: distributed` label.

**Test Plan**
`pytest test/distributed/test_store.py`
<img width="2548" alt="image" src="https://github.com/pytorch/pytorch/assets/12968408/dc0aebeb-6d5a-4daa-b98c-e56bd39aa588">
note: `TestMultiThreadedWait::test_wait` is a broken test that has been there for some time.

`test/distributed/elastic/utils/distributed_test.py`
<img width="2558" alt="image" src="https://github.com/pytorch/pytorch/assets/12968408/a6a3266d-b798-41c4-94d2-152056a034f6">

**TODO**
1. Update the doc at

- https://pytorch.org/docs/stable/distributed.html#distributed-key-value-store
- https://pytorch.org/docs/stable/distributed.html#tcp-initialization

2. Make torch elastic rendezvous to use libuv TCPStore as well. See `torch/distributed/elastic/rendezvous/c10d_rendezvous_backend.py` cc @mrshenli @pritamdamania87 @zhaojuanmao @satgera @gqchen @aazzolini @osalpekar @jiayisuse @H-Huang @kwen2501 @awgu @penguinwu @fegin @wanchaol @fduwjj @wz337 @tianyu-l @wconstab @yf225 @chauhang @d4l3k @kurman
3. Test if libuv backend is okay with initialization with socket. Change `LibUvTCPStoreTest::test_take_over_listen_socket`.

**Test Plan**
`pytest test/distributed/test_store.py`
<img width="2548" alt="image" src="https://github.com/pytorch/pytorch/assets/12968408/dc0aebeb-6d5a-4daa-b98c-e56bd39aa588">
note: `TestMultiThreadedWait::test_wait` is a broken test that has been there for some time.

`test/distributed/elastic/utils/distributed_test.py`
<img width="2558" alt="image" src="https://github.com/pytorch/pytorch/assets/12968408/a6a3266d-b798-41c4-94d2-152056a034f6">

Differential Revision: [D58259591](https://our.internmc.facebook.com/intern/diff/D58259591)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/127957
Approved by: https://github.com/kurman
ghstack dependencies: #127956
2024-06-07 16:53:01 +00:00
Chirag Pandya
0a761f0627 [RFC] Provide optional switches to _dump_nccl_trace (#127651)
Summary:
Data from PyTorch distributed is mostly useful during initial stages of model development.
Provide options to reduce data sent/dumped.
`_dump_nccl_trace` takes 3 optional switches. Default as before returns everything
- `includeCollectives`: option to also include collectives: Default is True.
- `includeStacktraces`: option to include stack traces in collectives. Default is True.
- `onlyActive`: option to only send active collective work - i.e. not completed. Default is
    False (i.e. send everything)

Test Plan:
Unit tests

Reviewers:

Subscribers:

Tasks:

Tags:

Pull Request resolved: https://github.com/pytorch/pytorch/pull/127651
Approved by: https://github.com/wconstab
2024-06-06 21:59:09 +00:00
Tristan Rice
ffaea656b5 WorkerServer: add support for binding to TCP (#127986)
This adds support for the WorkerServer binding to TCP as well as the existing unix socket support.

```py
server = _WorkerServer("", 1234)
```

Test plan:

Added unit test

```
python test/distributed/elastic/test_control_plane.py
```

Pull Request resolved: https://github.com/pytorch/pytorch/pull/127986
Approved by: https://github.com/c-p-i-o
2024-06-05 22:56:32 +00:00
Tristan Rice
597922ba21 Reapply "distributed debug handlers (#126601)" (#127805)
This reverts commit 7646825c3e.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/127805
Approved by: https://github.com/PaliC
2024-06-04 19:44:30 +00:00
PyTorch MergeBot
7646825c3e Revert "distributed debug handlers (#126601)"
This reverts commit 3d541835d5.

Reverted https://github.com/pytorch/pytorch/pull/126601 on behalf of https://github.com/PaliC due to breaking internal typechecking tests ([comment](https://github.com/pytorch/pytorch/pull/126601#issuecomment-2141076987))
2024-05-31 01:21:24 +00:00
cyy
be7be9fa16 [Distributed] [8/N] Fix clang-tidy warnings in torch/csrc/distributed/c10d (#125102)
This PR continues to clean clang-tidy warnings in torch/csrc/distributed/c10d, following https://github.com/pytorch/pytorch/pull/124987.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/125102
Approved by: https://github.com/ezyang
2024-05-30 16:19:53 +00:00
Tristan Rice
3d541835d5 distributed debug handlers (#126601)
This adds debug handlers as described in:
* https://gist.github.com/d4l3k/828b7be585c7615e85b2c448b308d925 (public copy)
* https://docs.google.com/document/d/1la68szcS6wUYElUUX-P6zXgkPA8lnfzpagMTPys3aQ8/edit (internal copy)

This is only adding the C++ pieces that will be used from the main process. The Python and torchrun pieces will be added in a follow up PR.

This adds 2 handlers out of the box:

* `/handler/ping` for testing purposes
* `/handler/dump_nccl_trace_pickle` as a POC integration with Flight Recorder

Test plan:

```
python test/distributed/elastic/test_control_plane.py
```

Pull Request resolved: https://github.com/pytorch/pytorch/pull/126601
Approved by: https://github.com/kurman, https://github.com/c-p-i-o
2024-05-30 02:21:08 +00:00
Yifu Wang
4a09117d16 Introduce ProcessGroupCudaP2P (#122163)
## Context
This stack prototypes automatic micro-pipelining of `all-gather -> matmul` and `matmul -> reduce-scatter` via Inductor. The idea originates from the paper [Overlap Communication with Dependent Computation via
Decomposition in Large Deep Learning Models](https://dl.acm.org/doi/pdf/10.1145/3567955.3567959). The implementation and some key optimizations are heavily influenced by @lw's implementation in xformers.

The stack contains several components:
- `ProcessGroupCudaP2P` - a thin wrapper around `ProcessGroupNCCL`. It in addition maintains a P2P workspace that enables SM-free, one-sided P2P communication which is needed for optimal micro-pipelining.
- `fused_all_gather_matmul` and `fused_matmul_reduce_scatter` dispatcher ops.
- Post-grad fx pass that detects `all-gather -> matmul` and `matmul -> reduce-scatter` and replaces them with the fused dispatcher ops.

To enable the prototype feature:
- Set the distributed backend to `cuda_p2p`.
- Set `torch._inductor.config._micro_pipeline_tp` to `True`.

*NOTE: the prototype sets nothing in stone w.r.t to each component's design. The purpose is to have a performant baseline with reasonable design on which each component can be further improved.*

## Benchmark
Setup:
- 8 x H100 (500W) + 3rd gen NVSwitch.
- Llama3 8B training w/ torchtitan.
- 8-way TP. Reduced the number of layers from 32 to 8 for benchmarking purpose.

Trace (baseline): https://interncache-all.fbcdn.net/manifold/perfetto-artifacts/tree/ui/index.html#!/?url=https://interncache-all.fbcdn.net/manifold/perfetto_internal_traces/tree/shared_trace/yifu_tmpjaz8zgx0
<img width="832" alt="image" src="https://github.com/pytorch/pytorch/assets/4156752/4addba77-5abc-4d2e-93ea-f68078587fe1">

Trace (w/ micro pipelining): https://interncache-all.fbcdn.net/manifold/perfetto-artifacts/tree/ui/index.html#!/?url=https://interncache-all.fbcdn.net/manifold/perfetto_internal_traces/tree/shared_trace/yifu_tmpn073b4wn
<img width="963" alt="image" src="https://github.com/pytorch/pytorch/assets/4156752/4f44e78d-8196-43ab-a1ea-27390f07e9d2">

## This PR
`ProcessGroupCudaP2P` is a thin wrapper around `ProcessGroupNCCL`. By default, it routes all collectives to the underlying `ProcessGroupNCCL`. In addition, `ProcessGroupCudaP2P` initializes a P2P workspace that allows direct GPU memory access among the members. The workspace can be used in Python to optimize intra-node communication patterns or to create custom intra-node collectives in CUDA.

`ProcessGroupCudaP2P` aims to bridge the gap where certain important patterns can be better optimized via fine-grained P2P memory access than with collectives in the latest version of NCCL. It is meant to complement NCCL rather than replacing it.
Usage:
```
    # Using ProcessGroupCudaP2P
    dist.init_process_group(backend="cuda_p2p", ...)

    # Using ProcessGroupCudaP2P while specifying ProcessGroupCudaP2P.Options
    pg_options = ProcessGroupCudaP2P.Options()
    dist.init_process_group(backend="cuda_p2p", pg_options=pg_options, ...)

    # Using ProcessGroupCudaP2P while specifying ProcessGroupNCCL.Options
    pg_options = ProcessGroupNCCL.Options()
    dist.init_process_group(backend="cuda_p2p", pg_options=pg_options, ...)

    # Using ProcessGroupCudaP2P while specifying both
    # ProcessGroupCudaP2P.Options and ProcessGroupNCCL.Options
    pg_options = ProcessGroupCudaP2P.Options()
    pg_options.nccl_options = ProcessGroupNCCL.Options()
    dist.init_process_group(backend="cuda_p2p", pg_options=pg_options, ...)

    # Down-casting the backend to access p2p buffers for cuda_p2p specific
    # optimizations
    if is_cuda_p2p_group(group):
        backend = get_cuda_p2p_backend(group)
        if required_p2p_buffer_size > backend.get_buffer_size():
            # fallback
        p2p_buffer = backend.get_p2p_buffer(...)
    else:
        # fallback
```

Pull Request resolved: https://github.com/pytorch/pytorch/pull/122163
Approved by: https://github.com/wanchaol
2024-05-24 18:33:18 +00:00