mirror of
https://github.com/zebrajr/pytorch.git
synced 2025-12-07 00:21:07 +01:00
Headers under torch/csrc/distributed may be referened with relative path, e.g., "<c10d/...>". However, relative path cannot be gracefully handled by Meta internal build when the NCCL PG is hipified to support AMD/RCCL because the "hipified" header files are generated in other directories. Moreover, using absolute path for header inclusion is the state-of-the-art in most components in Pytorch. Thus, this patch refactors all header paths in torch/csrc/distributed to be absolute. See D39835774 for more details about Meta internal complication. **How to test**: commit 9e5d199 removes -I./torch/csrc/distributed in compile options. Thus use it to verify we don't miss any relative path use of torch/csrc/distributed headers. Pull Request resolved: https://github.com/pytorch/pytorch/pull/85780 Approved by: https://github.com/kumpera, https://github.com/huydhn
372 lines
10 KiB
C++
372 lines
10 KiB
C++
#include <unistd.h>
|
|
|
|
#include <c10/util/irange.h>
|
|
#include <torch/csrc/distributed/c10d/ProcessGroupMPI.hpp>
|
|
|
|
#include <cstdlib>
|
|
#include <iostream>
|
|
#include <sstream>
|
|
#include <string>
|
|
#include <thread>
|
|
|
|
#define STR_HELPER(x) #x
|
|
#define STR(x) STR_HELPER(x)
|
|
|
|
// Wait for work to complete
|
|
std::vector<std::vector<at::Tensor>> waitWork(
|
|
c10::intrusive_ptr<::c10d::ProcessGroupMPI> pg,
|
|
std::vector<c10::intrusive_ptr<c10d::Work>> works) {
|
|
std::vector<std::vector<at::Tensor>> outputTensors;
|
|
for (auto& work : works) {
|
|
try {
|
|
work->wait();
|
|
} catch (const std::exception& ex) {
|
|
std::cerr << "Exception received: " << ex.what() << std::endl;
|
|
pg->abort();
|
|
}
|
|
outputTensors.emplace_back(work->result());
|
|
}
|
|
return outputTensors;
|
|
}
|
|
|
|
// Wait using Futures
|
|
std::vector<std::vector<at::Tensor>> waitFuture(
|
|
c10::intrusive_ptr<::c10d::ProcessGroupMPI> pg,
|
|
std::vector<c10::intrusive_ptr<c10d::Work>> works) {
|
|
std::vector<std::vector<at::Tensor>> outputTensors;
|
|
for (auto& work : works) {
|
|
auto fut = work->getFuture();
|
|
try {
|
|
fut->wait();
|
|
} catch (const std::exception& ex) {
|
|
std::cerr << "Exception received: " << ex.what() << std::endl;
|
|
pg->abort();
|
|
}
|
|
auto result = fut->value();
|
|
if (result.isNone()) {
|
|
outputTensors.emplace_back();
|
|
} else if (result.isTensorList()) {
|
|
outputTensors.emplace_back(result.toTensorVector());
|
|
} else {
|
|
TORCH_CHECK(false, "future result should be tensor list or none");
|
|
}
|
|
}
|
|
return outputTensors;
|
|
}
|
|
|
|
void testAllreduce(int iter = 1000) {
|
|
auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
|
|
|
|
// Generate inputs
|
|
std::vector<c10::intrusive_ptr<::c10d::Work>> works;
|
|
for (const auto i : c10::irange(iter)) {
|
|
auto tensor = at::ones({16, 16}) * i;
|
|
std::vector<at::Tensor> tensors = {tensor};
|
|
|
|
// Queue the work.
|
|
c10::intrusive_ptr<::c10d::Work> work = pg->allreduce(tensors);
|
|
works.push_back(std::move(work));
|
|
}
|
|
|
|
auto outputTensors = waitFuture(pg, works);
|
|
|
|
// Get the world size
|
|
auto worldSize = pg->getSize();
|
|
|
|
// Verify outputs
|
|
for (const auto i : c10::irange(iter)) {
|
|
const auto expected = worldSize * i;
|
|
auto data = outputTensors[i][0].data_ptr<float>();
|
|
for (auto j = 0; j < outputTensors[i][0].numel(); ++j) {
|
|
if (data[j] != expected) {
|
|
TORCH_CHECK(false, "BOOM!");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void testBroadcast(int iter = 10000) {
|
|
auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
|
|
std::vector<c10::intrusive_ptr<::c10d::Work>> works;
|
|
for (const auto i : c10::irange(iter)) {
|
|
auto tensors = std::vector<at::Tensor>();
|
|
if (pg->getRank() == 0) {
|
|
auto tensor = at::ones({16, 16}) * i;
|
|
tensors = std::vector<at::Tensor>({tensor});
|
|
} else {
|
|
auto tensor = at::zeros({16, 16});
|
|
tensors = std::vector<at::Tensor>({tensor});
|
|
}
|
|
|
|
// Queue the work.
|
|
c10::intrusive_ptr<::c10d::Work> work = pg->broadcast(tensors);
|
|
works.push_back(std::move(work));
|
|
}
|
|
|
|
auto outputTensors = waitFuture(pg, works);
|
|
|
|
// Verify outputs
|
|
for (const auto i : c10::irange(iter)) {
|
|
const auto expected = i;
|
|
auto data = outputTensors[i][0].data_ptr<float>();
|
|
for (auto j = 0; j < outputTensors[i][0].numel(); ++j) {
|
|
if (data[j] != expected) {
|
|
TORCH_CHECK(false, "BOOM!");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void testReduce(int iter = 10000) {
|
|
auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
|
|
std::vector<c10::intrusive_ptr<::c10d::Work>> works;
|
|
for (const auto i : c10::irange(iter)) {
|
|
auto tensor = at::ones({16, 16}) * i;
|
|
auto tensors = std::vector<at::Tensor>({tensor});
|
|
|
|
// Queue the work.
|
|
c10::intrusive_ptr<::c10d::Work> work = pg->reduce(tensors);
|
|
works.push_back(std::move(work));
|
|
}
|
|
|
|
auto outputTensors = waitFuture(pg, works);
|
|
|
|
// Get the world size
|
|
auto worldSize = pg->getSize();
|
|
|
|
if (pg->getRank() == 0) {
|
|
// Verify outputs
|
|
for (const auto i : c10::irange(iter)) {
|
|
const auto expected = worldSize * i;
|
|
auto data = outputTensors[i][0].data_ptr<float>();
|
|
for (auto j = 0; j < outputTensors[i][0].numel(); ++j) {
|
|
if (data[j] != expected) {
|
|
TORCH_CHECK(false, "BOOM!");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void testAllgather(int iter = 10000) {
|
|
auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
|
|
std::vector<c10::intrusive_ptr<::c10d::Work>> works;
|
|
|
|
// Get the world size
|
|
auto worldSize = pg->getSize();
|
|
auto rank = pg->getRank();
|
|
|
|
// Generate inputs
|
|
for (const auto i : c10::irange(iter)) {
|
|
auto tensor = at::ones({16, 16}) * i * rank;
|
|
auto tensors = std::vector<at::Tensor>({tensor});
|
|
auto outputs = std::vector<std::vector<at::Tensor>>(1);
|
|
outputs[0].resize(worldSize);
|
|
for (const auto j : c10::irange(worldSize)) {
|
|
outputs[0][j] = at::zeros({16, 16});
|
|
}
|
|
|
|
// Queue the work.
|
|
c10::intrusive_ptr<::c10d::Work> work = pg->allgather(outputs, tensors);
|
|
works.push_back(std::move(work));
|
|
}
|
|
|
|
auto outputTensors = waitFuture(pg, works);
|
|
|
|
// Verify outputs
|
|
for (const auto i : c10::irange(iter)) {
|
|
for (const auto j : c10::irange(worldSize)) {
|
|
const auto expected = i * j;
|
|
auto data = outputTensors[i][j].data_ptr<float>();
|
|
for (auto k = 0; k < outputTensors[i][j].numel(); ++k) {
|
|
if (data[k] != expected) {
|
|
TORCH_CHECK(false, "BOOM!");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void testGather(int iter = 10000) {
|
|
auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
|
|
std::vector<c10::intrusive_ptr<::c10d::Work>> works;
|
|
|
|
// Get the world size
|
|
auto worldSize = pg->getSize();
|
|
auto rank = pg->getRank();
|
|
|
|
// Generate inputs
|
|
for (const auto i : c10::irange(iter)) {
|
|
auto tensor = at::ones({16, 16}) * i * rank;
|
|
auto tensors = std::vector<at::Tensor>({tensor});
|
|
auto outputs = std::vector<std::vector<at::Tensor>>(0);
|
|
if (rank == 0) {
|
|
outputs = std::vector<std::vector<at::Tensor>>(1);
|
|
outputs[0].resize(worldSize);
|
|
for (const auto j : c10::irange(worldSize)) {
|
|
outputs[0][j] = at::zeros({16, 16});
|
|
}
|
|
}
|
|
|
|
// Queue the work.
|
|
c10::intrusive_ptr<::c10d::Work> work = pg->gather(outputs, tensors);
|
|
works.push_back(std::move(work));
|
|
}
|
|
|
|
auto outputTensors = waitFuture(pg, works);
|
|
|
|
// Verify outputs
|
|
if (rank == 0) {
|
|
for (const auto i : c10::irange(iter)) {
|
|
for (const auto j : c10::irange(worldSize)) {
|
|
const auto expected = i * j;
|
|
auto data = outputTensors[i][j].data_ptr<float>();
|
|
for (auto k = 0; k < outputTensors[i][j].numel(); ++k) {
|
|
if (data[k] != expected) {
|
|
TORCH_CHECK(false, "BOOM!");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
for (const auto i : c10::irange(iter)) {
|
|
if (outputTensors[i].size() != 0) {
|
|
TORCH_CHECK(false, "BOOM!");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void testScatter(int iter = 1) {
|
|
auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
|
|
std::vector<c10::intrusive_ptr<::c10d::Work>> works;
|
|
|
|
// Get the world size
|
|
auto worldSize = pg->getSize();
|
|
auto rank = pg->getRank();
|
|
|
|
// Generate inputs
|
|
for (const auto i : c10::irange(iter)) {
|
|
auto tensor = at::zeros({16, 16});
|
|
auto tensors = std::vector<at::Tensor>({tensor});
|
|
auto inputs = std::vector<std::vector<at::Tensor>>(0);
|
|
if (rank == 0) {
|
|
inputs = std::vector<std::vector<at::Tensor>>(1);
|
|
inputs[0].resize(worldSize);
|
|
for (const auto j : c10::irange(worldSize)) {
|
|
inputs[0][j] = at::ones({16, 16}) * i * j;
|
|
}
|
|
}
|
|
|
|
// Queue the work.
|
|
c10::intrusive_ptr<::c10d::Work> work = pg->scatter(tensors, inputs);
|
|
works.push_back(std::move(work));
|
|
}
|
|
|
|
auto outputTensors = waitFuture(pg, works);
|
|
|
|
// Verify outputs
|
|
for (const auto i : c10::irange(iter)) {
|
|
for (const auto j : c10::irange(worldSize)) {
|
|
const auto expected = i * j;
|
|
auto data = outputTensors[i][0].data_ptr<float>();
|
|
for (auto k = 0; k < outputTensors[i][0].numel(); ++k) {
|
|
if (data[k] != expected) {
|
|
TORCH_CHECK(false, "BOOM!");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void testSendRecv(bool recvAnysource, int iter = 10000) {
|
|
auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
|
|
// Generate inputs
|
|
std::vector<c10::intrusive_ptr<::c10d::Work>> works;
|
|
|
|
// pg->send does not keep sent tensors alive, so we need to.
|
|
std::vector<std::vector<at::Tensor>> sendTensors(iter);
|
|
auto rank = pg->getRank();
|
|
for (const auto i : c10::irange(iter)) {
|
|
if (rank == 0) {
|
|
auto tensor = at::ones({16, 16}) * i;
|
|
sendTensors[i] = std::vector<at::Tensor>({tensor});
|
|
|
|
// Queue the work.
|
|
c10::intrusive_ptr<::c10d::Work> work = pg->send(sendTensors[i], 1, 0);
|
|
works.push_back(std::move(work));
|
|
} else {
|
|
auto tensor = at::zeros({16, 16});
|
|
auto recvTensors = std::vector<at::Tensor>({tensor});
|
|
|
|
// Queue the work.
|
|
if (!recvAnysource) {
|
|
c10::intrusive_ptr<::c10d::Work> work = pg->recv(recvTensors, 0, 0);
|
|
works.push_back(std::move(work));
|
|
} else {
|
|
c10::intrusive_ptr<::c10d::Work> work =
|
|
pg->recvAnysource(recvTensors, 0);
|
|
works.push_back(std::move(work));
|
|
}
|
|
}
|
|
}
|
|
|
|
auto outputTensors = waitWork(pg, works);
|
|
if (rank == 0) {
|
|
return;
|
|
}
|
|
|
|
std::vector<int> srcRanks;
|
|
if (recvAnysource) {
|
|
for (const auto& work : works) {
|
|
srcRanks.push_back(work->sourceRank());
|
|
}
|
|
}
|
|
|
|
// Verify outputs
|
|
for (const auto i : c10::irange(iter)) {
|
|
if (recvAnysource && srcRanks[i] != 0) {
|
|
TORCH_CHECK(false, "src rank is wrong for recvAnysource");
|
|
}
|
|
const auto expected = i;
|
|
auto data = outputTensors[i][0].data_ptr<float>();
|
|
for (auto j = 0; j < outputTensors[i][0].numel(); ++j) {
|
|
if (data[j] != expected) {
|
|
TORCH_CHECK(false, "BOOM!");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void testBackendName() {
|
|
auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
|
|
if (pg->getBackendName() != std::string(c10d::MPI_BACKEND_NAME)) {
|
|
TORCH_CHECK(false, "BOOM!");
|
|
}
|
|
}
|
|
|
|
int main(int argc, char** argv) {
|
|
#ifdef MPIEXEC
|
|
// If we are within an openmpi mpirun, then skip the exec
|
|
if (!std::getenv("OMPI_COMM_WORLD_SIZE")) {
|
|
std::cout << "Execute mpiexec from: " << STR(MPIEXEC) << std::endl;
|
|
execl(STR(MPIEXEC), "-np 2", argv[0], (char*)nullptr);
|
|
}
|
|
|
|
testAllreduce();
|
|
testBroadcast();
|
|
testReduce();
|
|
testAllgather();
|
|
testGather();
|
|
testScatter();
|
|
testSendRecv(false);
|
|
testSendRecv(true);
|
|
testBackendName();
|
|
|
|
std::cout << "Test successful" << std::endl;
|
|
#else
|
|
std::cout << "MPI executable not found, skipping test" << std::endl;
|
|
#endif
|
|
return EXIT_SUCCESS;
|
|
}
|