Commit Graph

38 Commits

Author SHA1 Message Date
Yuanyuan Chen
36871622f1 [2/N] Mark unused parameters in C++ code (#165121)
This is follow-up of #164912 to mark unused C++ parameters to improve code readability.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/165121
Approved by: https://github.com/Skylion007
2025-10-15 03:04:39 +00:00
Jeffro
d89189f289 Fix inconsistent clock types in ProcessGroupNCCL::runHookLoop (#162543)
## Summary
This PR fixes an inconsistency in `ProcessGroupNCCL::runHookLoop` when computing `timeStarted`. Both `timeFinished` and `timeStarted` in `WorkInfo` are expected to use `std::chrono::system_clock`, but previously the code was casting a duration from `steady_clock`.

Reviewers suggested using `steady_clock` consistently for time measurement since it is appropriate for durations (see #153135 ). This PR updates both `timeStarted` and `timeFinished` in `WorkInfo`, and corresponding code in `runHookLoop`, to use `std::chrono::steady_clock`.

## Error message:
```
libcxx/include/__memory/allocator_traits.h:302:5: error: no matching function for call to '__construct_at'
  302 |     std::__construct_at(__p, std::forward<_Args>(__args)...);
      |     ^~~~~~~~~~~~~~~~~~~
libcxx/include/__memory/shared_ptr.h:162:33: note: in instantiation of function template specialization 'std::allocator_traits<std::allocator<c10d::WorkInfo>>::construct<c10d::WorkInfo, c10d::OpType, unsigned long, std::chrono::time_point<std::chrono::system_clock, std::chrono::duration<long long, std::ratio<1, 1000000000>>> &, std::chrono::time_point<std::chrono::system_clock> &, std::chrono::duration<float, std::ratio<1, 1000>>, 0>' requested here
  162 |     allocator_traits<_TpAlloc>::construct(__tmp, __get_elem(), std::forward<_Args>(__args)...);
      |                                 ^
libcxx/include/__memory/shared_ptr.h:736:51: note: in instantiation of function template specialization 'std::__shared_ptr_emplace<c10d::WorkInfo, std::allocator<c10d::WorkInfo>>::__shared_ptr_emplace<c10d::OpType, unsigned long, std::chrono::time_point<std::chrono::system_clock, std::chrono::duration<long long, std::ratio<1, 1000000000>>> &, std::chrono::time_point<std::chrono::system_clock> &, std::chrono::duration<float, std::ratio<1, 1000>>, std::allocator<c10d::WorkInfo>, 0>' requested here
  736 |   ::new ((void*)std::addressof(*__guard.__get())) _ControlBlock(__a, std::forward<_Args>(__args)...);
      |                                                   ^
libcxx/include/__memory/shared_ptr.h:744:15: note: in instantiation of function template specialization 'std::allocate_shared<c10d::WorkInfo, std::allocator<c10d::WorkInfo>, c10d::OpType, unsigned long, std::chrono::time_point<std::chrono::system_clock, std::chrono::duration<long long, std::ratio<1, 1000000000>>> &, std::chrono::time_point<std::chrono::system_clock> &, std::chrono::duration<float, std::ratio<1, 1000>>, 0>' requested here
  744 |   return std::allocate_shared<_Tp>(allocator<__remove_cv_t<_Tp> >(), std::forward<_Args>(__args)...);
      |               ^
torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp:2674:32: note: in instantiation of function template specialization 'std::make_shared<c10d::WorkInfo, c10d::OpType, unsigned long, std::chrono::time_point<std::chrono::system_clock, std::chrono::duration<long long, std::ratio<1, 1000000000>>> &, std::chrono::time_point<std::chrono::system_clock> &, std::chrono::duration<float, std::ratio<1, 1000>>, 0>' requested here
 2674 |         onCompletionHook_(std::make_shared<WorkInfo>(
      |                                ^
libcxx/include/__memory/construct_at.h:44:58: note: candidate template ignored: substitution failure [with _Tp = c10d::WorkInfo, _Args = <c10d::OpType, unsigned long, std::chrono::time_point<std::chrono::system_clock, std::chrono::duration<long long, std::ratio<1, 1000000000>>> &, std::chrono::time_point<std::chrono::system_clock> &, std::chrono::duration<float, std::ratio<1, 1000>>>]: no matching constructor for initialization of 'c10d::WorkInfo'
   43 | template <class _Tp, class... _Args, class = decltype(::new(std::declval<void*>()) _Tp(std::declval<_Args>()...))>
      |                                                                                    ~~~
   44 | _LIBCPP_HIDE_FROM_ABI _LIBCPP_CONSTEXPR_SINCE_CXX20 _Tp* __construct_at(_Tp* __location, _Args&&... __args) {
      |                                                          ^
1 error generated.

```

Pull Request resolved: https://github.com/pytorch/pytorch/pull/162543
Approved by: https://github.com/cyyever, https://github.com/Skylion007
2025-09-12 16:50:42 +00:00
Tristan Rice
1b3d69b59f Work: block_current_stream API (#156883)
This implements a new `wait_stream` API in Work that matches how `wait` works for ProcessGroupNCCL for CPU based backends such as Gloo.

The idea is to support Gloo communication overlap in FSDPv2/HSDP with minimal changes to FSDP.

There was a previous attempt to make FSDPv2 use Work.wait but given the extensive stream semantics used it doesn't play nicely. https://github.com/pytorch/pytorch/pull/148780

This uses a "Baton" CUDA kernel which spinlocks on a pinned CPU tensor waiting for it to be set.

Test plan:

```
pytest test/distributed/test_c10d_gloo.py -v -k wait_stream
pytest test/distributed/test_c10d_nccl.py -v -k wait_stream
```

Pull Request resolved: https://github.com/pytorch/pytorch/pull/156883
Approved by: https://github.com/kwen2501, https://github.com/fduwjj
2025-07-08 23:55:46 +00:00
Xuehai Pan
d55dc00f84 [BE][11/16] fix typos in torch/ (torch/csrc/distributed/) (#156321)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/156321
Approved by: https://github.com/jingsh
ghstack dependencies: #156313, #156314, #156315, #156316, #156317, #156319
2025-06-23 02:57:50 +00:00
PyTorch MergeBot
4b55871e06 Revert "[BE][11/16] fix typos in torch/ (torch/csrc/distributed/) (#156321)"
This reverts commit c95f7fa874.

Reverted https://github.com/pytorch/pytorch/pull/156321 on behalf of https://github.com/atalman due to export/test_torchbind.py::TestCompileTorchbind::test_compile_error_on_input_aliasing_contents_backend_aot_eager [GH job link](https://github.com/pytorch/pytorch/actions/runs/15804799771/job/44548489912) [HUD commit link](c95f7fa874) ([comment](https://github.com/pytorch/pytorch/pull/156321#issuecomment-2994163667))
2025-06-22 12:27:36 +00:00
Xuehai Pan
c95f7fa874 [BE][11/16] fix typos in torch/ (torch/csrc/distributed/) (#156321)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/156321
Approved by: https://github.com/jingsh
ghstack dependencies: #156313, #156314, #156315, #156316, #156317, #156319
2025-06-22 08:43:49 +00:00
Shuqiang Zhang
21a9c06ca9 [c10d] differentiate timeout errors from nccl errors (#138240)
Summary:
Our watchdog does not differentiate timeout from NCCL errors clearly in terms of both log and code paths.
It's important for c10d to differentiate different reasons of watchdog
failures. E.g, timeout vs nccl errors, and possibly let users to handle the
errors differently depends on the type of errors
Test Plan:
UT
Subscribers:

Tasks:

Tags:

Pull Request resolved: https://github.com/pytorch/pytorch/pull/138240
Approved by: https://github.com/Skylion007
2024-10-18 01:36:32 +00:00
Shuqiang Zhang
a1b22e369b [c10d] add an API to get the future result(success or failure) of a collective and customize error handling (#137799)
Summary:
This PR is trying to let users to know what exact collective call from the python thread is failing, and
customize their own error handling function, instead of watchdog thread crashing everything.

This is potentially very useful in fault tolerant training, in which we can have in-process restart.
E.g., when an nccl error is detected, users can potentially abort comms, re-init comms and go back to the previous check pointed step and try again, instead of crashing the whole job.

This is to allow users to check the status of each collective call,
using the ivalue::future libs in PT core. This also allows users to
attach its customized failure handling functions by:
work.get_future_result().then(erro_handling_func)

Note that the above call is also non-blocking for CPU thread
Test Plan:
Added a new test: test_get_future_result to verify the workResutl is
correctly propagated to the users

Tags:

Pull Request resolved: https://github.com/pytorch/pytorch/pull/137799
Approved by: https://github.com/fduwjj, https://github.com/wconstab
2024-10-16 00:20:09 +00:00
PyTorch MergeBot
c044deb9ce Revert "c10d/logging: add C10D_LOCK_GUARD (#134131)"
This reverts commit f33bcbe5fd.

Reverted https://github.com/pytorch/pytorch/pull/134131 on behalf of https://github.com/kit1980 due to See D61985186 ([comment](https://github.com/pytorch/pytorch/pull/134131#issuecomment-2327556381))
2024-09-03 22:35:14 +00:00
Tristan Rice
f33bcbe5fd c10d/logging: add C10D_LOCK_GUARD (#134131)
This adds logs if we can't acquire locks in NCCLUtils and ProcessGroupNCCL for 30s.

This is motivated by some deadlocks were seeing and it's unclear if it's in NCCL or on the PyTorch side of things.

This required replacing most `std::mutex` with `std::timed_mutex` and `std::condition_variable_any` as appropriate.

Test plan:

existing CI for regressions

will add unit tests on `C10D_LOCK_GUARD`

Pull Request resolved: https://github.com/pytorch/pytorch/pull/134131
Approved by: https://github.com/c-p-i-o, https://github.com/fduwjj
2024-08-28 01:40:42 +00:00
PyTorch MergeBot
1c4780e69a Revert "c10d/logging: add C10D_LOCK_GUARD (#134131)"
This reverts commit 4c28a0eb0b.

Reverted https://github.com/pytorch/pytorch/pull/134131 on behalf of https://github.com/ZainRizvi due to Sorry but this causes formatting errors internally which make it fail to build. See D61759282 ([comment](https://github.com/pytorch/pytorch/pull/134131#issuecomment-2310455878))
2024-08-26 15:19:27 +00:00
Tristan Rice
4c28a0eb0b c10d/logging: add C10D_LOCK_GUARD (#134131)
This adds logs if we can't acquire locks in NCCLUtils and ProcessGroupNCCL for 30s.

This is motivated by some deadlocks were seeing and it's unclear if it's in NCCL or on the PyTorch side of things.

This required replacing most `std::mutex` with `std::timed_mutex` and `std::condition_variable_any` as appropriate.

Test plan:

existing CI for regressions

will add unit tests on `C10D_LOCK_GUARD`

Pull Request resolved: https://github.com/pytorch/pytorch/pull/134131
Approved by: https://github.com/c-p-i-o, https://github.com/fduwjj
2024-08-24 00:27:39 +00:00
cyy
f4dcf2ae93 [1/N] Change #include <c10/util/Optional.h> to #include <optional> (#128301)
Fixes #ISSUE_NUMBER

Pull Request resolved: https://github.com/pytorch/pytorch/pull/128301
Approved by: https://github.com/ezyang, https://github.com/r-barnes
2024-07-08 07:03:53 +00:00
PyTorch MergeBot
846bb30e13 Revert "[1/N] Change #include <c10/util/Optional.h> to #include <optional> (#128301)"
This reverts commit bd72e28314.

Reverted https://github.com/pytorch/pytorch/pull/128301 on behalf of https://github.com/huydhn due to Sorry for reverting your change but it fails XLA build bd72e28314. Please rebase your PR before relanding because I think the failure is hidden by an unrelated broken trunk XLA failure from your current base commit ([comment](https://github.com/pytorch/pytorch/pull/128301#issuecomment-2169035822))
2024-06-15 01:58:20 +00:00
cyy
bd72e28314 [1/N] Change #include <c10/util/Optional.h> to #include <optional> (#128301)
Fixes #ISSUE_NUMBER

Pull Request resolved: https://github.com/pytorch/pytorch/pull/128301
Approved by: https://github.com/ezyang
2024-06-14 23:21:01 +00:00
Richard Barnes
ed327876f5 [codemod] c10:optional -> std::optional (#126135)
Generated by running the following from PyTorch root:
```
find . -regex ".*\.\(cpp\|h\|cu\|hpp\|cc\|cxx\)$" | grep -v "build/" | xargs -n 50 -P 4 perl -pi -e 's/c10::optional/std::optional/'
```

`c10::optional` is just an alias for `std::optional`. This removes usages of that alias in preparation for eliminating it entirely.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/126135
Approved by: https://github.com/Skylion007, https://github.com/malfet, https://github.com/albanD, https://github.com/aaronenyeshi
2024-05-14 19:35:51 +00:00
cyy
ea61c9cb29 [Distributed] [5/N] Fix clang-tidy warnings in torch/csrc/distributed/c10d (#124043)
This PR continues to fix some clang-tidy warnings in distributed/c10d code, following https://github.com/pytorch/pytorch/pull/124032.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/124043
Approved by: https://github.com/ezyang
2024-04-23 00:43:50 +00:00
Chao Zhou
a11a49af58 Add NCCL work sequence number to work info (#120596)
Summary: Expose sequence number to work info. The number can help applications identify a NCCL work more precisely.

Test Plan:
1. pytest test/distributed/test_c10d_nccl.py::WorkHookTest::test_on_completion_hook_seq
2. pytest test/distributed/test_c10d_nccl.py::WorkHookTest

Differential Revision: D54180050

Pull Request resolved: https://github.com/pytorch/pytorch/pull/120596
Approved by: https://github.com/kwen2501
2024-02-28 07:54:37 +00:00
PyTorch MergeBot
1e70f4d02c Revert "Reland #2 "[C10] PG observability hooks. (#108815, #110907)" (#111072)"
This reverts commit bb1424d46e.

Reverted https://github.com/pytorch/pytorch/pull/111072 on behalf of https://github.com/facebook-github-bot due to Diff reverted internally ([comment](https://github.com/pytorch/pytorch/pull/111072#issuecomment-1765399829))
2023-10-16 23:03:26 +00:00
Will Constable
bb1424d46e Reland #2 "[C10] PG observability hooks. (#108815, #110907)" (#111072)
This reverts commit 314a502eb0.

Changes since original PR:
Reland 1
 *  rename torch.distributed.hooks to torch.distributed._hooks

Reland 2
 * make _hooks importable even if !distributed.is_available()
 * handle cuda driver exit intermittent failure caused by new cuda api usage in callback caller (see prev PR in stack)

(original PR https://github.com/pytorch/pytorch/pull/108815 desc copied below)

Expose a set of observability hooks into C10D such that our users can
detect collectives failure both faster and more easily.

The design is similar to NCCL desync debug that it minimized the
overhead by doing most of the work out of the main thread.

This PR introduces a new module torch.distributed.hooks that exposes the following set of methods:

    register_collective_start_hook
    register_collective_end_hook
    register_process_group_hook

The process group hook exposes PG creation on the member ranks and call them inline from the
the PG creation code. This is fine since this happens during initialization and a limited number of times.

The collective start/end hooks are fired from a single background thread. It reads
events from a C++ queue and dispatches over.

Queue notification is oddly done using a pipe, this is needed so python can abort the thread on shutdown
and have it as background thread. This is not possible with more reasonable choices like a condvar.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/111072
Approved by: https://github.com/malfet
ghstack dependencies: #111061
2023-10-12 16:59:23 +00:00
PyTorch MergeBot
314a502eb0 Revert "Reland "[C10] PG observability hooks. (#108815)" (#110907)"
This reverts commit 7678cd22af.

Reverted https://github.com/pytorch/pytorch/pull/110907 on behalf of https://github.com/huydhn due to Sorry for reverting this, but macos job in trunk starts failing after this 7678cd22af ([comment](https://github.com/pytorch/pytorch/pull/110907#issuecomment-1756497387))
2023-10-11 00:23:42 +00:00
Will Constable
7678cd22af Reland "[C10] PG observability hooks. (#108815)" (#110907)
This reverts commit ff0358b038.

(original PR https://github.com/pytorch/pytorch/pull/108815 desc copied below)

Expose a set of observability hooks into C10D such that our users can
detect collectives failure both faster and more easily.

The design is similar to NCCL desync debug that it minimized the
overhead by doing most of the work out of the main thread.

This PR introduces a new module torch.distributed.hooks that exposes the following set of methods:

    register_collective_start_hook
    register_collective_end_hook
    register_process_group_hook

The process group hook exposes PG creation on the member ranks and call them inline from the
the PG creation code. This is fine since this happens during initialization and a limited number of times.

The collective start/end hooks are fired from a single background thread. It reads
events from a C++ queue and dispatches over.

Queue notification is oddly done using a pipe, this is needed so python can abort the thread on shutdown
and have it as background thread. This is not possible with more reasonable choices like a condvar.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/110907
Approved by: https://github.com/fduwjj
2023-10-10 20:09:40 +00:00
PyTorch MergeBot
ff0358b038 Revert "[C10] PG observability hooks. (#108815)"
This reverts commit 0c7a877745.

Reverted https://github.com/pytorch/pytorch/pull/108815 on behalf of https://github.com/albanD due to Add a new torch.distributed.hooks namespace but does not document it, test was added this morning ([comment](https://github.com/pytorch/pytorch/pull/108815#issuecomment-1751327751))
2023-10-06 19:49:49 +00:00
Rodrigo Kumpera
0c7a877745 [C10] PG observability hooks. (#108815)
Expose a set of observability hooks into C10D such that our users can
detect collectives failure both faster and more easily.

The design is similar to NCCL desync debug that it minimized the
overhead by doing most of the work out of the main thread.

This PR introduces a new module torch.distributed.hooks that exposes the following set of methods:

    register_collective_start_hook
    register_collective_end_hook
    register_process_group_hook

The process group hook exposes PG creation on the member ranks and call them inline from the
the PG creation code. This is fine since this happens during initialization and a limited number of times.

The collective start/end hooks are fired from a single background thread. It reads
events from a C++ queue and dispatches over.

Queue notification is oddly done using a pipe, this is needed so python can abort the thread on shutdown
and have it as background thread. This is not possible with more reasonable choices like a condvar.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/108815
Approved by: https://github.com/wconstab, https://github.com/fduwjj
2023-10-06 18:52:46 +00:00
Rodrigo Kumpera
317e39a8ad [C10d] Cleanup collective sequence number. (#109136)
Sequence numbers must be associated with a Work object
if we want to use it as a way to report collective progress.

The API surface change is introducing Work::getSequenceNumber, which
should eventually be exposed to python.

The bulk of this change is changing gloo to make the sequence number
be always in use and weave it to the dozens subclasses of Work.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/109136
Approved by: https://github.com/fduwjj
2023-09-26 17:17:04 +00:00
Shen Li
45128ab67c [Reland] Add OnCompletion Hook to ProcessGroup (#106988) (#107233)
This allows infra/trainers to get detailed stats about communication
efficiencies without know anything about what model or distributed
training paradigms have been used. This is helpful as infra/trainer
package usually prefers to be as model/algorithm agnostic as possible.
Therefore, we cannot assume that infra/trainer can have access to all
collectives used by the model authors.

This commit adds an `OnCompletion` hook to `ProcessGroupNCCL` which
will be fired on every work completion event.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/107233
Approved by: https://github.com/kumpera
2023-08-15 17:35:14 +00:00
PyTorch MergeBot
fd214aa8be Revert "Add OnCompletion Hook to ProcessGroup (#106988)"
This reverts commit ba1da47e8f.

Reverted https://github.com/pytorch/pytorch/pull/106988 on behalf of https://github.com/huydhn due to Sorry for reverting you change, but it is failing Windows build with some linker error.  The Windows failures on PR looks legit ([comment](https://github.com/pytorch/pytorch/pull/106988#issuecomment-1678580899))
2023-08-15 08:24:33 +00:00
Shen Li
ba1da47e8f Add OnCompletion Hook to ProcessGroup (#106988)
This allows infra/trainers to get detailed stats about communication
efficiencies without know anything about what model or distributed
training paradigms have been used. This is helpful as infra/trainer
package usually prefers to be as model/algorithm agnostic as possible.
Therefore, we cannot assume that infra/trainer can have access to all
collectives used by the model authors.

This commit adds an `OnCompletion` hook to `ProcessGroupNCCL` which
will be fired on every work completion event.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/106988
Approved by: https://github.com/kumpera, https://github.com/H-Huang
ghstack dependencies: #107140, #107141, #107160
2023-08-15 04:32:23 +00:00
Shen Li
facadc6c97 [Easy] Make Work::retrieveOpType a const function (#107141)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/107141
Approved by: https://github.com/awgu
ghstack dependencies: #107140
2023-08-14 23:16:40 +00:00
cyy
1157b4393b Add const reference and std::move in opportunities detected by clang-tidy (#105815)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/105815
Approved by: https://github.com/Skylion007
2023-07-25 12:28:14 +00:00
Howard Huang
9165d46b89 DDP + C10D sparse all_reduce changes (#103916) (#104256)
Summary:

reland of https://github.com/pytorch/pytorch/pull/103916

## Changes

prototyping sparse allreduce using the sparse dispatch key. When passing in sparse tensors into `dist.allreduce()` we can execute our dispatched function.

prior to this change, passing a sparse tensor into `allreduce()` will error out with `Tensor must be dense...`

## Example script

```python
# python -m torch.distributed.run --nnodes=1 --nproc_per_node=2 this_script.py

import torch
import torch.distributed as dist

def main():
    dist.init_process_group(backend="nccl")
    rank = dist.get_rank()
    a = torch.tensor([[0, 2.], [3, 0]]).to(rank)
    a = a.to_sparse()
    print(f"rank {rank} - a: {a}")
    dist.all_reduce(a)

if __name__ == "__main__":
    main()
```

output:
```
rank 1 - a: tensor(indices=tensor([[0, 1],
                       [1, 0]]),
       values=tensor([2., 3.]),
       device='cuda:1', size=(2, 2), nnz=2, layout=torch.sparse_coo)
allreduce_sparse_cuda_
tensor.is_sparse() = 1
in ProcessGroupNCCL::allreduceSparse
rank 0 - a: tensor(indices=tensor([[0, 1],
                       [1, 0]]),
       values=tensor([2., 3.]),
       device='cuda:0', size=(2, 2), nnz=2, layout=torch.sparse_coo)
allreduce_sparse_cuda_
tensor.is_sparse() = 1
in ProcessGroupNCCL::allreduceSparse
```

Test Plan:
Testing commands (OSS):

```
# python
pytest test/distributed/test_c10d_nccl.py -vsk test_sparse_allreduce_ops

# c++
build/bin/ProcessGroupNCCLTest --gtest_filter=ProcessGroupNCCLTest.testSparseAllreduce
```

Testing commands (internal, ondemand GPU):
ddp tests:
```
buck build mode/opt -c hpc_comms.use_ncclexp=default //caffe2/test/distributed:c10d --show-full-output

# Get the .par file from the previous command and use it below
TORCH_SHOW_CPP_STACKTRACE=1 /data/sandcastle/boxes/fbsource/buck-out/v2/gen/fbcode/c8344b52091f4f7f/caffe2/test/distributed/__c10d__/c10d.par -r test_ddp_set_sparse_metadata
```

c10d tests:
```
# build tests and run with log output (python)
buck build mode/opt -c hpc_comms.use_ncclexp=default //caffe2/test/distributed:c10d --show-full-output
NCCL_DEBUG=WARN /data/sandcastle/boxes/fbsource/buck-out/v2/gen/fbcode/c8344b52091f4f7f/caffe2/test/distributed/__c10d__/c10d.par -r test_sparse_allreduce_ops

# python
NCCL_DEBUG=WARN buck test mode/opt -c hpc_comms.use_ncclexp=default //caffe2/test/distributed:c10d -- --exact 'caffe2/test/distributed:c10d - test_sparse_allreduce_ops (test_c10d_nccl.ProcessGroupNCCLTest)'

# c++
NCCL_DEBUG=WARN buck run mode/opt -c hpc_comms.use_ncclexp=default //caffe2/test/cpp/c10d:ProcessGroupNCCLTest -- --gtest_filter=ProcessGroupNCCLTest.testSparseAllreduce
```

Differential Revision: D47056695

Pulled By: H-Huang

Pull Request resolved: https://github.com/pytorch/pytorch/pull/104256
Approved by: https://github.com/rohan-varma
2023-06-28 00:37:52 +00:00
PyTorch MergeBot
436d035dc7 Revert "DDP + C10D sparse all_reduce changes (#103916)"
This reverts commit fed5fba6e4.

Reverted https://github.com/pytorch/pytorch/pull/103916 on behalf of https://github.com/facebook-github-bot due to Diff reverted internally ([comment](https://github.com/pytorch/pytorch/pull/103916#issuecomment-1608412325))
2023-06-26 22:37:58 +00:00
Howard Huang
fed5fba6e4 DDP + C10D sparse all_reduce changes (#103916)
Summary:
## Changes

prototyping sparse allreduce using the sparse dispatch key. When passing in sparse tensors into `dist.allreduce()` we can execute our dispatched function.

prior to this change, passing a sparse tensor into `allreduce()` will error out with `Tensor must be dense...`

## Example script

```python
# python -m torch.distributed.run --nnodes=1 --nproc_per_node=2 this_script.py

import torch
import torch.distributed as dist

def main():
    dist.init_process_group(backend="nccl")
    rank = dist.get_rank()
    a = torch.tensor([[0, 2.], [3, 0]]).to(rank)
    a = a.to_sparse()
    print(f"rank {rank} - a: {a}")
    dist.all_reduce(a)

if __name__ == "__main__":
    main()
```

output:
```
rank 1 - a: tensor(indices=tensor([[0, 1],
                       [1, 0]]),
       values=tensor([2., 3.]),
       device='cuda:1', size=(2, 2), nnz=2, layout=torch.sparse_coo)
allreduce_sparse_cuda_
tensor.is_sparse() = 1
in ProcessGroupNCCL::allreduceSparse
rank 0 - a: tensor(indices=tensor([[0, 1],
                       [1, 0]]),
       values=tensor([2., 3.]),
       device='cuda:0', size=(2, 2), nnz=2, layout=torch.sparse_coo)
allreduce_sparse_cuda_
tensor.is_sparse() = 1
in ProcessGroupNCCL::allreduceSparse
```

Test Plan:
Testing commands (OSS):

```
# python
pytest test/distributed/test_c10d_nccl.py -vsk test_sparse_allreduce_ops

# c++
build/bin/ProcessGroupNCCLTest --gtest_filter=ProcessGroupNCCLTest.testSparseAllreduce
```

Testing commands (internal, ondemand GPU):
ddp tests:
```
buck build mode/opt -c hpc_comms.use_nccl=exp //caffe2/test/distributed:c10d --show-full-output

# Get the .par file from the previous command and use it below
TORCH_SHOW_CPP_STACKTRACE=1 /data/sandcastle/boxes/fbsource/buck-out/v2/gen/fbcode/c8344b52091f4f7f/caffe2/test/distributed/__c10d__/c10d.par -r test_ddp_set_sparse_metadata
```

c10d tests:
```
# build tests and run with log output (python)
buck build mode/opt -c hpc_comms.use_nccl=exp //caffe2/test/distributed:c10d --show-full-output
NCCL_DEBUG=WARN /data/sandcastle/boxes/fbsource/buck-out/v2/gen/fbcode/c8344b52091f4f7f/caffe2/test/distributed/__c10d__/c10d.par -r test_sparse_allreduce_ops

# python
NCCL_DEBUG=WARN buck test mode/opt -c hpc_comms.use_nccl=exp //caffe2/test/distributed:c10d -- --exact 'caffe2/test/distributed:c10d - test_sparse_allreduce_ops (test_c10d_nccl.ProcessGroupNCCLTest)'

# c++
NCCL_DEBUG=WARN buck run mode/opt -c hpc_comms.use_nccl=exp //caffe2/test/cpp/c10d:ProcessGroupNCCLTest -- --gtest_filter=ProcessGroupNCCLTest.testSparseAllreduce
```

Differential Revision: D46724856

Pulled By: H-Huang

Pull Request resolved: https://github.com/pytorch/pytorch/pull/103916
Approved by: https://github.com/rohan-varma
2023-06-26 20:42:17 +00:00
Ke Wen
3a09aa5977 [c10d] Faster coalescing (#98793)
### Description
The PR aims at reducing CPU overhead of context manager style coalescing.

By "context manager style coalescing", we mean:
Sync style:
```
with _coalescing_manager():
     for i in range(num_coll):
         dist.all_reduce(tensors[i])
```
Async style:
```
with _coalescing_manager(async_ops=True) as cm:
     for i in range(num_coll):
         dist.all_reduce(tensors[i])
cm.wait()
```
In previous implementation, each collective in the `num_coll` loop actually calls into the C++ backend, accumulating pybind overhead.

In the new implementation, we capture the collectives at Python level, and only fire towards C++ at the exit of the coalescing manager.

### Tests
In current PR, the "fast path" only applies to all-reduce.
- Flattened 512M: 16.38 ms, including CPU time 131.21 us
- Old _coalescing_manager 64 x 8M: 22.19 ms, including CPU time 2865 us
- New _coalescing_manager 64 x 8M: 16.93 ms, including CPU time 635 us

Hence a 4x reduction in CPU overhead (dependent on `num_coll`).

Cc @mrshenli @kumpera @wanchaol @fegin
Pull Request resolved: https://github.com/pytorch/pytorch/pull/98793
Approved by: https://github.com/kumpera
2023-04-24 21:27:26 +00:00
PyTorch MergeBot
9861ec9785 Revert "[c10d] Faster coalescing (#98793)"
This reverts commit db456ab83d.

Reverted https://github.com/pytorch/pytorch/pull/98793 on behalf of https://github.com/DanilBaibak due to Break internal build
2023-04-21 09:15:04 +00:00
Ke Wen
db456ab83d [c10d] Faster coalescing (#98793)
### Description
The PR aims at reducing CPU overhead of context manager style coalescing.

By "context manager style coalescing", we mean:
Sync style:
```
with _coalescing_manager():
     for i in range(num_coll):
         dist.all_reduce(tensors[i])
```
Async style:
```
with _coalescing_manager(async_ops=True) as cm:
     for i in range(num_coll):
         dist.all_reduce(tensors[i])
cm.wait()
```
In previous implementation, each collective in the `num_coll` loop actually calls into the C++ backend, accumulating pybind overhead.

In the new implementation, we capture the collectives at Python level, and only fire towards C++ at the exit of the coalescing manager.

### Tests
In current PR, the "fast path" only applies to all-reduce.
- Flattened 512M: 16.38 ms, including CPU time 131.21 us
- Old _coalescing_manager 64 x 8M: 22.19 ms, including CPU time 2865 us
- New _coalescing_manager 64 x 8M: 16.93 ms, including CPU time 635 us

Hence a 4x reduction in CPU overhead (dependent on `num_coll`).

Cc @mrshenli @kumpera @wanchaol @fegin
Pull Request resolved: https://github.com/pytorch/pytorch/pull/98793
Approved by: https://github.com/kumpera
2023-04-19 20:17:58 +00:00
cyy
f172feae0d More tidy fixes (#93069)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/93069
Approved by: https://github.com/Skylion007
2023-01-27 06:40:50 +00:00
Howard Huang
74ead61944 [2/N] [Dispatchable Collectives] Extract ProcessGroup::Work into a separate class and update references (#83680)
### Changes
- Move ProcessGroup::Work into its own class and update all the references to it / header includes.

#### Motivation
In the future PRs we will repurpose ProcessGroup to instead contain a list of Backends (ProcessGroupNCCL/Gloo/UCC) and perform dispatching to them based on tensor type. This change is prevent a circular dependency with ProcessGroup depending on Backend and Backend depending on ProcessGroup::Work.

Differential Revision: [D38839212](https://our.internmc.facebook.com/intern/diff/D38839212)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/83680
Approved by: https://github.com/kwen2501
2022-09-14 13:05:58 +00:00