Refactor ProcessGroupNCCL collective primitives (#18820)

Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/18820
ghimport-source-id: 220b2a3dd9d4d6d2e557e1802851f082c2dc6452

Stack from [ghstack](https://github.com/ezyang/ghstack):
* **#18820 Refactor ProcessGroupNCCL collective primitives**

Planning to add reduce-scatter, but no room in my stomach for more
copypasta.

Also rewrote the tensor list validation logic.  The existing validation
was ill-suited for all the cases it was being used for; it took a vector
of input tensors and a vector of output tensors, but only ever received
either two references to the same vector, or a bespoke singleton vector
and a vector of outputs (for which it would ignore all but the first
output).  In the first case, it performed unnecessary checks, and in the
second, it skipped necessary ones.

Reviewed By: mrshenli

Differential Revision: D14762369

fbshipit-source-id: dcf882ce1c5854333a9eb4424bfc18d9f4648ddf
This commit is contained in:
Max Wang 2019-04-26 12:32:29 -07:00 committed by Facebook Github Bot
parent e1ebf330d5
commit 61d48aa989
2 changed files with 220 additions and 263 deletions

View File

@ -287,90 +287,103 @@ std::vector<std::shared_ptr<NCCLComm>>& ProcessGroupNCCL::getNCCLComm(
return devNCCLCommMap_[devicesKey];
}
// Helper function that checks the input and output tensors for validity
void ProcessGroupNCCL::tensorCheckHelper(
const std::vector<at::Tensor>& input,
const std::vector<at::Tensor>& output,
int outputOverInput) {
if (input.size() * outputOverInput != output.size()) {
namespace {
// Check that all `tensors' have the same type and shape and are distributed
// across distinct GPUs.
void check_gpu_tensors(const std::vector<at::Tensor>& tensors) {
if (tensors.size() == 0) {
throw std::runtime_error("Tensor list must be nonempty");
}
if (tensors.size() > static_cast<size_t>(at::cuda::getNumGPUs())) {
throw std::runtime_error(
"Input tensor sequence should have the same "
"number of tensors as the output tensor sequence");
"Tensor list mustn't be larger than the number of available GPUs"
);
}
if (input.size() == 0) {
throw std::runtime_error("The number of input tensors should not be zero");
}
const auto& first = tensors.front();
if (input.size() > static_cast<size_t>(at::cuda::getNumGPUs())) {
throw std::runtime_error(
"The number of input tensors is larger than "
"the number of available GPUs");
}
// Set for ensuring that tensors are on separate devices.
std::unordered_set<decltype(first.get_device())> usedDevices;
usedDevices.reserve(tensors.size());
// To make sure each tensor is on separate devices
std::unordered_set<int> usedDevices;
usedDevices.reserve(input.size());
auto inputNumElement = input[0].numel();
auto elementType = input[0].scalar_type();
for (size_t i = 0; i < input.size(); ++i) {
// Check to make sure it's a GPU dense tensor
if (!(input[i].is_cuda() && !input[i].is_sparse() && output[i].is_cuda() &&
!output[i].is_sparse())) {
throw std::runtime_error(
"Only CUDA dense tensor is supported for NCCL "
"collective operations");
for (const auto& t : tensors) {
if (!t.is_cuda() || t.is_sparse()) {
throw std::runtime_error("Tensors must be CUDA and dense");
}
// Check the tensor type is identical
if (input[i].scalar_type() != elementType ||
output[i].scalar_type() != elementType) {
throw std::runtime_error(
"Expecting all GPU tensors to have identical "
"type");
if (t.scalar_type() != first.scalar_type()) {
throw std::runtime_error("Tensors must have identical type");
}
// Check the input tensor size is identical
if (input[i].numel() != inputNumElement) {
throw std::runtime_error(
"Expecting all input tensors to have identical "
"number of elements");
if (t.sizes() != first.sizes()) {
throw std::runtime_error("Tensors must have identical size");
}
// Check the output tensor size equals to input tensor size
if (output[i].numel() != inputNumElement) {
throw std::runtime_error(
"The number of elements of output tensor does "
"not match the number of elements of the input "
"tensor");
if (!t.is_contiguous()) {
throw std::runtime_error("Tensors must be contiguous");
}
// Contiguous verification
if (!input[i].is_contiguous() || !output[i].is_contiguous()) {
throw std::runtime_error("Expecting all GPU tensors to be contiguous");
}
bool inserted;
std::tie(std::ignore, inserted) = usedDevices.insert(input[i].get_device());
// Device verification, if the insertion didn't take place
const auto inserted = usedDevices.insert(t.get_device()).second;
if (!inserted) {
throw std::runtime_error("Expecting inputs on different GPU devices");
}
// Now check the output device
if (input[i].get_device() != output[i].get_device()) {
throw std::runtime_error(
"Expecting input and output tensors to be on "
"the same device");
throw std::runtime_error("Tensors must be on distinct GPU devices");
}
}
}
std::shared_ptr<ProcessGroup::Work> ProcessGroupNCCL::allreduce(
std::vector<at::Tensor>& tensors,
const AllreduceOptions& opts) {
tensorCheckHelper(tensors, tensors);
// Flatten each list in `tensor_lists' for a gather or scatter operation, and
// ensure compatibility with the corresponding tensor in `other'.
std::vector<at::Tensor> flatten_for_scatter_gather(
std::vector<std::vector<at::Tensor>>& tensor_lists,
std::vector<at::Tensor>& other,
size_t world_size) {
if (tensor_lists.size() != other.size()) {
throw std::runtime_error(
"Tensor list operands to scatter/gather must have the same length"
);
}
const auto num_devices = tensor_lists.size();
auto devices = getDeviceList(tensors);
auto key = getKeyFromDevices(devices);
std::vector<at::Tensor> flattened;
flattened.resize(num_devices);
for (auto i = size_t{}; i < num_devices; ++i) {
if (tensor_lists[i].size() != world_size * num_devices) {
throw std::runtime_error(
"Tensor list input to scatter/gather must match number of collective"
" participants"
);
}
// Only check device match for the first tensor in the list; the call to
// newLikeFlat() below will check the rest.
if (tensor_lists[i].front().get_device() != other[i].get_device()) {
throw std::runtime_error(
"Corresponding input/output tensors to scatter/gather must all reside"
" on the same device"
);
}
for (const auto& t : tensor_lists[i]) {
if (t.numel() != other[i].numel()) {
throw std::runtime_error(
"All tensor operands to scatter/gather must have the same size"
);
}
}
// Flatten the tensors (from all ranks) into a single big tensor.
flattened[i] = newLikeFlat(tensor_lists, i);
}
return flattened;
}
}
template<typename Fn, typename PreProcess, typename PostProcess>
std::shared_ptr<ProcessGroup::Work> ProcessGroupNCCL::collective(
std::vector<at::Tensor>& inputs,
std::vector<at::Tensor>& outputs,
Fn fn,
PreProcess pre,
PostProcess post) {
const auto devices = getDeviceList(inputs);
const auto key = getKeyFromDevices(devices);
auto& ncclComms = getNCCLComm(key, devices);
// First let NCCL streams wait for input tensors allocation streams
@ -384,32 +397,39 @@ std::shared_ptr<ProcessGroup::Work> ProcessGroupNCCL::allreduce(
std::unique_lock<std::mutex> cudaFreeMutexLock(
*(c10::cuda::CUDACachingAllocator::getFreeMutex()));
pre(ncclStreams_[key]);
C10D_NCCL_CHECK(ncclGroupStart());
for (size_t i = 0; i < tensors.size(); ++i) {
for (size_t i = 0; i < inputs.size(); ++i) {
gpuGuard.set_index(devices[i].index());
at::cuda::CUDAStream& ncclStream = ncclStreams_[key][i];
// Input `tensors` are created on a worker stream and used in different
// ncclStream. Hence, `tensors` must record the ncclStream to prevent being
// freed before ncclAllReduce finishes. See [Sync Streams].
// Both `inputs' and `outputs' are created on a worker stream and used in
// different ncclStreams. Hence, both must record the ncclStream to
// prevent being freed before the collective finishes.
//
// We only record `inputs' here, and leave recording `outputs' to `fn' for
// operations where `inputs' and `outputs' are not the same.
//
// See [Sync Streams].
c10::cuda::CUDACachingAllocator::recordStream(
tensors[i].storage().data(), ncclStream);
inputs[i].storage().data(), ncclStream);
C10D_NCCL_CHECK(ncclAllReduce(
tensors[i].data_ptr(),
tensors[i].data_ptr(),
tensors[i].numel(),
getNcclDataType(tensors[i].scalar_type()),
ncclOp[opts.reduceOp],
ncclComms[i]->getNcclComm(),
ncclStream.stream()));
C10D_NCCL_CHECK(fn(
inputs[i],
outputs[i],
ncclComms[i]->getNcclComm(),
ncclStream
));
}
C10D_NCCL_CHECK(ncclGroupEnd());
post(ncclStreams_[key]);
// Event should only be recorded after the ncclGroupEnd()
for (size_t i = 0; i < tensors.size(); ++i) {
for (size_t i = 0; i < inputs.size(); ++i) {
at::cuda::CUDAStream& ncclStream = ncclStreams_[key][i];
work->cudaEvents_[i].record(ncclStream);
}
@ -417,202 +437,124 @@ std::shared_ptr<ProcessGroup::Work> ProcessGroupNCCL::allreduce(
return work;
}
template<typename Fn>
std::shared_ptr<ProcessGroup::Work> ProcessGroupNCCL::collective(
std::vector<at::Tensor>& inputs,
std::vector<at::Tensor>& outputs,
Fn fn) {
return collective(
inputs, outputs, fn,
[] (std::vector<at::cuda::CUDAStream>&) {},
[] (std::vector<at::cuda::CUDAStream>&) {}
);
}
std::shared_ptr<ProcessGroup::Work> ProcessGroupNCCL::allreduce(
std::vector<at::Tensor>& tensors,
const AllreduceOptions& opts) {
check_gpu_tensors(tensors);
return collective(tensors, tensors,
[&] (at::Tensor& input, at::Tensor& output,
ncclComm_t comm, at::cuda::CUDAStream& stream) {
return ncclAllReduce(
input.data_ptr(),
output.data_ptr(),
input.numel(),
getNcclDataType(input.scalar_type()),
ncclOp[opts.reduceOp],
comm,
stream.stream()
);
}
);
}
std::shared_ptr<ProcessGroup::Work> ProcessGroupNCCL::broadcast(
std::vector<at::Tensor>& tensors,
const BroadcastOptions& opts) {
tensorCheckHelper(tensors, tensors);
check_gpu_tensors(tensors);
auto devices = getDeviceList(tensors);
auto key = getKeyFromDevices(devices);
auto& ncclComms = getNCCLComm(key, devices);
// First let NCCL streams wait for current streams
syncStreams(devices, ncclEvents_[key], ncclStreams_[key]);
// Work itself will create the CUDA events on all GPUs of tensors
auto work = std::make_shared<ProcessGroupNCCL::WorkNCCL>(devices);
at::cuda::OptionalCUDAGuard gpuGuard;
std::unique_lock<std::mutex> cudaFreeMutexLock(
*(c10::cuda::CUDACachingAllocator::getFreeMutex()));
C10D_NCCL_CHECK(ncclGroupStart());
for (size_t i = 0; i < tensors.size(); ++i) {
gpuGuard.set_index(devices[i].index());
at::cuda::CUDAStream& ncclStream = ncclStreams_[key][i];
// root rank of the the GPU
int root = opts.rootRank * tensors.size() + opts.rootTensor;
// Input `tensors` are created on worker streams and used in different
// ncclStreams. Hence, `tensors` must record ncclStreams to prevent being
// freed before ncclBcast finishes. See [Sync Streams].
c10::cuda::CUDACachingAllocator::recordStream(
tensors[i].storage().data(), ncclStream);
C10D_NCCL_CHECK(ncclBcast(
tensors[i].data_ptr(),
tensors[i].numel(),
getNcclDataType(tensors[i].scalar_type()),
return collective(tensors, tensors,
[&] (at::Tensor& input, at::Tensor& output,
ncclComm_t comm, at::cuda::CUDAStream& stream) {
const auto root = opts.rootRank * tensors.size() + opts.rootTensor;
return ncclBcast(
input.data_ptr(),
input.numel(),
getNcclDataType(input.scalar_type()),
root,
ncclComms[i]->getNcclComm(),
ncclStream.stream()));
}
C10D_NCCL_CHECK(ncclGroupEnd());
// Event should only be recorded after the ncclGroupEnd()
for (size_t i = 0; i < tensors.size(); ++i) {
at::cuda::CUDAStream& ncclStream = ncclStreams_[key][i];
work->cudaEvents_[i].record(ncclStream);
}
return work;
comm,
stream.stream()
);
}
);
}
std::shared_ptr<ProcessGroup::Work> ProcessGroupNCCL::reduce(
std::vector<at::Tensor>& tensors,
const ReduceOptions& opts) {
tensorCheckHelper(tensors, tensors);
check_gpu_tensors(tensors);
auto devices = getDeviceList(tensors);
auto key = getKeyFromDevices(devices);
auto& ncclComms = getNCCLComm(key, devices);
// First let NCCL streams wait for current streams
syncStreams(devices, ncclEvents_[key], ncclStreams_[key]);
// Work itself will create the CUDA events on all GPUs of tensors
auto work = std::make_shared<ProcessGroupNCCL::WorkNCCL>(devices);
at::cuda::OptionalCUDAGuard gpuGuard;
std::unique_lock<std::mutex> cudaFreeMutexLock(
*(c10::cuda::CUDACachingAllocator::getFreeMutex()));
C10D_NCCL_CHECK(ncclGroupStart());
for (size_t i = 0; i < tensors.size(); ++i) {
gpuGuard.set_index(devices[i].index());
at::cuda::CUDAStream& ncclStream = ncclStreams_[key][i];
// root rank of the the GPU
int root = opts.rootRank * tensors.size() + opts.rootTensor;
// Input `tensors` are created on worker streams and used in different
// ncclStreams. Hence, `tensors` must record ncclStreams to prevent being
// freed before ncclReduce finishes. See [Sync Streams].
c10::cuda::CUDACachingAllocator::recordStream(
tensors[i].storage().data(), ncclStream);
C10D_NCCL_CHECK(ncclReduce(
tensors[i].data_ptr(),
tensors[i].data_ptr(),
tensors[i].numel(),
getNcclDataType(tensors[i].scalar_type()),
return collective(tensors, tensors,
[&] (at::Tensor& input, at::Tensor& output,
ncclComm_t comm, at::cuda::CUDAStream& stream) {
const auto root = opts.rootRank * tensors.size() + opts.rootTensor;
return ncclReduce(
input.data_ptr(),
output.data_ptr(),
input.numel(),
getNcclDataType(input.scalar_type()),
ncclOp[opts.reduceOp],
root,
ncclComms[i]->getNcclComm(),
ncclStream.stream()));
}
C10D_NCCL_CHECK(ncclGroupEnd());
// Event should only be recorded after the ncclGroupEnd()
for (size_t i = 0; i < tensors.size(); ++i) {
at::cuda::CUDAStream& ncclStream = ncclStreams_[key][i];
work->cudaEvents_[i].record(ncclStream);
}
return work;
comm,
stream.stream()
);
}
);
}
std::shared_ptr<ProcessGroup::Work> ProcessGroupNCCL::allgather(
std::vector<std::vector<at::Tensor>>& outputTensors,
std::vector<at::Tensor>& inputTensors,
const AllgatherOptions& opts) {
if (outputTensors.size() != inputTensors.size()) {
throw std::runtime_error("allgather: input and output size mismatch");
}
std::vector<at::Tensor> flattenOutputTensors;
flattenOutputTensors.resize(outputTensors.size());
check_gpu_tensors(inputTensors);
for (size_t i = 0; i < outputTensors.size(); ++i) {
tensorCheckHelper(
std::vector<at::Tensor>{inputTensors[i]},
outputTensors[i],
size_ * inputTensors.size());
// Flatten the output tensors (from all ranks) to a single big tensor
flattenOutputTensors[i] = newLikeFlat(outputTensors, i);
auto outputFlattened = flatten_for_scatter_gather(
outputTensors, inputTensors, size_
);
check_gpu_tensors(outputFlattened);
if (static_cast<size_t>(flattenOutputTensors[i].numel()) !=
inputTensors[i].numel() * size_ * inputTensors.size()) {
throw std::runtime_error("Unexpected size for flatten tensor");
}
}
auto devices = getDeviceList(inputTensors);
auto key = getKeyFromDevices(devices);
auto& ncclComms = getNCCLComm(key, devices);
// First let NCCL streams wait for current streams
syncStreams(devices, ncclEvents_[key], ncclStreams_[key]);
// Work itself will create the CUDA events on all GPUs of tensors
auto work = std::make_shared<ProcessGroupNCCL::WorkNCCL>(devices);
at::cuda::OptionalCUDAGuard gpuGuard;
std::unique_lock<std::mutex> cudaFreeMutexLock(
*(c10::cuda::CUDACachingAllocator::getFreeMutex()));
C10D_NCCL_CHECK(ncclGroupStart());
for (size_t i = 0; i < inputTensors.size(); ++i) {
gpuGuard.set_index(devices[i].index());
at::cuda::CUDAStream& ncclStream = ncclStreams_[key][i];
// Input `inputTensors` and `flattenOutputTensors` are created on worker
// streams and used in different ncclStreams. Hence, `tensors` must record
// ncclStreams to prevent beingfreed before ncclReduce finishes.
// See [Sync Streams].
c10::cuda::CUDACachingAllocator::recordStream(
inputTensors[i].storage().data(), ncclStream);
c10::cuda::CUDACachingAllocator::recordStream(
flattenOutputTensors[i].storage().data(), ncclStream);
C10D_NCCL_CHECK(ncclAllGather(
inputTensors[i].data_ptr(),
flattenOutputTensors[i].data_ptr(),
inputTensors[i].numel(),
getNcclDataType(inputTensors[i].scalar_type()),
ncclComms[i]->getNcclComm(),
ncclStream.stream()));
}
C10D_NCCL_CHECK(ncclGroupEnd());
// Copy the flattened output tensors to the outputs
for (size_t i = 0; i < outputTensors.size(); ++i) {
at::cuda::CUDAStream& ncclStream = ncclStreams_[key][i];
at::cuda::CUDAStreamGuard guard(ncclStream);
for (size_t j = 0; j < outputTensors[0].size(); ++j) {
// See [Sync Streams].
return collective(inputTensors, outputFlattened,
[&] (at::Tensor& input, at::Tensor& output,
ncclComm_t comm, at::cuda::CUDAStream& stream) {
c10::cuda::CUDACachingAllocator::recordStream(
outputTensors[i][i].storage().data(), ncclStream);
output.storage().data(), stream
);
return ncclAllGather(
input.data_ptr(),
output.data_ptr(),
input.numel(),
getNcclDataType(input.scalar_type()),
comm,
stream.stream()
);
},
[&] (std::vector<at::cuda::CUDAStream>& ncclStreams) {},
[&] (std::vector<at::cuda::CUDAStream>& ncclStreams) {
// Copy the flattened output tensors to the outputs.
for (size_t i = 0; i < outputTensors.size(); ++i) {
at::cuda::CUDAStreamGuard guard(ncclStreams[i]);
for (size_t j = 0; j < outputTensors[0].size(); ++j) {
// See [Sync Streams].
c10::cuda::CUDACachingAllocator::recordStream(
outputTensors[i][i].storage().data(), ncclStreams[i]);
outputTensors[i][j].copy_(flattenOutputTensors[i][j], true);
outputTensors[i][j].copy_(outputFlattened[i][j], true);
}
}
}
}
// Event should only be recorded after the ncclGroupEnd()
for (size_t i = 0; i < inputTensors.size(); ++i) {
at::cuda::CUDAStream& ncclStream = ncclStreams_[key][i];
work->cudaEvents_[i].record(ncclStream);
}
return work;
);
}
std::shared_ptr<ProcessGroup::Work> ProcessGroupNCCL::barrier(

View File

@ -136,6 +136,9 @@ class ProcessGroupNCCL : public ProcessGroup {
std::vector<at::Tensor>& inputTensors,
const AllgatherOptions& opts = AllgatherOptions()) override;
std::shared_ptr<ProcessGroup::Work> barrier(
const BarrierOptions& opts = BarrierOptions()) override;
// Unsupported Ops
std::shared_ptr<ProcessGroup::Work> gather(
std::vector<std::vector<at::Tensor>>& outputTensors,
@ -161,9 +164,6 @@ class ProcessGroupNCCL : public ProcessGroup {
std::vector<at::Tensor>& tensors,
int tag) override;
std::shared_ptr<ProcessGroup::Work> barrier(
const BarrierOptions& opts = BarrierOptions()) override;
protected:
// Helper that broadcasts nccl unique ID to all ranks through the store
void broadcastUniqueNCCLID(ncclUniqueId* ncclID);
@ -174,12 +174,27 @@ class ProcessGroupNCCL : public ProcessGroup {
const std::string& devicesKey,
const std::vector<at::Device>& devices);
// Tensor checker helper
void tensorCheckHelper(
const std::vector<at::Tensor>& input,
const std::vector<at::Tensor>& output,
int outputOverInput = 1);
private:
// Helper that encapsulates work shared across all collective communication
// primitives. The callbacks have the following signatures:
//
// ncclResult_t fn(at::Tensor& input, at::Tensor& output,
// ncclComm_t, at::cuda::CUDAStream&);
// void {pre,post}(std::vector<at::cuda::CUDAStream&>);
template<typename Fn>
std::shared_ptr<ProcessGroup::Work> collective(
std::vector<at::Tensor>& input,
std::vector<at::Tensor>& output,
Fn fn);
template<typename Fn, typename PreProcess, typename PostProcess>
std::shared_ptr<ProcessGroup::Work> collective(
std::vector<at::Tensor>& input,
std::vector<at::Tensor>& output,
Fn fn,
PreProcess pre,
PostProcess post);
protected:
// Store that is used to exchange each Ranks's NCCL unique ID
std::shared_ptr<Store> store_;