#include "python_nccl.h" #include "nccl.h" #include "torch/csrc/THP.h" #include "torch/csrc/Types.h" #include "torch/csrc/DynamicTypes.h" #include "torch/csrc/cuda/THCP.h" #include "torch/csrc/cuda/nccl.h" #include #include #include using namespace at; using namespace torch::cuda::nccl; using namespace torch::cuda::nccl::detail; static const char* COMM_CAPSULE_NAME = "torch.cuda.nccl.Communicator"; PyObject * THCPModule_nccl_version(PyObject *self, PyObject *args) { return PyInt_FromLong(version()); } PyObject * THCPModule_nccl_unique_id(PyObject *self, PyObject *args) { HANDLE_TH_ERRORS ncclUniqueId id; CHECK(ncclGetUniqueId(&id)); return PyBytes_FromStringAndSize((char*)&id, NCCL_UNIQUE_ID_BYTES); END_HANDLE_TH_ERRORS } static ncclComm_t unpack_nccl_comm(PyObject* capsule) { ncclComm_t comm = (ncclComm_t)PyCapsule_GetPointer(capsule, COMM_CAPSULE_NAME); if (!comm) throw python_error(); return comm; } static void destroy_nccl_comm(PyObject* capsule) { HANDLE_TH_ERRORS ncclComm_t comm = unpack_nccl_comm(capsule); with_no_gil([&]{ ncclCommDestroy(comm); }); END_HANDLE_TH_ERRORS_RET() } static std::vector unpack_streams(PyObject* obj, size_t size) { if (obj == Py_None) { return std::vector(size, nullptr); } auto streams = THPUtils_PySequence_to_THCStreamList(obj); if (streams.size() != size) { throw std::runtime_error("number of streams is not equal to number of inputs"); } return streams; } static std::vector extract_tensors(PyObject* obj); static std::vector unpack_comms(PyObject* obj, size_t size) { if (obj == Py_None) { return std::vector(); } std::vector comms; if (PyCapsule_CheckExact(obj)) { comms = { unpack_nccl_comm(obj) }; } else { auto seq = THPObjectPtr(PySequence_Fast(obj, "comm is not a sequence")); if (!seq) throw python_error(); auto size = PySequence_Fast_GET_SIZE(seq.get()); comms = std::vector(size); for (int64_t i = 0; i < size; i++) { comms[i] = unpack_nccl_comm(PySequence_Fast_GET_ITEM(seq.get(), i)); } } if (comms.size() != size) { throw std::runtime_error("number of communicators is not equal to number of inputs"); } return comms; } PyObject * THCPModule_nccl_init_rank(PyObject *self, PyObject *args) { HANDLE_TH_ERRORS int nranks; const char* id; Py_ssize_t id_len; int rank; if (!PyArg_ParseTuple(args, "is#i:nccl_init_rank", &nranks, &id, &id_len, &rank)) { return NULL; } THPUtils_assert(id_len == NCCL_UNIQUE_ID_BYTES, "invalid unqiue_id (expected %d bytes, got %zd)", NCCL_UNIQUE_ID_BYTES, id_len); ncclUniqueId commId; memcpy(&commId, id, NCCL_UNIQUE_ID_BYTES); ncclComm_t comm; with_no_gil([&]{ CHECK(ncclCommInitRank(&comm, nranks, commId, rank)); }); return PyCapsule_New(comm, COMM_CAPSULE_NAME, &destroy_nccl_comm); END_HANDLE_TH_ERRORS } PyObject * THCPModule_nccl_reduce(PyObject *self, PyObject *args) { HANDLE_TH_ERRORS PyObject *_inputs, *_outputs, *_streams, *_comms; int root, op; if (!PyArg_ParseTuple(args, "OOiiOO", &_inputs, &_outputs, &root, &op, &_streams, &_comms)) { THPUtils_invalidArguments(args, NULL, "nccl_reduce", 1, "(sequence[Tensor] inputs, sequence[Tensor] outputs, int root," " int op, sequence[torch.cuda.Stream or None]"); return NULL; } std::vector inputs = extract_tensors(_inputs); std::vector outputs = extract_tensors(_outputs); std::vector streams = unpack_streams(_streams, inputs.size()); auto user_comms = unpack_comms(_comms, inputs.size()); THPUtils_assert(root >= 0 && (size_t)root < inputs.size(), "invalid root"); with_no_gil([&]{ _check_inputs(inputs, outputs, 1, 1); size_t len = inputs.size(); ncclDataType_t data_type = _get_data_type(inputs[0].type()); int64_t count = inputs[0].numel(); std::lock_guard lock(*(THCCachingAllocator_getCudaFreeMutex())); auto comms = user_comms.empty() ? _get_communicators(inputs) : ArrayRef(user_comms); AutoGPU gpu_guard; AutoNcclGroup nccl_group_guard; for (size_t i = 0; i < len; i++) { int device = inputs[i].get_device(); gpu_guard.setDevice(device); auto stream = (streams[i] == NULL) ? NULL : streams[i]->stream; CHECK(ncclReduce(inputs[i].data_ptr(), outputs[i].data_ptr(), count, data_type, (ncclRedOp_t) op, root, comms[i], stream)); } }); Py_RETURN_NONE; END_HANDLE_TH_ERRORS } PyObject * THCPModule_nccl_all_reduce(PyObject *self, PyObject *args) { HANDLE_TH_ERRORS PyObject *_inputs, *_outputs, *_streams, *_comms; int op; if (!PyArg_ParseTuple(args, "OOiOO", &_inputs, &_outputs, &op, &_streams, &_comms)) { THPUtils_invalidArguments(args, NULL, "nccl_all_reduce", 1, "(sequence[Tensor] inputs, sequence[Tensor] outputs, int op," " sequence[torch.cuda.Stream] streams," " sequence[torch.cuda.nccl.Communicator] comms)"); return NULL; } std::vector inputs = extract_tensors(_inputs); std::vector outputs = extract_tensors(_outputs); auto streams = unpack_streams(_streams, inputs.size()); auto user_comms = unpack_comms(_comms, inputs.size()); with_no_gil([&]{ _check_inputs(inputs, outputs, 1, 1); size_t len = inputs.size(); ncclDataType_t data_type = _get_data_type(inputs[0].type()); int64_t count = inputs[0].numel(); std::lock_guard lock(*(THCCachingAllocator_getCudaFreeMutex())); auto comms = user_comms.empty() ? _get_communicators(inputs) : ArrayRef(user_comms); AutoGPU gpu_guard; AutoNcclGroup nccl_group_guard; for (size_t i = 0; i < len; i++) { int device = inputs[i].get_device(); gpu_guard.setDevice(device); auto stream = (streams[i] == NULL) ? NULL : streams[i]->stream; CHECK(ncclAllReduce(inputs[i].data_ptr(), outputs[i].data_ptr(), count, data_type, (ncclRedOp_t) op, comms[i], stream)); } }); Py_RETURN_NONE; END_HANDLE_TH_ERRORS } PyObject * THCPModule_nccl_broadcast(PyObject *self, PyObject *args) { HANDLE_TH_ERRORS PyObject *_inputs, *_streams, *_comms; int root; if (!PyArg_ParseTuple(args, "OiOO", &_inputs, &root, &_streams, &_comms)) { THPUtils_invalidArguments(args, NULL, "nccl_broadcast", 1, "(sequence[Tensor] inputs, int root)"); return NULL; } std::vector inputs = extract_tensors(_inputs); THPUtils_assert(root >= 0 && (size_t)root < inputs.size(), "invalid root"); auto streams = unpack_streams(_streams, inputs.size()); auto user_comms = unpack_comms(_comms, inputs.size()); with_no_gil([&]{ torch::cuda::nccl::broadcast(inputs, streams, user_comms); }); Py_RETURN_NONE; END_HANDLE_TH_ERRORS } PyObject * THCPModule_nccl_all_gather(PyObject *self, PyObject *args) { HANDLE_TH_ERRORS PyObject *_inputs, *_outputs, *_streams, *_comms; if (!PyArg_ParseTuple(args, "OOOO", &_inputs, &_outputs, &_streams, &_comms)) { THPUtils_invalidArguments(args, NULL, "nccl_all_gather", 1, "(sequence[Tensor] inputs, sequence[Tensor] outputs"); return NULL; } std::vector inputs = extract_tensors(_inputs); std::vector outputs = extract_tensors(_outputs); auto streams = unpack_streams(_streams, inputs.size()); auto user_comms = unpack_comms(_comms, inputs.size()); with_no_gil([&]{ size_t len = inputs.size(); _check_inputs(inputs, outputs, len, 1); ncclDataType_t data_type = _get_data_type(inputs[0].type()); int64_t count = inputs[0].numel(); std::lock_guard lock(*(THCCachingAllocator_getCudaFreeMutex())); auto comms = user_comms.empty() ? _get_communicators(inputs) : ArrayRef(user_comms); AutoGPU gpu_guard; AutoNcclGroup nccl_group_guard; for (size_t i = 0; i < len; i++) { int device = inputs[i].get_device(); gpu_guard.setDevice(device); auto stream = (streams[i] == NULL) ? NULL : streams[i]->stream; #if defined(NCCL_MAJOR) && (NCCL_MAJOR >= 2) CHECK(ncclAllGather(inputs[i].data_ptr(), outputs[i].data_ptr(), count, data_type, comms[i], stream)); #else CHECK(ncclAllGather(inputs[i].data_ptr(), count, data_type, outputs[i].data_ptr(), comms[i], stream)); #endif } }); Py_RETURN_NONE; END_HANDLE_TH_ERRORS } PyObject * THCPModule_nccl_reduce_scatter(PyObject *self, PyObject *args) { HANDLE_TH_ERRORS PyObject *_inputs, *_outputs, *_streams, *_comms; int op; if (!PyArg_ParseTuple(args, "OOiOO", &_inputs, &_outputs, &op, &_streams, &_comms)) { THPUtils_invalidArguments(args, NULL, "nccl_reduce_scatter", 1, "(sequence[Tensor] inputs, sequence[Tensor] outputs, int op"); return NULL; } std::vector inputs = extract_tensors(_inputs); std::vector outputs = extract_tensors(_outputs); auto streams = unpack_streams(_streams, inputs.size()); auto user_comms = unpack_comms(_comms, inputs.size()); with_no_gil([&]{ size_t len = inputs.size(); _check_inputs(inputs, outputs, 1, len); ncclDataType_t data_type = _get_data_type(inputs[0].type()); int64_t count = inputs[0].numel() / len; std::lock_guard lock(*(THCCachingAllocator_getCudaFreeMutex())); auto comms = user_comms.empty() ? _get_communicators(inputs) : ArrayRef(user_comms); AutoGPU gpu_guard; AutoNcclGroup nccl_group_guard; for (size_t i = 0; i < len; i++) { int device = inputs[i].get_device(); gpu_guard.setDevice(device); auto stream = (streams[i] == NULL) ? NULL : streams[i]->stream; CHECK(ncclReduceScatter(inputs[i].data_ptr(), outputs[i].data_ptr(), count, data_type, (ncclRedOp_t) op, comms[i], stream)); } }); Py_RETURN_NONE; END_HANDLE_TH_ERRORS } static std::vector extract_tensors(PyObject* obj) { auto seq = THPObjectPtr(PySequence_Fast(obj, "expected a sequence")); if (!seq) throw python_error(); std::vector list; Py_ssize_t length = PySequence_Fast_GET_SIZE(seq.get()); for (Py_ssize_t i = 0; i < length; i++) { PyObject* item = PySequence_Fast_GET_ITEM(seq.get(), i); if (THPVariable_Check(item)) { auto var = (THPVariable*) item; list.emplace_back(var->cdata.data()); } else { list.emplace_back(torch::createTensor(item)); } } return list; }