Commit Graph

29 Commits

Author SHA1 Message Date
Luca Wehrstedt
bc09478a60 [TensorPipe] Use the new multi-payload message API (#37919)
Summary:
In D21209901 TensorPipe added support for a vector of payloads inside each message, instead of a single one, so that users with multiple payloads can send them separately as they are instead of having to copy them into a new block of contiguous memory. The PyTorch agent is using the old API, which is preventing us from deleting it. This change has no effects on over-the-wire format and thus on performance.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/37919

ghstack-source-id: 103572164

Test Plan:
On both workers
```
import os
import torch
import torch.distributed.rpc as rpc
os.environ["MASTER_ADDR"] = "127.0.0.1"
os.environ["MASTER_PORT"] = "8765"
```
On worker 0
```
rpc.init_rpc(name="foo", rank=0, backend=rpc.backend_registry.BackendType.TENSORPIPE, world_size=2, rpc_backend_options=rpc.TensorPipeRpcBackendOptions(worker_name_to_id={"foo": 0, "bar": 0}))
```
On worker 1
```
rpc.init_rpc(name="bar", rank=1, backend=rpc.backend_registry.BackendType.TENSORPIPE, world_size=2, rpc_backend_options=rpc.TensorPipeRpcBackendOptions(worker_name_to_id={"foo": 0, "bar": 0}))
```
On worker 0
```
In [15]: rpc.rpc_sync("bar", torch.add, args=(torch.full((2,2), 1), torch.full((2,2), 2)))
Out[15]:
tensor([[3., 3.],
        [3., 3.]])

In [16]: rpc.rpc_sync("bar", torch.add, args=(1, 2))
Out[16]: 3
```

Differential Revision: D21425536

fbshipit-source-id: a0ec2be825556b39aff018a2834baf815a6d8fa5
2020-05-07 02:52:30 -07:00
Edward Yang
fe88806784 Back out "Revert D21171334: [pytorch][PR] Change StorageImpl to track byte count rather than element count" (#37893)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/37893

Original commit changeset: 50746043acf3

Test Plan: sandcastle and ossci

Reviewed By: malfet, seemethere, ngimel

Differential Revision: D21416509

fbshipit-source-id: 735ec4e61f9d36d4537f52dd2dc6267751aeb94b
2020-05-05 22:43:15 -07:00
Edward Yang
a2fc7f787a Revert D21171334: [pytorch][PR] Change StorageImpl to track byte count rather than element count
Test Plan: revert-hammer

Differential Revision:
D21171334

Original commit changeset: 37329a379de9

fbshipit-source-id: 50746043acf3c76754688de0fe6f1cc12437ea2f
2020-05-05 16:36:15 -07:00
Kurt Mohler
3706803b60 Change StorageImpl to track byte count rather than element count (#37776)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/37776

* Remove type-specific size tracking in favor of byte size tracking in Storage and StorageImpl
* Changed numel() and set_numel() to nbytes() and set_nbytes()
* Added enum argument to Storage/StorageImpl constructor to indicate new meaning of the size parameter
* Update all callers of the changed API

Part of issue https://github.com/pytorch/pytorch/issues/33950
Pull Request resolved: https://github.com/pytorch/pytorch/pull/37028

Differential Revision: D21171334

Pulled By: ezyang

fbshipit-source-id: 37329a379de9a3a83cc5e9007e455a3e1c2d10b8
2020-05-05 14:20:51 -07:00
Hongyi Jia
3411ec6e32 [TensorPipe/RPC] Serialize and deserialize message (#36197)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/36197

Create APIs to convert between rpc::message and tensorpipe::message
1. tensorpipeSerialize() - converts rpc::message to tensorpipe::message without memory copy (tensors).
2. tensorpipeAllocateMessage - allocates rpc::message based on received tensorpipe descriptor to prepare memory-copy-free receiving.

Test Plan: buck test caffe2/test/cpp/rpc:test_tensorpipe_serialization

Reviewed By: lw

Differential Revision: D20084125

fbshipit-source-id: ffbc310f93443e50261aed752be0fe176610dd2a
2020-05-05 05:45:57 -07:00
Jeremy Lilley
443fe7ca0e [rpc] Avoid wireDeserializer overreading buffers by 1 byte (#36976)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/36976

The bounds check and the read were swapped in two places - I noticed
ASAN complaining in an unrelated change on an erroneous buffer.

Adding a couple simple test cases.
ghstack-source-id: 102606986

Test Plan: buck test mode/dev caffe2/test/cpp/rpc:

Differential Revision: D21148936

fbshipit-source-id: 7ec5007535f7310437ac1b9a72852a223b9dd29a
2020-04-21 17:01:45 -07:00
Michael Suo
866d9d4e6a [jit] Fix name collision on load (#35720)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/35720

When modules are saved, all relevant types are serialized according to
their qualified name with a compilation unit. Since qualified names are
guaranteed to be unique within a compilation unit, this normally works
fine.

On load, all types are registered in a compilation unit owned by the
script::Module. Type names are not unique across compilation units, so
if you load two modules with colliding type names, make them submodules
of yet another module, and save that module, there is the potential of a
name collision. See the added tests for examples if that description is
confusing.

The solution is to unique type names when serializing code by mangling
them if we detect a name collision.

Test Plan: Imported from OSS

Differential Revision: D20749423

Pulled By: suo

fbshipit-source-id: a8827ff1d4a89f3e7964dbbb49b4381863da3e6a
2020-04-01 00:02:38 -07:00
Edward Yang
96860af870 Revert D20164420: [1.5 Release][Dist Autograd][Better Engineering] Notify Workers on Failure during Distributed Autograd
Test Plan: revert-hammer

Differential Revision:
D20164420

Original commit changeset: 3d4ed7423096

fbshipit-source-id: 67f0f9c11cee84df6dbe37db7821dd601227df66
2020-03-19 08:02:07 -07:00
Omkar Salpekar
5f67c923f1 [1.5 Release][Dist Autograd][Better Engineering] Notify Workers on Failure during Distributed Autograd (#34638)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/34638

Fixes: https://github.com/pytorch/pytorch/issues/27643

This PR manages notifying workers in the event of a failure during distributed autograd. Gracefully handles propagating errors across all nodes in the backward pass and sets state in the local autograd engines accordingly.

(Note: this ignores all push blocking failures!)

Test Plan: Added 2 new tests checking errors when they are thrown in an intermediate node during distributed autograd. Ensured that all existing distributed autograd tests pass.

Differential Revision: D20164420

fbshipit-source-id: 3d4ed74230969ac70bb763f1b5b1c16d979f66a2
2020-03-18 18:56:14 -07:00
Pritam Damania
f058c03b15 Disallow sending CUDA tensors over RPC for current RPC agents. (#33604)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/33604

For our current RPC agents, this PR disallows sending CUDA tensors
over RPC and asks users to copy them explicitly to CPU. Currently, this seems
to be the easiest contract to guarantee for our current RPC agents, otherwise
if we do support this transparently it gets a little tricky in terms of whether
a CUDA tensor on the client should be sent to CPU/GPU of the remote end and
also which GPU device on the remote end.

In the future, the TensorPipe RPC agent can have its own specific handling of
CUDA tensors.

Closes https://github.com/pytorch/pytorch/issues/28881
ghstack-source-id: 100166120

Test Plan: waitforbuildbot

Differential Revision: D20020183

fbshipit-source-id: ca4d43d2a24e8fcd3a60b21e654aa0e953e756cb
2020-03-15 15:01:46 -07:00
Jeremy Lilley
fff6fe83a7 [pytorch-rpc] WireSerializer should check has_storage() (#34626)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/34626

We need to check has_storage() before looking at it in
cloneSparseTensors(), to avoid gratuitously throwing.

Ideally, we'd add a test for this (I wrote one up but had to disable it),
but won't work until JIT Pickler supports sparse tensors.
ghstack-source-id: 100018077

Test Plan: buck test mode/dev-nosan caffe2/torch/fb/distributed/thriftRpcAgent/...

Differential Revision: D20399971

fbshipit-source-id: 5debfa8140eb1f949d37336330223962cc320abc
2020-03-12 11:35:21 -07:00
Michael Suo
dbe850af5b [jit] do the code reorg (#33851)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/33851

Rationale and context described in #33828.

Script to reproduce the move:
https://gist.github.com/suo/16cbefaaeb67ca5a7c6caffd49b7f6e9
ghstack-source-id: 99079645

Test Plan: Make sure CI passes

Reviewed By: jamesr66a

Differential Revision: D20133869

fbshipit-source-id: 390e9241a9c85366d9005c492ac31f10aa96488e
2020-02-27 13:02:51 -08:00
Yanli Zhao
4d9b649261 jit pickling rref (#32959)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/32959

in rpc torch script call path, we need to pickle/unpickle rref, this diff is added to make jit pickler/unpickler be able to pickle/unpickle rref. It is similar to what is implemented for PyRef::pickle() and PyRef::unpickle().
The pickling/unpickling design assumes it is always coupled with RPC calls. It is not needed to checkpoint a model with rref, before checkpointing the model, user should call ref.to_here() to get value inside rref.

The pickling process is:
1. push torch.distributed.rpc.rref global string
1. call rref.fork() and create rrefForkData, which is a few IDs and type str of the value held inside the rref, the IDs includes rref id, fork id, caller work id, callee work id, owner work id
2. push the rrefForkData

The unpickling process is:
1. read torch.distributed.rpc.rref global string, and retrieve the cached global lamda function
2. the globa lamda function will get rrefForkData
3. if callee is also owner work id, then get owner rref based on Ids inside rrefFork data and return the ownerRRef
4. if callee is not owner work id, then create user rref using the rrefForkData and return the userRRef
5. meanwhile owner rref will be notified and do reference counting correctly

During unpickling, a type_resolver is needed to parse type str. This type_resolver has python dependency, so we get it from rpc_agent, and pass it to unpickler during construction. So we added a type_resolver argumenmt to jit unpickler constructor in this diff.
ghstack-source-id: 98814793

Test Plan: unit test

Differential Revision: D19713293

fbshipit-source-id: 4fd776cdd4ce8f457c4034d79acdfb4cd095c52e
2020-02-24 11:16:35 -08:00
Yanli Zhao
58234c0254 support torch script call over rpc (#32197)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/32197

This is to reland https://github.com/pytorch/pytorch/pull/30063, the main change is to match a general exception and grep "pickle" error word in "test_script_functions_not_supported" unit test, as Python 3.5 and Python 3.6 throw different types of errors with different error message for the rpc call in the unit test.
[test all]This diff makes following changes:
1. Providing a new set of python rpc privated APIs, they can accept an annotated TorchScript call and this call can be serialized, deserialized and executed in C++ without GIL. These privated APIs will be binded to JIT in the future, and they are different from public APIs as future JIT binded private APIs will be able to accept qualified_name, not callables. These private APIs are subject to be deprecated once JIT supports torch script function to be a JIT type.

Also, these APIs require torch script function to be defined and annotated by users in python land, it can not be script class/module constructor or class/module methods.

2. This diff also allows public rpc APIs to accept an annotated TorchScript call and execute code path that above private APIs ran on. Therefore if users invoke an annotated TorchScript call over RPC, this call can be serialized, deserialized and executed in C++ without GIL as well.

3. The above private APIs call a newly defined C++ function to make rpc torch script call to be serialized, deserialized and executed in C++ land. This C++ function returns an ivalue::Future. so that in follow up diff this C++ function can be called when these privated APIs are binded to JIT.

4. script_call.cpp/.h and request_callback_impl.cpp files are refactored accordingly so that torch script call and builtin call can share same message type and codes.

5. refactored deserializeResponse() and added a new utility to deserizalize response to IValue

ghstack-source-id: 96879167
ghstack-source-id: 96879167

Test Plan: unit test

Differential Revision: D19402374

fbshipit-source-id: 04efcc7c167d08a6503f29efe55e76f2be4b2c5e
2020-01-18 09:24:17 -08:00
Michael Suo
51a34545e9 Revert D18482934: support torch script call over rpc
Test Plan: revert-hammer

Differential Revision:
D18482934

Original commit changeset: bd82a0d820c4

fbshipit-source-id: ca5e50fb0a883ee311aeb310198d84ad28062158
2020-01-14 13:30:56 -08:00
Yanli Zhao
dbd737158b support torch script call over rpc (#30063)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/30063

This diff makes following changes:
1. Providing a new set of python rpc privated APIs, they can accept an annotated TorchScript call and this call can be serialized, deserialized and executed in C++ without GIL. These privated APIs will be binded to JIT in the future, and they are different from public APIs as future JIT binded private APIs will be able to accept qualified_name, not callables. These private APIs are subject to be deprecated once JIT supports torch script function to be a JIT type.

Also, these APIs require torch script function to be defined and annotated by users in python land, it can not be script class/module constructor or class/module methods.

2. This diff also allows public rpc APIs to accept an annotated TorchScript call and execute code path that above private APIs ran on. Therefore if users invoke an annotated TorchScript call over RPC, this call can be serialized, deserialized and executed in C++ without GIL as well.

3. The above private APIs call a newly defined C++ function to make rpc torch script call to be serialized, deserialized and executed in C++ land. This C++ function returns an ivalue::Future. so that in follow up diff this C++ function can be called when these privated APIs are binded to JIT.

4. script_call.cpp/.h and request_callback_impl.cpp files are refactored accordingly so that torch script call and builtin call can share same message type and codes.

5. refactored deserializeResponse() and added a new utility to deserizalize response to IValue

ghstack-source-id: 96638829

Test Plan: unit test

Differential Revision: D18482934

fbshipit-source-id: bd82a0d820c47a8e45b2e7c616eca06573f7d7ea
2020-01-14 09:27:04 -08:00
Jeremy Lilley
dff7b945bf Avoid sending large unneeded data over wire in process_group_agent. (#31357)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/31357

If a user selects a subset of a Tensor and sends it in an RPC, we were sending
the whole original Tensor Storage over the network.

While this sounds reasonable, in practice, we observed view-like Tensors being sent
over rpc, where only 1% of the data in the provided Tensor's Storage was
actually used/needed.

The simple solution here is to just force a clone in the serializer code if we see that
less than (arbitrary) half the bits are used, and the tensor is more than a nominal few KB.
Add related tests to ensure this doesn't break.

An alternate approach would be to modify the Pickler. That said, since Pickler is shared by more
components, the logic might be harder to tailor appropriately at that layer (particularly
given that the Pickler has explicit logic to share a single Storage* among several Tensors
that commonly point to the same Storage*).

It's possible that we might want to further refine the basic thresholds in this change.
In practice, we've seen a mostly bimodal distribution thus far for the percent of Tensor
Storage referred by a Tensor in observed rpcs (i.e. either 90%+ or sub-10% of the Storage
referenced), hence the existing 50% threshold here is probably not an unreasonable
starting point.
ghstack-source-id: 95925474

Test Plan: buck test mode/dev caffe2/test/cpp/rpc/...

Differential Revision: D19137056

fbshipit-source-id: e2b3a4dd0cc6e1de820fd0740aa1d59883dbf8d4
2019-12-18 19:24:24 -08:00
Yanli Zhao
20a2e526ef build a generic future<T> (#29579)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/29579

Per #28923, this diff is to move Future<Message> to torch::utils and extend it to be Future<T>, most of implementations are copied from FutureMessage and ivalue::Future. merge ivalue::Future with Future<T> will be done separately.

The main difference between Future<T>  and FutureMessage is the error handling, instead of checking message type inside Future to handle error, this future<T> owns has_error_ and error_ states.

also this future passes value_, has_error_ and error_ states to callbacks for easily read future states.

In next diff, a torch script rpc async API will be created, before the API returns, it will create an ivalue::Future and passes it to Future<T>'s call back where state of ivalue::Future will be set.  In this way, the torch script rpc async API  can still return a ivalue::Future and call wait() to get its state appropriately afterwards.
ghstack-source-id: 95479525

Test Plan: unit tests

Differential Revision: D18263023

fbshipit-source-id: 48a65712656a72c2feb0bb3ec8b308c0528986a6
2019-12-12 16:57:14 -08:00
Jeremy Lilley
4dab29a2bd Fix serialization memory lifetime issue. (#30603)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/30603

Pickler object needs to be kept in scope until data is written out to the
final serialized string. tensorData in particular is a reference to memory
owned by the descoped Pickle object.

Noticed this by inspection. In practice, this potential read-after-free here
is limited to non-cpu tensors, and any such use was very soon after free.
ghstack-source-id: 94756036

Test Plan: existing test suite at buck test mode/dev-nosan caffe2/test:rpc_fork

Differential Revision: D18760463

fbshipit-source-id: 9de890d66626aa48f13ca376dd9bd50b92e0cb00
2019-12-02 20:10:28 -08:00
Jeremy Lilley
f4e7e9039d Improve process_group_agent() serialization speed (#29785)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/29785

TLDR: This change improves process_group's serialization speed:
  Serialize_Tensor64:     12.38us ->   1.99us  (~-84%)
  Deserialize_Tensor64:   33.89us ->   5.62us  (~-84%)
  Serialize_Tensor1M:    525.74us -> 285.43us  (~-45%)
  Deserialize_Tensor1M:  892.61us -> 273.68us  (~-70%)

After speaking with the jit team, we had consensus that torch::save()/load()
are somewhat high-overhead for RPC serialization, mostly intended for
persistent disk data.

(Particularly, for large tensors, 35% of the time is spent in CRC checking, even
with the fb-side changes to subsitute 40x faster SSE-accelerated crc checking;
Also, for small tensors, the zip container overhead is considerable, as is the
overhead of lexing/parsing an embedded text python program for each RPC).

The jit team encouraged us to use jit::pickler, with the WriteableTensorData
way of outputting result tensors (not the default side-tensor table, or
with pickling the actual tensors). This ends up just pickling some tensor
metadata, and giving us some tensor blobs that we can mindlessly
blit over the wire (they copy to cpu memory if needed).

There is yet no standardized container format for the pickled data
(there is jit::pickle_save() checked in, but but it's experimental,
no load function is yet provided), but they encouraged us to just use
something sensible for this, and possibly revisit later. For now, I made
the directory headers slightly http-inspired.

Note that serialization is just one component of the pipeline, but that
said, we also see reasonable reductions in end-to-end echo times (noisier):
   ProcessGroupAgent_Echo(Tensor_Small)   855.25us -> 492.65us  (~-42%)
   ProcessGroupAgent_Echo(Tensor_1M)       10.82ms -> 6.94ms    (~-35%)
   ProcessGroupAgent_Echo(Small_NoTensor) 688.82us -> 301.72us  (~-56%)
   ProcessGroupAgent_Echo(1MB_NoTensor)     4.65ms -> 3.71ms    (~-20%)

I moved the "wire serialization" logic to a separate file to assist with
unittesting.
ghstack-source-id: 94694682

Test Plan:
buck test mode/dev-nosan caffe2/test/cpp/api:serialize
  buck test mode/dev-nosan caffe2/test/...

Differential Revision: D18493938

fbshipit-source-id: 07ddfe87dbe56472bc944f7d070627052c94a8f4
2019-11-28 09:57:52 -08:00
Pieter Noordhuis
49fba35208 Run clang-format for torch/distributed/rpc
Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/27531

Test Plan: Imported from OSS

Differential Revision: D17808206

Pulled By: pietern

fbshipit-source-id: 7d23327bfba42dab4b60779c9f03b7952ff0db7a
2019-11-05 06:25:30 -08:00
Pieter Noordhuis
6c3915643b Rename PythonUDF{Call,Resp} (#27530)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/27530

Per discussion in #27286, the `UDF` part is superfluous.

This makes the naming consistent with the `MessageType` enum.

Test Plan: Imported from OSS

Differential Revision: D17808211

Pulled By: pietern

fbshipit-source-id: 0ff925de26d027951ce285750ad276ed17fee4c6
2019-11-05 06:25:26 -08:00
Shen Li
043530a9b9 Support remote for Python UDF in distributed autograd
Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/28656

Test Plan: Imported from OSS

Differential Revision: D18138561

Pulled By: mrshenli

fbshipit-source-id: 798e7c00465b5a299f7b4642683bc407895bc7da
2019-10-29 19:39:04 -07:00
Shen Li
400293fcc6 Support remote for builtin operators in distributed autograd (#28630)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/28630

This includes:
1. Respect autograd context in rpc.remote for builtin ops
2. Force setting autograd context in RRef.to_here() even if the
message for to_here() does not contain any tensor.

Test Plan: Imported from OSS

Differential Revision: D18138562

Pulled By: mrshenli

fbshipit-source-id: a39ec83e556d19130f22eb317927241a017000ba
2019-10-29 19:39:00 -07:00
Shen Li
58873776ff Make RRef::toHere() return a jit::Future (#27943)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/27943

This is step 1 to make PyRRef::toHere() non-blocking on caller.

Test Plan: Imported from OSS

Differential Revision: D17936747

Pulled By: mrshenli

fbshipit-source-id: 7cf60e5804e72bdc28f0135fed4d7fdce05ea38a
2019-10-23 17:07:11 -07:00
Rohan Varma
d9b4788e5d cleanup dist autograd context on other nodes when it is released on one node (#27951)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/27951

we want to clean up the distributed autograd context across the other nodes when a single node is done (here done means exited the context manager `with dist_autograd.context() as context_id: ...`).

This PR does a few things to implement the above:
1) Add classes to encapsulate messages for requesting this context release and the response
2) Handling of this request in `request_callback_impl.cpp`. When we receive this request, we get the context from a given context_id and release it.
3) RPC call in `DistAutogradContainer::releaseContext` to send this command. This currently does not wait for an ack or implement any sort of retrying. We send the RPC to all the workerIds we have come into contact with (implemented in https://github.com/pytorch/pytorch/pull/26324)
4) Relevant unit tests

In follow up PRs, we will add error checking + retries for this call.

ghstack-source-id: 92269279

Test Plan: Added/modified unit tests in `test/dist_autograd_test.py`

Differential Revision: D17920137

fbshipit-source-id: 7403512ab5fcbc28d21c548b2e45319dd472e26a
2019-10-21 07:34:08 -07:00
Pritam Damania
3bccd3fc0d Distributed Autograd - FAST mode backward pass implementation. (#27022)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/27022

This change implements the "FAST" mode distributed autograd backward
pass as described in https://github.com/pytorch/pytorch/issues/23110.

At a high level the backward pass works as follows:
1. We start by computing dependencies on the node that calls
`torch.distributed.backward`.
2. This node computes the dependencies starting from the root nodes provided in
the backward call and all the 'send' functions present in the current autograd
context. The "FAST" mode assumes all 'send' functions are part of the autograd
computation.
3. Once the dependency computation is done, the distributed autograd engine
calls the local autograd engine to execute the autograd graph. Note that the
autograd graph on a single node is not necessarily connected because of
inter-node communication. As a result, we have special handling to ensure the
local autograd engine ensures we execute the entire graph starting from the
provided roots and all 'send' functions on the node.
4. When the local autograd engine hits a 'recv' function, it performs an async
RPC to send the gradients over to the appropriate node and stores a future in
the autograd context to keep track of this RPC.
5. On the destination node, the appropriate 'send' function is looked up and
enqueued on the local autograd engine. If this is the first time the node is
hearing about this autograd context id on the backward pass, then the node
computes dependencies for the local autograd engine.
6. As part of compute dependencies, the distributed autograd engine discovers
all leaf nodes and ensures those are passed as 'outputs' to the local autograd
engine. This avoids running the 'AccumulateGrad' function.
7. The gradients computed for the leaf nodes are then actually accumulated in
`DistAutogradContext` for the appropriate autograd context id.
8. The distributed autograd engine waits for the local autograd engine
to complete and also waits for all the 'Futures' (stored in 4.) for respective
RPCs to finish.

We have made the following changes to the local autograd engine for this
purpose:

1. Expose GraphTask and NodeTask so that the distributed autograd engine can
use them.
2. Expose a `execute_with_graph_task` API which gives the distributed engine
to build a GraphTask and pass it to the local autograd engine.
3. Expose a `enqueue_on_cpu` API, which allows the distributed engine to build
a `NodeTask` for a 'send' function and enqueue it on the local autograd engine.

In addition to this a few general improvements:
1. Added a `PropagateGradients` RPC call for the 'recv' function to pass
gradients to the appropriate node during the backward pass.
2. Use IValues as much as possible in serialization for RpcWithAutograd.
3. If Future.wait(), contains a message type EXCEPTION, we throw an appropriate
exception instead of just returning the message. This is inline with what most
Future.wait() APIs do.
4. Added a `get_gradients(context_id)` API which allows users to retrieve a map
from Tensor to respective gradient for the provided context_id on the local
node.
ghstack-source-id: 91794926

Test Plan: unit tests.

Differential Revision: D17652615

fbshipit-source-id: 96f65c52adb2706ee29f4b49e1655afaa0a3bec3
2019-10-12 09:47:49 -07:00
Shen Li
2486b0ba82 Add Python RRef as args and return value (#25499)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/25499

See #23110 for model parallel design details, and #26759 for the RRef
protocol. This commit add support for using RRef as Python UDF arguments
and return value. RRefs can now be shared from owner to user, from user to
owner, or from user to user.

Limitations:
1. No implicit type conversion yet. (#27099)
2. No failure handling and retry. (#26116)
3. UDF is not yet blocked until all RRefs are confirmed. (#27098)
4. Internal RRef control messages are not idempotent yet. (#26116)
5. Cannot delete RRefs correctly when there are circular dependencies. (#27096)

Main changes:

1. Added `SCRIPT_REMOTE_CALL` and `PYTHON_REMOTE_CALL` to `Message.h` to represent `dist.remote` invocations.
2. Added `SCRIPT_RREF_FETCH_CALL`, `PYTHON_RREF_FETCH_CALL`, `RREF_USER_ACCEPT`, `RREF_USER_DELETE`, `RREF_CHILD_ACCEPT`, and `RREF_FORK_REQUEST` to `Message.h` as internal RRef control messages.
3. New message request handling code is added to `functions.cpp`, and message format is added in `script_remote_call.h`, `python_remote_call.h`, and `rref_proto.h`.
4. Added a `PyRRef` type in `py_rref.h` and `py_rref.cpp` which holds a shared pointer to C++ `RRef` type. `PyRRef` wraps the C++ API and also implements RRef pickling and unpickling. RRef fork related control messages will be sent during RRef pickling/unpickling procedure.
5.  Update `RRef.h` and `RRef.cpp` accordingly to support `py::object` RRefs.
6. RRef context (reference count, etc.) are tracked in `rref_context.h` and `rref_context.cpp`.

Test Plan:
Imported from OSS

buck test mode/dev-nosan //caffe2/test:rpc_fork

Differential Revision: D17184146

Pulled By: mrshenli

fbshipit-source-id: a3a268efc087ac1ef489136ab957080382629265
2019-10-03 17:47:12 -07:00
Pritam Damania
fe4170bda8 Add send and recv backward functions for builtin operators RPC. (#25527)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/25527

Master GH issue: https://github.com/pytorch/pytorch/issues/23110.

This change builds upon https://github.com/pytorch/pytorch/pull/24876 and
provides all the autograd hooks needed for a forward pass with distributed rpc
for builtin operators. This change does not address distributed rpc for python
UDFs and that will be addressed in follow up PRs.

Summary of changes:
1. Attach send autograd functions when a request is sent from the client and
response is sent from the server.
2. Attach receive autograd functions when a request is received on the server
and a response is received on the client.
3. Generate a globally unique autograd_message_id for each send/recv autograd
function pair to uniquely identify them.
ghstack-source-id: 91240466

Test Plan: unit tests.

Differential Revision: D17148077

fbshipit-source-id: 192d8a3f552ed7cc939f55dcca332965c9bd3233
2019-10-03 01:18:46 -07:00