diff --git a/docs/source/distributed.rst b/docs/source/distributed.rst index f046f250f2a..89c30fcf322 100644 --- a/docs/source/distributed.rst +++ b/docs/source/distributed.rst @@ -284,6 +284,13 @@ The machine with rank 0 will be used to set up all connections. This is the default method, meaning that ``init_method`` does not have to be specified (or can be ``env://``). +Improving initialization time +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* ``TORCH_GLOO_LAZY_INIT`` - establishes connections on demand rather than + using a full mesh which can greatly improve initialization time for non all2all + operations. + Post-Initialization ------------------- diff --git a/test/distributed/test_c10d_gloo.py b/test/distributed/test_c10d_gloo.py index 9228efdedf3..57ad689179d 100644 --- a/test/distributed/test_c10d_gloo.py +++ b/test/distributed/test_c10d_gloo.py @@ -46,6 +46,7 @@ from torch.testing._internal.common_distributed import ( requires_gloo, simple_sparse_reduce_tests, skip_if_lt_x_gpu, + skip_if_win32, verify_ddp_error_logged, ) from torch.testing._internal.common_utils import ( @@ -219,6 +220,8 @@ class TimeoutTest(test_c10d_common.AbstractTimeoutTest, TestCase): class ProcessGroupGlooTest(MultiProcessTestCase): + lazy_init = False + def _create_process_group_gloo(self, store, rank, world_size, opts): pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, opts) dist.barrier(group=pg) @@ -231,7 +234,7 @@ class ProcessGroupGlooTest(MultiProcessTestCase): def opts(self, threads=2): opts = c10d.ProcessGroupGloo._Options() opts._timeout = 50.0 - opts._devices = [create_device(interface=LOOPBACK)] + opts._devices = [create_device(interface=LOOPBACK, lazy_init=self.lazy_init)] opts._threads = threads return opts @@ -241,8 +244,8 @@ class ProcessGroupGlooTest(MultiProcessTestCase): opts = c10d.ProcessGroupGloo._Options() opts._timeout = 5.0 opts._devices = [ - create_device(interface=LOOPBACK), - create_device(interface=LOOPBACK), + create_device(interface=LOOPBACK, lazy_init=self.lazy_init), + create_device(interface=LOOPBACK, lazy_init=self.lazy_init), ] pg = self._create_process_group_gloo(store, self.rank, self.world_size, opts) @@ -2334,6 +2337,19 @@ class ReducerTest(TestCase): optimizer.step() +@skip_if_win32() +class ProcessGroupGlooLazyInitTest(ProcessGroupGlooTest): + lazy_init = True + + def setUp(self): + os.environ["TORCH_GLOO_LAZY_INIT"] = "1" + super().setUp() + + def tearDown(self) -> None: + del os.environ["TORCH_GLOO_LAZY_INIT"] + return super().tearDown() + + class CommTest(test_c10d_common.AbstractCommTest, MultiProcessTestCase): @property def device(self): diff --git a/third_party/gloo b/third_party/gloo index e348db90d86..c6107042761 160000 --- a/third_party/gloo +++ b/third_party/gloo @@ -1 +1 @@ -Subproject commit e348db90d8677277e926c14c94ee2acfa77173d4 +Subproject commit c61070427610ccd923efe3e7f8b3eca12bbcc31a diff --git a/torch/_C/_distributed_c10d.pyi b/torch/_C/_distributed_c10d.pyi index 6aaaf4b9c5f..0487eb7c924 100644 --- a/torch/_C/_distributed_c10d.pyi +++ b/torch/_C/_distributed_c10d.pyi @@ -570,9 +570,9 @@ class ProcessGroupGloo(Backend): timeout: timedelta, ) -> None: ... @staticmethod - def create_device(hostname="", interface="") -> Device: ... + def create_device(hostname="", interface="", lazy_init=None) -> Device: ... @staticmethod - def create_default_device() -> Device: ... + def create_default_device(lazy_init=None) -> Device: ... def _set_default_timeout(self, timeout) -> None: ... class _ProcessGroupWrapper(Backend): diff --git a/torch/csrc/distributed/c10d/GlooDeviceFactory.cpp b/torch/csrc/distributed/c10d/GlooDeviceFactory.cpp index af09ba39470..32c4c4f88ac 100644 --- a/torch/csrc/distributed/c10d/GlooDeviceFactory.cpp +++ b/torch/csrc/distributed/c10d/GlooDeviceFactory.cpp @@ -39,12 +39,14 @@ C10_DEFINE_SHARED_REGISTRY_WITHOUT_WARNING( GlooDeviceRegistry, ::gloo::transport::Device, const std::string& /* interface */, - const std::string& /* hostname */) + const std::string& /* hostname */, + bool /* lazyInit */) #if GLOO_HAVE_TRANSPORT_TCP static std::shared_ptr<::gloo::transport::Device> makeTCPDevice( const std::string& interfaceName, - const std::string& hostname) { + const std::string& hostname, + bool lazyInit) { TORCH_CHECK( !interfaceName.empty() || !hostname.empty(), "GlooDeviceFactory::makeTCPDevice(): interface or hostname " @@ -56,7 +58,11 @@ static std::shared_ptr<::gloo::transport::Device> makeTCPDevice( } else { attr.hostname = hostname; } - return ::gloo::transport::tcp::CreateDevice(attr); + if (lazyInit) { + return ::gloo::transport::tcp::CreateLazyDevice(attr); + } else { + return ::gloo::transport::tcp::CreateDevice(attr); + } } // Registry priority is per key identifier. We register TCP to `LINUX` for @@ -69,12 +75,15 @@ C10_REGISTER_CREATOR(GlooDeviceRegistry, TCP, makeTCPDevice) #if GLOO_HAVE_TRANSPORT_TCP_TLS static std::shared_ptr<::gloo::transport::Device> makeTCPTLSDevice( const std::string& interface, - const std::string& hostname) { + const std::string& hostname, + bool lazyInit) { TORCH_CHECK( !interface.empty() || !hostname.empty(), "GlooDeviceFactory::makeTCPTLSDevice(): interface or hostname " "can't be empty"); + TORCH_CHECK(!lazyInit, "TCP_TLS transport does not support lazy init"); + ::gloo::transport::tcp::attr attr; if (!interface.empty()) { attr.iface = interface; @@ -105,12 +114,15 @@ C10_REGISTER_CREATOR(GlooDeviceRegistry, TCP_TLS, makeTCPTLSDevice) #if GLOO_HAVE_TRANSPORT_UV static std::shared_ptr<::gloo::transport::Device> makeUVDevice( const std::string& interfaceName, - const std::string& hostname) { + const std::string& hostname, + bool lazyInit) { TORCH_CHECK( !interfaceName.empty() || !hostname.empty(), "GlooDeviceFactory::makeUVDevice(): interface or hostname " "can't be empty"); + TORCH_CHECK(!lazyInit, "UV transport does not support lazy init"); + ::gloo::transport::uv::attr attr; if (!interfaceName.empty()) { attr.iface = interfaceName; @@ -131,23 +143,27 @@ C10_REGISTER_CREATOR(GlooDeviceRegistry, UV, makeUVDevice) namespace { std::shared_ptr<::gloo::transport::Device> makeGlooDevice( const std::string& interfaceName, - const std::string& hostName) { + const std::string& hostName, + bool lazyInit) { static auto transportName = c10::utils::get_env("GLOO_DEVICE_TRANSPORT"); if (transportName.has_value()) { return GlooDeviceRegistry()->Create( - transportName.value().c_str(), interfaceName, hostName); + transportName.value().c_str(), interfaceName, hostName, lazyInit); } #ifdef __linux__ - return GlooDeviceRegistry()->Create("LINUX", interfaceName, hostName); + return GlooDeviceRegistry()->Create( + "LINUX", interfaceName, hostName, lazyInit); #endif #ifdef __APPLE__ - return GlooDeviceRegistry()->Create("APPLE", interfaceName, hostName); + return GlooDeviceRegistry()->Create( + "APPLE", interfaceName, hostName, lazyInit); #endif #ifdef _WIN32 - return GlooDeviceRegistry()->Create("WIN32", interfaceName, hostName); + return GlooDeviceRegistry()->Create( + "WIN32", interfaceName, hostName, lazyInit); #endif return nullptr; @@ -155,8 +171,8 @@ std::shared_ptr<::gloo::transport::Device> makeGlooDevice( } // anonymous namespace std::shared_ptr<::gloo::transport::Device> GlooDeviceFactory:: - makeDeviceForInterface(const std::string& interfaceName) { - auto device = makeGlooDevice(interfaceName, ""); + makeDeviceForInterface(const std::string& interfaceName, bool lazyInit) { + auto device = makeGlooDevice(interfaceName, "", lazyInit); if (!device) { TORCH_CHECK(false, "makeDeviceForInterface(): unsupported gloo device"); } @@ -164,8 +180,8 @@ std::shared_ptr<::gloo::transport::Device> GlooDeviceFactory:: } std::shared_ptr<::gloo::transport::Device> GlooDeviceFactory:: - makeDeviceForHostname(const std::string& hostname) { - auto device = makeGlooDevice("", hostname); + makeDeviceForHostname(const std::string& hostname, bool lazyInit) { + auto device = makeGlooDevice("", hostname, lazyInit); if (!device) { TORCH_CHECK(false, "makeDeviceForHostname(): unsupported gloo device"); } diff --git a/torch/csrc/distributed/c10d/GlooDeviceFactory.hpp b/torch/csrc/distributed/c10d/GlooDeviceFactory.hpp index 1221e9d033f..a7220f0d81c 100644 --- a/torch/csrc/distributed/c10d/GlooDeviceFactory.hpp +++ b/torch/csrc/distributed/c10d/GlooDeviceFactory.hpp @@ -14,18 +14,21 @@ class TORCH_API GlooDeviceFactory { public: // Create new device instance for specific interface. static std::shared_ptr<::gloo::transport::Device> makeDeviceForInterface( - const std::string& interface); + const std::string& interface, + bool lazyInit); // Create new device instance for specific hostname or address. static std::shared_ptr<::gloo::transport::Device> makeDeviceForHostname( - const std::string& hostname); + const std::string& hostname, + bool lazyInit); }; TORCH_DECLARE_SHARED_REGISTRY( GlooDeviceRegistry, ::gloo::transport::Device, const std::string&, /* interface */ - const std::string& /* hostname */); + const std::string&, /* hostname */ + bool /* lazyInit */); } // namespace c10d diff --git a/torch/csrc/distributed/c10d/ProcessGroupGloo.cpp b/torch/csrc/distributed/c10d/ProcessGroupGloo.cpp index 3c5644eeab6..077bf311284 100644 --- a/torch/csrc/distributed/c10d/ProcessGroupGloo.cpp +++ b/torch/csrc/distributed/c10d/ProcessGroupGloo.cpp @@ -415,6 +415,10 @@ const auto kLoopbackAddress = "127.0.0.1"; } // namespace +bool getDefaultGlooLazyInit() { + return ::c10d::getCvarBool(TORCH_GLOO_LAZY_INIT, false); +} + // static void ProcessGroupGloo::AsyncWork::execute( const c10::intrusive_ptr& work) { @@ -687,23 +691,24 @@ bool doesHostnameResolveToUsableAddress(const std::string& hostname) { } // namespace std::shared_ptr<::gloo::transport::Device> ProcessGroupGloo:: - createDeviceForInterface(const std::string& interface_name) { - return ::c10d::GlooDeviceFactory::makeDeviceForInterface(interface_name); + createDeviceForInterface(const std::string& interface_name, bool lazyInit) { + return ::c10d::GlooDeviceFactory::makeDeviceForInterface( + interface_name, lazyInit); } std::shared_ptr<::gloo::transport::Device> ProcessGroupGloo:: - createDeviceForHostname(const std::string& hostname) { + createDeviceForHostname(const std::string& hostname, bool lazyInit) { TORCH_CHECK( doesHostnameResolveToUsableAddress(hostname), "Cannot resolve ", hostname, " to a (local) address"); - return ::c10d::GlooDeviceFactory::makeDeviceForHostname(hostname); + return ::c10d::GlooDeviceFactory::makeDeviceForHostname(hostname, lazyInit); } #if defined(__linux__) || defined(_WIN32) std::shared_ptr<::gloo::transport::Device> ProcessGroupGloo:: - createDefaultDevice() { + createDefaultDevice(bool lazyInit) { // Use the hostname to resolve the network address to // use. Note: if the hostname does not resolve to an address (e.g. // because of misconfigured /etc/hosts file), this will not work. @@ -716,7 +721,8 @@ std::shared_ptr<::gloo::transport::Device> ProcessGroupGloo:: // Use this machine's hostname if it resolves to an address. if (doesHostnameResolveToUsableAddress(hostname.data())) { - return ::c10d::GlooDeviceFactory::makeDeviceForHostname(hostname.data()); + return ::c10d::GlooDeviceFactory::makeDeviceForHostname( + hostname.data(), lazyInit); } // Otherwise, use the loopback address. @@ -724,13 +730,13 @@ std::shared_ptr<::gloo::transport::Device> ProcessGroupGloo:: "Unable to resolve hostname to a (local) address. ", "Using the loopback address as fallback. ", "Manually set the network interface to bind to with GLOO_SOCKET_IFNAME."); - return createDeviceForHostname(kLoopbackAddress); + return createDeviceForHostname(kLoopbackAddress, lazyInit); } #endif #ifdef __APPLE__ std::shared_ptr<::gloo::transport::Device> ProcessGroupGloo:: - createDefaultDevice() { + createDefaultDevice(bool lazyInit) { // Use the hostname to resolve the network address to // use. Note: if the hostname does not resolve to an address (e.g. // because of misconfigured /etc/hosts file), this will not work. @@ -743,7 +749,8 @@ std::shared_ptr<::gloo::transport::Device> ProcessGroupGloo:: // Use this machine's hostname if it resolves to an address. if (doesHostnameResolveToUsableAddress(hostname.get())) { - return ::c10d::GlooDeviceFactory::makeDeviceForHostname(hostname.get()); + return ::c10d::GlooDeviceFactory::makeDeviceForHostname( + hostname.get(), lazyInit); } // Otherwise, use the loopback address. @@ -751,7 +758,7 @@ std::shared_ptr<::gloo::transport::Device> ProcessGroupGloo:: "Unable to resolve hostname to a (local) address. ", "Using the loopback address as fallback. ", "Manually set the network interface to bind to with GLOO_SOCKET_IFNAME."); - return createDeviceForHostname(kLoopbackAddress); + return createDeviceForHostname(kLoopbackAddress, lazyInit); } #endif diff --git a/torch/csrc/distributed/c10d/ProcessGroupGloo.hpp b/torch/csrc/distributed/c10d/ProcessGroupGloo.hpp index 059ba8a4ee3..917544d9e11 100644 --- a/torch/csrc/distributed/c10d/ProcessGroupGloo.hpp +++ b/torch/csrc/distributed/c10d/ProcessGroupGloo.hpp @@ -28,6 +28,13 @@ namespace c10d { constexpr const char* GLOO_BACKEND_NAME = "gloo"; +// Control whether or not connections are established in a full mesh or lazily +// as needed. +static std::vector TORCH_GLOO_LAZY_INIT = {"TORCH_GLOO_LAZY_INIT"}; + +// Returns default value for lazyInit. +bool TORCH_API getDefaultGlooLazyInit(); + // ProcessGroupGloo implements Gloo bindings for c10d. // // All functions on this class are expected to be called in the same @@ -244,24 +251,20 @@ class TORCH_API ProcessGroupGloo : public Backend { // Create new device instance for specific interface. static std::shared_ptr<::gloo::transport::Device> createDeviceForInterface( - const std::string& interface); + const std::string& interface, + bool lazyInit = false); // Create new device instance for specific hostname or address. static std::shared_ptr<::gloo::transport::Device> createDeviceForHostname( - const std::string& hostname); + const std::string& hostname, + bool lazyInit = false); // Create new device instance. // It tries to resolve this machine's hostname and bind to that address. // If that fails (i.e. the hostname doesn't resolve to an address), it // falls back to binding to the loopback address. - static std::shared_ptr<::gloo::transport::Device> createDefaultDevice(); - - // Create ProcessGroupGloo instance. - static c10::intrusive_ptr createProcessGroupGloo( - const c10::intrusive_ptr& store, - int rank, - int size, - std::chrono::milliseconds timeout); + static std::shared_ptr<::gloo::transport::Device> createDefaultDevice( + bool lazyInit = false); explicit ProcessGroupGloo( const c10::intrusive_ptr& store, diff --git a/torch/csrc/distributed/c10d/init.cpp b/torch/csrc/distributed/c10d/init.cpp index 0217d2471dc..f1bd5fb14cf 100644 --- a/torch/csrc/distributed/c10d/init.cpp +++ b/torch/csrc/distributed/c10d/init.cpp @@ -2849,24 +2849,36 @@ options :class:`~torch.distributed.ProcessGroupNCCL.Options`). processGroupGloo .def_static( "create_device", - [](const std::string& hostname, const std::string& interface) + [](const std::string& hostname, + const std::string& interface, + std::optional lazyInit_) -> std::shared_ptr<::gloo::transport::Device> { + bool lazyInit = + lazyInit_.value_or(::c10d::getDefaultGlooLazyInit()); + if (!hostname.empty()) { return ::c10d::ProcessGroupGloo::createDeviceForHostname( - hostname); + hostname, lazyInit); } if (!interface.empty()) { return ::c10d::ProcessGroupGloo::createDeviceForInterface( - interface); + interface, lazyInit); } throw std::invalid_argument( "Specify either `hostname` or `interface` argument."); }, py::arg("hostname") = "", - py::arg("interface") = "") + py::arg("interface") = "", + py::arg("lazy_init") = std::nullopt) .def_static( "create_default_device", - &::c10d::ProcessGroupGloo::createDefaultDevice); + [](std::optional lazyInit_) { + bool lazyInit = + lazyInit_.value_or(::c10d::getDefaultGlooLazyInit()); + + return ::c10d::ProcessGroupGloo::createDefaultDevice(lazyInit); + }, + py::arg("lazy_init") = std::nullopt); processGroupGloo .def( @@ -2898,20 +2910,22 @@ options :class:`~torch.distributed.ProcessGroupNCCL.Options`). py::gil_scoped_release nogil{}; auto options = ::c10d::ProcessGroupGloo::Options::create(); + bool lazyInit = ::c10d::getDefaultGlooLazyInit(); // Use interfaces listed in "GLOO_SOCKET_IFNAME", if set. char* ifnameEnv = getenv(GLOO_SOCKET_IFNAME_ENV.c_str()); if (ifnameEnv && strlen(ifnameEnv) > 1) { for (const auto& iface : ::c10d::split(',', ifnameEnv)) { options->devices.push_back( - ::c10d::ProcessGroupGloo::createDeviceForInterface(iface)); + ::c10d::ProcessGroupGloo::createDeviceForInterface( + iface, lazyInit)); } } else { // If no hostname is specified, this function looks up // the machine's hostname and returns a device instance // associated with the address that the hostname resolves to. options->devices.push_back( - ::c10d::ProcessGroupGloo::createDefaultDevice()); + ::c10d::ProcessGroupGloo::createDefaultDevice(lazyInit)); } options->timeout = timeout; diff --git a/torch/testing/_internal/common_distributed.py b/torch/testing/_internal/common_distributed.py index 2a8fc04265c..6a3e654d9e7 100644 --- a/torch/testing/_internal/common_distributed.py +++ b/torch/testing/_internal/common_distributed.py @@ -442,11 +442,11 @@ if TEST_WITH_ROCM: TIMEOUT_OVERRIDE["test_join_kwargs"] = 200 -def create_device(interface=None): +def create_device(interface=None, lazy_init: bool = False): if sys.platform == "win32" or interface is None: - return c10d.ProcessGroupGloo.create_device(hostname="127.0.0.1") + return c10d.ProcessGroupGloo.create_device(hostname="127.0.0.1", lazy_init=lazy_init) else: - return c10d.ProcessGroupGloo.create_device(interface=interface) + return c10d.ProcessGroupGloo.create_device(interface=interface, lazy_init=lazy_init) def get_timeout(test_id) -> int: