This adds the differentiable collective -- all_to_all_single_grad. This is the initial proof of concept PR and I will be adding the remaining collectives in follow up PRs.
This adds a new function called `all_to_all_single_autograd` which is the autograd variant of `all_to_all_single`. For backwards compatibility + initial testing we wanted to make the autograd variant separate to avoid regressions.
This uses `autograd::Function` to register an Autograd op that calls the original `_c10d_functional::all_to_all_single` via the dispatcher. This works with compile and inductor as opposed to the previous Python implementation that had issues. As this uses the existing `_c10d_functional` ops we don't need to register any meta functions or lowering.
To avoid cudaStream issues this explicitly calls `wait_tensor` in the backward method to ensure it runs under the same stream as the async operation. This hurts performance but can be alleviated potentially using `compile`.
Related work: https://github.com/pytorch/torchrec/blob/main/torchrec/distributed/comm_ops.py
Test plan:
```
pytest test/distributed/test_functional_api.py -k test_all_to_all_single_compile
pytest test/distributed/test_functional_api.py
```
Pull Request resolved: https://github.com/pytorch/pytorch/pull/123599
Approved by: https://github.com/yifuwang
The test has been failing sporadically rencetly in CI and the failures
are not reproducible locally, likely due to some nasty race conditional
related a combination of MultiThreadedTestCase, the use of global state
and finalizers, and the recently introduced test decorator for native
funcol migration.
Switching to the test to use MultiProcessTestCase to provide better
isolation.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/121046
Approved by: https://github.com/weifengpy
Additional changes: tests in test_functional_api.py uses multi-threaded pg which is implemented in Python. For the native ops to call into the Python pg implementation, glue code in PyProcessGroup is required for each collective. This PR also adds a few pieces of previously missing glue code, which are necessary for running test_functional_api.py with native funcol.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/119982
Approved by: https://github.com/wanchaol
This PR mimics what we have done to trace ProcessGroup. This allows use to use FakeProcessGroup with torch.compile. FakeProcessGroup allows us to use world_size > 1 without creating multiple processes thus enabling the usage of PDB to debug bucketing DDP allreduce in the Inductor. We can theoretically use GLOO with world_size==1 to achieve the same goal. However, the `wait()` seems to be optimized away when the world_size is 1.
Differential Revision: [D51136463](https://our.internmc.facebook.com/intern/diff/D51136463/)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/113314
Approved by: https://github.com/wanchaol
We cannot use inner tensors for finalizers as they are uncollective until waited.
This PR adds a bunch of tests for the observable behavior we want, including the
necessary scafold for us to test code for their waitiness.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/107250
Approved by: https://github.com/wconstab
Inductor codegen is suboptimal when calling all_reduce_coalesced with input args. We need to fix inductor's calling convention for that, or something else.
Might not work if any outputs is unused.
Test code:
```python
import torch
import torch.distributed as dist
import torch.nn.functional as F
from functorch import make_fx
import os
import torch.distributed._functional_collectives as ft_c
from torch.testing._internal.common_distributed import (
spawn_threads_and_init_comms,
)
from torch._inductor.compile_fx import compile_fx_inner
def my_fun(a, b):
c = a * 3
tensors = ft_c.all_reduce_coalesced([a, c, b], "sum", [0])
return ((tensors[1] + tensors[0] + tensors[2]).sum(), )
@spawn_threads_and_init_comms(world_size=1)
def inductor_main(self):
x = torch.arange(4).cuda() * (dist.get_rank() + 1)
y = torch.arange(4).cuda() * (dist.get_rank() + 1)
x = x.to(torch.float)
y = y.to(torch.float) * 0.5
res = make_fx(my_fun)(x, y)
print(f"fx graph:\n{res.graph}")
ind = compile_fx_inner(res, [x, y])
print(f"inductor done:\n{ind}")
os.environ["PROXY_TENSOR_TRACING"] = "1"
os.environ["TORCH_COMPILE_DEBUG"] = "1"
torch._dynamo.config.output_code = True
if __name__ == "__main__":
inductor_main(None)
```
Pull Request resolved: https://github.com/pytorch/pytorch/pull/97157
Approved by: https://github.com/fegin