Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/59298
After recent changes, LazyStreamContext had in fact always become eager, and was in fact equivalent to a vector of streams. So it makes more sense now to remove this abstraction and use a more self-descriptive type.
This PR migrates the RequestCallback internals. The next PR migrates the TensorPipe agent.
ghstack-source-id: 130583774
Test Plan: CI
Reviewed By: mrshenli
Differential Revision: D28789175
fbshipit-source-id: fa581a50f9a6a1e42c2ad8c808a9b099bea7433e
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/57636
The "preferred" pointer holder for Future is `intrusive_ptr` (e.g., `then` returns an `intrusive_ptr`, `toFuture` returns `intrusive_ptr`, ...). However in RPC we often wrap it with `shared_ptr`. This probably dates back to when we had a separate Future type, before the merge.
At the boundary between RPC and JIT this difference becomes a bit annoying, as conversions between the pointer types are needed. I think it would be simpler and more consistent to always use `intrusive_ptr`, also in RPC.
This PR was produced mainly by find-and-replace, plus a couple of manual fixes.
ghstack-source-id: 128296581
Test Plan: CI
Reviewed By: pritamdamania87
Differential Revision: D28187972
fbshipit-source-id: d4609273a1550b4921910e85d2198e02f31c905b
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/34497
Use a thread_local table to intercept UserRRefs created during user
function args deserialization, and then wait for confirmations of
those UserRRefs before launching the given user function.
Differential Revision: D20347464
Test Plan: Imported from OSS
Pulled By: mrshenli
fbshipit-source-id: 087484a2d2f03fbfb156752ab25653f39b412a07
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/30637
RequestCallback api currently forces work to be always synchronous, which,
as we scale, means we're going to need to throw large number of (mostly
blocked) threads at the rpc problem. For some activities like dependent
autograd rpcs, there's not a necessary reason to block in these threads.
In this change, the RequestCallback api is updated to return a
shared_ptr<FutureMessage> rather than a Message:
std::shared_ptr<FutureMessage> operator()(Message& request) const;
With a futures-style api, RPC ops that wish to be async can then be async,
while short-lived blocking functions (or Python UDFs) can just block.
In this change, we keep all of the current ops as synchronous (i.e. we block
and then return a completed FutureMessage). We also update the rpc_agents in
a manner compatible with this sort of parallelism.
Here, we only want to incur overhead when we use the async behavior.
Some modest extra cost seems unavoidable here (e.g. the allocation for the
std::make_shared<>), but we can trivially detect the synchronous/completed
case in the rpc_agent and avoid the extra thread-switches/etc. in that case.
ghstack-source-id: 95287026
Test Plan:
- Basic: buck test mode/dev-nosan caffe2/test/...
- Additional testcase in ThriftRpcAgentTest for deferred work.
Differential Revision: D18774322
fbshipit-source-id: cf49922a71707cfb1726de16f93af23b160385d8
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/29696
The paths distributed/autograd/context/dist_autograd_context.h and
distributed/autograd/context/dist_autograd_container.h were repetitive.
Therefore renaming these to distributed/autograd/context/context.h and
distributed/autograd/context/container.h
ghstack-source-id: 93850266
Test Plan: waitforbuildbot
Differential Revision: D18467624
fbshipit-source-id: bbf3905396f553006851af296c880c1bd106ec47
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/29605
Adds a wrapper around the existing createException function that
allows passing of an error string, instead of a regular C++ exception. This
allows us to createExceptions for errors that aren't necessarilu c++
exceptions. This function is used by
https://github.com/pytorch/pytorch/pull/29601 and
https://github.com/pytorch/pytorch/pull/26336.
ghstack-source-id: 93819039
Test Plan: Unit tests pass
Differential Revision: D18439216
fbshipit-source-id: 70b6a2e4f107304e322cdd2630847ad0071bc0c1
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/28312
1. currently if autograd context is valid, even tensors do not require grads and grads function are not attached.
it still send rpc with autograd meta. This is not ideal.
This diff makes some change to make sure rpc with autograd meta is sent only if autograd context is valid and tensors require grads
2. meanwhile create a utiliy to attach autograd info and functions as needed
3. add autograd send/recv functions for python rpc call
4. make changes to support nested python rpc calls
5. disallow nested dist autograd context (was landed in #27022)
ghstack-source-id: 92240367
Test Plan: unit tests
Differential Revision: D18017554
fbshipit-source-id: dbe79a5171063901a78a9b3322b9b31c159d098d
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/27576
1. currently if autograd context is valid, even tensors do not require grads and grads function are not attached.
it still send rpc with autograd meta. This is not ideal.
This diff makes some change to make sure rpc with autograd meta is sent only if autograd context is valid and tensors require grads
2. meanwhile create a utiliy to attach autograd info and functions as needed
3. add autograd send/recv functions for python rpc call
4. make changes to support nested python rpc calls
5. disallow nested dist autograd context (was landed in #27022)
ghstack-source-id: 92154535
Test Plan: unit tests
Differential Revision: D17819153
fbshipit-source-id: 37d8a85855bf591f2f2da48d475a06e870a30ea1
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/27022
This change implements the "FAST" mode distributed autograd backward
pass as described in https://github.com/pytorch/pytorch/issues/23110.
At a high level the backward pass works as follows:
1. We start by computing dependencies on the node that calls
`torch.distributed.backward`.
2. This node computes the dependencies starting from the root nodes provided in
the backward call and all the 'send' functions present in the current autograd
context. The "FAST" mode assumes all 'send' functions are part of the autograd
computation.
3. Once the dependency computation is done, the distributed autograd engine
calls the local autograd engine to execute the autograd graph. Note that the
autograd graph on a single node is not necessarily connected because of
inter-node communication. As a result, we have special handling to ensure the
local autograd engine ensures we execute the entire graph starting from the
provided roots and all 'send' functions on the node.
4. When the local autograd engine hits a 'recv' function, it performs an async
RPC to send the gradients over to the appropriate node and stores a future in
the autograd context to keep track of this RPC.
5. On the destination node, the appropriate 'send' function is looked up and
enqueued on the local autograd engine. If this is the first time the node is
hearing about this autograd context id on the backward pass, then the node
computes dependencies for the local autograd engine.
6. As part of compute dependencies, the distributed autograd engine discovers
all leaf nodes and ensures those are passed as 'outputs' to the local autograd
engine. This avoids running the 'AccumulateGrad' function.
7. The gradients computed for the leaf nodes are then actually accumulated in
`DistAutogradContext` for the appropriate autograd context id.
8. The distributed autograd engine waits for the local autograd engine
to complete and also waits for all the 'Futures' (stored in 4.) for respective
RPCs to finish.
We have made the following changes to the local autograd engine for this
purpose:
1. Expose GraphTask and NodeTask so that the distributed autograd engine can
use them.
2. Expose a `execute_with_graph_task` API which gives the distributed engine
to build a GraphTask and pass it to the local autograd engine.
3. Expose a `enqueue_on_cpu` API, which allows the distributed engine to build
a `NodeTask` for a 'send' function and enqueue it on the local autograd engine.
In addition to this a few general improvements:
1. Added a `PropagateGradients` RPC call for the 'recv' function to pass
gradients to the appropriate node during the backward pass.
2. Use IValues as much as possible in serialization for RpcWithAutograd.
3. If Future.wait(), contains a message type EXCEPTION, we throw an appropriate
exception instead of just returning the message. This is inline with what most
Future.wait() APIs do.
4. Added a `get_gradients(context_id)` API which allows users to retrieve a map
from Tensor to respective gradient for the provided context_id on the local
node.
ghstack-source-id: 91794926
Test Plan: unit tests.
Differential Revision: D17652615
fbshipit-source-id: 96f65c52adb2706ee29f4b49e1655afaa0a3bec3
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/25527
Master GH issue: https://github.com/pytorch/pytorch/issues/23110.
This change builds upon https://github.com/pytorch/pytorch/pull/24876 and
provides all the autograd hooks needed for a forward pass with distributed rpc
for builtin operators. This change does not address distributed rpc for python
UDFs and that will be addressed in follow up PRs.
Summary of changes:
1. Attach send autograd functions when a request is sent from the client and
response is sent from the server.
2. Attach receive autograd functions when a request is received on the server
and a response is received on the client.
3. Generate a globally unique autograd_message_id for each send/recv autograd
function pair to uniquely identify them.
ghstack-source-id: 91240466
Test Plan: unit tests.
Differential Revision: D17148077
fbshipit-source-id: 192d8a3f552ed7cc939f55dcca332965c9bd3233