pytorch/test/cpp/c10d/ProcessGroupNCCLTest.cpp
Rohan Varma 885da61d7d [PG NCCL] Disable NCCL health check (#67668)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/67668

This adds an env var to enable NCCL health check, which when left unspecified, results in the check not being run. Unit tests that need to test this functionality have the env variable set. Please see internal diff for more details.

Test Plan: CI

Reviewed By: yuguo68, mrshenli

Differential Revision: D32089763

fbshipit-source-id: dff5664a5e607f711515cd1042089ca769914fbb
2021-11-02 16:21:59 -07:00

719 lines
21 KiB
C++

#include <chrono>
#include <iostream>
#include <c10d/FileStore.hpp>
#include <c10d/ProcessGroupNCCL.hpp>
#include "CUDATest.hpp"
#include "TestUtils.hpp"
#include "c10d/Types.hpp"
#include "c10d/ProcessGroup.hpp"
#include <c10/cuda/CUDAGuard.h>
#include <c10/cuda/CUDAStream.h>
#include <c10/util/irange.h>
#include <torch/csrc/autograd/profiler.h>
#include <gtest/gtest.h>
using namespace c10d::test;
using at::cuda::CUDAStream;
using c10d::ProcessGroup;
class NCCLTestBase {
public:
NCCLTestBase(
const std::string& path,
const std::chrono::milliseconds pgTimeout = kProcessGroupDefaultTimeout
) : path_(path), pgTimeout_(pgTimeout) {}
NCCLTestBase(NCCLTestBase&& other) {
path_ = std::move(other.path_);
pg_ = std::move(other.pg_);
}
::c10d::ProcessGroupNCCL& getProcessGroup() {
return *pg_;
}
void initialize(int rank, int size) {
auto store = c10::make_intrusive<::c10d::FileStore>(path_, size);
c10::intrusive_ptr<c10d::ProcessGroupNCCL::Options> opts = c10::make_intrusive<c10d::ProcessGroupNCCL::Options>();
opts->timeout = pgTimeout_;
setenv("ENABLE_NCCL_HEALTH_CHECK", "1", /* overwrite */ 1);
pg_ = std::unique_ptr<::c10d::ProcessGroupNCCL>(
new ::c10d::ProcessGroupNCCL(store, rank, size, std::move(opts)));
}
protected:
std::string path_;
std::unique_ptr<::c10d::ProcessGroupNCCL> pg_;
std::chrono::milliseconds pgTimeout_;
};
class NCCLTest : public NCCLTestBase {
public:
NCCLTest(const std::string& path, int worldSize, std::chrono::milliseconds pgTimeout = kProcessGroupDefaultTimeout)
: NCCLTestBase(path, pgTimeout),
numDevices_(cudaNumDevices()),
state_(::at::globalContext().lazyInitCUDA()),
worldSize_(worldSize) {
// Each device has a single tensor to perf the NCCL op
tensors_.resize(numDevices_);
inputs_.resize(numDevices_);
outputs_.resize(numDevices_);
at::cuda::OptionalCUDAGuard deviceGuard;
for (const auto i : c10::irange(numDevices_)) {
deviceGuard.set_index(i);
tensors_[i] = at::empty({3, 3}, at::kCUDA);
inputs_[i].resize(worldSize_ * numDevices_);
outputs_[i].resize(worldSize_ * numDevices_);
for (auto j = 0; j < worldSize_ * numDevices_; ++j) {
inputs_[i][j] = at::empty({3, 3}, at::kCUDA);
outputs_[i][j] = at::empty({3, 3}, at::kCUDA);
}
}
// Allocate a stream per device.
//
// The "current stream" is set globally per device in THC, so we
// can't make two tensors on the same device use different streams
// and pass this along to the collective (since it uses the THC
// getters to retrieve the current stream).
//
streams_.reserve(numDevices_);
for (const auto i : c10::irange(numDevices_)) {
deviceGuard.set_index(i);
streams_.push_back(at::cuda::getStreamFromPool());
}
}
void wait(
c10::intrusive_ptr<ProcessGroup::Work>& work,
std::chrono::milliseconds timeout = kNoTimeout) {
c10::cuda::CUDAMultiStreamGuard guard(streams_);
work->wait(timeout);
}
std::vector<at::Tensor> getTensors() {
std::vector<at::Tensor> outputs(numDevices_);
// For the duration of this function, make THC use our streams
c10::cuda::CUDAMultiStreamGuard guard(streams_);
// Copy inputs to outputs
for (const auto i : c10::irange(numDevices_)) {
cudaStreamSynchronize(streams_[i].stream());
outputs[i] = tensors_[i].cpu();
}
return outputs;
}
std::vector<std::vector<at::Tensor>> getInputTensors() {
return getTensorLists(inputs_);
}
std::vector<std::vector<at::Tensor>> getOutputTensors() {
return getTensorLists(outputs_);
}
int numDevices() const {
return numDevices_;
}
private:
std::vector<std::vector<at::Tensor>> getTensorLists(
std::vector<std::vector<at::Tensor>>& tensor_lists) {
std::vector<std::vector<at::Tensor>> outputs(numDevices_);
for (auto & output : outputs) {
output = std::vector<at::Tensor>(worldSize_ * numDevices_);
}
// For the duration of this function, make THC use our streams
c10::cuda::CUDAMultiStreamGuard guard(streams_);
// Copy inputs to outputs
for (const auto i : c10::irange(numDevices_)) {
cudaStreamSynchronize(streams_[i].stream());
for (auto j = 0; j < worldSize_ * numDevices_; ++j) {
outputs[i][j] = tensor_lists[i][j].cpu();
}
}
return outputs;
}
protected:
// Launches sleep on every CUDA device
void launchDeviceSleep() {
at::cuda::OptionalCUDAGuard deviceGuard;
for (const auto i : c10::irange(numDevices_)) {
deviceGuard.set_index(i);
cudaSleep(streams_[i], 2000 * 1000 * 1000);
}
}
// Launches value initialization for every tensor
void valueInitialization() {
at::cuda::OptionalCUDAGuard deviceGuard;
for (const auto i : c10::irange(numDevices_)) {
deviceGuard.set_index(i);
tensors_[i].fill_(pg_->getRank() * numDevices_ + i);
}
}
const int numDevices_;
THCState* state_;
int worldSize_;
std::vector<at::Tensor> tensors_;
std::vector<std::vector<at::Tensor>> inputs_;
std::vector<std::vector<at::Tensor>> outputs_;
std::vector<CUDAStream> streams_;
};
class AllreduceNCCLTest : public NCCLTest {
public:
AllreduceNCCLTest(const std::string& path, int worldSize)
: NCCLTest(path, worldSize) {}
c10::intrusive_ptr<c10d::ProcessGroup::Work> run() {
// For the duration of this function, make THC use our streams
c10::cuda::CUDAMultiStreamGuard guard(streams_);
launchDeviceSleep();
valueInitialization();
using namespace torch::autograd::profiler;
// Make sure enabling profile does not make any issue. Note, in single
// process multi-device mode we do not expect any events be populated for
// collective operations, since profiling for that mode is not supported.
enableProfilerLegacy(ProfilerConfig(ProfilerState::CPU));
auto results = pg_->allreduce(tensors_);
disableProfilerLegacy();
return results;
}
};
class BroadcastNCCLTest : public NCCLTest {
public:
BroadcastNCCLTest(const std::string& path, int worldSize)
: NCCLTest(path, worldSize) {}
c10::intrusive_ptr<c10d::ProcessGroup::Work> run(int rootRank, int rootTensor) {
// For the duration of this function, make THC use our streams
c10::cuda::CUDAMultiStreamGuard guard(streams_);
launchDeviceSleep();
valueInitialization();
::c10d::BroadcastOptions options;
options.rootRank = rootRank;
options.rootTensor = rootTensor;
return pg_->broadcast(tensors_, options);
}
};
class ReduceNCCLTest : public NCCLTest {
public:
ReduceNCCLTest(const std::string& path, int worldSize)
: NCCLTest(path, worldSize) {}
c10::intrusive_ptr<c10d::ProcessGroup::Work> run(int rootRank, int rootTensor) {
// For the duration of this function, make THC use our streams
c10::cuda::CUDAMultiStreamGuard guard(streams_);
launchDeviceSleep();
valueInitialization();
::c10d::ReduceOptions options;
options.rootRank = rootRank;
options.rootTensor = rootTensor;
return pg_->reduce(tensors_, options);
}
};
class AllgatherNCCLTest : public NCCLTest {
public:
AllgatherNCCLTest(const std::string& path, int worldSize)
: NCCLTest(path, worldSize) {}
c10::intrusive_ptr<c10d::ProcessGroup::Work> run() {
// For the duration of this function, make THC use our streams
c10::cuda::CUDAMultiStreamGuard guard(streams_);
launchDeviceSleep();
valueInitialization();
return pg_->allgather(outputs_, tensors_);
}
};
class AllgatherBaseNCCLTest : public NCCLTest {
public:
AllgatherBaseNCCLTest(const std::string& path, int worldSize)
: NCCLTest(path, worldSize) {
output_tensor_ = at::empty({worldSize_, 3, 3}, at::kCUDA);
}
c10::intrusive_ptr<c10d::ProcessGroup::Work> run() {
// For the duration of this function, make THC use our streams
c10::cuda::CUDAMultiStreamGuard guard(streams_);
launchDeviceSleep();
valueInitialization();
// contains at least one element otherwise wouldn't run.
// this is a flattened allgather, hence one rank contributes
// only 1 tensor, regardless of number of devices
return pg_->_allgather_base(output_tensor_, tensors_[0]);
}
at::Tensor getOutputTensor() {
c10::cuda::CUDAMultiStreamGuard guard(streams_);
return output_tensor_.cpu();
}
at::Tensor getInputTensor() {
c10::cuda::CUDAMultiStreamGuard guard(streams_);
return tensors_[0].cpu();
}
private:
at::Tensor output_tensor_;
};
struct ReduceScatterNCCLTest : NCCLTest {
ReduceScatterNCCLTest(const std::string& path, int worldSize)
: NCCLTest(path, worldSize) {}
c10::intrusive_ptr<c10d::ProcessGroup::Work> run() {
// For the duration of this function, make THC use our streams
c10::cuda::CUDAMultiStreamGuard guard(streams_);
at::cuda::OptionalCUDAGuard deviceGuard;
launchDeviceSleep();
// Launch value initialization for every tensor
for (const auto i : c10::irange(numDevices_)) {
deviceGuard.set_index(i);
for (auto j = 0; j < worldSize_ * numDevices_; ++j) {
inputs_[i][j].fill_(
pg_->getRank() * numDevices_ * worldSize_ + i * worldSize_ + j);
}
}
return pg_->reduce_scatter(tensors_, inputs_);
}
};
class ReduceScatterBaseNCCLTest : public NCCLTest {
public:
ReduceScatterBaseNCCLTest(const std::string& path, int worldSize)
: NCCLTest(path, worldSize) {
output_tensor_ = at::empty({1}, at::kCUDA);
input_tensor_ = at::empty({worldSize}, at::kCUDA);
for (const auto i : c10::irange(worldSize)) {
input_tensor_[i] = i;
}
}
c10::intrusive_ptr<c10d::ProcessGroup::Work> run() {
// For the duration of this function, make THC use our streams
at::cuda::CUDAMultiStreamGuard guard(streams_);
launchDeviceSleep();
return pg_->_reduce_scatter_base(output_tensor_, input_tensor_);
}
at::Tensor getOutputTensor() {
at::cuda::CUDAMultiStreamGuard guard(streams_);
return output_tensor_.cpu();
}
at::Tensor getInputTensor() {
at::cuda::CUDAMultiStreamGuard guard(streams_);
return input_tensor_.cpu();
}
private:
at::Tensor output_tensor_;
at::Tensor input_tensor_;
};
void testAllreduce(const std::string& path, int rank, int size) {
auto test = AllreduceNCCLTest(path, size);
test.initialize(rank, size);
auto work = test.run();
// Wait for work to finish
test.wait(work);
// Validation
const int totalNumGPUs = test.numDevices() * size;
const auto expected = (totalNumGPUs * (totalNumGPUs - 1)) / 2;
const auto tensors = test.getTensors();
for (const auto & tensor : tensors) {
const auto *const data = tensor.data_ptr<float>();
for (const auto k : c10::irange(tensor.numel())) {
EXPECT_EQ(data[k], expected)
<< "Allreduce ouputs do not match expected outputs";
}
}
}
void testBroadcast(const std::string& path, int rank, int size) {
auto test = BroadcastNCCLTest(path, size);
test.initialize(rank, size);
const int numDevices = test.numDevices();
// try every permutation of root rank and root tensor
for (const auto rootRank : c10::irange(size)) {
for (const auto rootTensor : c10::irange(numDevices)) {
auto work = test.run(rootRank, rootTensor);
// wait for work to complete
test.wait(work);
// Check results
const auto expected = (rootRank * numDevices + rootTensor);
const auto tensors = test.getTensors();
for (const auto & tensor : tensors) {
const auto *const data = tensor.data_ptr<float>();
for (const auto k : c10::irange(tensor.numel())) {
EXPECT_EQ(data[k], expected)
<< "Broadcast outputs do not match expected outputs";
}
}
}
}
}
void testReduce(const std::string& path, int rank, int size) {
auto test = ReduceNCCLTest(path, size);
test.initialize(rank, size);
const int numDevices = test.numDevices();
// try every permutation of root rank and root tensor
for (const auto rootRank : c10::irange(size)) {
for (const auto rootTensor : c10::irange(numDevices)) {
auto work = test.run(rootRank, rootTensor);
// wait for work to complete
test.wait(work);
// Validation
const int totalNumGPUs = numDevices * size;
const auto expected = (totalNumGPUs * (totalNumGPUs - 1)) / 2;
auto tensors = test.getTensors();
if (rank == rootRank) {
auto& tensor = tensors[rootTensor];
auto data = tensor.data_ptr<float>();
for (const auto k : c10::irange(tensor.numel())) {
EXPECT_EQ(data[k], expected)
<< "Reduce outputs do not match expected outputs";
}
}
}
}
}
void testAllgather(const std::string& path, int rank, int size) {
auto test = AllgatherNCCLTest(path, size);
test.initialize(rank, size);
auto work = test.run();
// Wait for work to finish
test.wait(work);
// Validation
auto tensors = test.getOutputTensors();
// device index
for (auto & device : tensors) {
// rank index
for (const auto j : c10::irange(device.size())) {
const auto expected = j;
auto& tensor = device[j];
auto data = tensor.data_ptr<float>();
for (const auto k : c10::irange(tensor.numel())) {
EXPECT_EQ(data[k], expected)
<< "Allgather outputs do not match expected outputs";
}
}
}
}
void testAllgatherBase(const std::string& path, int rank, int size) {
auto test = AllgatherBaseNCCLTest(path, size);
test.initialize(rank, size);
auto work = test.run();
// Wait for work to finish
test.wait(work);
// Validation
auto output_tensor = test.getOutputTensor();
auto input_tensor = test.getInputTensor();
auto data = output_tensor.data_ptr<float>();
// Rank index
for (const auto i : c10::irange(output_tensor.numel())) {
// expected is i // input.numel() <- rank, and each rank contributed rank * num_gpu
const auto expected = (i / input_tensor.numel()) * test.numDevices();
EXPECT_EQ(data[i], expected)
<< "Allgather_base outputs do not match expected outputs";
}
}
void testReduceScatterBase(const std::string& path, int rank, int size) {
auto test = ReduceScatterBaseNCCLTest(path, size);
test.initialize(rank, size);
auto work = test.run();
// Wait for work to finish
test.wait(work);
// Validation
auto output_tensor = test.getOutputTensor();
auto input_tensor = test.getInputTensor();
auto data = output_tensor.data_ptr<float>();
// Rank index
for (const auto i : c10::irange(output_tensor.numel())) {
// expected is i * input.numel() <- rank, and each rank contributed rank * num_gpu
const auto expected = size * rank * test.numDevices();
EXPECT_EQ(data[i], expected)
<< "Reducescatter_base outputs do not match expected outputs";
}
}
void testReduceScatter(const std::string& path, int rank, int size) {
auto test = ReduceScatterNCCLTest(path, size);
test.initialize(rank, size);
auto work = test.run();
// Wait for work to finish
test.wait(work);
const auto participants = test.numDevices() * size;
const auto base = (participants * (participants - 1)) / 2;
// Validation
auto tensors = test.getTensors();
// device index
for (const auto i : c10::irange(tensors.size())) {
const auto modifier = participants * (rank * participants + i);
const auto expected = base + modifier;
auto& tensor = tensors[i];
auto data = tensor.data_ptr<float>();
for (const auto j : c10::irange(tensor.numel())) {
EXPECT_EQ(data[j], expected) << "ReduceScatter outputs do not match expected outputs!";
}
}
}
void testProcessGroupNCCLHealthCheckFailHelper(const std::string& path, bool timeout) {
// simulate world_size > 1 here via threads.
const int worldSize = 4;
std::mutex m;
std::unordered_set<uint64_t> nums;
auto runTest = [&](int i) {
NCCLTest test(path, worldSize, std::chrono::milliseconds(3000));
// Catch error relating to health check failure
bool error_caught = false;
try {
test.initialize(timeout ? 0 : -1, worldSize);
} catch (const std::exception &e) {
std::string errMsg = e.what();
const std::string kTimeoutErr = "Failed to initialize NCCL communicator on rank";
const std::string kInvalidRankErr = "Invalid rank";
std::string expectedSubstr = timeout ? kTimeoutErr : kInvalidRankErr;
bool cond = errMsg.find(expectedSubstr) != std::string::npos;
EXPECT_TRUE(cond);
error_caught = true;
}
EXPECT_TRUE(error_caught);
};
std::vector<std::thread> threads;
threads.reserve(worldSize);
for (const auto r : c10::irange(worldSize)) {
threads.emplace_back(std::thread([=]() { runTest(r); }));
}
for (auto& t : threads) {
t.join();
}
}
void testProcessGroupNCCLHealthCheckFailException(const std::string& path, int /* unused */, int /* unused */) {
testProcessGroupNCCLHealthCheckFailHelper(path, /* timeout */ false);
}
void testProcessGroupNCCLHealthCheckFailTimeout(const std::string& path, int /* unused */, int /* unused */) {
testProcessGroupNCCLHealthCheckFailHelper(path, /* timeout */ true);
}
void testSequenceNumInit(const std::string& path, int /* unused */, int /* unused */) {
// Note: ProcessGroupNCCLTest doesn't support multiprocess testing. So we
// simulate world_size > 1 here via threads.
const int worldSize = 2;
std::mutex m;
std::unordered_set<uint64_t> nums;
auto runTest = [&](int i) {
NCCLTest test(path, worldSize);
test.initialize(i, worldSize);
test.getProcessGroup().setSequenceNumberForGroup();
std::lock_guard<std::mutex> lock(m);
auto seqNum = test.getProcessGroup().getSequenceNumberForGroup();
nums.insert(seqNum);
};
std::vector<std::thread> threads;
threads.reserve(worldSize);
for (const auto r : c10::irange(worldSize)) {
threads.emplace_back(std::thread([=]() { runTest(r); }));
}
for (auto& t : threads) {
t.join();
}
EXPECT_EQ(nums.size(), 1);
}
class ProcessGroupNCCLTest: public ::testing::Test {
protected:
void SetUp() override {
// Use WORLD_SIZE and RANK environmental variables to do multi-node
// distributed testing
auto sizeEnv = std::getenv("WORLD_SIZE");
auto rankEnv = std::getenv("RANK");
if (sizeEnv && rankEnv) {
size_ = std::stoi(std::string(sizeEnv));
rank_ = std::stoi(std::string(rankEnv));
}
LOG(INFO) << "Multi-node world size: " << size_ << " rank: " << rank_;
}
void TearDown() override {
// Reset NCCL_BLOCKING_WAIT environment variable after each run.
ASSERT_TRUE(setenv(c10d::NCCL_BLOCKING_WAIT, "0", 1) == 0);
}
bool skipTest() {
// Skip tests if CUDA is not available.
if (!at::cuda::is_available()) {
LOG(INFO) << "CUDA not available, skipping test";
return true;
}
return false;
}
int size_{1};
int rank_{0};
};
TEST_F(ProcessGroupNCCLTest, testAllreduce) {
if (skipTest()) {
return;
}
{
TemporaryFile file;
testAllreduce(file.path, rank_, size_);
}
}
TEST_F(ProcessGroupNCCLTest, testBroadcast) {
if (skipTest()) {
return;
}
{
TemporaryFile file;
testBroadcast(file.path, rank_, size_);
}
}
TEST_F(ProcessGroupNCCLTest, testReduce) {
if (skipTest()) {
return;
}
{
TemporaryFile file;
testReduce(file.path, rank_, size_);
}
}
TEST_F(ProcessGroupNCCLTest, testAllgather) {
if (skipTest()) {
return;
}
{
TemporaryFile file;
testAllgather(file.path, rank_, size_);
}
}
TEST_F(ProcessGroupNCCLTest, testAllgatherBase) {
if (skipTest()) {
return;
}
{
TemporaryFile file;
testAllgatherBase(file.path, rank_, size_);
}
}
TEST_F(ProcessGroupNCCLTest, testReduceScatter) {
if (skipTest()) {
return;
}
{
TemporaryFile file;
testReduceScatter(file.path, rank_, size_);
}
}
TEST_F(ProcessGroupNCCLTest, testSequenceNumInit) {
if (skipTest()) {
return;
}
{
TemporaryFile file;
testSequenceNumInit(file.path, rank_, size_);
}
}
TEST_F(ProcessGroupNCCLTest, testProcessGroupNCCLHealthCheckFailTimeout) {
if (skipTest()) {
return;
}
{
TemporaryFile file;
testProcessGroupNCCLHealthCheckFailTimeout(file.path, rank_, size_);
}
}
TEST_F(ProcessGroupNCCLTest, testProcessGroupNCCLHealthCheckFailException) {
if (skipTest()) {
return;
}
{
TemporaryFile file;
testProcessGroupNCCLHealthCheckFailException(file.path, rank_, size_);
}
}
TEST_F(ProcessGroupNCCLTest, testReduceScatterBase) {
if (skipTest()) {
return;
}
{
TemporaryFile file;
testReduceScatterBase(file.path, rank_, size_);
}
}
TEST_F(ProcessGroupNCCLTest, testBackendName) {
if (skipTest()) {
return;
}
{
TemporaryFile file;
auto test = NCCLTestBase(file.path);
test.initialize(rank_, size_);
EXPECT_EQ(
test.getProcessGroup().getBackendName(), std::string(c10d::NCCL_BACKEND_NAME));
}
}