Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/59205
Reland of https://github.com/pytorch/pytorch/pull/58422
Similar to Future (which I tackled recently), Message is an ivalue type (a "custom class" one), and the natural way to represent it is inside an intrusive_ptr. However in the RPC code we had a mix of usages, often passing Message by value. This has undesirable consequences, as it could easily trigger a copy by accident, which I believe is why in many places we accepted _rvalue references_ to Message, in order to force the caller to move. In my experience this is non-idiomatic in C++ (normally a function signature specifies how the function consumes its arguments, and it's up to the caller to then decide whether to copy or move).
By moving to intrusive_ptr everywhere I think we eliminate and simplify many of the problems above.
In this PR I do half of the migration, by updating everything except the `toMessageImpl` methods, which will come in the next PR.
ghstack-source-id: 130202849
Test Plan: CI
Reviewed By: mrshenli
Differential Revision: D28623891
fbshipit-source-id: c9aeea3440679a11741ca78c06b03c57cb815a5e
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/58422
Similar to Future (which I tackled recently), Message is an ivalue type (a "custom class" one), and the natural way to represent it is inside an intrusive_ptr. However in the RPC code we had a mix of usages, often passing Message by value. This has undesirable consequences, as it could easily trigger a copy by accident, which I believe is why in many places we accepted _rvalue references_ to Message, in order to force the caller to move. In my experience this is non-idiomatic in C++ (normally a function signature specifies how the function consumes its arguments, and it's up to the caller to then decide whether to copy or move).
By moving to intrusive_ptr everywhere I think we eliminate and simplify many of the problems above.
In this PR I do half of the migration, by updating everything except the `toMessageImpl` methods, which will come in the next PR.
ghstack-source-id: 129567053
Test Plan: CI
Reviewed By: mrshenli
Differential Revision: D28474878
fbshipit-source-id: 5b76d45e05f6fa58c831e369c5c964d126187a6c
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/57636
The "preferred" pointer holder for Future is `intrusive_ptr` (e.g., `then` returns an `intrusive_ptr`, `toFuture` returns `intrusive_ptr`, ...). However in RPC we often wrap it with `shared_ptr`. This probably dates back to when we had a separate Future type, before the merge.
At the boundary between RPC and JIT this difference becomes a bit annoying, as conversions between the pointer types are needed. I think it would be simpler and more consistent to always use `intrusive_ptr`, also in RPC.
This PR was produced mainly by find-and-replace, plus a couple of manual fixes.
ghstack-source-id: 128296581
Test Plan: CI
Reviewed By: pritamdamania87
Differential Revision: D28187972
fbshipit-source-id: d4609273a1550b4921910e85d2198e02f31c905b
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/57294
With the advent of CPUs in the device maps, and to be more generic (e.g., to support AMD GPUs), and to avoid conversions when passing to Future and RRef and such, it's easier to use Devices instead of DeviceIndices. This started by just migrating the TensorPipe agent but the RPC layer is quite intertwined so I had to migrate a lot of stuff.
ghstack-source-id: 127916562
Test Plan: CI
Reviewed By: mrshenli
Differential Revision: D28092733
fbshipit-source-id: 024dcb3648c5898ab13e770413c43958f04f1a8a
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/44859
TensorPipe's `set_device_map` option was applied during the forward
pass. However, if we ran the backward pass for the graph we would not
automatically pick up the reverse device mapping.
As a result, users had to specify both forward and backward device mapping
which is very tedious to do.
In this PR, I've added this functionality such that TensorPipe automatically
picks up the reverse device mapping during the backward pass. This is done by
storing the appropriate device mapping in the "recv" autograd function for
distributed autograd.
#Closes: https://github.com/pytorch/pytorch/issues/44170
ghstack-source-id: 119950842
Test Plan:
1) waitforbuildbot
2) Unit test added.
Reviewed By: mrshenli
Differential Revision: D23751975
fbshipit-source-id: 2717d0ef5bde3db029a6172d98aad95734d52140
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/44655
Since `toHere()` does not execute operations over RPC and simply
transfers the value to the local node, we don't need to enable the profiler
remotely for this message. This causes unnecessary overhead and is not needed.
Since `toHere` is a blocking call, we already profile the call on the local node using `RECORD_USER_SCOPE`, so this does not change the expected profiler results (validated by ensuring all remote profiling tests pass).
ghstack-source-id: 112605610
Test Plan: CI
Reviewed By: mrshenli
Differential Revision: D23641466
fbshipit-source-id: 109d9eb10bd7fe76122b2026aaf1c7893ad10588
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/38748
This diff contains the message scaffolding and profiler changes in order to be able to remotely run the profiler across different nodes and aggregate the results on a single node.
As discussed, we have implemented this by creating new message types, that similar to autograd messages, wrap the profiling information with the original message, and send this new message over the wire. On the receiving end, this wrapped message is detected, we fetch the original message from it, and process the original message with the profiler enabled. When sending a response with profiling information, we serialize the profiled `Events` and send them back over RPC. When such a message is received, the events profiled on the remote node are stored (added back to the local profiler).
Changes in this PR:
- New message types (run_with_profiling_req, run_with_profiling_resp) to send profiling info over the wire. Message parsing logic is added to handle these wrapped types.
- Handling of sending profiler data over the wire, in particular, the attributes of the `ProfilerConfig` and the serialized profiled `Event`s
- The logic for wrapping RPC messages is deduped with that in `rpc_with_autograd`, and the common payload wrapping/unwrapping logic is moved to helper functions in `rpc/utils.cpp`
- Changes in `autograd/utils.cpp` to detect if we have enabled the profiler and are sending an RPC, if so, uses the above new message types
- Changes in request_callback to parse and turn on the profiler in a thread-local fashion
- Serialization and deserialization of profiling `Events`, and support to add the remote events to the thread-local profiler
- Introduction of the concept of `node_id`, which as discussed with ilia-cher , will be used along with the `Event`s handle attribute to distinguish between events. When there are events from different nodes, this node information is rendered in the profile output (e.g. when printing tables), otherwise, it is not, since it is irrelevant.
- Some changes to profiler.cpp to add useful helper methods/guards
- toHere() is now profiled for RRefs
- Unittests
ghstack-source-id: 106134626
Test Plan: Added unittests, existing profiler unittests.
Differential Revision: D19510010
fbshipit-source-id: 044347af992f19a9e3b357c9567f6fc73e988157
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/34650
Resubmit of https://github.com/pytorch/pytorch/pull/33840, which was overly eager in the sense that it deleted a lot of code that we didn't want to get rid of yet (default timeout handling).
This PR adds an optional argument into `rpc_sync` and `rpc_async` as well as `RpcAgent::send()` that allows the user to specify a timeout for an RPC to override the default set timeout. If the user does not specify this argument, then the currently set default RPC timeout given in the RPC constructor or by `rpc.set_rpc_timeout()` is used. Otherwise, we use the passed in timeout.
This diff does not address:
1) timeout support when called rpc.rpc_async is called as a JIT operator. For this to work, we would need to change the logic in `register_distributed_ops` to pass in this timeout to `rpcTorchscript`. One more issue is that torchscript doesn't support the timedelta object. This will be done in a follow up PR as it requires a fair amount of changes to the argument parsing logic.
2) Per-RPC timeouts for internal messages or `rpc.remote()`. A follow-up diff will address the latter with the approach of raising the timeout error at the earliest next possible time to the user, such as when the next time the RRef is forked or `to_here` is called
Added unit tests to confirm the current behavior
ghstack-source-id: 102622601
Test Plan: Added unit tests in rpc_test
Differential Revision: D20376953
fbshipit-source-id: 9fb3f147520588308ab50dd33286255658d76d47
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/35055
This is the first step to improving the way RPCs are profiled as suggested by Ilia. For now, since RPC can return two different types of futures, we have to implement two different code paths, one for the python eager mode future and one for the jit future.
This diff implements the python eager part. We have defined a method `_call_end_callbacks_on_future` that takes in a future and schedules a `RecordFunction` to be completed as a callback on the future.
Once https://github.com/pytorch/pytorch/pull/35039 lands, we can implement the JIT codepath by registering an operator that takes a `Future(t)` as well.
These code paths will be merged once the futures are merged.
ghstack-source-id: 102478180
Test Plan: Added unit tests
Differential Revision: D20452003
fbshipit-source-id: 1acdcb073bd1f63d6fb2e78277ac0be00fd6671d
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/31381
This PR adds support for being able to profile both sync and async RPCs, so that users can use the autograd profiler and be able to view metrics such as RPC latency and number of calls in the profiler output.
The way this is implemented is by using the existing `RecordFunction` class provided by the autograd profiler. We create a `RecordFunction` instance when sending an RPC, if autograd profiling is enabled. We also invoke the starting callbacks on this `RecordFunction` instance, this does things such as start the CPU timer. This instance is then persisted across the lifetime of the RPC by attaching it to the `Future` created by the RPC. When the RPC is finished (i.e. when `future->markComplete()` is called), we run the `RecordFunction` instance's end callbacks, which among other things, stops the timer so that we get the correct RPC latency.
The `RecordFunction` and relevant callbacks in `profiler.cpp` are modified slightly to support running end callbacks from a different thread (which is needed since futures are marked as completed by a different thread than the main RPC thread). By default, the autograd profiler uses a `thread_local` list of `Events` and `thread_id`. However, since we'd like to run the `RecordFunction`'s callbacks from a different thread, we would like to access the list of `Events` created by the original thread. This is done by attaching the `thread_id` for the event to the `RecordFunction`, and then looking up the event with that thread in `all_event_lists` (see the changes in `profiler.cpp`). To ensure that the original behavior does not change in the profiler, this described behavior is only run when a user calls `setOverrideThreadId()` on the `RecordFunction` object.
ghstack-source-id: 96527291
Test Plan: Added a unit test.
Differential Revision: D19053322
fbshipit-source-id: 9a27a60c809fc4fdb16fa5d85085f3b6b21abfbb
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/31207
Cleanup after #30914.
In #30914, `autogradContext->addKnownWorkerId(dst);` was moved out of `addSendRpcBackward()`.
So `addSendRpcBackward()` does not need `dstId` as it's argument anymore.
ghstack-source-id: 95509218
Test Plan:
# Unit tests
```
buck test mode/dev-nosan //caffe2/test:dist_autograd_fork -- test_context_cleanup_tensor_no_grad
```
Differential Revision: D5742365
fbshipit-source-id: accd041a594ec18d369231f5590289828d87baa7
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/29770
We were passing around const and non-const references for
DistAutogradContext from DistAutogradContainer. This wasn't safe since the
context could be deleted from the container and a thread might still be using
the reference. This usually would happen when a backward pass fails on the node
driving the backward pass (resulting in delete context messages being sent to
all nodes) but other nodes are still executing code related to that autograd
context.
This was also the reason why `test_backward_autograd_engine_error` was flaky.
Using a std::shared_ptr everywhere ensures we're safe and never crash.
Closes#28928Closes#26922
ghstack-source-id: 94201446
Differential Revision: D18494814
fbshipit-source-id: 0c925fdbd5755f6d876dad56885e2cbaf41fc5f0
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/29696
The paths distributed/autograd/context/dist_autograd_context.h and
distributed/autograd/context/dist_autograd_container.h were repetitive.
Therefore renaming these to distributed/autograd/context/context.h and
distributed/autograd/context/container.h
ghstack-source-id: 93850266
Test Plan: waitforbuildbot
Differential Revision: D18467624
fbshipit-source-id: bbf3905396f553006851af296c880c1bd106ec47
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/28630
This includes:
1. Respect autograd context in rpc.remote for builtin ops
2. Force setting autograd context in RRef.to_here() even if the
message for to_here() does not contain any tensor.
Test Plan: Imported from OSS
Differential Revision: D18138562
Pulled By: mrshenli
fbshipit-source-id: a39ec83e556d19130f22eb317927241a017000ba
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/28312
1. currently if autograd context is valid, even tensors do not require grads and grads function are not attached.
it still send rpc with autograd meta. This is not ideal.
This diff makes some change to make sure rpc with autograd meta is sent only if autograd context is valid and tensors require grads
2. meanwhile create a utiliy to attach autograd info and functions as needed
3. add autograd send/recv functions for python rpc call
4. make changes to support nested python rpc calls
5. disallow nested dist autograd context (was landed in #27022)
ghstack-source-id: 92240367
Test Plan: unit tests
Differential Revision: D18017554
fbshipit-source-id: dbe79a5171063901a78a9b3322b9b31c159d098d
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/27576
1. currently if autograd context is valid, even tensors do not require grads and grads function are not attached.
it still send rpc with autograd meta. This is not ideal.
This diff makes some change to make sure rpc with autograd meta is sent only if autograd context is valid and tensors require grads
2. meanwhile create a utiliy to attach autograd info and functions as needed
3. add autograd send/recv functions for python rpc call
4. make changes to support nested python rpc calls
5. disallow nested dist autograd context (was landed in #27022)
ghstack-source-id: 92154535
Test Plan: unit tests
Differential Revision: D17819153
fbshipit-source-id: 37d8a85855bf591f2f2da48d475a06e870a30ea1
Summary:
Per https://github.com/pytorch/pytorch/issues/25525 we want to clean up distributed autograd context on all nodes, in addition to the local one. To do this, we want to send async RPCs to the other nodes telling them to clean up the context.
The first step for this is for a node's context to know about the other workers. This PR does two things:
1) Adds the necessary data structures and getter functions to `DistAutogradContext`
2) Refactors calls to `addSendRpcBackward` to take in the `worker_id` as an additional argument
Pull Request resolved: https://github.com/pytorch/pytorch/pull/26324
Differential Revision: D17769411
Pulled By: rohan-varma
fbshipit-source-id: b7327d1209a574e2e88cb197edff3103024d51ad
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/27022
This change implements the "FAST" mode distributed autograd backward
pass as described in https://github.com/pytorch/pytorch/issues/23110.
At a high level the backward pass works as follows:
1. We start by computing dependencies on the node that calls
`torch.distributed.backward`.
2. This node computes the dependencies starting from the root nodes provided in
the backward call and all the 'send' functions present in the current autograd
context. The "FAST" mode assumes all 'send' functions are part of the autograd
computation.
3. Once the dependency computation is done, the distributed autograd engine
calls the local autograd engine to execute the autograd graph. Note that the
autograd graph on a single node is not necessarily connected because of
inter-node communication. As a result, we have special handling to ensure the
local autograd engine ensures we execute the entire graph starting from the
provided roots and all 'send' functions on the node.
4. When the local autograd engine hits a 'recv' function, it performs an async
RPC to send the gradients over to the appropriate node and stores a future in
the autograd context to keep track of this RPC.
5. On the destination node, the appropriate 'send' function is looked up and
enqueued on the local autograd engine. If this is the first time the node is
hearing about this autograd context id on the backward pass, then the node
computes dependencies for the local autograd engine.
6. As part of compute dependencies, the distributed autograd engine discovers
all leaf nodes and ensures those are passed as 'outputs' to the local autograd
engine. This avoids running the 'AccumulateGrad' function.
7. The gradients computed for the leaf nodes are then actually accumulated in
`DistAutogradContext` for the appropriate autograd context id.
8. The distributed autograd engine waits for the local autograd engine
to complete and also waits for all the 'Futures' (stored in 4.) for respective
RPCs to finish.
We have made the following changes to the local autograd engine for this
purpose:
1. Expose GraphTask and NodeTask so that the distributed autograd engine can
use them.
2. Expose a `execute_with_graph_task` API which gives the distributed engine
to build a GraphTask and pass it to the local autograd engine.
3. Expose a `enqueue_on_cpu` API, which allows the distributed engine to build
a `NodeTask` for a 'send' function and enqueue it on the local autograd engine.
In addition to this a few general improvements:
1. Added a `PropagateGradients` RPC call for the 'recv' function to pass
gradients to the appropriate node during the backward pass.
2. Use IValues as much as possible in serialization for RpcWithAutograd.
3. If Future.wait(), contains a message type EXCEPTION, we throw an appropriate
exception instead of just returning the message. This is inline with what most
Future.wait() APIs do.
4. Added a `get_gradients(context_id)` API which allows users to retrieve a map
from Tensor to respective gradient for the provided context_id on the local
node.
ghstack-source-id: 91794926
Test Plan: unit tests.
Differential Revision: D17652615
fbshipit-source-id: 96f65c52adb2706ee29f4b49e1655afaa0a3bec3
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/25527
Master GH issue: https://github.com/pytorch/pytorch/issues/23110.
This change builds upon https://github.com/pytorch/pytorch/pull/24876 and
provides all the autograd hooks needed for a forward pass with distributed rpc
for builtin operators. This change does not address distributed rpc for python
UDFs and that will be addressed in follow up PRs.
Summary of changes:
1. Attach send autograd functions when a request is sent from the client and
response is sent from the server.
2. Attach receive autograd functions when a request is received on the server
and a response is received on the client.
3. Generate a globally unique autograd_message_id for each send/recv autograd
function pair to uniquely identify them.
ghstack-source-id: 91240466
Test Plan: unit tests.
Differential Revision: D17148077
fbshipit-source-id: 192d8a3f552ed7cc939f55dcca332965c9bd3233
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/24876
This contains very basic functionality of adding 'send' autograd
function to our autograd graph. The purpose of this change is to validate the
basic structure proposed here makes sense. Once this makes sense, we can build
upon this to address more complicated scenarios. At a high level we've added
the following functionality:
1) Define a very simple 'SendRpcBackwards' autograd function.
2) Attach this function to appropriate tensors when we call an RPC.
3) Store the send function in our distributed autograd context.
ghstack-source-id: 89359708
Test Plan: unit tests.
Differential Revision: D16903255
fbshipit-source-id: 6c04794a8e58b199795404225fd9da0c1440460e