Reapply "ProcessGroupGloo: support lazy_init (#150801)" (#151031)

This reverts commit 73f3d6d9aa.

Reapplies #150801

Test plan:

See #150801

submodule

Pull Request resolved: https://github.com/pytorch/pytorch/pull/151031
Approved by: https://github.com/fduwjj
This commit is contained in:
Tristan Rice 2025-04-11 01:58:35 +00:00 committed by PyTorch MergeBot
parent b7c0fda163
commit df4e5294a6
10 changed files with 119 additions and 53 deletions

View File

@ -284,6 +284,13 @@ The machine with rank 0 will be used to set up all connections.
This is the default method, meaning that ``init_method`` does not have to be specified (or
can be ``env://``).
Improving initialization time
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
* ``TORCH_GLOO_LAZY_INIT`` - establishes connections on demand rather than
using a full mesh which can greatly improve initialization time for non all2all
operations.
Post-Initialization
-------------------

View File

@ -46,6 +46,7 @@ from torch.testing._internal.common_distributed import (
requires_gloo,
simple_sparse_reduce_tests,
skip_if_lt_x_gpu,
skip_if_win32,
verify_ddp_error_logged,
)
from torch.testing._internal.common_utils import (
@ -219,6 +220,8 @@ class TimeoutTest(test_c10d_common.AbstractTimeoutTest, TestCase):
class ProcessGroupGlooTest(MultiProcessTestCase):
lazy_init = False
def _create_process_group_gloo(self, store, rank, world_size, opts):
pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, opts)
dist.barrier(group=pg)
@ -231,7 +234,7 @@ class ProcessGroupGlooTest(MultiProcessTestCase):
def opts(self, threads=2):
opts = c10d.ProcessGroupGloo._Options()
opts._timeout = 50.0
opts._devices = [create_device(interface=LOOPBACK)]
opts._devices = [create_device(interface=LOOPBACK, lazy_init=self.lazy_init)]
opts._threads = threads
return opts
@ -241,8 +244,8 @@ class ProcessGroupGlooTest(MultiProcessTestCase):
opts = c10d.ProcessGroupGloo._Options()
opts._timeout = 5.0
opts._devices = [
create_device(interface=LOOPBACK),
create_device(interface=LOOPBACK),
create_device(interface=LOOPBACK, lazy_init=self.lazy_init),
create_device(interface=LOOPBACK, lazy_init=self.lazy_init),
]
pg = self._create_process_group_gloo(store, self.rank, self.world_size, opts)
@ -2334,6 +2337,19 @@ class ReducerTest(TestCase):
optimizer.step()
@skip_if_win32()
class ProcessGroupGlooLazyInitTest(ProcessGroupGlooTest):
lazy_init = True
def setUp(self):
os.environ["TORCH_GLOO_LAZY_INIT"] = "1"
super().setUp()
def tearDown(self) -> None:
del os.environ["TORCH_GLOO_LAZY_INIT"]
return super().tearDown()
class CommTest(test_c10d_common.AbstractCommTest, MultiProcessTestCase):
@property
def device(self):

2
third_party/gloo vendored

@ -1 +1 @@
Subproject commit e348db90d8677277e926c14c94ee2acfa77173d4
Subproject commit c61070427610ccd923efe3e7f8b3eca12bbcc31a

View File

@ -570,9 +570,9 @@ class ProcessGroupGloo(Backend):
timeout: timedelta,
) -> None: ...
@staticmethod
def create_device(hostname="", interface="") -> Device: ...
def create_device(hostname="", interface="", lazy_init=None) -> Device: ...
@staticmethod
def create_default_device() -> Device: ...
def create_default_device(lazy_init=None) -> Device: ...
def _set_default_timeout(self, timeout) -> None: ...
class _ProcessGroupWrapper(Backend):

View File

@ -39,12 +39,14 @@ C10_DEFINE_SHARED_REGISTRY_WITHOUT_WARNING(
GlooDeviceRegistry,
::gloo::transport::Device,
const std::string& /* interface */,
const std::string& /* hostname */)
const std::string& /* hostname */,
bool /* lazyInit */)
#if GLOO_HAVE_TRANSPORT_TCP
static std::shared_ptr<::gloo::transport::Device> makeTCPDevice(
const std::string& interfaceName,
const std::string& hostname) {
const std::string& hostname,
bool lazyInit) {
TORCH_CHECK(
!interfaceName.empty() || !hostname.empty(),
"GlooDeviceFactory::makeTCPDevice(): interface or hostname "
@ -56,8 +58,12 @@ static std::shared_ptr<::gloo::transport::Device> makeTCPDevice(
} else {
attr.hostname = hostname;
}
if (lazyInit) {
return ::gloo::transport::tcp::CreateLazyDevice(attr);
} else {
return ::gloo::transport::tcp::CreateDevice(attr);
}
}
// Registry priority is per key identifier. We register TCP to `LINUX` for
// the flexibility of other application to override by priority. Register
@ -69,12 +75,15 @@ C10_REGISTER_CREATOR(GlooDeviceRegistry, TCP, makeTCPDevice)
#if GLOO_HAVE_TRANSPORT_TCP_TLS
static std::shared_ptr<::gloo::transport::Device> makeTCPTLSDevice(
const std::string& interface,
const std::string& hostname) {
const std::string& hostname,
bool lazyInit) {
TORCH_CHECK(
!interface.empty() || !hostname.empty(),
"GlooDeviceFactory::makeTCPTLSDevice(): interface or hostname "
"can't be empty");
TORCH_CHECK(!lazyInit, "TCP_TLS transport does not support lazy init");
::gloo::transport::tcp::attr attr;
if (!interface.empty()) {
attr.iface = interface;
@ -105,12 +114,15 @@ C10_REGISTER_CREATOR(GlooDeviceRegistry, TCP_TLS, makeTCPTLSDevice)
#if GLOO_HAVE_TRANSPORT_UV
static std::shared_ptr<::gloo::transport::Device> makeUVDevice(
const std::string& interfaceName,
const std::string& hostname) {
const std::string& hostname,
bool lazyInit) {
TORCH_CHECK(
!interfaceName.empty() || !hostname.empty(),
"GlooDeviceFactory::makeUVDevice(): interface or hostname "
"can't be empty");
TORCH_CHECK(!lazyInit, "UV transport does not support lazy init");
::gloo::transport::uv::attr attr;
if (!interfaceName.empty()) {
attr.iface = interfaceName;
@ -131,23 +143,27 @@ C10_REGISTER_CREATOR(GlooDeviceRegistry, UV, makeUVDevice)
namespace {
std::shared_ptr<::gloo::transport::Device> makeGlooDevice(
const std::string& interfaceName,
const std::string& hostName) {
const std::string& hostName,
bool lazyInit) {
static auto transportName = c10::utils::get_env("GLOO_DEVICE_TRANSPORT");
if (transportName.has_value()) {
return GlooDeviceRegistry()->Create(
transportName.value().c_str(), interfaceName, hostName);
transportName.value().c_str(), interfaceName, hostName, lazyInit);
}
#ifdef __linux__
return GlooDeviceRegistry()->Create("LINUX", interfaceName, hostName);
return GlooDeviceRegistry()->Create(
"LINUX", interfaceName, hostName, lazyInit);
#endif
#ifdef __APPLE__
return GlooDeviceRegistry()->Create("APPLE", interfaceName, hostName);
return GlooDeviceRegistry()->Create(
"APPLE", interfaceName, hostName, lazyInit);
#endif
#ifdef _WIN32
return GlooDeviceRegistry()->Create("WIN32", interfaceName, hostName);
return GlooDeviceRegistry()->Create(
"WIN32", interfaceName, hostName, lazyInit);
#endif
return nullptr;
@ -155,8 +171,8 @@ std::shared_ptr<::gloo::transport::Device> makeGlooDevice(
} // anonymous namespace
std::shared_ptr<::gloo::transport::Device> GlooDeviceFactory::
makeDeviceForInterface(const std::string& interfaceName) {
auto device = makeGlooDevice(interfaceName, "");
makeDeviceForInterface(const std::string& interfaceName, bool lazyInit) {
auto device = makeGlooDevice(interfaceName, "", lazyInit);
if (!device) {
TORCH_CHECK(false, "makeDeviceForInterface(): unsupported gloo device");
}
@ -164,8 +180,8 @@ std::shared_ptr<::gloo::transport::Device> GlooDeviceFactory::
}
std::shared_ptr<::gloo::transport::Device> GlooDeviceFactory::
makeDeviceForHostname(const std::string& hostname) {
auto device = makeGlooDevice("", hostname);
makeDeviceForHostname(const std::string& hostname, bool lazyInit) {
auto device = makeGlooDevice("", hostname, lazyInit);
if (!device) {
TORCH_CHECK(false, "makeDeviceForHostname(): unsupported gloo device");
}

View File

@ -14,18 +14,21 @@ class TORCH_API GlooDeviceFactory {
public:
// Create new device instance for specific interface.
static std::shared_ptr<::gloo::transport::Device> makeDeviceForInterface(
const std::string& interface);
const std::string& interface,
bool lazyInit);
// Create new device instance for specific hostname or address.
static std::shared_ptr<::gloo::transport::Device> makeDeviceForHostname(
const std::string& hostname);
const std::string& hostname,
bool lazyInit);
};
TORCH_DECLARE_SHARED_REGISTRY(
GlooDeviceRegistry,
::gloo::transport::Device,
const std::string&, /* interface */
const std::string& /* hostname */);
const std::string&, /* hostname */
bool /* lazyInit */);
} // namespace c10d

View File

@ -415,6 +415,10 @@ const auto kLoopbackAddress = "127.0.0.1";
} // namespace
bool getDefaultGlooLazyInit() {
return ::c10d::getCvarBool(TORCH_GLOO_LAZY_INIT, false);
}
// static
void ProcessGroupGloo::AsyncWork::execute(
const c10::intrusive_ptr<AsyncWork>& work) {
@ -687,23 +691,24 @@ bool doesHostnameResolveToUsableAddress(const std::string& hostname) {
} // namespace
std::shared_ptr<::gloo::transport::Device> ProcessGroupGloo::
createDeviceForInterface(const std::string& interface_name) {
return ::c10d::GlooDeviceFactory::makeDeviceForInterface(interface_name);
createDeviceForInterface(const std::string& interface_name, bool lazyInit) {
return ::c10d::GlooDeviceFactory::makeDeviceForInterface(
interface_name, lazyInit);
}
std::shared_ptr<::gloo::transport::Device> ProcessGroupGloo::
createDeviceForHostname(const std::string& hostname) {
createDeviceForHostname(const std::string& hostname, bool lazyInit) {
TORCH_CHECK(
doesHostnameResolveToUsableAddress(hostname),
"Cannot resolve ",
hostname,
" to a (local) address");
return ::c10d::GlooDeviceFactory::makeDeviceForHostname(hostname);
return ::c10d::GlooDeviceFactory::makeDeviceForHostname(hostname, lazyInit);
}
#if defined(__linux__) || defined(_WIN32)
std::shared_ptr<::gloo::transport::Device> ProcessGroupGloo::
createDefaultDevice() {
createDefaultDevice(bool lazyInit) {
// Use the hostname to resolve the network address to
// use. Note: if the hostname does not resolve to an address (e.g.
// because of misconfigured /etc/hosts file), this will not work.
@ -716,7 +721,8 @@ std::shared_ptr<::gloo::transport::Device> ProcessGroupGloo::
// Use this machine's hostname if it resolves to an address.
if (doesHostnameResolveToUsableAddress(hostname.data())) {
return ::c10d::GlooDeviceFactory::makeDeviceForHostname(hostname.data());
return ::c10d::GlooDeviceFactory::makeDeviceForHostname(
hostname.data(), lazyInit);
}
// Otherwise, use the loopback address.
@ -724,13 +730,13 @@ std::shared_ptr<::gloo::transport::Device> ProcessGroupGloo::
"Unable to resolve hostname to a (local) address. ",
"Using the loopback address as fallback. ",
"Manually set the network interface to bind to with GLOO_SOCKET_IFNAME.");
return createDeviceForHostname(kLoopbackAddress);
return createDeviceForHostname(kLoopbackAddress, lazyInit);
}
#endif
#ifdef __APPLE__
std::shared_ptr<::gloo::transport::Device> ProcessGroupGloo::
createDefaultDevice() {
createDefaultDevice(bool lazyInit) {
// Use the hostname to resolve the network address to
// use. Note: if the hostname does not resolve to an address (e.g.
// because of misconfigured /etc/hosts file), this will not work.
@ -743,7 +749,8 @@ std::shared_ptr<::gloo::transport::Device> ProcessGroupGloo::
// Use this machine's hostname if it resolves to an address.
if (doesHostnameResolveToUsableAddress(hostname.get())) {
return ::c10d::GlooDeviceFactory::makeDeviceForHostname(hostname.get());
return ::c10d::GlooDeviceFactory::makeDeviceForHostname(
hostname.get(), lazyInit);
}
// Otherwise, use the loopback address.
@ -751,7 +758,7 @@ std::shared_ptr<::gloo::transport::Device> ProcessGroupGloo::
"Unable to resolve hostname to a (local) address. ",
"Using the loopback address as fallback. ",
"Manually set the network interface to bind to with GLOO_SOCKET_IFNAME.");
return createDeviceForHostname(kLoopbackAddress);
return createDeviceForHostname(kLoopbackAddress, lazyInit);
}
#endif

View File

@ -28,6 +28,13 @@ namespace c10d {
constexpr const char* GLOO_BACKEND_NAME = "gloo";
// Control whether or not connections are established in a full mesh or lazily
// as needed.
static std::vector<std::string> TORCH_GLOO_LAZY_INIT = {"TORCH_GLOO_LAZY_INIT"};
// Returns default value for lazyInit.
bool TORCH_API getDefaultGlooLazyInit();
// ProcessGroupGloo implements Gloo bindings for c10d.
//
// All functions on this class are expected to be called in the same
@ -244,24 +251,20 @@ class TORCH_API ProcessGroupGloo : public Backend {
// Create new device instance for specific interface.
static std::shared_ptr<::gloo::transport::Device> createDeviceForInterface(
const std::string& interface);
const std::string& interface,
bool lazyInit = false);
// Create new device instance for specific hostname or address.
static std::shared_ptr<::gloo::transport::Device> createDeviceForHostname(
const std::string& hostname);
const std::string& hostname,
bool lazyInit = false);
// Create new device instance.
// It tries to resolve this machine's hostname and bind to that address.
// If that fails (i.e. the hostname doesn't resolve to an address), it
// falls back to binding to the loopback address.
static std::shared_ptr<::gloo::transport::Device> createDefaultDevice();
// Create ProcessGroupGloo instance.
static c10::intrusive_ptr<ProcessGroupGloo> createProcessGroupGloo(
const c10::intrusive_ptr<Store>& store,
int rank,
int size,
std::chrono::milliseconds timeout);
static std::shared_ptr<::gloo::transport::Device> createDefaultDevice(
bool lazyInit = false);
explicit ProcessGroupGloo(
const c10::intrusive_ptr<Store>& store,

View File

@ -2849,24 +2849,36 @@ options :class:`~torch.distributed.ProcessGroupNCCL.Options`).
processGroupGloo
.def_static(
"create_device",
[](const std::string& hostname, const std::string& interface)
[](const std::string& hostname,
const std::string& interface,
std::optional<bool> lazyInit_)
-> std::shared_ptr<::gloo::transport::Device> {
bool lazyInit =
lazyInit_.value_or(::c10d::getDefaultGlooLazyInit());
if (!hostname.empty()) {
return ::c10d::ProcessGroupGloo::createDeviceForHostname(
hostname);
hostname, lazyInit);
}
if (!interface.empty()) {
return ::c10d::ProcessGroupGloo::createDeviceForInterface(
interface);
interface, lazyInit);
}
throw std::invalid_argument(
"Specify either `hostname` or `interface` argument.");
},
py::arg("hostname") = "",
py::arg("interface") = "")
py::arg("interface") = "",
py::arg("lazy_init") = std::nullopt)
.def_static(
"create_default_device",
&::c10d::ProcessGroupGloo::createDefaultDevice);
[](std::optional<bool> lazyInit_) {
bool lazyInit =
lazyInit_.value_or(::c10d::getDefaultGlooLazyInit());
return ::c10d::ProcessGroupGloo::createDefaultDevice(lazyInit);
},
py::arg("lazy_init") = std::nullopt);
processGroupGloo
.def(
@ -2898,20 +2910,22 @@ options :class:`~torch.distributed.ProcessGroupNCCL.Options`).
py::gil_scoped_release nogil{};
auto options = ::c10d::ProcessGroupGloo::Options::create();
bool lazyInit = ::c10d::getDefaultGlooLazyInit();
// Use interfaces listed in "GLOO_SOCKET_IFNAME", if set.
char* ifnameEnv = getenv(GLOO_SOCKET_IFNAME_ENV.c_str());
if (ifnameEnv && strlen(ifnameEnv) > 1) {
for (const auto& iface : ::c10d::split(',', ifnameEnv)) {
options->devices.push_back(
::c10d::ProcessGroupGloo::createDeviceForInterface(iface));
::c10d::ProcessGroupGloo::createDeviceForInterface(
iface, lazyInit));
}
} else {
// If no hostname is specified, this function looks up
// the machine's hostname and returns a device instance
// associated with the address that the hostname resolves to.
options->devices.push_back(
::c10d::ProcessGroupGloo::createDefaultDevice());
::c10d::ProcessGroupGloo::createDefaultDevice(lazyInit));
}
options->timeout = timeout;

View File

@ -442,11 +442,11 @@ if TEST_WITH_ROCM:
TIMEOUT_OVERRIDE["test_join_kwargs"] = 200
def create_device(interface=None):
def create_device(interface=None, lazy_init: bool = False):
if sys.platform == "win32" or interface is None:
return c10d.ProcessGroupGloo.create_device(hostname="127.0.0.1")
return c10d.ProcessGroupGloo.create_device(hostname="127.0.0.1", lazy_init=lazy_init)
else:
return c10d.ProcessGroupGloo.create_device(interface=interface)
return c10d.ProcessGroupGloo.create_device(interface=interface, lazy_init=lazy_init)
def get_timeout(test_id) -> int: