This implements a new `wait_stream` API in Work that matches how `wait` works for ProcessGroupNCCL for CPU based backends such as Gloo.
The idea is to support Gloo communication overlap in FSDPv2/HSDP with minimal changes to FSDP.
There was a previous attempt to make FSDPv2 use Work.wait but given the extensive stream semantics used it doesn't play nicely. https://github.com/pytorch/pytorch/pull/148780
This uses a "Baton" CUDA kernel which spinlocks on a pinned CPU tensor waiting for it to be set.
Test plan:
```
pytest test/distributed/test_c10d_gloo.py -v -k wait_stream
pytest test/distributed/test_c10d_nccl.py -v -k wait_stream
```
Pull Request resolved: https://github.com/pytorch/pytorch/pull/156883
Approved by: https://github.com/kwen2501, https://github.com/fduwjj
Summary:
Our watchdog does not differentiate timeout from NCCL errors clearly in terms of both log and code paths.
It's important for c10d to differentiate different reasons of watchdog
failures. E.g, timeout vs nccl errors, and possibly let users to handle the
errors differently depends on the type of errors
Test Plan:
UT
Subscribers:
Tasks:
Tags:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/138240
Approved by: https://github.com/Skylion007
Summary:
This PR is trying to let users to know what exact collective call from the python thread is failing, and
customize their own error handling function, instead of watchdog thread crashing everything.
This is potentially very useful in fault tolerant training, in which we can have in-process restart.
E.g., when an nccl error is detected, users can potentially abort comms, re-init comms and go back to the previous check pointed step and try again, instead of crashing the whole job.
This is to allow users to check the status of each collective call,
using the ivalue::future libs in PT core. This also allows users to
attach its customized failure handling functions by:
work.get_future_result().then(erro_handling_func)
Note that the above call is also non-blocking for CPU thread
Test Plan:
Added a new test: test_get_future_result to verify the workResutl is
correctly propagated to the users
Tags:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/137799
Approved by: https://github.com/fduwjj, https://github.com/wconstab
This adds logs if we can't acquire locks in NCCLUtils and ProcessGroupNCCL for 30s.
This is motivated by some deadlocks were seeing and it's unclear if it's in NCCL or on the PyTorch side of things.
This required replacing most `std::mutex` with `std::timed_mutex` and `std::condition_variable_any` as appropriate.
Test plan:
existing CI for regressions
will add unit tests on `C10D_LOCK_GUARD`
Pull Request resolved: https://github.com/pytorch/pytorch/pull/134131
Approved by: https://github.com/c-p-i-o, https://github.com/fduwjj
This adds logs if we can't acquire locks in NCCLUtils and ProcessGroupNCCL for 30s.
This is motivated by some deadlocks were seeing and it's unclear if it's in NCCL or on the PyTorch side of things.
This required replacing most `std::mutex` with `std::timed_mutex` and `std::condition_variable_any` as appropriate.
Test plan:
existing CI for regressions
will add unit tests on `C10D_LOCK_GUARD`
Pull Request resolved: https://github.com/pytorch/pytorch/pull/134131
Approved by: https://github.com/c-p-i-o, https://github.com/fduwjj
Summary: Expose sequence number to work info. The number can help applications identify a NCCL work more precisely.
Test Plan:
1. pytest test/distributed/test_c10d_nccl.py::WorkHookTest::test_on_completion_hook_seq
2. pytest test/distributed/test_c10d_nccl.py::WorkHookTest
Differential Revision: D54180050
Pull Request resolved: https://github.com/pytorch/pytorch/pull/120596
Approved by: https://github.com/kwen2501
This reverts commit 314a502eb0.
Changes since original PR:
Reland 1
* rename torch.distributed.hooks to torch.distributed._hooks
Reland 2
* make _hooks importable even if !distributed.is_available()
* handle cuda driver exit intermittent failure caused by new cuda api usage in callback caller (see prev PR in stack)
(original PR https://github.com/pytorch/pytorch/pull/108815 desc copied below)
Expose a set of observability hooks into C10D such that our users can
detect collectives failure both faster and more easily.
The design is similar to NCCL desync debug that it minimized the
overhead by doing most of the work out of the main thread.
This PR introduces a new module torch.distributed.hooks that exposes the following set of methods:
register_collective_start_hook
register_collective_end_hook
register_process_group_hook
The process group hook exposes PG creation on the member ranks and call them inline from the
the PG creation code. This is fine since this happens during initialization and a limited number of times.
The collective start/end hooks are fired from a single background thread. It reads
events from a C++ queue and dispatches over.
Queue notification is oddly done using a pipe, this is needed so python can abort the thread on shutdown
and have it as background thread. This is not possible with more reasonable choices like a condvar.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/111072
Approved by: https://github.com/malfet
ghstack dependencies: #111061
This reverts commit ff0358b038.
(original PR https://github.com/pytorch/pytorch/pull/108815 desc copied below)
Expose a set of observability hooks into C10D such that our users can
detect collectives failure both faster and more easily.
The design is similar to NCCL desync debug that it minimized the
overhead by doing most of the work out of the main thread.
This PR introduces a new module torch.distributed.hooks that exposes the following set of methods:
register_collective_start_hook
register_collective_end_hook
register_process_group_hook
The process group hook exposes PG creation on the member ranks and call them inline from the
the PG creation code. This is fine since this happens during initialization and a limited number of times.
The collective start/end hooks are fired from a single background thread. It reads
events from a C++ queue and dispatches over.
Queue notification is oddly done using a pipe, this is needed so python can abort the thread on shutdown
and have it as background thread. This is not possible with more reasonable choices like a condvar.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/110907
Approved by: https://github.com/fduwjj
Expose a set of observability hooks into C10D such that our users can
detect collectives failure both faster and more easily.
The design is similar to NCCL desync debug that it minimized the
overhead by doing most of the work out of the main thread.
This PR introduces a new module torch.distributed.hooks that exposes the following set of methods:
register_collective_start_hook
register_collective_end_hook
register_process_group_hook
The process group hook exposes PG creation on the member ranks and call them inline from the
the PG creation code. This is fine since this happens during initialization and a limited number of times.
The collective start/end hooks are fired from a single background thread. It reads
events from a C++ queue and dispatches over.
Queue notification is oddly done using a pipe, this is needed so python can abort the thread on shutdown
and have it as background thread. This is not possible with more reasonable choices like a condvar.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/108815
Approved by: https://github.com/wconstab, https://github.com/fduwjj
Sequence numbers must be associated with a Work object
if we want to use it as a way to report collective progress.
The API surface change is introducing Work::getSequenceNumber, which
should eventually be exposed to python.
The bulk of this change is changing gloo to make the sequence number
be always in use and weave it to the dozens subclasses of Work.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/109136
Approved by: https://github.com/fduwjj
This allows infra/trainers to get detailed stats about communication
efficiencies without know anything about what model or distributed
training paradigms have been used. This is helpful as infra/trainer
package usually prefers to be as model/algorithm agnostic as possible.
Therefore, we cannot assume that infra/trainer can have access to all
collectives used by the model authors.
This commit adds an `OnCompletion` hook to `ProcessGroupNCCL` which
will be fired on every work completion event.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/107233
Approved by: https://github.com/kumpera
This allows infra/trainers to get detailed stats about communication
efficiencies without know anything about what model or distributed
training paradigms have been used. This is helpful as infra/trainer
package usually prefers to be as model/algorithm agnostic as possible.
Therefore, we cannot assume that infra/trainer can have access to all
collectives used by the model authors.
This commit adds an `OnCompletion` hook to `ProcessGroupNCCL` which
will be fired on every work completion event.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/106988
Approved by: https://github.com/kumpera, https://github.com/H-Huang
ghstack dependencies: #107140, #107141, #107160
### Description
The PR aims at reducing CPU overhead of context manager style coalescing.
By "context manager style coalescing", we mean:
Sync style:
```
with _coalescing_manager():
for i in range(num_coll):
dist.all_reduce(tensors[i])
```
Async style:
```
with _coalescing_manager(async_ops=True) as cm:
for i in range(num_coll):
dist.all_reduce(tensors[i])
cm.wait()
```
In previous implementation, each collective in the `num_coll` loop actually calls into the C++ backend, accumulating pybind overhead.
In the new implementation, we capture the collectives at Python level, and only fire towards C++ at the exit of the coalescing manager.
### Tests
In current PR, the "fast path" only applies to all-reduce.
- Flattened 512M: 16.38 ms, including CPU time 131.21 us
- Old _coalescing_manager 64 x 8M: 22.19 ms, including CPU time 2865 us
- New _coalescing_manager 64 x 8M: 16.93 ms, including CPU time 635 us
Hence a 4x reduction in CPU overhead (dependent on `num_coll`).
Cc @mrshenli @kumpera @wanchaol @fegin
Pull Request resolved: https://github.com/pytorch/pytorch/pull/98793
Approved by: https://github.com/kumpera
### Description
The PR aims at reducing CPU overhead of context manager style coalescing.
By "context manager style coalescing", we mean:
Sync style:
```
with _coalescing_manager():
for i in range(num_coll):
dist.all_reduce(tensors[i])
```
Async style:
```
with _coalescing_manager(async_ops=True) as cm:
for i in range(num_coll):
dist.all_reduce(tensors[i])
cm.wait()
```
In previous implementation, each collective in the `num_coll` loop actually calls into the C++ backend, accumulating pybind overhead.
In the new implementation, we capture the collectives at Python level, and only fire towards C++ at the exit of the coalescing manager.
### Tests
In current PR, the "fast path" only applies to all-reduce.
- Flattened 512M: 16.38 ms, including CPU time 131.21 us
- Old _coalescing_manager 64 x 8M: 22.19 ms, including CPU time 2865 us
- New _coalescing_manager 64 x 8M: 16.93 ms, including CPU time 635 us
Hence a 4x reduction in CPU overhead (dependent on `num_coll`).
Cc @mrshenli @kumpera @wanchaol @fegin
Pull Request resolved: https://github.com/pytorch/pytorch/pull/98793
Approved by: https://github.com/kumpera
### Changes
- Move ProcessGroup::Work into its own class and update all the references to it / header includes.
#### Motivation
In the future PRs we will repurpose ProcessGroup to instead contain a list of Backends (ProcessGroupNCCL/Gloo/UCC) and perform dispatching to them based on tensor type. This change is prevent a circular dependency with ProcessGroup depending on Backend and Backend depending on ProcessGroup::Work.
Differential Revision: [D38839212](https://our.internmc.facebook.com/intern/diff/D38839212)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/83680
Approved by: https://github.com/kwen2501