Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/45318
When calling `then()` from WorkNCCL, record the input data pointers in futureNCCLCallbackStream_ before the execution of the input callback.
Note that the recording cannot be directly added to the lambda used by addCallback in ProcessGroupNCCL.hpp. This is because the type of future value in that context is pyobject rather than TensorList, but a type casting will require pybind and introduce Python dependency, which should not be allowed in c10d library.
I have considered creating a util function in a separate file to support this type casting, and then placing it under torch/csrc directory where python dependency is allowed. However, torch/csrc has a dependency on c10d, so this will create a circular dependency.
Finally, a `record_stream_cb_` member is added to FutureNCCL, and the default value is nullptr. A default `record_stream_cb_` implementation is added to `PythonFutureWrapper,` where Python dependency is allowed.
In addition, a few lines are reformatted by lint.
caffe2/torch/csrc/distributed/c10d/init.cpp is only reformatted.
#Closes: https://github.com/pytorch/pytorch/issues/44203
Test Plan:
buck test mode/dev-nosan caffe2/test/distributed:c10d -- ProcessGroupNCCLTest
buck test mode/dev-nosan caffe2/test/distributed:c10d -- test_accumulate_gradients_no_sync_allreduce_with_then_hook
buck test mode/dev-nosan caffe2/test/distributed:c10d -- test_ddp_comm_hook_allreduce_with_then_hook_nccl
Reviewed By: pritamdamania87
Differential Revision: D23910257
fbshipit-source-id: 66920746c41f3a27a3689f22e2a2d9709d0faa15
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/46501
Gradients in this method will not be modified.
ghstack-source-id: 114851646
Test Plan: waitforbuildbot
Reviewed By: pritamdamania87
Differential Revision: D24374300
fbshipit-source-id: a2941891008f9f197a5234b50260218932d2d37d
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/46227
Follow up from https://github.com/pytorch/pytorch/issues/45419, in
this PR I've removed as many PyCFunction casts as I could from the codebase.
The only ones I didn't remove were the ones with `METH_VARARGS | METH_KEYWORDS`
which have 3 parameters instead of 2 and had to be casted. Example: `
{"copy_", (PyCFunction)(void(*)(void))THPStorage_(copy_), METH_VARARGS |
METH_KEYWORDS, nullptr},`
ghstack-source-id: 114632704
Test Plan: waitforbuildbot
Reviewed By: albanD
Differential Revision: D24269435
fbshipit-source-id: 025cfd43a9a2a3e59f6b2951c1a78749193d77cf
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/46498
The name of the first arg "ddp_model" is misleading, because it is actually the reducer of DDP model rather than entire model.
This method is called in the file caffe2/torch/nn/parallel/distributed.py:
`dist._register_comm_hook(self.reducer, state, hook)`
ghstack-source-id: 114531188
(Note: this ignores all push blocking failures!)
Test Plan: waitforbuildbot
Reviewed By: pritamdamania87
Differential Revision: D24372827
fbshipit-source-id: dacb5a59e87400d93a2f35da43560a591ebc5499
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/45942
We only need to keep track of this for traversing the autograd graph
when find_unused_parameters=True. Without that, we populate and keep this
mapping in memory, which occupies sizeof(pointer) * number of grad accumulators
of extra memory.
ghstack-source-id: 114219289
Test Plan: CI
Reviewed By: mrshenli
Differential Revision: D24154407
fbshipit-source-id: 220d723e262f36590a03a3fd2dab47cbfdb87d40
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/46221
The RPC framework only allowed sending RPCs based on provided
WorkerInfo or name. When using RPC with DDP, sometimes it might just be easier
to refer to everything in terms of ranks since DDP doesn't support names yet.
As a result, support a `to` parameter in the RPC APIs which allow for
specifying a rank as well would be helpful.
ghstack-source-id: 114207172
Test Plan:
1) waitforbuildbot
2) Unit Tests
Reviewed By: mrshenli
Differential Revision: D24264989
fbshipit-source-id: 5edf5d92e2bd2f213471dfe7c74eebfa9efc9f70
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/45933
Occasionally users run DDP with models with unused params, in this
case we would like to surface an error message telling them to run with
find_unused_params=True. However, a recent change to rebuild_buckets logic (https://github.com/pytorch/pytorch/pull/44798) made
it so that we raise a size mismatch error when this happens, but the
information about unused parameters is likely to be more useful and likely to
be the most common case of failure. Prefer raising this error over the
subsequent size mismatch errors.
ghstack-source-id: 113914759
Test Plan: Added unittest
Reviewed By: mrshenli
Differential Revision: D24151256
fbshipit-source-id: 5d349a988b4aac7d3e0ef7b3cd84dfdcbe9db675
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/45783
After the previous device maps commits, `pipeWrite` might throw. In
this case, if we increment active calls before `pipeWrite` on the
caller, that active call won't be decremented properly when `pipeWrite`
throws. As a result, `shutdown` can silently timeout. I noticed this
as some tests take more than 60s to finish.
This commit extract the tensor device checking logic out of pipeWrite,
and make sure the error is thrown before the active call count is
incremented.
Differential Revision: D24094803
Test Plan: Imported from OSS
Reviewed By: mruberry
Pulled By: mrshenli
fbshipit-source-id: d30316bb23d2afd3ba4f5540c3bd94a2ac10969b
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/44921
This diff adds support for Process Group point-to-point operations on NCCL backend based on ncclSend/ncclRecv. See https://github.com/pytorch/pytorch/issues/43995 for more context.
ghstack-source-id: 113592785
Test Plan: unittest
Reviewed By: jiayisuse
Differential Revision: D23709848
fbshipit-source-id: cdf38050379ecbb10450f3394631317b41163258
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/45543
This PR adds documentation for the c10d Store to the public docs. Previously these docs were missing although we exposed a lightly-used (but potentially useful) Python API for our distributed key-value store.
ghstack-source-id: 113409195
Test Plan: Will verify screenshots by building the docs.
Reviewed By: pritamdamania87
Differential Revision: D24005598
fbshipit-source-id: 45c3600e7c3f220710e99a0483a9ce921d75d044
Summary:
This is an attempt at refactoring `torch.distributed` implementation. Goal is to push Python layer's global states (like _default_pg) to C++ layer such that `torch.distributed` becomes more TorchScript friendly.
This PR adds the skeleton of C++ implementation, at the moment it is not included in any build (and won't be until method implementations are filled in). If you see any test failures related, feel free to revert.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/45547
Reviewed By: izdeby
Differential Revision: D24024213
Pulled By: gmagogsfm
fbshipit-source-id: 2762767f63ebef43bf58e17f9447d53cf119f05f
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/44826
As described in https://github.com/pytorch/pytorch/issues/43690, there
is a need for DDP to be able to ignore certain parameters in the module (not
install allreduce hooks) for certain use cases. `find_unused_parameters` is
sufficient from a correctness perspective, but we can get better performance
with this upfront list if users know which params are unused, since we won't
have to traverse the autograd graph every iteration.
To enable this, we add a field `parameters_to_ignore` to DDP init and don't
pass in that parameter to reducer if that parameter is in the given list.
ghstack-source-id: 113210109
Test Plan: Added unittest
Reviewed By: xw285cornell, mrshenli
Differential Revision: D23740639
fbshipit-source-id: a0411712a8b0b809b9c9e6da04bef2b955ba5314
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/45401
Added a DeleteKey API for the TCP Store
ghstack-source-id: 112997162
Test Plan:
Modified the existing get/set test to use delete. verified that the
correct keys were deleted and that the numKeys API returned the right values
Reviewed By: mrshenli
Differential Revision: D23955730
fbshipit-source-id: 5c9f82be34ff4521c59f56f8d9c1abf775c67f9f
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/44967
When enabling profiler on server, if it is a different machine it may
not have CUDA while caller does. In this case, we would crash but now we
fallback to CPU and log a warning.
ghstack-source-id: 112977906
Test Plan: CI
Reviewed By: pritamdamania87
Differential Revision: D23790729
fbshipit-source-id: dc6eba172b7e666842d54553f52a6b9d5f0a5362
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/43963
Added a DeleteKey API for the TCP Store
ghstack-source-id: 112939762
Test Plan:
Modified the existing get/set test to use delete. verified that the
correct keys were deleted and that the numKeys API returned the right values
Reviewed By: jiayisuse
Differential Revision: D23009117
fbshipit-source-id: 1a0d95b43d79e665a69b2befbaa059b2b50a1f66
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/43962
TCPStore needs a getNumKeys API for our logging needs.
ghstack-source-id: 112939761
Test Plan: Adding tests to C++ Store Tests
Reviewed By: pritamdamania87
Differential Revision: D22985085
fbshipit-source-id: 8a0d286fbd6fd314dcc997bae3aad0e62b51af83
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/45353
Temporarily removing this feature, will add this back after branch cut.
Test Plan: Imported from OSS
Reviewed By: rohan-varma
Differential Revision: D23939865
Pulled By: mrshenli
fbshipit-source-id: 7dceaffea6b9a16512b5ba6036da73e7f8f83a8e
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/44664
Closes https://github.com/pytorch/pytorch/issues/39971. This PR adds support for functions decorated with `rpc.functions.async_execution` to be profiled over RPC as builtins, jit functions, and blocking python UDFs currently can be. The reasoning for this is to provide complete feature support in terms of RPC profiling and the various types of functions users can run.
To enable this, the PR below this enables calling `disableProfiler()` safely from another thread. We use that functionality to defer disabling the profiler on the server until the future corresponding to the RPC request completes (rather than only the blocking `processRPC` call as was done previously). Since when the future completes we've kicked off the async function and the future corresponding to it has completed, we are able to capture any RPCs the function would have called and the actual work done on the other node.
For example, if the following async function is ran on a server over RPC:
```
def slow_add(x, y):
time.sleep(1)
return torch.add(x, y)
rpc.functions.async_execution
def slow_async_add(to, x, y):
return rpc.rpc_async(to, slow_add, args=(x, y))
```
we expect to see the original RPC profiled, the nested RPC profiled, and the actual torch.add() work. All of these events should be recorded with the correct node id. Here is an example profiling output:
```
------------------------------------------------------------------------------------------------------------------------- --------------- --------------- --------------- --------
------- --------------- --------------- ---------------
Name Self CPU total % Self CPU total CPU total % CPU total CPU time avg Number of Calls Node ID
------------------------------------------------------------------------------------------------------------------------- --------------- --------------- --------------- --------
------- --------------- --------------- --------------- rpc_async#slow_async_add(worker1 -> worker2) 0.00% 0.000us 0 1.012s
1.012s 1 1
aten::empty 7.02% 11.519us 7.02% 11.519us 11.519us 1 1
rpc_async#slow_async_add(worker1 -> worker2)#remote_op: rpc_async#slow_add(worker2 -> worker3) 0.00% 0.000us 0 1.006s
1.006s 1 2 rpc_async#slow_async_add(worker1 -> worker2)#remote_op: aten::empty 7.21% 11.843us 7.21% 11.843us
11.843us 1 2
rpc_async#slow_async_add(worker1 -> worker2)#remote_op: rpc_async#slow_add(worker2 -> worker3)#remote_op: aten::add 71.94% 118.107us 85.77% 140.802us 140.802us 1 3
rpc_async#slow_async_add(worker1 -> worker2)#remote_op: rpc_async#slow_add(worker2 -> worker3)#remote_op: aten::empty 13.82% 22.695us 13.82% 22.695us
22.695us 1 3 ------------------------------------------------------------------------------------------------------------------------- --------------- --------------- --------------- --------
------- --------------- --------------- ---------------
Self CPU time total: 164.164us
```
This PR also moves a bunch of the profiling logic to `rpc/utils.cpp` to declutter `request_callback` code.
ghstack-source-id: 112868470
Test Plan:
```
rvarm1@devbig978:fbcode (52dd34f6)$ buck test mode/no-gpu mode/dev-nosan //caffe2/test/distributed/rpc:process_group_agent -- test_rpc_profiling_async_function --print-passing-details --stress-runs 1
```
Reviewed By: mrshenli
Differential Revision: D23638387
fbshipit-source-id: eedb6d48173a4ecd41d70a9c64048920bd4807c4
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/44983
`_all_gather` was converted from `_wait_all_workers` and inherited its
5 seconds fixed timeout. As `_all_gather` meant to support a broader
set of use cases, the timeout configuration should be more flexible.
This PR makes `rpc._all_gather` use the global default RPC timeout.
Test Plan: Imported from OSS
Reviewed By: pritamdamania87
Differential Revision: D23794383
Pulled By: mrshenli
fbshipit-source-id: 382f52c375f0f25c032c5abfc910f72baf4c5ad9
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/45065
To preserve backwards compatibility with applications that were passing in some ProcessGroupRpcBackendOptions but were not explicitly setting backend=BackendType.PROCESS_GROUP, we're here now inferring the backend type from the options if only the latter ones are passed. If neither are passed, we'll default to TensorPipe, as before this change.
ghstack-source-id: 112586258
Test Plan: Added new unit tests.
Reviewed By: pritamdamania87
Differential Revision: D23814289
fbshipit-source-id: f4be7919e0817a4f539a50ab12216dc3178cb752
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/44655
Since `toHere()` does not execute operations over RPC and simply
transfers the value to the local node, we don't need to enable the profiler
remotely for this message. This causes unnecessary overhead and is not needed.
Since `toHere` is a blocking call, we already profile the call on the local node using `RECORD_USER_SCOPE`, so this does not change the expected profiler results (validated by ensuring all remote profiling tests pass).
ghstack-source-id: 112605610
Test Plan: CI
Reviewed By: mrshenli
Differential Revision: D23641466
fbshipit-source-id: 109d9eb10bd7fe76122b2026aaf1c7893ad10588
Summary:
We currently are fetching an allreduced tensor from Python in C++ in, where we are storing the resulting tensor in a struct's parameter. This PR removes extra tensor paratemeter in the function parameter and fetch from a single place.
Fixes https://github.com/pytorch/pytorch/issues/43960
Pull Request resolved: https://github.com/pytorch/pytorch/pull/44914
Reviewed By: rohan-varma
Differential Revision: D23798888
Pulled By: bugra
fbshipit-source-id: ad1b8c31c15e3758a57b17218bbb9dc1f61f1577
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/45088Fixes#45082
Found a few problems while working on #44983
1. We deliberately swallow RPC timeouts during shutdown, as we haven't
found a good way to handle those. When we convert `_wait_all_workers`
into `_all_gather`, the same logic was inherited. However, as
`_all_gather` meant to be used in more general scenarios, we should
no longer keep silent about errors. This commit let the error throw
in `_all_gather` and also let `shutdown()` to catch them and log.
2. After fixing (1), I found that `UnpickledPythonCall` needs to
acquire GIL on destruction, and this can lead to deadlock when used
in conjuction with `ProcessGroup`. Because `ProcessGroup` ctor is a
synchronization point which holds GIL. In `init_rpc`, followers
(`rank != 0`) can exit before the leader (`rank == 0`). If the two
happens together, we could get a) on a follower, it exits `init_rpc`
after running `_broadcast_to_followers` and before the reaching dtor
of `UnpickledPythonCall`. Then it runs the ctor of `ProcessGroup`,
which holds the GIL and wait for the leader to join. However, the
leader is waiting for the response from `_broadcast_to_followers`,
which is blocked by the dtor of `UnpickledPythonCall`. And hence
the deadlock. This commit drops the GIL in `ProcessGroup` ctor.
3. After fixing (2), I found that `TensorPipe` backend
nondeterministically fails with `test_local_shutdown`, due to a
similar reason as (2), but this time it is that `shutdown()` on a
follower runs before the leader finishes `init_rpc`. This commit
adds a join for `TensorPipe` backend `init_rpc` after `_all_gather`.
The 3rd one should be able to solve the 2nd one as well. But since
I didn't see a reason to hold GIL during `ProcessGroup` ctor, I
made that change too.
Test Plan: Imported from OSS
Reviewed By: pritamdamania87
Differential Revision: D23825592
Pulled By: mrshenli
fbshipit-source-id: 94920f2ad357746a6b8e4ffaa380dd56a7310976
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/45014
Pull Request resolved: https://github.com/pytorch/tensorpipe/pull/219
Pull Request resolved: https://github.com/pytorch/tensorpipe/pull/212
+ Introduce buffer.h defining the buffer struct(s). The `CpuBuffer`
struct is always defined, while the `CudaBuffer` struct is defined
only when `TENSORPIPE_SUPPORTS_CUDA` is true.
+ Update all channels to take a `CpuBuffer` or `CudaBuffer` for
`send`/`recv` rather than a raw pointer and a length.
+ Make the base `Channel`/`Context` classes templated on `TBuffer`,
effectively creating two channel hierarchies (one for CPU channels,
one for CUDA channels).
+ Update the Pipe and the generic channel tests to use the new API. So
far, generic channel tests are CPU only, and tests for the CUDA IPC
channel are (temporarily) disabled. A subsequent PR will take care of
refactoring tests so that generic tests work for CUDA channels. An
other PR will add support for CUDA tensors in the Pipe.
Differential Revision: D23598033
Test Plan: Imported from OSS
Reviewed By: lw
Pulled By: beauby
fbshipit-source-id: 1d6c3f91e288420858835cd5e7962e8da051b44b
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/44345
As part of enhancing profiler support for RPC, when executing TorchScript functions over RPC, we would like to be able to support user-defined profiling scopes created by `with record_function(...)`.
Since after https://github.com/pytorch/pytorch/pull/34705, we support `with` statements in TorchScript, this PR adds support for `with torch.autograd.profiler.record_function` to be used within TorchScript.
This can be accomplished via the following without this PR:
```
torch.opts.profiler._record_function_enter(...)
# Script code, such as forward pass
torch.opts.profiler._record_function_exit(....)
```
This is a bit hacky and it would be much cleaner to use the context manager now that we support `with` statements. Also, `_record_function_` type operators are internal operators that are subject to change, this change will help avoid BC issues in the future.
Tested with `python test/test_jit.py TestWith.test_with_record_function -v`
ghstack-source-id: 112320645
Test Plan:
Repro instructions:
1) Change `def script_add_ones_return_any(x) -> Any` to `def script_add_ones_return_any(x) -> Tensor` in `jit/rpc_test.py`
2) `buck test mode/dev-nosan //caffe2/test/distributed/rpc:process_group_agent -- test_record_function_on_caller_rpc_async --print-passing-details`
3) The function which ideally should accept `Future[Any]` is `def _call_end_callbacks_on_future` in `autograd/profiler.py`.
python test/test_jit.py TestWith.test_with_foo -v
Reviewed By: pritamdamania87
Differential Revision: D23332074
fbshipit-source-id: 61b0078578e8b23bfad5eeec3b0b146b6b35a870
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/44798
[test all]
Update for relanding: in ddp.join(), moved _rebuild_buckets from end of backward to beginning of forward as well.
Part of relanding PR #41954, this refactoring is to move rebuild_buckets call from end of first iteration to beginning of second iteration
ghstack-source-id: 112279261
ghstack-source-id: 112279261
Test Plan: unit tests
Reviewed By: rohan-varma
Differential Revision: D23735185
fbshipit-source-id: c26e0efeecb3511640120faa1122a2c856cd694e
Summary:
[test all]
Pull Request resolved: https://github.com/pytorch/pytorch/pull/44330
Part of relanding PR #41954, this refactor is to seperate intialize_bucket_views and populate_bucket_views_out, as they are doing different things and called by different callsites as well
ghstack-source-id: 112257271
Test Plan: unit tests
Reviewed By: mrshenli
Differential Revision: D23583347
fbshipit-source-id: a5f2041b2c4f2c2b5faba1af834c7143eaade938
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/43796
This diff adds an option for the process group NCCL backend to pick high priority cuda streams.
Test Plan: waitforsandcastle
Reviewed By: jiayisuse
Differential Revision: D23404286
fbshipit-source-id: b79ae097b7cd945a26e8ba1dd13ad3147ac790eb
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/44703
The description of this public function should be in the header file.
Also fix some typos.
Test Plan: N/A.
Reviewed By: pritamdamania87
Differential Revision: D23703661
fbshipit-source-id: 24ae63de9498e321b31dfb2efadb44183c6370df
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/44663
The new API returns the type of the data object referenced by this
`RRef`. On the owner, this is same as `type(rref.local_value())`.
On a user, this will trigger an RPC to fetch the `type` object from
the owner. After this function is run once, the `type` object is
cached by the `RRef`, and subsequent invocations no longer trigger
RPC.
closes#33210
Test Plan: Imported from OSS
Reviewed By: rohan-varma
Differential Revision: D23691990
Pulled By: mrshenli
fbshipit-source-id: a2d87cd601a691dd75164b6bcd7315245e9cf6bd
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/44326
Part of relanding PR #41954, this refactoring is to move rebuild_buckets call from end of first iteration to beginning of second iteration
ghstack-source-id: 112011490
Test Plan: unit tests
Reviewed By: mrshenli
Differential Revision: D23583017
fbshipit-source-id: ef67f79437a820d9b5699b651803622418499a83
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/44340
Changed the constructor of GradBucket to pass the input by const
reference and hence avoided unnecessary explicit move semantics. Since
previously the declaration and definition are separated, passing the input
tensor vector by value looks quite bizarre.
Test Plan: buck test caffe2/torch/lib/c10d:ProcessGroupGlooTest
Reviewed By: pritamdamania87
Differential Revision: D23569939
fbshipit-source-id: db761d42e76bf938089a0b38e98e76a05bcf4162
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/44339
Moved the inline implementations of GradBucket class to the header for
succinctness and readability. This coding style is also consistent with
reducer.h under the same directory.
Test Plan: buck test caffe2/torch/lib/c10d:ProcessGroupGlooTest
Reviewed By: pritamdamania87
Differential Revision: D23569701
fbshipit-source-id: 237d9e2c5f63a6bcac829d0fcb4a5ba3bede75e5