[PyTorch] Use events from pool in copy_device_to_device (#165647)

Summary: In this diff, we add a event pool so that we dont have to create/destroy events all the time, instead re-use the events from the pool.

Test Plan: contbuild

Differential Revision: D84685495

Pull Request resolved: https://github.com/pytorch/pytorch/pull/165647
Approved by: https://github.com/bbus
This commit is contained in:
Banit Agrawal 2025-10-28 05:19:05 +00:00 committed by PyTorch MergeBot
parent 02095cc09d
commit 9139368b64
2 changed files with 106 additions and 11 deletions

View File

@ -2,10 +2,10 @@
#include <ATen/cuda/ATenCUDAGeneral.h>
#include <ATen/cuda/CUDAContext.h>
#include <c10/core/impl/GPUTrace.h>
#include <c10/cuda/CUDAStream.h>
#include <c10/cuda/CUDAGuard.h>
#include <ATen/cuda/Exceptions.h>
#include <c10/core/impl/GPUTrace.h>
#include <c10/cuda/CUDAGuard.h>
#include <c10/cuda/CUDAStream.h>
#include <c10/util/Exception.h>
#include <cuda_runtime_api.h>
@ -246,4 +246,79 @@ private:
}
};
// EventPool - Thread-safe pool of CUDA events to avoid expensive cudaEventCreate
// calls. cudaEventCreate when concurrently invoked from multiple threads can be
// very expensive (especially on certain device/driver combinations).
using CUDAEventPtr =
std::unique_ptr<CUDAEvent, std::function<void(CUDAEvent*)>>;
class EventPool {
public:
EventPool() : pools_(at::cuda::device_count()) {}
CUDAEventPtr get(const DeviceIndex device) {
// If the device is invalid, return a default event and no pooling
if (device < 0 || device >= (DeviceIndex)pools_.size()) {
auto deleter = [](CUDAEvent* event) {
delete event;
};
return CUDAEventPtr(
std::make_unique<CUDAEvent>(cudaEventDisableTiming).release(), deleter);
}
auto& pool = pools_[device];
// Create a destructor that returns the event to the appropriate device pool
auto destructor = [&pool](CUDAEvent* event) noexcept {
if (event != nullptr) {
std::lock_guard<std::mutex> lock(pool.mutex_);
pool.event_pool_.emplace_back(event);
}
};
{
std::lock_guard<std::mutex> lock(pool.mutex_);
if (!pool.event_pool_.empty()) {
auto event = std::move(pool.event_pool_.back());
pool.event_pool_.pop_back();
return CUDAEventPtr(event.release(), destructor);
}
}
return CUDAEventPtr(
std::make_unique<CUDAEvent>(cudaEventDisableTiming).release(),
destructor);
}
void empty_cache() {
for (auto& pool : pools_) {
std::lock_guard<std::mutex> lock(pool.mutex_);
pool.event_pool_.clear();
}
}
void init_num_events(const size_t num_events) {
for (DeviceIndex device_idx = 0; device_idx < at::cuda::device_count(); ++device_idx) {
CUDAGuard device_guard(device_idx);
std::vector<CUDAEventPtr> temp_events;
temp_events.reserve(num_events);
for (size_t i = 0; i < num_events; ++i) {
auto event = get(device_idx);
// Record the event to ensure it's properly initialized
event->record();
temp_events.emplace_back(std::move(event));
}
// Events will be returned to pool when temp_events is destroyed
}
}
private:
struct alignas(64) PerDevicePool {
alignas(64) std::mutex mutex_;
std::vector<std::unique_ptr<CUDAEvent>> event_pool_;
};
std::vector<PerDevicePool> pools_;
};
} // namespace at::cuda

View File

@ -1,11 +1,11 @@
#define TORCH_ASSERT_ONLY_METHOD_OPERATORS
#include <ATen/core/Tensor.h>
#include <ATen/Context.h>
#include <ATen/Dispatch.h>
#include <ATen/Dispatch_v2.h>
#include <ATen/cuda/CachingHostAllocator.h>
#include <ATen/core/Tensor.h>
#include <ATen/cuda/CUDAContext.h>
#include <ATen/cuda/CUDAEvent.h>
#include <ATen/cuda/CachingHostAllocator.h>
#include <ATen/cuda/PeerToPeerAccess.h>
#include <ATen/native/Copy.h>
#include <ATen/native/TensorIterator.h>
@ -27,6 +27,24 @@
namespace at::native {
namespace {
// Initial pool size for CUDA events per device.
constexpr size_t kInitialEventPoolSize = 8;
at::cuda::CUDAEventPtr getEventFromPool(const at::DeviceIndex device_idx) {
static auto* event_pool = []() {
auto* pool = new at::cuda::EventPool();
// Pre-populate the pool with events to avoid stalls in creating events
pool->init_num_events(kInitialEventPoolSize);
return pool;
}();
return event_pool->get(device_idx);
}
} // namespace
void neg_kernel_cuda(TensorIteratorBase &iter);
void conj_kernel_cuda(TensorIteratorBase &iter);
@ -263,12 +281,14 @@ void copy_device_to_device(TensorIterator& iter,
// write-after-read dependencies on the destination side are handled, so
// that no one is operating on the dst memory when we perform the copy.
// src waits on dst barrier (src already waits on src)
CUDAEvent dst_ready;
// Use event pool for better performance instead of creating new events
auto dst_ready = getEventFromPool(dst_device.index());
device_guard.set_device(dst_device);
dst_ready.record(getCurrentCUDAStream(dst_device.index()));
dst_ready->record(getCurrentCUDAStream(dst_device.index()));
device_guard.set_device(src_device);
dst_ready.block(copy_stream);
dst_ready->block(copy_stream);
}
if (memcpy_eligible) {
@ -307,11 +327,11 @@ void copy_device_to_device(TensorIterator& iter,
// operate on dst's copy until the copy is complete.
// Still on src_device, record stream event
CUDAEvent src_ready;
src_ready.record(copy_stream);
auto src_ready = getEventFromPool(src_device.index());
src_ready->record(copy_stream);
device_guard.set_device(dst_device);
src_ready.block(getCurrentCUDAStream(dst_device.index()));
src_ready->block(getCurrentCUDAStream(dst_device.index()));
}
AT_CUDA_CHECK(cudaGetLastError());