Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/66744
Modified loops in files under fbsource/fbcode/caffe2/ from the format
`for(TYPE var=x0;var<x_max;x++)`
to the format
`for(const auto var: irange(xmax))`
This was achieved by running r-barnes's loop upgrader script (D28874212) with some modification to exclude all files under /torch/jit and a number of reversions or unused variable suppression warnings added by hand.
Test Plan: Sandcastle
Reviewed By: ngimel
Differential Revision: D31705358
fbshipit-source-id: d6ea350cbaa8f452fc78f238160e5374be637a48
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/66234
Modified loops in files under fbsource/fbcode/caffe2/ from the format
`for(TYPE var=x0;var<x_max;x++)`
to the format
`for(const auto var: irange(xmax))`
This was achieved by running r-barnes's loop upgrader script (D28874212) with some modification to exclude all files under /torch/jit and a number of reversions or unused variable suppression warnings added by hand.
bypass_size_limit
allow-large-files
Test Plan: Sandcastle
Reviewed By: ngimel
Differential Revision: D30652629
fbshipit-source-id: 0ae6c4bbbb554bad42e372792a6430e1acf15e3e
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/59212
Reland of https://github.com/pytorch/pytorch/pull/58428
Until now, the TP agent expected the output of a remote function to be on the same streams as the inputs. In other words, it used the lazy stream context of the inputs to synchronize the output tensors. This was true in the most common case of a synchronous remote function. However it wasn't true for async functions, for fetching RRefs, ... The more generic way is to use the CUDA events held by the Future to perform this synchronization. (These events may be on the input streams, or they may not be!).
ghstack-source-id: 130202842
Test Plan: CI
Reviewed By: mrshenli
Differential Revision: D28623885
fbshipit-source-id: 29333bcb75d077ab801eac92017d0e381e8f5569
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/59205
Reland of https://github.com/pytorch/pytorch/pull/58422
Similar to Future (which I tackled recently), Message is an ivalue type (a "custom class" one), and the natural way to represent it is inside an intrusive_ptr. However in the RPC code we had a mix of usages, often passing Message by value. This has undesirable consequences, as it could easily trigger a copy by accident, which I believe is why in many places we accepted _rvalue references_ to Message, in order to force the caller to move. In my experience this is non-idiomatic in C++ (normally a function signature specifies how the function consumes its arguments, and it's up to the caller to then decide whether to copy or move).
By moving to intrusive_ptr everywhere I think we eliminate and simplify many of the problems above.
In this PR I do half of the migration, by updating everything except the `toMessageImpl` methods, which will come in the next PR.
ghstack-source-id: 130202849
Test Plan: CI
Reviewed By: mrshenli
Differential Revision: D28623891
fbshipit-source-id: c9aeea3440679a11741ca78c06b03c57cb815a5e
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/58428
Until now, the TP agent expected the output of a remote function to be on the same streams as the inputs. In other words, it used the lazy stream context of the inputs to synchronize the output tensors. This was true in the most common case of a synchronous remote function. However it wasn't true for async functions, for fetching RRefs, ... The more generic way is to use the CUDA events held by the Future to perform this synchronization. (These events may be on the input streams, or they may not be!).
ghstack-source-id: 129567045
Test Plan: CI
Reviewed By: mrshenli
Differential Revision: D28474982
fbshipit-source-id: c0034eb3f2a2ea525efb63a31b839bc086060e7e
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/58422
Similar to Future (which I tackled recently), Message is an ivalue type (a "custom class" one), and the natural way to represent it is inside an intrusive_ptr. However in the RPC code we had a mix of usages, often passing Message by value. This has undesirable consequences, as it could easily trigger a copy by accident, which I believe is why in many places we accepted _rvalue references_ to Message, in order to force the caller to move. In my experience this is non-idiomatic in C++ (normally a function signature specifies how the function consumes its arguments, and it's up to the caller to then decide whether to copy or move).
By moving to intrusive_ptr everywhere I think we eliminate and simplify many of the problems above.
In this PR I do half of the migration, by updating everything except the `toMessageImpl` methods, which will come in the next PR.
ghstack-source-id: 129567053
Test Plan: CI
Reviewed By: mrshenli
Differential Revision: D28474878
fbshipit-source-id: 5b76d45e05f6fa58c831e369c5c964d126187a6c
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/57294
With the advent of CPUs in the device maps, and to be more generic (e.g., to support AMD GPUs), and to avoid conversions when passing to Future and RRef and such, it's easier to use Devices instead of DeviceIndices. This started by just migrating the TensorPipe agent but the RPC layer is quite intertwined so I had to migrate a lot of stuff.
ghstack-source-id: 127916562
Test Plan: CI
Reviewed By: mrshenli
Differential Revision: D28092733
fbshipit-source-id: 024dcb3648c5898ab13e770413c43958f04f1a8a
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/56346
Now that TensorPipe's API has `targetDevice`, use that instead of
manually writing the CUDA device index in `metadata`.
Test Plan: CI
Reviewed By: lw
Differential Revision: D27703235
fbshipit-source-id: c5b620e3b3ce619367412efdbe9fa3778f6b8869
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/55136
This will ease the transition to the new API where `Buffer` does not
store a length anymore.
Test Plan: CI
Reviewed By: lw
Differential Revision: D27466385
fbshipit-source-id: 9a167f8c501455a3ab49ce75257c69d8b4869925
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/54251
Pull Request resolved: https://github.com/pytorch/tensorpipe/pull/324
In order to merge the channel hierarchies, we need a generic `Buffer` type, that can wrap either a `CpuBuffer` or a `CudaBuffer`.
The constraints are that, since this type is used by the channels, it cannot explicitly refer to `CudaBuffer`. We propose here a type-erasure based solution, with small-buffer optimization to avoid heap-allocating the wrapped concrete buffer.
This is a new version of D27001339 (c618dc13d2) which broke PyTorch OSS build.
Test Plan: CI
Reviewed By: lw, mrshenli
Differential Revision: D27156053
fbshipit-source-id: 4244302af33a3be91dcd06093c0d6045d081d3cc
Summary:
Pull Request resolved: https://github.com/pytorch/tensorpipe/pull/322
Pull Request resolved: https://github.com/pytorch/pytorch/pull/54145
In order to merge the channel hierarchies, we need a generic `Buffer` type, that can wrap either a `CpuBuffer` or a `CudaBuffer`.
The constraints are that, since this type is used by the channels, it cannot explicitly refer to `CudaBuffer`. We propose here a type-erasure based solution, with small-buffer optimization to avoid heap-allocating the wrapped concrete buffer.
ghstack-source-id: 124131499
Test Plan: CI
Reviewed By: lw
Differential Revision: D27001339
fbshipit-source-id: 26d7dc19d69d7e3336df6fd4ff6ec118dc17c5b6
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/45014
Pull Request resolved: https://github.com/pytorch/tensorpipe/pull/219
Pull Request resolved: https://github.com/pytorch/tensorpipe/pull/212
+ Introduce buffer.h defining the buffer struct(s). The `CpuBuffer`
struct is always defined, while the `CudaBuffer` struct is defined
only when `TENSORPIPE_SUPPORTS_CUDA` is true.
+ Update all channels to take a `CpuBuffer` or `CudaBuffer` for
`send`/`recv` rather than a raw pointer and a length.
+ Make the base `Channel`/`Context` classes templated on `TBuffer`,
effectively creating two channel hierarchies (one for CPU channels,
one for CUDA channels).
+ Update the Pipe and the generic channel tests to use the new API. So
far, generic channel tests are CPU only, and tests for the CUDA IPC
channel are (temporarily) disabled. A subsequent PR will take care of
refactoring tests so that generic tests work for CUDA channels. An
other PR will add support for CUDA tensors in the Pipe.
Differential Revision: D23598033
Test Plan: Imported from OSS
Reviewed By: lw
Pulled By: beauby
fbshipit-source-id: 1d6c3f91e288420858835cd5e7962e8da051b44b
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/42637
This commit enables sending non-CPU tensors through RPC using
TensorPipe backend. Users can configure device mappings by calling
set_map_location on `TensorPipeRpcBackendOptions`. Internally,
the `init_rpc` API verifies the correctness of device mappings. It
will shutdown RPC if the check failed, or proceed and pass global
mappings to `TensorPipeAgent` if the check was successful. For serde,
we added a device indices field to TensorPipe read and write buffers,
which should be either empty (all tensors must be on CPU) or match
the tensors in order and number in the RPC message. This commit
does not yet avoid zero-copy, the tensor is always moved to CPU
on the sender and then moved to the specified device on the receiver.
Test Plan: Imported from OSS
Reviewed By: izdeby
Differential Revision: D23011572
Pulled By: mrshenli
fbshipit-source-id: 62b617eed91237d4e9926bc8551db78b822a1187
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/39010
The initial version of the serialization for the TensorPipe RPC agent (i.e., the conversion from rpc::Message to tensorpipe::Message) worker around a limitation of TensorPipe of only allowing one payload per message by pickling each tensor separately and storing the pickles as metadata (which is a less efficient way of sending data over, as it goes through more copies). Having now lifter that limitation we can now improve the way we serialize. We now put the type and the id as their own payloads, we do a single pickling pass for all the tensors of the message (which allows us to deduplicate them) and store the pickle as a payload. My impression is that pickling is a somewhat costly operation, so reducing the number of times we do it should be beneficial for performance. For this same reason, another change I've done here is separate the allocation of the buffers from the deserialization. This will allow us (in the future) to perform the allocation on the I/O event loop but perform the unpickling in the worker thread, thus keeping the event loop more responsive.
ghstack-source-id: 104810740
Test Plan: RPC tests
Differential Revision: D21716067
fbshipit-source-id: c1475cc78afdcf0820a485ffd98c91abb35796c7
Summary:
In D21209901 TensorPipe added support for a vector of payloads inside each message, instead of a single one, so that users with multiple payloads can send them separately as they are instead of having to copy them into a new block of contiguous memory. The PyTorch agent is using the old API, which is preventing us from deleting it. This change has no effects on over-the-wire format and thus on performance.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/37919
ghstack-source-id: 103572164
Test Plan:
On both workers
```
import os
import torch
import torch.distributed.rpc as rpc
os.environ["MASTER_ADDR"] = "127.0.0.1"
os.environ["MASTER_PORT"] = "8765"
```
On worker 0
```
rpc.init_rpc(name="foo", rank=0, backend=rpc.backend_registry.BackendType.TENSORPIPE, world_size=2, rpc_backend_options=rpc.TensorPipeRpcBackendOptions(worker_name_to_id={"foo": 0, "bar": 0}))
```
On worker 1
```
rpc.init_rpc(name="bar", rank=1, backend=rpc.backend_registry.BackendType.TENSORPIPE, world_size=2, rpc_backend_options=rpc.TensorPipeRpcBackendOptions(worker_name_to_id={"foo": 0, "bar": 0}))
```
On worker 0
```
In [15]: rpc.rpc_sync("bar", torch.add, args=(torch.full((2,2), 1), torch.full((2,2), 2)))
Out[15]:
tensor([[3., 3.],
[3., 3.]])
In [16]: rpc.rpc_sync("bar", torch.add, args=(1, 2))
Out[16]: 3
```
Differential Revision: D21425536
fbshipit-source-id: a0ec2be825556b39aff018a2834baf815a6d8fa5
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/36197
Create APIs to convert between rpc::message and tensorpipe::message
1. tensorpipeSerialize() - converts rpc::message to tensorpipe::message without memory copy (tensors).
2. tensorpipeAllocateMessage - allocates rpc::message based on received tensorpipe descriptor to prepare memory-copy-free receiving.
Test Plan: buck test caffe2/test/cpp/rpc:test_tensorpipe_serialization
Reviewed By: lw
Differential Revision: D20084125
fbshipit-source-id: ffbc310f93443e50261aed752be0fe176610dd2a