mirror of
https://github.com/zebrajr/pytorch.git
synced 2025-12-06 12:20:52 +01:00
### **SUMMARY:** It is unnecessary to perform `n + 1` calls to `cast` (one cast for each parameter name-value pair and one cast for the filter generator itself) in a dictionary comprehension in an effort to avoid mypy errors. Previously, the `cast` to `Tuple[str, str]` was necessary to prevent mypy from complaining that we are trying to create a dictionary out of lists as opposed to tuples (see the mypy issue [here](https://github.com/python/mypy/issues/7509)). However, a `cast` is both adding unnecessary overhead due to the function call and should generally only be used when a linter is unable to properly infer the type of a variable, not to "lie" to it about the type. We can avoid this by instead using a generator within the dictionary comprehension and then indexing into it twice to produce tuples of size 2; mypy recognizes this as a valid dictionary initialization. The above change is much more performant than the previous version of the code. Timing the two versions of the dictionary construction yielded the following results: ``` >python -m timeit -s "from typing import cast, Dict, Tuple, Iterable" -n 100000 -p "dict(cast(Tuple[str, str], pair.split('=')) for pair in cast(Iterable[str], filter(None, 'rank=3&world_size=5'.split('&'))))" 100000 loops, best of 5: 2.66 usec per loop >python -m timeit -n 100000 -p "dict((pair[0], pair[1]) for pair in (pair.split('=') for pair in filter(None, 'rank=3&world_size=5'.split('&'))))" 100000 loops, best of 5: 1.09 usec per loop ``` The `cast` to `Iterable[str]` is similarly a "lie" that is not necessary. It is best to be as transparent as possible to the linter rather than using `cast` to eliminate errors. This actually does not even produce any mypy errors when removed in isolation from the other changes. Further, it is good practice to type hint the return value of a function rather than specifying the type of the return value inside the function. Thus, the unnecessary intermediate variable `query_dict` inside `_query_to_dict` was removed, and the type hint of the return value was moved to the function declaration. The type of the argument `query` is specified as `str`. ### **EDITS (additional commits):** [The sole type hint for `query_dict` (in `_env_rendezvous_handler`) was removed to match all other functions in the file.](76d78bfc9c) [Incorrect typing is fixed for _env_rendezvous_handler typing so that `rank`, `world_size`, `master_port`, and `master_addr` are specified to be `int`, `int`, `int`, and `str`, respectively.](3cc5844264) Pull Request resolved: https://github.com/pytorch/pytorch/pull/75959 Approved by: https://github.com/kumpera, https://github.com/mrshenli
268 lines
9.3 KiB
Python
268 lines
9.3 KiB
Python
try:
|
|
from urllib.parse import urlparse, urlunparse
|
|
except ImportError:
|
|
raise ImportError(
|
|
"urllib cannot be found, urlparse from python2 is no longer supported."
|
|
)
|
|
|
|
import numbers
|
|
import os
|
|
import sys
|
|
from datetime import timedelta
|
|
from typing import Dict
|
|
|
|
import torch._six as six
|
|
from torch.distributed import FileStore, PrefixStore, Store, TCPStore
|
|
|
|
from .constants import default_pg_timeout
|
|
|
|
|
|
_rendezvous_handlers = {}
|
|
|
|
|
|
def register_rendezvous_handler(scheme, handler):
|
|
"""Registers a new rendezvous handler.
|
|
|
|
Before we can run collective algorithms, participating processes
|
|
need to find each other and exchange information to be able to
|
|
communicate. We call this process rendezvous.
|
|
|
|
The outcome of the rendezvous process is a triplet containing a
|
|
shared key/value store, the rank of the process, and the total
|
|
number of participating processes.
|
|
|
|
If none of the bundled rendezvous methods apply to your execution
|
|
environment you can opt to register your own rendezvous handler.
|
|
Pick a unique name and use the URL scheme to identify it when
|
|
calling the `rendezvous()` function.
|
|
|
|
Args:
|
|
scheme (str): URL scheme to identify your rendezvous handler.
|
|
handler (function): Handler that is invoked when the
|
|
`rendezvous()` function is called with a URL that uses
|
|
the corresponding scheme. It must be a generator function
|
|
that yields the triplet.
|
|
"""
|
|
global _rendezvous_handlers
|
|
if scheme in _rendezvous_handlers:
|
|
raise RuntimeError(
|
|
"Rendezvous handler for {}:// already registered".format(scheme)
|
|
)
|
|
_rendezvous_handlers[scheme] = handler
|
|
|
|
# Query will have format "rank=0&world_size=1" and is
|
|
# converted into {"rank": 0, "world_size": 1}
|
|
def _query_to_dict(query: str) -> Dict[str, str]:
|
|
return dict((pair[0], pair[1]) for pair in (pair.split("=") for pair in filter(None, query.split("&"))))
|
|
|
|
def rendezvous(url: str, rank: int = -1, world_size: int = -1, **kwargs):
|
|
if not isinstance(url, six.string_classes):
|
|
raise RuntimeError("`url` must be a string. {}: {}".format(type(url), url))
|
|
|
|
if not isinstance(rank, numbers.Integral):
|
|
raise RuntimeError("`rank` must be an integer. {}".format(rank))
|
|
|
|
if not isinstance(world_size, numbers.Integral):
|
|
raise RuntimeError("`world_size` must be an integer. {}".format(world_size))
|
|
|
|
# Append node-specific arguments.
|
|
result = urlparse(url)
|
|
if rank != -1 or world_size != -1:
|
|
query_dict = _query_to_dict(result.query)
|
|
assert (
|
|
"rank" not in query_dict and "world_size" not in query_dict
|
|
), "The url: {url} has node-specific arguments(rank, world_size) already.".format(
|
|
url=url
|
|
)
|
|
if rank != -1:
|
|
query_dict["rank"] = rank
|
|
if world_size != -1:
|
|
query_dict["world_size"] = world_size
|
|
|
|
result = result._replace(
|
|
query="{}".format(
|
|
"&".join(["{}={}".format(k, v) for k, v in query_dict.items()])
|
|
)
|
|
)
|
|
url = urlunparse(result)
|
|
|
|
if result.scheme not in _rendezvous_handlers:
|
|
raise RuntimeError("No rendezvous handler for {}://".format(result.scheme))
|
|
return _rendezvous_handlers[result.scheme](url, **kwargs)
|
|
|
|
def _create_store_from_options(backend_options, rank):
|
|
result = urlparse(backend_options.init_method)
|
|
|
|
# If using env initialization, get rank and world_size from env
|
|
world_size = -1
|
|
if result.scheme == "env":
|
|
rank = os.environ.get("RANK", rank)
|
|
# Here, the world_size has already beeen initialized to -1 in init_rpc
|
|
# If the world_size env variable is also not present then it is a dynamic group
|
|
world_size = int(os.environ.get("WORLD_SIZE", world_size))
|
|
|
|
query_dict = _query_to_dict(result.query)
|
|
# if rank is -1 then intentionally exclude rank for the query, error will be thrown later
|
|
if rank != -1:
|
|
query_dict["rank"] = str(rank)
|
|
query_dict["world_size"] = str(world_size)
|
|
|
|
result = result._replace(
|
|
query="{}".format(
|
|
"&".join(["{}={}".format(k, v) for k, v in query_dict.items()])
|
|
)
|
|
)
|
|
|
|
url = urlunparse(result)
|
|
if result.scheme not in _rendezvous_handlers:
|
|
raise RuntimeError("No handler for {}://".format(result.scheme))
|
|
store, _, _ = next(_rendezvous_handlers[result.scheme](url))
|
|
return store
|
|
|
|
def _rendezvous_error(msg):
|
|
return ValueError("Error initializing torch.distributed using " + msg)
|
|
|
|
|
|
def _file_rendezvous_handler(url: str, **kwargs):
|
|
def _error(msg):
|
|
return _rendezvous_error("file:// rendezvous: " + msg)
|
|
|
|
result = urlparse(url)
|
|
path = result.path
|
|
if sys.platform == "win32":
|
|
import urllib.request
|
|
|
|
full_path = result.netloc + result.path
|
|
path = urllib.request.url2pathname(full_path)
|
|
if path:
|
|
# Normalizing an empty string produces ".", which is not expected.
|
|
path = os.path.normpath(path)
|
|
|
|
if not path:
|
|
raise _error("path missing")
|
|
query_dict = _query_to_dict(result.query)
|
|
if "rank" not in query_dict:
|
|
raise _error("rank parameter missing")
|
|
if "world_size" not in query_dict:
|
|
raise _error("world size parameter missing")
|
|
|
|
rank = int(query_dict["rank"])
|
|
world_size = int(query_dict["world_size"])
|
|
store = FileStore(path, world_size)
|
|
yield (store, rank, world_size)
|
|
|
|
# If this configuration is invalidated, there is nothing we can do about it
|
|
raise RuntimeError("Unable to perform rerendezvous using file:// method")
|
|
|
|
|
|
def _torchelastic_use_agent_store() -> bool:
|
|
return os.environ.get("TORCHELASTIC_USE_AGENT_STORE", None) == str(True)
|
|
|
|
|
|
def _create_c10d_store(hostname, port, rank, world_size, timeout) -> Store:
|
|
"""
|
|
Smartly creates a c10d Store object on ``rank`` based on whether
|
|
we need to re-use agent store. The TCPStore server is assumed to be hosted
|
|
on ``hostname:port``.
|
|
|
|
If ``torchelastic_use_agent_store()`` is ``True``, then it is assumed that
|
|
the agent leader (node rank 0) hosts the TCPStore server (for which the
|
|
endpoint is specified by the given ``hostname:port``). Hence
|
|
ALL ranks will create and return a TCPStore client (e.g. ``start_daemon=False``).
|
|
|
|
If ``torchelastic_use_agent_store()`` is ``False``, then rank 0 will host
|
|
the TCPStore (with multi-tenancy) and it is assumed that rank 0's hostname
|
|
and port are correctly passed via ``hostname`` and ``port``. All
|
|
non-zero ranks will create and return a TCPStore client.
|
|
"""
|
|
# check if port is uint16_t
|
|
if not 0 <= port < 2**16:
|
|
raise ValueError(f"port must have value from 0 to 65535 but was {port}.")
|
|
|
|
if _torchelastic_use_agent_store():
|
|
attempt = os.environ["TORCHELASTIC_RESTART_COUNT"]
|
|
tcp_store = TCPStore(hostname, port, world_size, False, timeout)
|
|
return PrefixStore(f"/worker/attempt_{attempt}", tcp_store)
|
|
else:
|
|
start_daemon = rank == 0
|
|
return TCPStore(
|
|
hostname, port, world_size, start_daemon, timeout, multi_tenant=True
|
|
)
|
|
|
|
|
|
def _tcp_rendezvous_handler(
|
|
url: str, timeout: timedelta = default_pg_timeout, **kwargs
|
|
):
|
|
def _error(msg):
|
|
return _rendezvous_error("tcp:// rendezvous: " + msg)
|
|
|
|
result = urlparse(url)
|
|
if not result.port:
|
|
raise _error("port number missing")
|
|
query_dict = _query_to_dict(result.query)
|
|
if "rank" not in query_dict:
|
|
raise _error("rank parameter missing")
|
|
if "world_size" not in query_dict:
|
|
raise _error("world size parameter missing")
|
|
|
|
rank = int(query_dict["rank"])
|
|
world_size = int(query_dict["world_size"])
|
|
assert result.hostname is not None
|
|
|
|
store = _create_c10d_store(result.hostname, result.port, rank, world_size, timeout)
|
|
|
|
yield (store, rank, world_size)
|
|
|
|
# If this configuration is invalidated, there is nothing we can do about it
|
|
raise RuntimeError("Unable to perform re-rendezvous using tcp:// method")
|
|
|
|
|
|
def _env_rendezvous_handler(
|
|
url: str, timeout: timedelta = default_pg_timeout, **kwargs
|
|
):
|
|
def _error(msg):
|
|
return _rendezvous_error("env:// rendezvous: " + msg)
|
|
|
|
def _env_error(var):
|
|
return _error("environment variable %s expected, but not set" % var)
|
|
|
|
def _get_env_or_raise(env_var: str) -> str:
|
|
env_val = os.environ.get(env_var, None)
|
|
if not env_val:
|
|
raise _env_error(env_var)
|
|
else:
|
|
return env_val
|
|
|
|
result = urlparse(url)
|
|
query_dict = _query_to_dict(result.query)
|
|
|
|
rank: int
|
|
world_size: int
|
|
master_port: int
|
|
master_addr: str
|
|
|
|
if "rank" in query_dict:
|
|
rank = int(query_dict["rank"])
|
|
else:
|
|
rank = int(_get_env_or_raise("RANK"))
|
|
|
|
if "world_size" in query_dict:
|
|
world_size = int(query_dict["world_size"])
|
|
else:
|
|
world_size = int(_get_env_or_raise("WORLD_SIZE"))
|
|
|
|
master_addr = _get_env_or_raise("MASTER_ADDR")
|
|
master_port = int(_get_env_or_raise("MASTER_PORT"))
|
|
|
|
store = _create_c10d_store(master_addr, master_port, rank, world_size, timeout)
|
|
|
|
yield (store, rank, world_size)
|
|
|
|
# If this configuration is invalidated, there is nothing we can do about it
|
|
raise RuntimeError("Unable to perform re-rendezvous using env:// method")
|
|
|
|
|
|
register_rendezvous_handler("tcp", _tcp_rendezvous_handler)
|
|
register_rendezvous_handler("env", _env_rendezvous_handler)
|
|
register_rendezvous_handler("file", _file_rendezvous_handler)
|