Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/57294
With the advent of CPUs in the device maps, and to be more generic (e.g., to support AMD GPUs), and to avoid conversions when passing to Future and RRef and such, it's easier to use Devices instead of DeviceIndices. This started by just migrating the TensorPipe agent but the RPC layer is quite intertwined so I had to migrate a lot of stuff.
ghstack-source-id: 127916562
Test Plan: CI
Reviewed By: mrshenli
Differential Revision: D28092733
fbshipit-source-id: 024dcb3648c5898ab13e770413c43958f04f1a8a
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/57052
This PR caps a stack whose goal was to merge CUDAFuture into ivalue::Future. CUDAFuture used to be a subclass of ivalue::Future, which was already pretty good, but it meant that in several places we needed `#ifdef`s or registries in order to create the right type of class, which was annoying. We've made CUDAFuture device-agnostic, by using generic helpers, so that it doesn't depend on CUDA. Now all its code can be inserted into ivalue::Future.
This PR does this very naively, by copy-pasting CUDAFuture's code into the (previously empty) virtual methods of ivalue::Future. This helps ensure the correctness of this PR, as it's straightforward to see it behaves exactly like before. However we probably want to polish it a bit later to iron out so wrinkles.
ghstack-source-id: 127713138
(Note: this ignores all push blocking failures!)
Test Plan: CI
Reviewed By: mrshenli
Differential Revision: D28036829
fbshipit-source-id: 3e5b16402f5dc245c1fcb9d7bf06db64dcb0d2a3
Summary:
In my last PR I've missed CUDA and distributed folders, fixing this now
This change is autogenerated by `python tool/clang_tidy.py -s`
Pull Request resolved: https://github.com/pytorch/pytorch/pull/57235
Reviewed By: janeyx99
Differential Revision: D28084444
Pulled By: malfet
fbshipit-source-id: bf222f69ee90c7872c3cb0931e8cdb84f0cb3cda
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/57085
PR #54932 fixed the CUDA RPC for RRef when RRef is created through
RPC. But besides that use case, RRef can also be created locally
by directly passing in a value, which would bypass the CUDA stream
synchronization in #54932.
This commit covers the above gap by adding a `devices` argument
to RRef constructor. The RRef will then use this argument to
choose between `CUDAFutre` and `ivalue::Future` to hold the value.
When `devices` is specified and non-empty, `CUDAFuture` will be
used, and the `devices` will be passed to that `CUDAFuture`.
Test Plan: Imported from OSS
Reviewed By: lw
Differential Revision: D28050001
Pulled By: mrshenli
fbshipit-source-id: 2316b419fa69aa4dcd444050f0b74e61c3d0af1e
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/54428
Using c10::ArrayRef as the parameter type makes the API more flexible and allows the caller to leverage small-buffer optimizations (e.g. c10::SmallVector, std::array) for performance critical cases.
Test Plan: No behavioral changes. Run the existing unit and integration tests.
Reviewed By: suo
Differential Revision: D27232222
fbshipit-source-id: 7b13bc6bd02257097ca119077028fbccc68cc925
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/44859
TensorPipe's `set_device_map` option was applied during the forward
pass. However, if we ran the backward pass for the graph we would not
automatically pick up the reverse device mapping.
As a result, users had to specify both forward and backward device mapping
which is very tedious to do.
In this PR, I've added this functionality such that TensorPipe automatically
picks up the reverse device mapping during the backward pass. This is done by
storing the appropriate device mapping in the "recv" autograd function for
distributed autograd.
#Closes: https://github.com/pytorch/pytorch/issues/44170
ghstack-source-id: 119950842
Test Plan:
1) waitforbuildbot
2) Unit test added.
Reviewed By: mrshenli
Differential Revision: D23751975
fbshipit-source-id: 2717d0ef5bde3db029a6172d98aad95734d52140
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/50367
This had already been done by mrshenli on Friday (#50236, D25847892 (f9f758e349)) but over the weekend Facebook's internal clang-format version got updated and this changed the format, hence we need to re-apply it. Note that this update also affected the JIT files, which are the other module enrolled in clang-format (see 8530c65e25, D25849205 (8530c65e25)).
ghstack-source-id: 119656866
Test Plan: Shouldn't include functional changes. In any case, there's CI.
Reviewed By: mrshenli
Differential Revision: D25867720
fbshipit-source-id: 3723abc6c35831d7a8ac31f74baf24c963c98b9d
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/45783
After the previous device maps commits, `pipeWrite` might throw. In
this case, if we increment active calls before `pipeWrite` on the
caller, that active call won't be decremented properly when `pipeWrite`
throws. As a result, `shutdown` can silently timeout. I noticed this
as some tests take more than 60s to finish.
This commit extract the tensor device checking logic out of pipeWrite,
and make sure the error is thrown before the active call count is
incremented.
Differential Revision: D24094803
Test Plan: Imported from OSS
Reviewed By: mruberry
Pulled By: mrshenli
fbshipit-source-id: d30316bb23d2afd3ba4f5540c3bd94a2ac10969b
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/44664
Closes https://github.com/pytorch/pytorch/issues/39971. This PR adds support for functions decorated with `rpc.functions.async_execution` to be profiled over RPC as builtins, jit functions, and blocking python UDFs currently can be. The reasoning for this is to provide complete feature support in terms of RPC profiling and the various types of functions users can run.
To enable this, the PR below this enables calling `disableProfiler()` safely from another thread. We use that functionality to defer disabling the profiler on the server until the future corresponding to the RPC request completes (rather than only the blocking `processRPC` call as was done previously). Since when the future completes we've kicked off the async function and the future corresponding to it has completed, we are able to capture any RPCs the function would have called and the actual work done on the other node.
For example, if the following async function is ran on a server over RPC:
```
def slow_add(x, y):
time.sleep(1)
return torch.add(x, y)
rpc.functions.async_execution
def slow_async_add(to, x, y):
return rpc.rpc_async(to, slow_add, args=(x, y))
```
we expect to see the original RPC profiled, the nested RPC profiled, and the actual torch.add() work. All of these events should be recorded with the correct node id. Here is an example profiling output:
```
------------------------------------------------------------------------------------------------------------------------- --------------- --------------- --------------- --------
------- --------------- --------------- ---------------
Name Self CPU total % Self CPU total CPU total % CPU total CPU time avg Number of Calls Node ID
------------------------------------------------------------------------------------------------------------------------- --------------- --------------- --------------- --------
------- --------------- --------------- --------------- rpc_async#slow_async_add(worker1 -> worker2) 0.00% 0.000us 0 1.012s
1.012s 1 1
aten::empty 7.02% 11.519us 7.02% 11.519us 11.519us 1 1
rpc_async#slow_async_add(worker1 -> worker2)#remote_op: rpc_async#slow_add(worker2 -> worker3) 0.00% 0.000us 0 1.006s
1.006s 1 2 rpc_async#slow_async_add(worker1 -> worker2)#remote_op: aten::empty 7.21% 11.843us 7.21% 11.843us
11.843us 1 2
rpc_async#slow_async_add(worker1 -> worker2)#remote_op: rpc_async#slow_add(worker2 -> worker3)#remote_op: aten::add 71.94% 118.107us 85.77% 140.802us 140.802us 1 3
rpc_async#slow_async_add(worker1 -> worker2)#remote_op: rpc_async#slow_add(worker2 -> worker3)#remote_op: aten::empty 13.82% 22.695us 13.82% 22.695us
22.695us 1 3 ------------------------------------------------------------------------------------------------------------------------- --------------- --------------- --------------- --------
------- --------------- --------------- ---------------
Self CPU time total: 164.164us
```
This PR also moves a bunch of the profiling logic to `rpc/utils.cpp` to declutter `request_callback` code.
ghstack-source-id: 112868470
Test Plan:
```
rvarm1@devbig978:fbcode (52dd34f6)$ buck test mode/no-gpu mode/dev-nosan //caffe2/test/distributed/rpc:process_group_agent -- test_rpc_profiling_async_function --print-passing-details --stress-runs 1
```
Reviewed By: mrshenli
Differential Revision: D23638387
fbshipit-source-id: eedb6d48173a4ecd41d70a9c64048920bd4807c4
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/42637
This commit enables sending non-CPU tensors through RPC using
TensorPipe backend. Users can configure device mappings by calling
set_map_location on `TensorPipeRpcBackendOptions`. Internally,
the `init_rpc` API verifies the correctness of device mappings. It
will shutdown RPC if the check failed, or proceed and pass global
mappings to `TensorPipeAgent` if the check was successful. For serde,
we added a device indices field to TensorPipe read and write buffers,
which should be either empty (all tensors must be on CPU) or match
the tensors in order and number in the RPC message. This commit
does not yet avoid zero-copy, the tensor is always moved to CPU
on the sender and then moved to the specified device on the receiver.
Test Plan: Imported from OSS
Reviewed By: izdeby
Differential Revision: D23011572
Pulled By: mrshenli
fbshipit-source-id: 62b617eed91237d4e9926bc8551db78b822a1187
Summary:
Forward-declare `tensorpipe::Message` class in utils.h
Guard TensorPipe specific methods in utils.cpp with `#ifdef USE_TENSORPIPE`
Pass `USE_TENSORPIPE` as private flag to `torch_cpu` library
Pull Request resolved: https://github.com/pytorch/pytorch/pull/40846
Differential Revision: D22338864
Pulled By: malfet
fbshipit-source-id: 2ea2aea84527ae7480e353afb55951a068b3b980
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/40066
Builds on top of the previous PR to ensure that all remotely profiled events are prefixed with the key for the RPC that generated them.
The key is generated by the result of `_build_rpc_profiling_key` in `rpc/internal.py` and prefixed onto the event name. In order to do this, we set the current-key when creating the RPC in Python, retrieve the currently-set key in C++ and save a GloballyUniqueId -> key mapping to an in-memory map. When we receive an RPC with profiling information, we expect to receive this ID back, and look up the corresponding profiling key in the map.
The key is then added to all the remote events.
Tested by adding tests to ensure the key is added to all the remote events. Also added a UT which tests in under the multi-threading scenario, to ensure that the mapping's correctness is maintained when several RPCs are in the process of being created at once.
ghstack-source-id: 106316106
Test Plan: Unit test
Differential Revision: D22040035
fbshipit-source-id: 9215feb06084b294edbfa6e03385e13c1d730c43
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/38748
This diff contains the message scaffolding and profiler changes in order to be able to remotely run the profiler across different nodes and aggregate the results on a single node.
As discussed, we have implemented this by creating new message types, that similar to autograd messages, wrap the profiling information with the original message, and send this new message over the wire. On the receiving end, this wrapped message is detected, we fetch the original message from it, and process the original message with the profiler enabled. When sending a response with profiling information, we serialize the profiled `Events` and send them back over RPC. When such a message is received, the events profiled on the remote node are stored (added back to the local profiler).
Changes in this PR:
- New message types (run_with_profiling_req, run_with_profiling_resp) to send profiling info over the wire. Message parsing logic is added to handle these wrapped types.
- Handling of sending profiler data over the wire, in particular, the attributes of the `ProfilerConfig` and the serialized profiled `Event`s
- The logic for wrapping RPC messages is deduped with that in `rpc_with_autograd`, and the common payload wrapping/unwrapping logic is moved to helper functions in `rpc/utils.cpp`
- Changes in `autograd/utils.cpp` to detect if we have enabled the profiler and are sending an RPC, if so, uses the above new message types
- Changes in request_callback to parse and turn on the profiler in a thread-local fashion
- Serialization and deserialization of profiling `Events`, and support to add the remote events to the thread-local profiler
- Introduction of the concept of `node_id`, which as discussed with ilia-cher , will be used along with the `Event`s handle attribute to distinguish between events. When there are events from different nodes, this node information is rendered in the profile output (e.g. when printing tables), otherwise, it is not, since it is irrelevant.
- Some changes to profiler.cpp to add useful helper methods/guards
- toHere() is now profiled for RRefs
- Unittests
ghstack-source-id: 106134626
Test Plan: Added unittests, existing profiler unittests.
Differential Revision: D19510010
fbshipit-source-id: 044347af992f19a9e3b357c9567f6fc73e988157
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/38898
Pickling will pickle the tensor meta info, and its up to the jit
exporter or other upstream who use the pickler to decide how to write
the actual tensor data.
This PR make we call getWritableTensorData in upper level so that rpc
and TensorPipe can leverge it with only pickling tensor meta data without
converting the tensor from GPU to CPU.
Test Plan: Imported from OSS
Differential Revision: D21879866
Pulled By: wanchaol
fbshipit-source-id: 75f7ff4073e4ad15b6588973dcbdc48f97a8329f
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/38590
This PR implements timeout semantics for RRef for parity with rpc_sync and rpc_async. How it works:
- Timeout parameter is added to rpc.remote. If the rpc.remote call times out, note that the error won't be raised to the user in that call, as it is not blocking (similar to rpc_async). Instead, the timeout error will be raised the next time the RRef is used (either by pickling or to_here call).
- Error handling semantics are added to RRef to deal with the timeout errors. Previously, if there was an error creating the OwnerRRef, the callback on the local user would throw an error in a callback, resulting in an `std::terminate`. Instead of this, the error is now caught and surfaced to the user the next time the RRef is used. As part of this, we have added an `RPCErrorType` enum and defined RRef error handlers to handle the `RPCErrorrTypes` (currently just timeout and unknown)
- A timeout parameter is added to `to_here()` which gives the user control over the max amount of time it can block for.
- `ctx.prepareChildForFork()` which is called when the RRef is pickled (i.e. used as an arg over RPC) checks if the `rpc.remote()` call had timed out, and if so, raises that error to the user.
- Tests are added, primarily via delay injection.
ghstack-source-id: 105232837
Test Plan: CI
Differential Revision: D21588165
fbshipit-source-id: c9f9e8aa3521012ea1de3e0f152a41afdf8b23f3
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/39010
The initial version of the serialization for the TensorPipe RPC agent (i.e., the conversion from rpc::Message to tensorpipe::Message) worker around a limitation of TensorPipe of only allowing one payload per message by pickling each tensor separately and storing the pickles as metadata (which is a less efficient way of sending data over, as it goes through more copies). Having now lifter that limitation we can now improve the way we serialize. We now put the type and the id as their own payloads, we do a single pickling pass for all the tensors of the message (which allows us to deduplicate them) and store the pickle as a payload. My impression is that pickling is a somewhat costly operation, so reducing the number of times we do it should be beneficial for performance. For this same reason, another change I've done here is separate the allocation of the buffers from the deserialization. This will allow us (in the future) to perform the allocation on the I/O event loop but perform the unpickling in the worker thread, thus keeping the event loop more responsive.
ghstack-source-id: 104810740
Test Plan: RPC tests
Differential Revision: D21716067
fbshipit-source-id: c1475cc78afdcf0820a485ffd98c91abb35796c7
Summary:
In D21209901 TensorPipe added support for a vector of payloads inside each message, instead of a single one, so that users with multiple payloads can send them separately as they are instead of having to copy them into a new block of contiguous memory. The PyTorch agent is using the old API, which is preventing us from deleting it. This change has no effects on over-the-wire format and thus on performance.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/37919
ghstack-source-id: 103572164
Test Plan:
On both workers
```
import os
import torch
import torch.distributed.rpc as rpc
os.environ["MASTER_ADDR"] = "127.0.0.1"
os.environ["MASTER_PORT"] = "8765"
```
On worker 0
```
rpc.init_rpc(name="foo", rank=0, backend=rpc.backend_registry.BackendType.TENSORPIPE, world_size=2, rpc_backend_options=rpc.TensorPipeRpcBackendOptions(worker_name_to_id={"foo": 0, "bar": 0}))
```
On worker 1
```
rpc.init_rpc(name="bar", rank=1, backend=rpc.backend_registry.BackendType.TENSORPIPE, world_size=2, rpc_backend_options=rpc.TensorPipeRpcBackendOptions(worker_name_to_id={"foo": 0, "bar": 0}))
```
On worker 0
```
In [15]: rpc.rpc_sync("bar", torch.add, args=(torch.full((2,2), 1), torch.full((2,2), 2)))
Out[15]:
tensor([[3., 3.],
[3., 3.]])
In [16]: rpc.rpc_sync("bar", torch.add, args=(1, 2))
Out[16]: 3
```
Differential Revision: D21425536
fbshipit-source-id: a0ec2be825556b39aff018a2834baf815a6d8fa5
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/37776
* Remove type-specific size tracking in favor of byte size tracking in Storage and StorageImpl
* Changed numel() and set_numel() to nbytes() and set_nbytes()
* Added enum argument to Storage/StorageImpl constructor to indicate new meaning of the size parameter
* Update all callers of the changed API
Part of issue https://github.com/pytorch/pytorch/issues/33950
Pull Request resolved: https://github.com/pytorch/pytorch/pull/37028
Differential Revision: D21171334
Pulled By: ezyang
fbshipit-source-id: 37329a379de9a3a83cc5e9007e455a3e1c2d10b8
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/36197
Create APIs to convert between rpc::message and tensorpipe::message
1. tensorpipeSerialize() - converts rpc::message to tensorpipe::message without memory copy (tensors).
2. tensorpipeAllocateMessage - allocates rpc::message based on received tensorpipe descriptor to prepare memory-copy-free receiving.
Test Plan: buck test caffe2/test/cpp/rpc:test_tensorpipe_serialization
Reviewed By: lw
Differential Revision: D20084125
fbshipit-source-id: ffbc310f93443e50261aed752be0fe176610dd2a
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/36976
The bounds check and the read were swapped in two places - I noticed
ASAN complaining in an unrelated change on an erroneous buffer.
Adding a couple simple test cases.
ghstack-source-id: 102606986
Test Plan: buck test mode/dev caffe2/test/cpp/rpc:
Differential Revision: D21148936
fbshipit-source-id: 7ec5007535f7310437ac1b9a72852a223b9dd29a
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/35720
When modules are saved, all relevant types are serialized according to
their qualified name with a compilation unit. Since qualified names are
guaranteed to be unique within a compilation unit, this normally works
fine.
On load, all types are registered in a compilation unit owned by the
script::Module. Type names are not unique across compilation units, so
if you load two modules with colliding type names, make them submodules
of yet another module, and save that module, there is the potential of a
name collision. See the added tests for examples if that description is
confusing.
The solution is to unique type names when serializing code by mangling
them if we detect a name collision.
Test Plan: Imported from OSS
Differential Revision: D20749423
Pulled By: suo
fbshipit-source-id: a8827ff1d4a89f3e7964dbbb49b4381863da3e6a
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/34638
Fixes: https://github.com/pytorch/pytorch/issues/27643
This PR manages notifying workers in the event of a failure during distributed autograd. Gracefully handles propagating errors across all nodes in the backward pass and sets state in the local autograd engines accordingly.
(Note: this ignores all push blocking failures!)
Test Plan: Added 2 new tests checking errors when they are thrown in an intermediate node during distributed autograd. Ensured that all existing distributed autograd tests pass.
Differential Revision: D20164420
fbshipit-source-id: 3d4ed74230969ac70bb763f1b5b1c16d979f66a2
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/33604
For our current RPC agents, this PR disallows sending CUDA tensors
over RPC and asks users to copy them explicitly to CPU. Currently, this seems
to be the easiest contract to guarantee for our current RPC agents, otherwise
if we do support this transparently it gets a little tricky in terms of whether
a CUDA tensor on the client should be sent to CPU/GPU of the remote end and
also which GPU device on the remote end.
In the future, the TensorPipe RPC agent can have its own specific handling of
CUDA tensors.
Closes https://github.com/pytorch/pytorch/issues/28881
ghstack-source-id: 100166120
Test Plan: waitforbuildbot
Differential Revision: D20020183
fbshipit-source-id: ca4d43d2a24e8fcd3a60b21e654aa0e953e756cb
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/34626
We need to check has_storage() before looking at it in
cloneSparseTensors(), to avoid gratuitously throwing.
Ideally, we'd add a test for this (I wrote one up but had to disable it),
but won't work until JIT Pickler supports sparse tensors.
ghstack-source-id: 100018077
Test Plan: buck test mode/dev-nosan caffe2/torch/fb/distributed/thriftRpcAgent/...
Differential Revision: D20399971
fbshipit-source-id: 5debfa8140eb1f949d37336330223962cc320abc
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/32959
in rpc torch script call path, we need to pickle/unpickle rref, this diff is added to make jit pickler/unpickler be able to pickle/unpickle rref. It is similar to what is implemented for PyRef::pickle() and PyRef::unpickle().
The pickling/unpickling design assumes it is always coupled with RPC calls. It is not needed to checkpoint a model with rref, before checkpointing the model, user should call ref.to_here() to get value inside rref.
The pickling process is:
1. push torch.distributed.rpc.rref global string
1. call rref.fork() and create rrefForkData, which is a few IDs and type str of the value held inside the rref, the IDs includes rref id, fork id, caller work id, callee work id, owner work id
2. push the rrefForkData
The unpickling process is:
1. read torch.distributed.rpc.rref global string, and retrieve the cached global lamda function
2. the globa lamda function will get rrefForkData
3. if callee is also owner work id, then get owner rref based on Ids inside rrefFork data and return the ownerRRef
4. if callee is not owner work id, then create user rref using the rrefForkData and return the userRRef
5. meanwhile owner rref will be notified and do reference counting correctly
During unpickling, a type_resolver is needed to parse type str. This type_resolver has python dependency, so we get it from rpc_agent, and pass it to unpickler during construction. So we added a type_resolver argumenmt to jit unpickler constructor in this diff.
ghstack-source-id: 98814793
Test Plan: unit test
Differential Revision: D19713293
fbshipit-source-id: 4fd776cdd4ce8f457c4034d79acdfb4cd095c52e
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/32197
This is to reland https://github.com/pytorch/pytorch/pull/30063, the main change is to match a general exception and grep "pickle" error word in "test_script_functions_not_supported" unit test, as Python 3.5 and Python 3.6 throw different types of errors with different error message for the rpc call in the unit test.
[test all]This diff makes following changes:
1. Providing a new set of python rpc privated APIs, they can accept an annotated TorchScript call and this call can be serialized, deserialized and executed in C++ without GIL. These privated APIs will be binded to JIT in the future, and they are different from public APIs as future JIT binded private APIs will be able to accept qualified_name, not callables. These private APIs are subject to be deprecated once JIT supports torch script function to be a JIT type.
Also, these APIs require torch script function to be defined and annotated by users in python land, it can not be script class/module constructor or class/module methods.
2. This diff also allows public rpc APIs to accept an annotated TorchScript call and execute code path that above private APIs ran on. Therefore if users invoke an annotated TorchScript call over RPC, this call can be serialized, deserialized and executed in C++ without GIL as well.
3. The above private APIs call a newly defined C++ function to make rpc torch script call to be serialized, deserialized and executed in C++ land. This C++ function returns an ivalue::Future. so that in follow up diff this C++ function can be called when these privated APIs are binded to JIT.
4. script_call.cpp/.h and request_callback_impl.cpp files are refactored accordingly so that torch script call and builtin call can share same message type and codes.
5. refactored deserializeResponse() and added a new utility to deserizalize response to IValue
ghstack-source-id: 96879167
ghstack-source-id: 96879167
Test Plan: unit test
Differential Revision: D19402374
fbshipit-source-id: 04efcc7c167d08a6503f29efe55e76f2be4b2c5e
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/30063
This diff makes following changes:
1. Providing a new set of python rpc privated APIs, they can accept an annotated TorchScript call and this call can be serialized, deserialized and executed in C++ without GIL. These privated APIs will be binded to JIT in the future, and they are different from public APIs as future JIT binded private APIs will be able to accept qualified_name, not callables. These private APIs are subject to be deprecated once JIT supports torch script function to be a JIT type.
Also, these APIs require torch script function to be defined and annotated by users in python land, it can not be script class/module constructor or class/module methods.
2. This diff also allows public rpc APIs to accept an annotated TorchScript call and execute code path that above private APIs ran on. Therefore if users invoke an annotated TorchScript call over RPC, this call can be serialized, deserialized and executed in C++ without GIL as well.
3. The above private APIs call a newly defined C++ function to make rpc torch script call to be serialized, deserialized and executed in C++ land. This C++ function returns an ivalue::Future. so that in follow up diff this C++ function can be called when these privated APIs are binded to JIT.
4. script_call.cpp/.h and request_callback_impl.cpp files are refactored accordingly so that torch script call and builtin call can share same message type and codes.
5. refactored deserializeResponse() and added a new utility to deserizalize response to IValue
ghstack-source-id: 96638829
Test Plan: unit test
Differential Revision: D18482934
fbshipit-source-id: bd82a0d820c47a8e45b2e7c616eca06573f7d7ea
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/31357
If a user selects a subset of a Tensor and sends it in an RPC, we were sending
the whole original Tensor Storage over the network.
While this sounds reasonable, in practice, we observed view-like Tensors being sent
over rpc, where only 1% of the data in the provided Tensor's Storage was
actually used/needed.
The simple solution here is to just force a clone in the serializer code if we see that
less than (arbitrary) half the bits are used, and the tensor is more than a nominal few KB.
Add related tests to ensure this doesn't break.
An alternate approach would be to modify the Pickler. That said, since Pickler is shared by more
components, the logic might be harder to tailor appropriately at that layer (particularly
given that the Pickler has explicit logic to share a single Storage* among several Tensors
that commonly point to the same Storage*).
It's possible that we might want to further refine the basic thresholds in this change.
In practice, we've seen a mostly bimodal distribution thus far for the percent of Tensor
Storage referred by a Tensor in observed rpcs (i.e. either 90%+ or sub-10% of the Storage
referenced), hence the existing 50% threshold here is probably not an unreasonable
starting point.
ghstack-source-id: 95925474
Test Plan: buck test mode/dev caffe2/test/cpp/rpc/...
Differential Revision: D19137056
fbshipit-source-id: e2b3a4dd0cc6e1de820fd0740aa1d59883dbf8d4
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/29579
Per #28923, this diff is to move Future<Message> to torch::utils and extend it to be Future<T>, most of implementations are copied from FutureMessage and ivalue::Future. merge ivalue::Future with Future<T> will be done separately.
The main difference between Future<T> and FutureMessage is the error handling, instead of checking message type inside Future to handle error, this future<T> owns has_error_ and error_ states.
also this future passes value_, has_error_ and error_ states to callbacks for easily read future states.
In next diff, a torch script rpc async API will be created, before the API returns, it will create an ivalue::Future and passes it to Future<T>'s call back where state of ivalue::Future will be set. In this way, the torch script rpc async API can still return a ivalue::Future and call wait() to get its state appropriately afterwards.
ghstack-source-id: 95479525
Test Plan: unit tests
Differential Revision: D18263023
fbshipit-source-id: 48a65712656a72c2feb0bb3ec8b308c0528986a6
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/30603
Pickler object needs to be kept in scope until data is written out to the
final serialized string. tensorData in particular is a reference to memory
owned by the descoped Pickle object.
Noticed this by inspection. In practice, this potential read-after-free here
is limited to non-cpu tensors, and any such use was very soon after free.
ghstack-source-id: 94756036
Test Plan: existing test suite at buck test mode/dev-nosan caffe2/test:rpc_fork
Differential Revision: D18760463
fbshipit-source-id: 9de890d66626aa48f13ca376dd9bd50b92e0cb00
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/29785
TLDR: This change improves process_group's serialization speed:
Serialize_Tensor64: 12.38us -> 1.99us (~-84%)
Deserialize_Tensor64: 33.89us -> 5.62us (~-84%)
Serialize_Tensor1M: 525.74us -> 285.43us (~-45%)
Deserialize_Tensor1M: 892.61us -> 273.68us (~-70%)
After speaking with the jit team, we had consensus that torch::save()/load()
are somewhat high-overhead for RPC serialization, mostly intended for
persistent disk data.
(Particularly, for large tensors, 35% of the time is spent in CRC checking, even
with the fb-side changes to subsitute 40x faster SSE-accelerated crc checking;
Also, for small tensors, the zip container overhead is considerable, as is the
overhead of lexing/parsing an embedded text python program for each RPC).
The jit team encouraged us to use jit::pickler, with the WriteableTensorData
way of outputting result tensors (not the default side-tensor table, or
with pickling the actual tensors). This ends up just pickling some tensor
metadata, and giving us some tensor blobs that we can mindlessly
blit over the wire (they copy to cpu memory if needed).
There is yet no standardized container format for the pickled data
(there is jit::pickle_save() checked in, but but it's experimental,
no load function is yet provided), but they encouraged us to just use
something sensible for this, and possibly revisit later. For now, I made
the directory headers slightly http-inspired.
Note that serialization is just one component of the pipeline, but that
said, we also see reasonable reductions in end-to-end echo times (noisier):
ProcessGroupAgent_Echo(Tensor_Small) 855.25us -> 492.65us (~-42%)
ProcessGroupAgent_Echo(Tensor_1M) 10.82ms -> 6.94ms (~-35%)
ProcessGroupAgent_Echo(Small_NoTensor) 688.82us -> 301.72us (~-56%)
ProcessGroupAgent_Echo(1MB_NoTensor) 4.65ms -> 3.71ms (~-20%)
I moved the "wire serialization" logic to a separate file to assist with
unittesting.
ghstack-source-id: 94694682
Test Plan:
buck test mode/dev-nosan caffe2/test/cpp/api:serialize
buck test mode/dev-nosan caffe2/test/...
Differential Revision: D18493938
fbshipit-source-id: 07ddfe87dbe56472bc944f7d070627052c94a8f4
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/27530
Per discussion in #27286, the `UDF` part is superfluous.
This makes the naming consistent with the `MessageType` enum.
Test Plan: Imported from OSS
Differential Revision: D17808211
Pulled By: pietern
fbshipit-source-id: 0ff925de26d027951ce285750ad276ed17fee4c6