[C10d] Add skeleton of LibUV backend. (#105672)

This commit hooks up tcpstore creation and build flags.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/105672
Approved by: https://github.com/fduwjj
This commit is contained in:
Rodrigo Kumpera 2023-07-26 10:23:14 -07:00 committed by PyTorch MergeBot
parent dffa4e14b9
commit 2636751fb9
7 changed files with 58 additions and 3 deletions

View File

@ -529,6 +529,7 @@ libtorch_distributed_base_sources = [
"torch/csrc/distributed/c10d/Store.cpp",
"torch/csrc/distributed/c10d/TCPStore.cpp",
"torch/csrc/distributed/c10d/TCPStoreBackend.cpp",
"torch/csrc/distributed/c10d/TCPStoreLibUvBackend.cpp",
"torch/csrc/distributed/c10d/Utils.cpp",
"torch/csrc/distributed/c10d/comm.cpp",
"torch/csrc/distributed/c10d/debug.cpp",

View File

@ -1379,6 +1379,8 @@ if(USE_DISTRIBUTED AND USE_TENSORPIPE)
set(TP_ENABLE_CUDA_IPC ON CACHE BOOL "" FORCE)
endif()
set(TP_BUILD_LIBUV ON CACHE BOOL "" FORCE)
add_compile_options(-DTORCH_USE_LIBUV)
include_directories(BEFORE SYSTEM ${CMAKE_CURRENT_LIST_DIR}/../third_party/tensorpipe/third_party/libuv/include)
set(TP_STATIC_OR_SHARED STATIC CACHE STRING "" FORCE)
# Tensorpipe uses cuda_add_library

View File

@ -63,7 +63,8 @@ std::mutex TCPServer::cache_mutex_{};
std::shared_ptr<TCPServer> TCPServer::start(const TCPStoreOptions& opts) {
auto startCore = [&opts]() {
auto daemon = create_tcpstore_backend(opts);
auto daemon = opts.useLibUV ? create_libuv_tcpstore_backend(opts)
: create_tcpstore_backend(opts);
return std::make_shared<TCPServer>(daemon->port(), std::move(daemon));
};
@ -236,6 +237,12 @@ TCPStore::TCPStore(std::string host, const TCPStoreOptions& opts)
: Store{opts.timeout},
addr_{std::move(host)},
numWorkers_{opts.numWorkers} {
if (opts.useLibUV) {
TORCH_CHECK(
::c10d::detail::is_libuv_tcpstore_backend_available(),
"use_libuv was requested but PyTorch was build without libuv support");
}
Socket::initialize();
if (opts.isServer) {

View File

@ -37,6 +37,9 @@ struct TCPStoreOptions {
// over the bound socket associated to this fd. This option is useful to avoid
// port assignment races in certain scenarios.
c10::optional<int> masterListenFd = c10::nullopt;
// A boolean value indicating whether to use the experimental libUV backend.
bool useLibUV = false;
};
class TORCH_API TCPStore : public Store {

View File

@ -72,6 +72,8 @@ class BackgroundThread {
};
std::unique_ptr<BackgroundThread> create_tcpstore_backend(const TCPStoreOptions& opts);
std::unique_ptr<BackgroundThread> create_libuv_tcpstore_backend(const TCPStoreOptions& opts);
bool is_libuv_tcpstore_backend_available();
} // namespace detail
} // namespace c10d

View File

@ -0,0 +1,37 @@
#include <algorithm>
#include <deque>
#include <memory>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include <fmt/format.h>
#include <torch/csrc/distributed/c10d/TCPStore.hpp>
#include <torch/csrc/distributed/c10d/TCPStoreBackend.hpp>
#ifdef TORCH_USE_LIBUV
#include <uv.h>
#endif
namespace c10d {
namespace detail {
std::unique_ptr<BackgroundThread> create_libuv_tcpstore_backend(
const TCPStoreOptions& opts) {
#ifdef TORCH_USE_LIBUV
TORCH_CHECK(false, "Libuv implementation missing");
#else
TORCH_CHECK(false, "Libuv not available");
#endif
}
bool is_libuv_tcpstore_backend_available() {
#ifdef TORCH_USE_LIBUV
return true;
#else
return false;
#endif
}
} // namespace detail
} // namespace c10d

View File

@ -1301,7 +1301,8 @@ Example::
std::chrono::milliseconds timeout,
bool waitWorkers,
bool multiTenant,
c10::optional<int> masterListenFd) {
c10::optional<int> masterListenFd,
bool useLibUV) {
c10::optional<std::size_t> numWorkers = c10::nullopt;
if (worldSize.has_value() && worldSize.value() > -1) {
numWorkers = static_cast<std::size_t>(worldSize.value());
@ -1314,7 +1315,8 @@ Example::
waitWorkers,
timeout,
multiTenant,
masterListenFd};
masterListenFd,
useLibUV};
return c10::make_intrusive<::c10d::TCPStore>(host, opts);
}),
@ -1329,6 +1331,7 @@ Example::
py::arg("wait_for_workers") = true,
py::arg("multi_tenant") = false,
py::arg("master_listen_fd") = py::none(),
py::arg("use_libuv") = false,
py::call_guard<py::gil_scoped_release>())
.def_property_readonly(
"host",