Today we can initialize a mixed-backend process group (e.g. "cpu:gloo,cuda:nccl") but we can only pass one set of process group options.
However, when we call `split_group`, we retrieve that set of options from the parent PG and pass it to the ProcessGroup::groupSplit C++ API, which then attempts to propagate that set of options to all backends.
This leads to an assert on some user code, where ProcessGroupGloo::split is expecting gloo options but receives nccl options instead.
Arguably the APIs as currently designed are just broken; we should not ever expect a single set of backend options to apply across multiple backends. However, fixing this would require changing quite a few public APIs.
As a quick fix, since user-provided options really only exist for NCCL, just warn and fall-back to defaulted options for Gloo if non-gloo options are detected.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/162424
Approved by: https://github.com/d4l3k, https://github.com/fduwjj, https://github.com/H-Huang
Today we can initialize a mixed-backend process group (e.g. "cpu:gloo,cuda:nccl") but we can only pass one set of process group options.
However, when we call `split_group`, we retrieve that set of options from the parent PG and pass it to the ProcessGroup::groupSplit C++ API, which then attempts to propagate that set of options to all backends.
This leads to an assert on some user code, where ProcessGroupGloo::split is expecting gloo options but receives nccl options instead.
Arguably the APIs as currently designed are just broken; we should not ever expect a single set of backend options to apply across multiple backends. However, fixing this would require changing quite a few public APIs.
As a quick fix, since user-provided options really only exist for NCCL, just warn and fall-back to defaulted options for Gloo if non-gloo options are detected.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/162424
Approved by: https://github.com/d4l3k, https://github.com/fduwjj, https://github.com/H-Huang
- Enable communication of tensors with Complex datatype in ProcessGroupGloo, similar to how ProcessGroupNCCL handles it.
- Move a function, which checks if Complex datatype is supported by a reduce operation, from ProcessGroupNCCL.cpp into a new file to be shared with ProcessGroupGloo.
Fixes#156632
Pull Request resolved: https://github.com/pytorch/pytorch/pull/156633
Approved by: https://github.com/d4l3k
Summary: We added group split in D78300794 and remote_group_merge in D78450094. We first want to upstream this change to PGNCCLx as well so that NCCLx can use this new API and we can continue our c10d clean up in https://github.com/pytorch/pytorch/pull/158488.
Test Plan:
CI
```
buck test -c hpc_comms.use_ncclx=stable comms/ncclx/pg/tests:test_c10d_ncclx -- test_group_split_and_merge
```
Rollback Plan:
Differential Revision: D78521060
Pull Request resolved: https://github.com/pytorch/pytorch/pull/158790
Approved by: https://github.com/d4l3k
This updates ProcessGroupGloo to support per operation timeouts. Previously the timeouts were ignored even if they were set.
* This checks if the timeout is `kUnsetTimeout` and conditionally uses the provided timeout or the default timeout from the context.
* This exposes `set_timeout` as a standard method on ProcessGroup/Backend so we can test the global timeout.
Test plan:
```
pytest test/distributed/test_c10d_gloo.py -v -k allreduce_timeout
```
Pull Request resolved: https://github.com/pytorch/pytorch/pull/158128
Approved by: https://github.com/H-Huang, https://github.com/fduwjj
This enables Gloo CUDA when used with a backend that supports GPUDirect which currently is only the IBVERBS backend.
This requires some changes to Gloo which are in https://github.com/pytorch/gloo/pull/441
Since we're now depending on gloo_cuda we need to split ProcessGroupGloo into two pieces, one with the CPU bits (libtorch_cpu) and one with CUDA kernels in libtorch_cuda. This unfortunately requires some major refactoring as some CPU code is shared across both.
The gloo submodule is updated to depend on the new Gloo changes
Test plan:
```py
import os
import time
transport = "TCP"
#transport = "IBVERBS"
os.environ["GLOO_DEVICE_TRANSPORT"] = transport
rank = int(os.environ["RANK"])
os.environ["CUDA_VISIBLE_DEVICES"] = str(rank)
ibv = "mlx5_0:1,mlx5_3:1,mlx5_4:1,mlx5_5:1,mlx5_6:1,mlx5_9:1,mlx5_10:1,mlx5_11:1".split(",")[rank]
ibv_name, ibv_port = ibv.split(":")
os.environ["TORCH_GLOO_IBV_NAME"] = ibv_name
os.environ["TORCH_GLOO_IBV_PORT"] = ibv_port
os.environ["TORCH_GLOO_IBV_INDEX"] = "3"
import torch
import torch.distributed as dist
dist.init_process_group("gloo")
rank = dist.get_rank()
# initial sanity check
#device = "cpu"
#t = torch.zeros(10, device=device)
#dist.all_reduce(t)
#print("sanity complete")
device = "cpu"
iters = 10
warmup_iters = 2
for nelem in [10, 100, 1000, 10000, 100000, 1000000, 10000000, 100000000]:
t = torch.zeros(nelem, device=device)
torch.cuda.current_stream().synchronize()
for i in range(warmup_iters):
dist.all_reduce(t)
torch.cuda.current_stream().synchronize()
start = time.perf_counter()
for i in range(iters):
dist.all_reduce(t)
torch.cuda.current_stream().synchronize()
dur = (time.perf_counter() - start)
qps = iters/dur
bandwidth_gb = t.nbytes * iters / dur / 1e9
gb = t.nbytes / 1e9
if rank == 0:
print(f"{transport=} {device=} {iters=} {nelem=} {qps=} {gb=} {bandwidth_gb=}\n", end="")
```
Pull Request resolved: https://github.com/pytorch/pytorch/pull/153406
Approved by: https://github.com/fduwjj
This adds lazy initialization support to ProcessGroupGloo via `TORCH_GLOO_LAZY_INIT` or via `create_device(..., lazy_init=True)`
This is still a draft PR as there's one race condition when doing coalesced operations that needs to be fixed upstream in Gloo first. Depends on https://github.com/facebookincubator/gloo/pull/427 landing first
This also updates the gloo submodule to include the required changes.
Test plan:
added lazy init test variants
```
pytest -v test/distributed/test_c10d_gloo.py -k Lazy
```
Pull Request resolved: https://github.com/pytorch/pytorch/pull/150801
Approved by: https://github.com/fduwjj
Summary:
X-link: https://github.com/facebookincubator/gloo/pull/423
This modifies `connectFullMesh` to take in a shared_ptr<IStore> instead of a reference. This is an API breaking change but fairly easy to work around.
To have backwards compatibility in PyTorch during the commit phase we add a new ifdef `GLOO_SHARED_STORE` which can provide backwards compatibility until we update the pinned Gloo version in pytorch OSS repo.
This also adds a new `wait_get` method to `IStore` which will allow us to do a more efficient operation in PyTorch TCPStore. PyTorch's `Store::get` automatically waits so we want to make sure we can avoid waiting twice to reduce network traffic.
This change will land simultaneously in PyTorch and Gloo repos.
Test Plan:
```
buck2 test //gloo/... //caffe2/caffe2/contrib/gloo:
```
Differential Revision: D72084111
Pull Request resolved: https://github.com/pytorch/pytorch/pull/150230
Approved by: https://github.com/fduwjj
This adds a `reduce_scatter` implementation for ProcessGroupGloo. This is a pretty naive implementation as it does 1 allreduce per rank but may be useful for testing in FSDP etc. There was an existing implementation of reduce_scatter_tensor/reduce_scatter_tensor_coalesed that has a very similar implementation but requires a fixed tensor size per rank.
If users find these functions to be too slow we can address them as issues arise.
Gloo now supports all major distributed operations. Quite a few of these were added by @rohan-varma and @yifuwang but they didn't update the support chart. We also have `CUDAWork` variants of most operations so those were also added to the chart.
Test plan:
```
pytest -v test/distributed/test_c10d_gloo.py -k reduce_scatter
```
Pull Request resolved: https://github.com/pytorch/pytorch/pull/149869
Approved by: https://github.com/fduwjj
This adds AVG support to ProcessGroupGloo to better support FSDP on CPU. I expect there will be more issues but this is easy enough to support in a naive fashion.
This applies to both reduce and allreduce.
This is a simple SUM + division and may not be the most numerically stable but that's expected. FSDP for low precision data types implements pre/post divide and uses SUM instead.
Test plan:
```
pytest -v test/distributed/test_c10d_gloo.py
```
Pull Request resolved: https://github.com/pytorch/pytorch/pull/149781
Approved by: https://github.com/fduwjj
This PR aims to support the following use case:
```python
def all_reduce_eager(x):
y = x * x
req = dist.all_reduce(y, op=dist.ReduceOp.SUM, async_op=True)
assert isinstance(req, torch.distributed.Work)
return y
@torch.compile(fullgraph=True)
def all_reduce_wait_compiled(y):
torch.ops.c10d_functional.wait_tensor(y)
return y * y
x = torch.ones(1280, 1280, device="cuda") + self.rank
with allow_inflight_collective_as_graph_input_ctx():
y = all_reduce_eager(x)
z = all_reduce_wait_compiled(y)
```
where the collective is issued in eager (with `async_op=True`) but waited in compiled region.
This is important for internal use cases such as TorchRec, where we issue collectives in eager for SparseArch all_to_all but want to wait for them in compiled region at beginning of OverArch, so that the all_to_all can be overlapped with the DenseArch compute that runs in parallel.
----
**Update**: Did two items to prevent regression to existing use cases:
1. Added memory-stressed test case to test_c10d_nccl.py `test_unwaited` to cover existing user's "not calling work.wait() for non-functional collective" use case
2. Gated all new `register_work()` / `unregister_work()` calls with `c10d::allow_inflight_collective_as_graph_input()` check, which is a new context manager that requires explicit user enablement (i.e. not on by default, so should not affect existing users).
The risk of this new version of PR causing regression should be very low.
------
Test commands:
- `pytest -rA test/distributed/test_inductor_collectives.py::TestCollectivesMultiProc::test_eager_async_allreduce_inductor_wait`
- `pytest -rA test/test_fx.py::TestDCE::test_keep_collectives`
- `pytest -rA test/test_fx.py::TestDCE::test_keep_collectives_no_overload`
- `pytest -rA test/distributed/test_c10d_functional_native.py::TestWithNCCL::test_wait_tensor`
- `pytest -rA test/distributed/test_c10d_functional_native.py::TestWithNCCL::test_unwaited`
- `pytest -rA test/distributed/test_c10d_nccl.py::CommTest::test_wait_tensor`
- `pytest -rA test/distributed/test_c10d_nccl.py::CommTest::test_unwaited`
- `pytest -rA test/distributed/_tensor/test_tensor_ops.py::DistTensorOpsTest::test_equal`
- `pytest -rA test/distributed/_tensor/test_random_ops.py::DistTensorRandomOpTest::test_manual_seed`
- `pytest -rA test/distributed/test_dynamo_distributed.py::TestMultiProc::test_ddp_baseline_aot_eager_multiprocess`
- `pytest -rA test/distributed/test_dynamo_distributed.py::TestMultiProc::test_fsdp_aot_eager`
- `pytest -rA test/distributed/test_dynamo_distributed.py::TestMultiProc::test_fsdp_setattr`
- `pytest -rA test/distributed/test_dynamo_distributed.py::TestMultiProc::test_fsdp_unspecialized_forced_getattr_inline`
- `pytest -rA test/distributed/test_dynamo_distributed.py::TestMultiProc::test_fsdp_unspecialized_forced_getattr_no_inline`
- `pytest -rA test/distributed/test_dynamo_distributed.py::TestMultiProc::test_asymmetric_compilation`
- `pytest -rA test/distributed/test_dynamo_distributed.py::TestMultiProc::test_compiler_collectives_automatic_dynamic_scalar`
- `pytest -rA test/distributed/test_dynamo_distributed.py::TestMultiProc::test_compiler_collectives_automatic_dynamic_speculation_divergence`
- `pytest -rA test/distributed/test_dynamo_distributed.py::TestMultiProc::test_compiler_collectives_automatic_dynamic_tensor`
- `pytest -rA test/distributed/test_dynamo_distributed.py::TestMultiProc::test_compiler_collectives_dim_mismatch`
- `pytest -rA test/distributed/test_dynamo_distributed.py::TestMultiProc::test_compiler_collectives_graph_break_empty_graph_still_collective`
- `pytest -rA test/distributed/test_dynamo_distributed.py::TestMultiProc::test_compiler_collectives_missing_source`
- `pytest -rA test/distributed/test_dynamo_distributed.py::TestMultiProc::test_compiler_collectives_scalar_missing_source`
- `pytest -rA test/distributed/test_dynamo_distributed.py::TestMultiProc::test_compiler_collectives_type_mismatch`
- `pytest -rA test/distributed/test_dynamo_distributed.py::TestMultiProc::test_ddp_activation_checkpointing`
- `pytest -rA test/distributed/test_dynamo_distributed.py::TestMultiProc::test_ddp_baseline_aot_eager_multiprocess`
- `pytest -rA test/distributed/test_dynamo_distributed.py::TestMultiProc::test_fsdp_activation_checkpointing`
- `pytest -rA test/distributed/test_dynamo_distributed.py::TestMultiProc::test_fsdp_aot_eager`
- `pytest -rA test/distributed/test_dynamo_distributed.py::TestMultiProc::test_fsdp_inductor`
- `pytest -rA test/distributed/test_dynamo_distributed.py::TestMultiProc::test_fsdp_setattr`
- `pytest -rA test/distributed/test_dynamo_distributed.py::TestMultiProc::test_fsdp_unspecialized_forced_getattr_inline`
- `pytest -rA test/distributed/test_dynamo_distributed.py::TestMultiProc::test_fsdp_unspecialized_forced_getattr_no_inline`
- `pytest -rA test/distributed/test_dynamo_distributed.py::TestMultiProc::test_hf_bert_ddp_aot_eager`
- `pytest -rA test/distributed/test_dynamo_distributed.py::TestMultiProc::test_hf_bert_ddp_aot_eager_static_graph`
- `pytest -rA test/distributed/test_dynamo_distributed.py::TestMultiProc::test_hf_bert_ddp_inductor`
- `pytest -rA test/distributed/test_dynamo_distributed.py::TestMultiProc::test_hf_bert_ddp_inductor_static_graph`
- `pytest -rA test/distributed/test_dynamo_distributed.py::TestMultiProc::test_hf_bert_fsdp_activation_checkpointing`
- `pytest -rA test/distributed/_tensor/test_experimental_ops.py::DistOtherOpsTest::test_bernoulli`
- `pytest -rA test/distributed/_tensor/test_dtensor_compile.py::TestDTensorCompileE2E::test_tp_compile_fullgraph_is_seq_parallel_True`
- `pytest -rA test/distributed/test_inductor_collectives.py::TestCollectivesMultiProc::test_allreduce_inductor_cudagraph_trees`
- `python benchmarks/dynamo/torchbench.py --ci --accuracy --timing --explain --inductor --device cuda --inference --bfloat16 --total-partitions 2 --partition-id 1 --output inference_torchbench.csv --only moco`
------
Differential Revision: [D65023311](https://our.internmc.facebook.com/intern/diff/D65023311)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/137763
Approved by: https://github.com/yifuwang
This adds logs if we can't acquire locks in NCCLUtils and ProcessGroupNCCL for 30s.
This is motivated by some deadlocks were seeing and it's unclear if it's in NCCL or on the PyTorch side of things.
This required replacing most `std::mutex` with `std::timed_mutex` and `std::condition_variable_any` as appropriate.
Test plan:
existing CI for regressions
will add unit tests on `C10D_LOCK_GUARD`
Pull Request resolved: https://github.com/pytorch/pytorch/pull/134131
Approved by: https://github.com/c-p-i-o, https://github.com/fduwjj
This adds logs if we can't acquire locks in NCCLUtils and ProcessGroupNCCL for 30s.
This is motivated by some deadlocks were seeing and it's unclear if it's in NCCL or on the PyTorch side of things.
This required replacing most `std::mutex` with `std::timed_mutex` and `std::condition_variable_any` as appropriate.
Test plan:
existing CI for regressions
will add unit tests on `C10D_LOCK_GUARD`
Pull Request resolved: https://github.com/pytorch/pytorch/pull/134131
Approved by: https://github.com/c-p-i-o, https://github.com/fduwjj