mirror of
https://github.com/zebrajr/pytorch.git
synced 2025-12-07 12:21:27 +01:00
Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/30218 Test Plan: Imported from OSS Differential Revision: D18638881 Pulled By: mrshenli fbshipit-source-id: ca6fae6f8cea8cdcc33d275dd71a347fbb5dd45c
324 lines
12 KiB
ReStructuredText
324 lines
12 KiB
ReStructuredText
.. _remote-reference-protocol:
|
|
|
|
Remote Reference Protocol
|
|
=========================
|
|
|
|
.. warning::
|
|
The :ref:`rref` API is experimental and subject to change.
|
|
|
|
This note describes the design details of Remote Reference protocol and walks
|
|
through message flows in different scenarios. Make sure you're familiar with the
|
|
:ref:`distributed-rpc-framework` before proceeding.
|
|
|
|
Background
|
|
^^^^^^^^^^
|
|
|
|
RRef stands for Remote REFerence. It is a reference of an object which is
|
|
located on the local or a remote worker, and transparently handles reference
|
|
counting under the hood. Conceptually, it can be considered as a distributed
|
|
shared pointer. Applications can create an RRef by calling
|
|
:meth:`~torch.distributed.rpc.remote`. Each RRef is owned by the callee worker
|
|
of the :meth:`~torch.distributed.rpc.remote` call (i.e., owner) and can be used
|
|
by multiple users. The owner stores the real data and keeps track of the global
|
|
reference count. Every RRef can be uniquely identified by a global ``RRefId``,
|
|
which is assigned at the time of creation on the caller of the
|
|
:meth:`~torch.distributed.rpc.remote` call.
|
|
|
|
On the owner worker, there is only one ``OwnerRRef`` instance, which contains
|
|
the real data, while on user workers, there can be as many ``UserRRefs`` as
|
|
necessary, and ``UserRRef`` does not hold the data. All usage on the owner will
|
|
retrieve the unique ``OwnerRRef`` instance using the globally unique ``RRefId``.
|
|
A ``UserRRef`` will be created when it is used as an argument or return value in
|
|
:meth:`~torch.distributed.rpc.rpc_sync`,
|
|
:meth:`~torch.distributed.rpc.rpc_async` or
|
|
:meth:`~torch.distributed.rpc.remote` invocation, and the owner will be notified
|
|
according to update the reference count. An ``OwnerRRef`` and its data will be
|
|
deleted when there is no ``UserRRef`` instances globally and there are no
|
|
reference to the ``OwnerRRef`` on the owner as well.
|
|
|
|
|
|
Assumptions
|
|
^^^^^^^^^^^
|
|
|
|
RRef protocol is designed with the following assumptions.
|
|
|
|
- **Transient Network Failures**: The RRef design aims to handle transient
|
|
network failures by retrying messages. Node crashes or permanent network
|
|
partition is beyond the scope. When those incidents occur, the application
|
|
may take down all workers, revert to the previous checkpoint, and resume
|
|
training.
|
|
- **Non-idempotent UDFs**: We assume the user functions (UDF) provided to
|
|
:meth:`~torch.distributed.rpc.rpc_sync`,
|
|
:meth:`~torch.distributed.rpc.rpc_async` or
|
|
:meth:`~torch.distributed.rpc.remote` are not idempotent and therefore
|
|
cannot be retried. However, internal RRef control messages will be made
|
|
idempotent and retryable.
|
|
- **Out of Order Message Delivery**: We do not assume message delivery order
|
|
between any pair of nodes, because both sender and receiver are using multiple
|
|
threads. There is no guarantee on which message will be processed first.
|
|
|
|
|
|
RRef Lifetime
|
|
^^^^^^^^^^^^^
|
|
|
|
The goal of the protocol is to delete an ``OwnerRRef`` at an appropriate time.
|
|
The right time to delete an ``OwnerRRef`` is when there are no living
|
|
``UserRRef`` instances and user code is not holding references to the
|
|
``OwnerRRef`` either. The tricky part is to determine if there are any living
|
|
``UserRRef`` instances.
|
|
|
|
Design Reasoning
|
|
----------------
|
|
|
|
A user can get a ``UserRRef`` in three situations:
|
|
|
|
1) Receiving a ``UserRRef`` from the owner.
|
|
2) Receiving a ``UserRRef`` from another user.
|
|
3) Creating a new ``UserRRef`` owned by another worker.
|
|
|
|
|
|
Case 1 is the simplest where the owner passes its RRef to a user, where the
|
|
owner calls :meth:`~torch.distributed.rpc.rpc_sync`,
|
|
:meth:`~torch.distributed.rpc.rpc_async`, or
|
|
:meth:`~torch.distributed.rpc.remote` and uses its RRef as an argument. In this
|
|
case a new ``UserRRef`` will be created on the user. As the owner is the caller,
|
|
it can easily update its local reference count on the ``OwnerRRef``.
|
|
|
|
The only requirement is that any
|
|
``UserRRef`` must notify the owner upon destruction. Hence, we need the first
|
|
guarantee:
|
|
|
|
**G1. The owner will be notified when any ``UserRRef`` is deleted.**
|
|
|
|
As messages might come delayed or out-of-order, we need one more guarantee to
|
|
make sure the delete message is not processed too soon. If A sends a message to
|
|
B that involves an RRef, we call the RRef on A the parent RRef and the RRef on B
|
|
the child RRef.
|
|
|
|
**G2. Parent RRef will NOT be deleted until the child RRef is confirmed by the
|
|
owner.**
|
|
|
|
In cases 2 and 3, it is possible that the owner has only partial or no knowledge
|
|
at all about the RRef fork graph. For example, an RRef could be
|
|
constructed on a user, and before the owner receives any RPC call, the
|
|
creator user might have already shared the RRef with other users, and those
|
|
users could further share the RRef. One invariant is that the fork graph of
|
|
any RRef is always a tree, because forking an RRef always
|
|
creates a new ``UserRRef`` instance on the callee (except if the callee is the
|
|
owner), and hence every RRef has a single parent.
|
|
|
|
The owner's view on any ``UserRRef`` in the tree has three stages:
|
|
|
|
.. code::
|
|
|
|
1) unknown -> 2) known -> 3) deleted.
|
|
|
|
The owner's view of the entire tree keeps changing. The owner deletes its
|
|
``OwnerRRef`` instance when it thinks there are no living ``UserRRef``
|
|
instances, i.e.,
|
|
when ``OwnerRRef`` is deleted, all ``UserRRef`` instances could be either indeed
|
|
deleted or unknown. The dangerous case is when some forks are unknown and others
|
|
are deleted.
|
|
|
|
**G2** trivially guarantees that no parent ``UserRRef`` can be deleted before
|
|
the owner knows all of its children ``UserRRef`` instances. However, it is
|
|
possible that the child ``UserRRef`` may be deleted before the owner knows its
|
|
parent ``UserRRef``.
|
|
|
|
Consider the following example, where the ``OwnerRRef`` forks to A, then A forks
|
|
to Y, and Y forks to Z.:
|
|
|
|
.. code::
|
|
|
|
OwnerRRef -> A -> Y -> Z
|
|
|
|
If all of Z's messages, including the delete message, are processed by the
|
|
owner before all messages from Y, the owner will learn Z's deletion before
|
|
knowing Y. Nevertheless, this does not cause any problem. Because, at least
|
|
one of Y's ancestors will be alive (in this case, A) and it will
|
|
prevent the owner from deleting the ``OwnerRRef``. More specifically, if the
|
|
owner does not know Y, A cannot be deleted due to **G2**, and the owner knows A
|
|
as the owner is A's parent.
|
|
|
|
Things get a little trickier if the RRef is created on a user:
|
|
|
|
|
|
.. code::
|
|
|
|
OwnerRRef
|
|
^
|
|
|
|
|
A -> Y -> Z
|
|
|
|
|
|
If Z calls :meth:`~torch.distributed.rpc.RRef.to_here` on the ``UserRRef``, the
|
|
owner at least knows A when Z is deleted, because otherwise,
|
|
:meth:`~torch.distributed.rpc.RRef.to_here` wouldn't finish. If Z does not call
|
|
:meth:`~torch.distributed.rpc.RRef.to_here`, it is possible that the owner
|
|
receives all messages from Z before any message from A and Y. In this case, as
|
|
the real data of the ``OwnerRRef`` has not been created yet, there is nothing to
|
|
be deleted either. It is the same as Z does not exist at all. Hence, it's still
|
|
OK.
|
|
|
|
Implementation
|
|
--------------
|
|
|
|
**G1** is implemented by sending out a delete message in ``UserRRef``
|
|
destructor. To provide **G2**, the parent ``UserRRef`` is put into a context
|
|
whenever it is forked, indexed by the new ``ForkId``. The parent ``UserRRef`` is
|
|
only removed from the context when it receives an acknowledgement message (ACK)
|
|
from the child, and the child will only send out the ACK when it is confirmed by
|
|
the owner.
|
|
|
|
|
|
Protocol Scenarios
|
|
^^^^^^^^^^^^^^^^^^
|
|
|
|
Let's now discuss how the above designs translate to the protocol in four
|
|
scenarios.
|
|
|
|
User Share RRef with Owner as Return Value
|
|
------------------------------------------
|
|
|
|
|
|
.. code::
|
|
import torch
|
|
import torch.distributed.rpc as rpc
|
|
|
|
# on worker A
|
|
rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1))
|
|
# say the rref has RRefId 100 and ForkId 1
|
|
rref.to_here()
|
|
|
|
|
|
In this case, the ``UserRRef`` is created on the user worker A, then it is
|
|
passed to the owner worker B together with the remote message, and then B
|
|
creates the ``OwnerRRef``. The method :meth:`~torch.distributed.rpc.remote`
|
|
returns immediately, meaning that the ``UserRRef`` can be forked/used before
|
|
the owner knows about it.
|
|
|
|
On the owner, when receiving the :meth:`~torch.distributed.rpc.remote` call, it
|
|
will create the ``OwnerRRef``, and returns an ACK to acknowledge ``{100, 1}``
|
|
(``RRefId``, ``ForkId``). Only after receiving this ACK, can A delete its
|
|
``UserRRef``. This involves both **G1** and **G2**. **G1** is obvious. For
|
|
**G2**, the ``OwnerRRef`` is a child of the ``UserRRef``, and the ``UserRRef``
|
|
is not deleted until it receives the ACK from the owner.
|
|
|
|
.. image:: https://user-images\.githubusercontent\.com/16999635/69164772-98181300-0abe-11ea-93a7-9ad9f757cd94.png
|
|
:alt: user_to_owner_ret.png
|
|
:width: 500 px
|
|
|
|
The diagram above shows the message flow, where solid arrow contains user
|
|
function and dashed arrow are builtin messages. Note that the first two messages
|
|
from A to B (:meth:`~torch.distributed.rpc.remote` and
|
|
:meth:`~torch.distributed.rpc.RRef.to_here`) may
|
|
arrive at B in any order, but the final delete message will only be sent out
|
|
when:
|
|
|
|
- B acknowledges ``UserRRef {100, 1}`` (G2), and
|
|
- Python GC agrees to delete the local ``UserRRef`` instance. This occurs when
|
|
the RRef is no longer in scope and is eligible for garbage collection.
|
|
|
|
|
|
|
|
User Share RRef with Owner as Argument
|
|
--------------------------------------
|
|
|
|
.. code::
|
|
|
|
import torch
|
|
import torch.distributed.rpc as rpc
|
|
|
|
# on worker A and worker B
|
|
def func(rref):
|
|
pass
|
|
|
|
# on worker A
|
|
rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1))
|
|
# say the rref has RRefId 100 and ForkId 1
|
|
rpc.rpc_async('B', func, args=(rref, ))
|
|
|
|
|
|
In this case, after creating the ``UserRRef`` on A, A uses it as an argument in
|
|
a followup RPC call to B. A will keep ``UserRRef {100, 1}`` alive until it
|
|
receives the acknowledge from B (**G2**, not the return value of the RPC call).
|
|
This is necessary because A should not send out the delete message until all
|
|
previous messages are received, otherwise, the ``OwnerRRef`` could be
|
|
deleted before usage as we do not guarantee message delivery order. This is done
|
|
by creating a child ``ForkId`` of RRef, holding them in a map until receives the
|
|
owner confirms the child ``ForkId``. The figure below shows the message flow.
|
|
|
|
.. image:: https://user-images.githubusercontent.com/16999635/69164845-b67e0e80-0abe-11ea-93fa-d24674e75a2b.png
|
|
:alt: user_to_owner_arg.png
|
|
:width: 500 px
|
|
|
|
|
|
Note that the ``UserRRef`` could be deleted on B before func finishes or even
|
|
starts. However this is OK, as at the time B sends out ACK for the child
|
|
``ForkId``, it already acquired the ``OwnerRRef`` instance, which would prevent
|
|
it been deleted too soon.
|
|
|
|
|
|
Owner Share RRef with User
|
|
--------------------------
|
|
|
|
Owner to user is the simplest case, where the owner can update reference
|
|
counting locally, and does not need any additional control message to notify
|
|
others. Regarding **G2**, it is same as the parent receives the ACK from the
|
|
owner immediately, as the parent is the owner.
|
|
|
|
.. code::
|
|
|
|
import torch
|
|
import torch.distributed.rpc as RRef, rpc
|
|
|
|
# on worker B and worker C
|
|
def func(rref):
|
|
pass
|
|
|
|
# on worker B, creating a local RRef
|
|
rref = RRef("data")
|
|
# say the rref has RRefId 100
|
|
dist.rpc_async('C', func, args=(rref, ))
|
|
|
|
|
|
.. image:: https://user-images.githubusercontent.com/16999635/69164921-c990de80-0abe-11ea-9250-d32ad00cf4ae.png
|
|
:alt: owner_to_user.png
|
|
:width: 500 px
|
|
|
|
The figure above shows the message flow. Note that when the ``OwnerRRef`` exits
|
|
scope after the rpc_async call, it will not be deleted, because internally
|
|
there is a map to hold it alive if there is any known forks, in which case is
|
|
``UserRRef {100, 1}``. (**G2**)
|
|
|
|
|
|
User Share RRef with User
|
|
-------------------------
|
|
|
|
This is the most complicated case where caller user (parent ``UserRRef``),
|
|
callee user (child ``UserRRef``), and the owner all need to get involved.
|
|
|
|
.. code::
|
|
|
|
import torch
|
|
import torch.distributed.rpc as rpc
|
|
|
|
# on worker A and worker C
|
|
def func(rref):
|
|
pass
|
|
|
|
# on worker A
|
|
rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1))
|
|
# say the rref has RRefId 100 and ForkId 1
|
|
rpc.rpc_async('C', func, args=(rref, ))
|
|
|
|
.. image:: https://user-images.githubusercontent.com/16999635/69164971-d6adcd80-0abe-11ea-971d-6b7af131f0fd.png
|
|
:alt: user_to_user.png
|
|
:width: 500 px
|
|
|
|
When C receives the child ``UserRRef`` from A, it sends out a fork request to
|
|
the owner B. Later, when the B confirms the ``UserRRef`` on C, C will perform
|
|
two actions in parallel: 1) send out the child ACK to A ,and 2) run the user
|
|
provided function. During this time, the parent (A) will hold its
|
|
``UserRRef {100, 1}`` alive to achieve **G2**.
|