Support IPC for Expandable Segments (#130890)

This reapplication commit is the same as before except it resolves a build error in an internal build where `handle` was shadowed.

Differential Revision: [D60547506](https://our.internmc.facebook.com/intern/diff/D60547506)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/130890
Approved by: https://github.com/dsjohns2
This commit is contained in:
zdevito 2024-08-02 09:49:22 -07:00 committed by PyTorch MergeBot
parent 618e2c9de4
commit 8d9c3a71f6
4 changed files with 260 additions and 63 deletions

View File

@ -16,6 +16,7 @@
#if !defined(USE_ROCM) && defined(PYTORCH_C10_DRIVER_API_SUPPORTED)
#include <c10/cuda/driver_api.h>
#include <sys/syscall.h>
#include <sys/types.h>
#include <unistd.h>
#endif
@ -123,6 +124,12 @@ constexpr size_t kMinLargeAlloc =
10485760; // allocations between 1 and 10 MiB may use kLargeBuffer
constexpr size_t kRoundLarge = 2097152; // round up large allocations to 2 MiB
char SHAREABLE_HANDLE_VERSION = 1;
enum ShareableHandleType : char {
SHAREABLE_CUDA_MALLOC = 'c',
SHAREABLE_CUDA_EXPANDABLE_SEGMENT = 'e'
};
namespace {
using stream_set = ska::flat_hash_set<cuda::CUDAStream>;
@ -382,7 +389,7 @@ Instead these mapping have to be done manually. The allocator now has an
struct ExpandableSegment {
ExpandableSegment(
c10::DeviceIndex device,
cudaStream_t stream,
std::optional<cudaStream_t> stream,
size_t address_space_size,
size_t segment_size,
std::vector<c10::DeviceIndex> peers)
@ -420,6 +427,7 @@ struct ExpandableSegment {
CUmemGenericAllocationHandle handle = 0;
CUmemAllocationProp prop = {};
prop.type = CU_MEM_ALLOCATION_TYPE_PINNED;
prop.requestedHandleTypes = CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR;
prop.location.type = CU_MEM_LOCATION_TYPE_DEVICE;
// NOLINTNEXTLINE(bugprone-signed-char-misuse)
prop.location.id = static_cast<int>(device_);
@ -429,13 +437,13 @@ struct ExpandableSegment {
for (auto j : c10::irange(begin, i)) {
auto h = handles_.at(j).value();
handles_.at(j) = std::nullopt;
C10_CUDA_DRIVER_CHECK(DriverAPI::get()->cuMemRelease_(h));
C10_CUDA_DRIVER_CHECK(DriverAPI::get()->cuMemRelease_(h.handle));
}
trimHandles();
return rangeFromHandles(begin, begin);
}
C10_CUDA_DRIVER_CHECK(status);
handles_.at(i) = handle;
handles_.at(i) = Handle{handle, std::nullopt};
}
mapAndSetAccess(begin, end);
return rangeFromHandles(begin, end);
@ -454,10 +462,94 @@ struct ExpandableSegment {
return rangeFromHandles(begin, end);
}
// Setup IPC sharing for range.
// Returns the (larger) range that was actually shared.
// Serializes data to std::ostream that can be passed to the
// other process, and then restored as an exapandable segment
// via ExpandableSegment::fromShared(istream);
SegmentRange share(SegmentRange range, std::ostream& buf) {
auto begin = segmentLeft(range.ptr);
auto end = segmentRight(range.ptr + range.size);
ShareHeader header{getpid(), segment_size_, end - begin};
buf.write((const char*)&header, sizeof(ShareHeader));
for (auto i : c10::irange(begin, end)) {
auto& handle = handles_.at(i).value();
if (!handle.fd) {
int fd = 0;
C10_CUDA_DRIVER_CHECK(DriverAPI::get()->cuMemExportToShareableHandle_(
&fd, handle.handle, CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR, 0));
handle.fd = fd;
}
int fd = *handle.fd;
buf.write((const char*)&fd, sizeof(int));
}
return rangeFromHandles(begin, end);
}
static std::unique_ptr<ExpandableSegment> fromShared(
c10::DeviceIndex device,
std::vector<c10::DeviceIndex> peers,
std::istream& buf) {
ShareHeader header{};
buf.read((char*)&header, sizeof(ShareHeader));
auto segment = std::make_unique<ExpandableSegment>(
device,
std::nullopt,
header.num_handles * header.segment_size,
header.segment_size,
std::move(peers));
// older build setups (e.g. multiwheels) do not have this syscall, added 2020
// but the kernel on the system might still support it.
#ifndef SYS_pidfd_open
#define SYS_pidfd_open 434
#endif
#ifndef SYS_pidfd_getfd
#define SYS_pidfd_getfd 438
#endif
auto pidfd = syscall(SYS_pidfd_open, header.pid, 0);
TORCH_CHECK(
pidfd != -1 || errno != ENOSYS,
"The kernel on this machine does not support the pidfd_open syscall needed to use IPC for CUDA tensors when expandable_segments:True is set. "
"Consider using expandable_segments:False via torch.cuda.memory._set_allocator_settings('expandable_segments:False') for this allocation.");
TORCH_CHECK(pidfd != -1, "pidfd_open:", std::strerror(errno));
for (auto i : c10::irange(header.num_handles)) {
(void)i;
int fd = 0;
buf.read((char*)&fd, sizeof(int));
auto myfd = syscall(SYS_pidfd_getfd, pidfd, fd, 0);
if (myfd == -1) {
auto err = errno;
close((int)pidfd);
for (auto& h : segment->handles_) {
C10_CUDA_DRIVER_CHECK(
DriverAPI::get()->cuMemRelease_(h.value().handle));
h = std::nullopt;
}
TORCH_CHECK(
err != ENOSYS,
"The kernel on this machine does not support the pidfd_getfd syscall needed to use IPC for CUDA tensors when expandable_segments:True is set. "
"Consider using expandable_segments:False via torch.cuda.memory._set_allocator_settings('expandable_segments:False') for this allocation.");
TORCH_CHECK(false, "pidfd_getfd: ", std::strerror(err));
}
CUmemGenericAllocationHandle handle = 0;
C10_CUDA_DRIVER_CHECK(DriverAPI::get()->cuMemImportFromShareableHandle_(
&handle,
// NOLINTNEXTLINE(performance-no-int-to-ptr)
(void*)(uintptr_t)myfd,
CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR));
close((int)myfd);
segment->handles_.emplace_back(Handle{handle, std::nullopt});
}
close((int)pidfd);
segment->mapAndSetAccess(0, header.num_handles);
return segment;
}
char* ptr() const {
// NOLINTNEXTLINE(performance-no-int-to-ptr)
return reinterpret_cast<char*>(ptr_);
}
size_t size() const {
return max_handles_ * segment_size_;
}
@ -492,7 +584,7 @@ struct ExpandableSegment {
ptr_ + i * segment_size_,
segment_size_,
0,
handles_.at(i).value(),
handles_.at(i).value().handle,
0ULL));
}
setAccess(device_, begin, end);
@ -509,13 +601,21 @@ struct ExpandableSegment {
// cannot call c10::cuda::stream_synchronize because
// it might grab the GIL which can lead to a deadlock
// Locking order must be GIL -> Allocator Lock
C10_CUDA_CHECK(cudaStreamSynchronize(stream_));
if (stream_) {
C10_CUDA_CHECK(cudaStreamSynchronize(*stream_));
} else {
cuda::CUDAGuard device_guard(device_);
C10_CUDA_CHECK(cudaDeviceSynchronize());
}
for (auto i : c10::irange(begin, end)) {
CUmemGenericAllocationHandle h = handles_.at(i).value();
Handle h = handles_.at(i).value();
handles_.at(i) = std::nullopt;
C10_CUDA_DRIVER_CHECK(DriverAPI::get()->cuMemUnmap_(
ptr_ + segment_size_ * i, segment_size_));
C10_CUDA_DRIVER_CHECK(DriverAPI::get()->cuMemRelease_(h));
if (h.fd) {
close(*h.fd);
}
C10_CUDA_DRIVER_CHECK(DriverAPI::get()->cuMemRelease_(h.handle));
}
trimHandles();
}
@ -551,11 +651,20 @@ struct ExpandableSegment {
ptr() + segment_size_ * begin, segment_size_ * (end - begin));
}
c10::DeviceIndex device_;
cudaStream_t stream_;
std::optional<cudaStream_t> stream_;
CUdeviceptr ptr_{};
size_t segment_size_;
size_t max_handles_;
std::vector<std::optional<CUmemGenericAllocationHandle>> handles_;
struct Handle {
CUmemGenericAllocationHandle handle;
std::optional<int> fd;
};
struct ShareHeader {
pid_t pid;
size_t segment_size;
size_t num_handles;
};
std::vector<std::optional<Handle>> handles_;
// devices on which this memory should be mapped in addition
// to the device where the physical memory lives (device_).
std::vector<c10::DeviceIndex> peers_;
@ -564,7 +673,7 @@ struct ExpandableSegment {
struct ExpandableSegment {
ExpandableSegment(
c10::DeviceIndex device,
cudaStream_t stream,
std::optional<cudaStream_t> stream,
size_t address_space_size,
size_t segment_size,
std::vector<c10::DeviceIndex> peers) {
@ -576,6 +685,15 @@ struct ExpandableSegment {
SegmentRange unmap(SegmentRange range) {
return SegmentRange(nullptr, 0);
}
SegmentRange share(SegmentRange range, std::ostream& ss) {
return SegmentRange(nullptr, 0);
}
static std::unique_ptr<ExpandableSegment> fromShared(
c10::DeviceIndex device,
std::vector<c10::DeviceIndex> peers,
std::istream& buf) {
return {};
}
char* ptr() const {
return nullptr;
}
@ -1425,14 +1543,27 @@ class DeviceCachingAllocator {
}
ShareableHandle shareIpcHandle(Block* block) {
size_t outSize = 0;
void* base = getBaseAllocation(block, &outSize);
auto offset = (char*)block->ptr - (char*)base;
cudaIpcMemHandle_t handle;
C10_CUDA_CHECK(cudaIpcGetMemHandle(&handle, base));
return ShareableHandle{
offset,
std::string((char*)&handle, (char*)&handle + CUDA_IPC_HANDLE_SIZE)};
std::lock_guard<std::recursive_mutex> lock(mutex);
std::ostringstream ss;
ss.put(SHAREABLE_HANDLE_VERSION);
ptrdiff_t offset = 0;
if (!block->expandable_segment_) {
ss.put(SHAREABLE_CUDA_MALLOC);
Block* base_block = block;
while (base_block->prev) {
base_block = base_block->prev;
}
offset = (char*)block->ptr - (char*)base_block->ptr;
cudaIpcMemHandle_t handle;
C10_CUDA_CHECK(cudaIpcGetMemHandle(&handle, base_block->ptr));
ss.write((char*)&handle, CUDA_IPC_HANDLE_SIZE);
} else {
ss.put(SHAREABLE_CUDA_EXPANDABLE_SEGMENT);
auto full_range = block->expandable_segment_->share(
SegmentRange(block->ptr, block->size), ss);
offset = (char*)block->ptr - (char*)full_range.ptr;
}
return ShareableHandle{offset, ss.str()};
}
void recordStream(Block* block, cuda::CUDAStream stream) {
@ -1973,6 +2104,7 @@ class DeviceCachingAllocator {
}
void addPeerAccess(c10::DeviceIndex dev_to_access) {
std::lock_guard<std::recursive_mutex> lock(mutex);
if (std::find(
devices_with_peer_access_.begin(),
devices_with_peer_access_.end(),
@ -1984,6 +2116,10 @@ class DeviceCachingAllocator {
es->addPeer(dev_to_access);
}
}
std::vector<c10::DeviceIndex> peers() const {
std::lock_guard<std::recursive_mutex> lock(mutex);
return devices_with_peer_access_;
}
bool hasAllocatedExpandableSegments() const {
return !expandable_segments_.empty();
@ -3426,6 +3562,13 @@ class NativeCachingAllocator : public CUDAAllocator {
C10_CUDA_CHECK(err);
}
device_allocator[dev_to_access]->addPeerAccess(dev);
std::lock_guard<std::mutex> lock(IpcMutex);
for (auto& entry : ipcMemHandle_to_devptr) {
if (entry.second.device_ == dev_to_access &&
entry.second.expandable_segment_) {
entry.second.expandable_segment_->addPeer(dev);
}
}
}
cudaError_t memcpyAsync(
@ -3452,58 +3595,121 @@ class NativeCachingAllocator : public CUDAAllocator {
this->free(ptr);
}
// In CUDA IPC, sender sends a tensor to receiver, getIpcDevPtr
// is called by the receiving process to map the CUDA memory from the sending
// process into its own address space.
//
// CUDA IPC only allows sharing a big memory block associated with a
// In CUDA IPC, sender sends a tensor to receiver via shareIPCHandle,
// getIpcDevPtr is called by the receiving process to map the CUDA memory from
// the sending process into its own address space.
// When allocated with cudaMalloc we use the cudaIPCMemHandle_t APIs.
// These APIs only allow sharing a big memory block associated with a
// cudaIpcMemHandle_t and it can be opened only **once** per context per
// process. There can be multiple types of storage in the same IPC mem block,
// so we must cache the device ptr to construct typed storage as it comes.
//
// ipcMemHandle_to_devptr maps a cudaIpcMemHandle_t to a device pointer in the
// process that can be used to access the memory block in the sender process.
// It only saves a weak_ptr of the device pointer in the map, the shared_ptr
// will be used to reconstruct all storages in this CudaMalloc allocation. And
// it will deleted in cudaIpcCloseMemHandle when its reference count is 0.
//
// When using cuMemCreate, via expandable segments, we use
// cuMemExportToShareableHandle to create a file descriptor that can be sent
// to the other process to sort the object. Then we recreate part of the
// exandable segment necessary to load the allocation.
// ipcMemHandle_to_devptr caches the mapping from shareable handle to
// this process' memory mapping information for that share to ensure we do not
// create it twice. When the shared_ptr is no longer in use we clean up the
// cache.
std::mutex IpcMutex;
ska::flat_hash_map<std::string, std::weak_ptr<void>> ipcMemHandle_to_devptr;
struct MemHandleCacheEntry {
MemHandleCacheEntry(
c10::DeviceIndex device,
std::string& handle,
const DeviceCachingAllocator& allocator)
: device_(device),
expandable_segment_(nullptr),
cuda_ipc_ptr_(nullptr) {
int type = SHAREABLE_CUDA_MALLOC;
std::istringstream ss(handle);
if (handle.size() != CUDA_IPC_HANDLE_SIZE) {
auto version = ss.get();
TORCH_CHECK(
version <= SHAREABLE_HANDLE_VERSION,
"received sharable handle from a future version of torch that this version does not know how to handle")
type = ss.get();
} // otherwise this is coming from an old pytorch where it has to be a raw
// SHARABLE_CUDA_MALLOC
if (type == SHAREABLE_CUDA_MALLOC) {
cudaIpcMemHandle_t cuda_handle;
ss.read((char*)&cuda_handle, CUDA_IPC_HANDLE_SIZE);
C10_CUDA_CHECK(cudaIpcOpenMemHandle(
&cuda_ipc_ptr_, cuda_handle, cudaIpcMemLazyEnablePeerAccess));
} else if (type == SHAREABLE_CUDA_EXPANDABLE_SEGMENT) {
expandable_segment_ =
ExpandableSegment::fromShared(device, allocator.peers(), ss)
.release();
} else {
TORCH_INTERNAL_ASSERT(
false, "unexpected or illformed shareable handle type");
}
}
// this struct expects that clear is explicitly called to
// free resources, because we only want this code running when
// the shared pointer to this entry is destructed, not during
// deinitialization when cuda may already have been shutdown.
// This replicates the previous behavior of this map when it
// stored raw cuda_ipc_ptr_ handles.
void clear() {
if (cuda_ipc_ptr_) {
cuda::CUDAGuard device_guard(device_);
C10_CUDA_CHECK(cudaIpcCloseMemHandle(cuda_ipc_ptr_));
cuda_ipc_ptr_ = nullptr;
}
if (expandable_segment_) {
delete expandable_segment_;
expandable_segment_ = nullptr;
}
}
void* ptr() {
if (cuda_ipc_ptr_) {
return cuda_ipc_ptr_;
} else {
return expandable_segment_->ptr();
}
}
c10::DeviceIndex device_;
ExpandableSegment* expandable_segment_;
void* cuda_ipc_ptr_; // nullptr if expandable_segment_ is not null
std::weak_ptr<void> wp_;
};
ska::flat_hash_map<std::string, MemHandleCacheEntry> ipcMemHandle_to_devptr;
std::shared_ptr<void> getIpcDevPtr(std::string handle) override {
std::lock_guard<std::mutex> lock(IpcMutex);
auto iter = ipcMemHandle_to_devptr.find(handle);
if (iter != ipcMemHandle_to_devptr.end()) {
auto devptr = iter->second.lock();
if (devptr)
return devptr;
auto devptr = iter->second.wp_.lock();
// the weak_ptr should always be valid because we delete the entry from
// the cache when the shared_ptr is destructed, so we should never get
// here.
TORCH_INTERNAL_ASSERT(devptr, "entry in cache has missing shared_ptr");
return devptr;
}
// This ipcMemHandle hasn't been opened, or already expired, open it to
// enable IPC access to that mem block.
void* dev = nullptr;
auto ipc_handle =
reinterpret_cast<const cudaIpcMemHandle_t*>(handle.c_str());
C10_CUDA_CHECK(cudaIpcOpenMemHandle(
&dev, *ipc_handle, cudaIpcMemLazyEnablePeerAccess));
// devPtr has to be deleted in same device when created.
c10::DeviceIndex curr_device = 0;
C10_CUDA_CHECK(c10::cuda::GetDevice(&curr_device));
auto sp =
std::shared_ptr<void>(dev, [handle, curr_device, this](void* ptr) {
cuda::CUDAGuard device_guard(curr_device);
auto inserted = ipcMemHandle_to_devptr.insert(
iter,
{handle,
MemHandleCacheEntry(
curr_device, handle, *device_allocator[curr_device])});
auto sp = std::shared_ptr<void>(
inserted->second.ptr(), [handle, this](void* ptr) {
std::lock_guard<std::mutex> deleter_lock(IpcMutex);
C10_CUDA_CHECK(cudaIpcCloseMemHandle(ptr));
ipcMemHandle_to_devptr.erase(handle);
auto it = ipcMemHandle_to_devptr.find(handle);
TORCH_INTERNAL_ASSERT(it != ipcMemHandle_to_devptr.end());
it->second.clear();
ipcMemHandle_to_devptr.erase(it);
});
std::weak_ptr<void> wp = sp;
// To eliminate an additional search, we can use insert().
// It doesn't overwrite when key already exists(ptr expired).
// But in the deleter for sp we erased the entry,
// this should be safe to do now.
ipcMemHandle_to_devptr.insert(iter, {handle, wp});
inserted->second.wp_ = sp;
return sp;
}
std::string name() override {
return "native";
}
@ -3592,9 +3798,7 @@ struct BackendStaticInitializer {
std::atomic<CUDAAllocator*> allocator;
BackendStaticInitializer backend_static_initializer;
} // namespace cuda::CUDACachingAllocator
} // namespace c10
namespace c10::cuda {

View File

@ -98,9 +98,6 @@ TEST_CUDA_IPC = (
TEST_MULTIGPU = TEST_CUDA_IPC and torch.cuda.device_count() > 1
if TEST_CUDA_IPC:
torch.cuda.memory._set_allocator_settings("expandable_segments:False")
if not NO_MULTIPROCESSING_SPAWN:
# We want to use `spawn` if able because some of our tests check that the
# data loader terminiates gracefully. To prevent hanging in the testing

View File

@ -48,9 +48,6 @@ TEST_CUDA_IPC = (
TEST_MULTIGPU = TEST_CUDA_IPC and torch.cuda.device_count() > 1
if TEST_CUDA_IPC:
torch.cuda.memory._set_allocator_settings("expandable_segments:False")
class SubProcess(mp.Process):
def __init__(self, tensor):

View File

@ -421,7 +421,6 @@ static std::string THPStorage_bytesAsHandleString(PyObject* handle) {
if (PyBytes_AsStringAndSize(handle, &buffer, &handle_size) == -1) {
TORCH_CHECK(handle_size == CUDA_IPC_HANDLE_SIZE, "incorrect handle");
}
TORCH_CHECK(handle_size == CUDA_IPC_HANDLE_SIZE, "incorrect handle size");
return std::string(buffer, handle_size);
END_HANDLE_TH_ERRORS_RET("")
}