mirror of
https://github.com/zebrajr/pytorch.git
synced 2025-12-06 12:20:52 +01:00
Summary: This is part 1 of the effort to support `share_memory_()` in C++ aten library. This allows C++ code to in place replace the tensor storage to shm based. For now fd based shm is the only implementation supported to simplify memory management in general. This first part intentionally avoids public api changes (to `TensorBase`, see comments in `StorageUtil.h`) such that we can get the core features usable outside pt/csrc first. The API addition to `Tensor` or `TensorBase` would involve more distracting changes and make the change harder to review. Test Plan: ``` buck test caffe2:StorageUtils_test ``` Differential Revision: D43467616 Pull Request resolved: https://github.com/pytorch/pytorch/pull/95228 Approved by: https://github.com/ezyang
670 lines
24 KiB
C++
670 lines
24 KiB
C++
#include <torch/csrc/python_headers.h>
|
|
#ifdef _MSC_VER
|
|
#include <c10/util/win32-headers.h>
|
|
#endif
|
|
#include <structmember.h>
|
|
|
|
#include <c10/core/CPUAllocator.h>
|
|
#include <libshm.h>
|
|
#include <torch/csrc/CudaIPCTypes.h>
|
|
#include <torch/csrc/Device.h>
|
|
#include <torch/csrc/DynamicTypes.h>
|
|
#include <torch/csrc/THP.h>
|
|
#include <torch/csrc/autograd/utils/wrap_outputs.h>
|
|
#include <torch/csrc/copy_utils.h>
|
|
|
|
#include <c10/util/intrusive_ptr.h>
|
|
#include <fmt/format.h>
|
|
|
|
#include <torch/csrc/Storage.h>
|
|
#include <torch/csrc/StorageSharing.h>
|
|
|
|
#ifdef USE_CUDA
|
|
#include <c10/cuda/CUDAGuard.h>
|
|
#include <cuda.h>
|
|
#include <cuda_runtime.h>
|
|
#endif
|
|
|
|
#include <ATen/MapAllocator.h>
|
|
#include <ATen/StorageUtils.h>
|
|
#include <torch/csrc/utils/python_numbers.h>
|
|
#include <atomic>
|
|
#include <string>
|
|
|
|
static PyObject* THPStorage_sharedDecref(PyObject* _self, PyObject* noargs) {
|
|
HANDLE_TH_ERRORS
|
|
auto self = (THPStorage*)_self;
|
|
c10::DeviceType device_type = self->cdata->device_type();
|
|
if (device_type == at::kCPU) {
|
|
c10::StorageImpl* storage = self->cdata;
|
|
THManagedMapAllocator* ctx =
|
|
THManagedMapAllocator::fromDataPtr(storage->data_ptr());
|
|
if (ctx) {
|
|
ctx->decref();
|
|
}
|
|
}
|
|
Py_INCREF(self);
|
|
return (PyObject*)self;
|
|
END_HANDLE_TH_ERRORS
|
|
}
|
|
|
|
static PyObject* THPStorage_sharedIncref(PyObject* _self, PyObject* noargs) {
|
|
HANDLE_TH_ERRORS
|
|
auto self = (THPStorage*)_self;
|
|
c10::DeviceType device_type = self->cdata->device_type();
|
|
if (device_type == at::kCPU) {
|
|
c10::StorageImpl* storage = self->cdata;
|
|
THManagedMapAllocator* ctx =
|
|
THManagedMapAllocator::fromDataPtr(storage->data_ptr());
|
|
if (ctx) {
|
|
ctx->incref();
|
|
}
|
|
}
|
|
Py_RETURN_NONE;
|
|
END_HANDLE_TH_ERRORS
|
|
}
|
|
|
|
static PyObject* THPStorage_pyNewFilenameStorage(
|
|
PyObject* _unused,
|
|
PyObject* args) {
|
|
HANDLE_TH_ERRORS
|
|
// NOLINTNEXTLINE(cppcoreguidelines-init-variables)
|
|
long long size;
|
|
if (!PyArg_ParseTuple(args, "L", &size)) {
|
|
return nullptr;
|
|
}
|
|
|
|
int flags = at::ALLOCATOR_MAPPED_SHAREDMEM | at::ALLOCATOR_MAPPED_EXCLUSIVE;
|
|
std::string handle = at::NewProcessWideShmHandle();
|
|
return THPStorage_New(c10::make_intrusive<at::StorageImpl>(
|
|
c10::StorageImpl::use_byte_size_t(),
|
|
size,
|
|
THManagedMapAllocator::makeDataPtr("", handle.c_str(), flags, size),
|
|
/*allocator=*/nullptr,
|
|
/*resizable=*/false));
|
|
END_HANDLE_TH_ERRORS
|
|
}
|
|
|
|
static PyObject* THPStorage_shareFilename(PyObject* _self, PyObject* noargs) {
|
|
HANDLE_TH_ERRORS
|
|
TORCH_CHECK(
|
|
reinterpret_cast<THPStorage*>(_self)->cdata->device_type() == at::kCPU,
|
|
"_share_filename_: only available on CPU");
|
|
auto self = (THPStorage*)_self;
|
|
c10::StorageImpl* storage = self->cdata;
|
|
THManagedMapAllocator* ctx =
|
|
THManagedMapAllocator::fromDataPtr(storage->data_ptr());
|
|
// Storage is already in shared memory, just return a handle
|
|
if (ctx) {
|
|
// done
|
|
} else {
|
|
// TODO: retry on collision
|
|
// TODO: free GIL - but remember to reacquire it when an exception is thrown
|
|
int flags = at::ALLOCATOR_MAPPED_SHAREDMEM | at::ALLOCATOR_MAPPED_EXCLUSIVE;
|
|
std::string handle = at::NewProcessWideShmHandle();
|
|
at::Storage new_storage(c10::make_intrusive<at::StorageImpl>(
|
|
c10::StorageImpl::use_byte_size_t(),
|
|
storage->nbytes(),
|
|
THManagedMapAllocator::makeDataPtr(
|
|
"", handle.c_str(), flags, storage->nbytes()),
|
|
/*allocator=*/nullptr,
|
|
/*resizable=*/false));
|
|
|
|
at::Storage _self_aten = torch::createStorage(_self);
|
|
{
|
|
// Copying into shared memory can be slow, so release the GIL
|
|
pybind11::gil_scoped_release no_gil;
|
|
at::storage_copy(new_storage, _self_aten);
|
|
}
|
|
|
|
std::swap(*storage, *new_storage.unsafeGetStorageImpl());
|
|
ctx = THManagedMapAllocator::fromDataPtr(storage->data_ptr());
|
|
AT_ASSERT(ctx);
|
|
}
|
|
|
|
THPObjectPtr manager_handle(PyBytes_FromString(ctx->manager_handle()));
|
|
if (!manager_handle)
|
|
return nullptr;
|
|
THPObjectPtr storage_handle(PyBytes_FromString(ctx->filename()));
|
|
if (!storage_handle)
|
|
return nullptr;
|
|
THPObjectPtr size(THPUtils_packUInt64(storage->nbytes() / sizeof(uint8_t)));
|
|
if (!size)
|
|
return nullptr;
|
|
|
|
THPObjectPtr tuple(PyTuple_New(3));
|
|
if (!tuple)
|
|
return nullptr;
|
|
PyTuple_SET_ITEM(tuple.get(), 0, manager_handle.release());
|
|
PyTuple_SET_ITEM(tuple.get(), 1, storage_handle.release());
|
|
PyTuple_SET_ITEM(tuple.get(), 2, size.release());
|
|
return tuple.release();
|
|
END_HANDLE_TH_ERRORS
|
|
}
|
|
|
|
static PyObject* THPStorage_newSharedFilename(
|
|
PyObject* _unused,
|
|
PyObject* args) {
|
|
HANDLE_TH_ERRORS
|
|
THPUtils_assert(PyTuple_GET_SIZE(args) == 3, "tuple of 3 items expected");
|
|
PyObject* _manager_handle = PyTuple_GET_ITEM(args, 0);
|
|
PyObject* _object_handle = PyTuple_GET_ITEM(args, 1);
|
|
PyObject* _size = PyTuple_GET_ITEM(args, 2);
|
|
if (!PyBytes_Check(_manager_handle) || !PyBytes_Check(_object_handle) ||
|
|
!THPUtils_checkLong(_size)) {
|
|
THPUtils_invalidArguments(
|
|
args,
|
|
nullptr,
|
|
"_new_shared in file system mode",
|
|
1,
|
|
"a handle (string/bytes) and storage size (int)");
|
|
return nullptr;
|
|
}
|
|
const char* manager_handle = PyBytes_AS_STRING(_manager_handle);
|
|
const char* object_handle = PyBytes_AS_STRING(_object_handle);
|
|
int64_t size = THPUtils_unpackLong(_size);
|
|
int flags = at::ALLOCATOR_MAPPED_SHAREDMEM | at::ALLOCATOR_MAPPED_NOCREATE;
|
|
return THPStorage_New(c10::make_intrusive<at::StorageImpl>(
|
|
c10::StorageImpl::use_byte_size_t(),
|
|
size,
|
|
THManagedMapAllocator::makeDataPtr(
|
|
manager_handle, object_handle, flags, size),
|
|
/*allocator=*/nullptr,
|
|
/*resizable=*/false));
|
|
END_HANDLE_TH_ERRORS
|
|
}
|
|
|
|
static PyObject* THPStorage_pyNewFdStorage(PyObject* _unused, PyObject* args) {
|
|
HANDLE_TH_ERRORS
|
|
// NOLINTNEXTLINE(cppcoreguidelines-init-variables)
|
|
long long size;
|
|
if (!PyArg_ParseTuple(args, "L", &size)) {
|
|
return nullptr;
|
|
}
|
|
return THPStorage_New(at::new_shm_fd_storage(size));
|
|
END_HANDLE_TH_ERRORS
|
|
}
|
|
|
|
static PyObject* THPStorage_shareFd(PyObject* _self, PyObject* noargs) {
|
|
HANDLE_TH_ERRORS
|
|
TORCH_CHECK(
|
|
reinterpret_cast<THPStorage*>(_self)->cdata->device_type() == at::kCPU,
|
|
"_share_fd_: only available on CPU");
|
|
auto self = (THPStorage*)_self;
|
|
c10::StorageImpl* storage = self->cdata;
|
|
// NOLINTNEXTLINE(cppcoreguidelines-init-variables)
|
|
at::MapAllocator* ctx;
|
|
// Storage is already in shared memory, just return a handle
|
|
if ((ctx = at::MapAllocator::fromDataPtr(storage->data_ptr()))) {
|
|
// done
|
|
} else {
|
|
at::Storage new_storage(at::new_shm_fd_storage(storage->nbytes()));
|
|
at::Storage _self_aten = torch::createStorage(_self);
|
|
{
|
|
// Copying into shared memory can be slow, so release the GIL
|
|
pybind11::gil_scoped_release no_gil;
|
|
at::storage_copy(new_storage, _self_aten);
|
|
}
|
|
|
|
std::swap(*storage, *new_storage.unsafeGetStorageImpl());
|
|
ctx = at::MapAllocator::fromDataPtr(storage->data_ptr());
|
|
AT_ASSERT(ctx);
|
|
}
|
|
|
|
THPObjectPtr storage_handle(THPUtils_packInt32(ctx->fd()));
|
|
if (!storage_handle)
|
|
return nullptr;
|
|
THPObjectPtr size(THPUtils_packUInt64(storage->nbytes() / sizeof(uint8_t)));
|
|
if (!size)
|
|
return nullptr;
|
|
|
|
THPObjectPtr tuple(PyTuple_New(2));
|
|
if (!tuple)
|
|
return nullptr;
|
|
PyTuple_SET_ITEM(tuple.get(), 0, storage_handle.release());
|
|
PyTuple_SET_ITEM(tuple.get(), 1, size.release());
|
|
return tuple.release();
|
|
END_HANDLE_TH_ERRORS
|
|
}
|
|
|
|
static PyObject* THPStorage_newSharedFd(PyObject* _unused, PyObject* args) {
|
|
HANDLE_TH_ERRORS
|
|
THPUtils_assert(PyTuple_GET_SIZE(args) == 2, "tuple of 2 items expected");
|
|
PyObject* _tmp_fd = PyTuple_GET_ITEM(args, 0);
|
|
PyObject* _size = PyTuple_GET_ITEM(args, 1);
|
|
if (!THPUtils_checkLong(_tmp_fd) || !THPUtils_checkLong(_size)) {
|
|
THPUtils_invalidArguments(
|
|
args,
|
|
nullptr,
|
|
"_new_shared in file descriptor mode",
|
|
1,
|
|
"a file descriptor (int) and storage size (int)");
|
|
return nullptr;
|
|
}
|
|
// NOLINTNEXTLINE(cppcoreguidelines-init-variables)
|
|
int fd;
|
|
int tmp_fd = (int)THPUtils_unpackLong(_tmp_fd);
|
|
int64_t size = THPUtils_unpackLong(_size);
|
|
if ((fd = dup(tmp_fd)) == -1) {
|
|
THPUtils_setError("could not duplicate a shared memory file descriptor");
|
|
return nullptr;
|
|
}
|
|
|
|
int flags = at::ALLOCATOR_MAPPED_SHAREDMEM | at::ALLOCATOR_MAPPED_NOCREATE |
|
|
at::ALLOCATOR_MAPPED_KEEPFD | at::ALLOCATOR_MAPPED_FROMFD;
|
|
return THPStorage_New(c10::make_intrusive<at::StorageImpl>(
|
|
c10::StorageImpl::use_byte_size_t(),
|
|
size,
|
|
at::MapAllocator::makeDataPtr(at::WITH_FD, "", fd, flags, size, nullptr),
|
|
/*allocator=*/nullptr,
|
|
/*resizable=*/false));
|
|
END_HANDLE_TH_ERRORS
|
|
}
|
|
|
|
static PyObject* THPStorage_shareCuda(PyObject* _self, PyObject* noargs) {
|
|
HANDLE_TH_ERRORS
|
|
#ifdef USE_CUDA
|
|
TORCH_CHECK(
|
|
reinterpret_cast<THPStorage*>(_self)->cdata->device_type() == at::kCUDA,
|
|
"_share_cuda_: only available on CUDA");
|
|
auto self = (THPStorage*)_self;
|
|
c10::StorageImpl* storage = self->cdata;
|
|
|
|
if (storage->received_cuda()) {
|
|
AT_ERROR(
|
|
"Attempted to send CUDA tensor received from another process; this is not currently supported. Consider cloning before sending.");
|
|
}
|
|
|
|
at::DeviceGuard device_guard(storage->device());
|
|
THPObjectPtr tuple(PyTuple_New(8));
|
|
THPObjectPtr device(THPUtils_packInt32(storage->device().index()));
|
|
THPObjectPtr _handle(Py_None);
|
|
Py_INCREF(Py_None);
|
|
THPObjectPtr size_bytes(THPUtils_packUInt64(storage->nbytes()));
|
|
THPObjectPtr _offset_bytes(THPUtils_packInt32(0));
|
|
THPObjectPtr _ref_counter(Py_None);
|
|
Py_INCREF(Py_None);
|
|
THPObjectPtr _ref_counter_offset(THPUtils_packInt32(0));
|
|
THPObjectPtr _event_handle(Py_None);
|
|
Py_INCREF(Py_None);
|
|
THPObjectPtr _event_sync_required(Py_None);
|
|
Py_INCREF(Py_None);
|
|
if (storage->data<uint8_t>()) {
|
|
// NOLINTNEXTLINE(cppcoreguidelines-init-variables)
|
|
size_t base_size;
|
|
void* base_ptr = c10::cuda::CUDACachingAllocator::getBaseAllocation(
|
|
storage->data<uint8_t>(), &base_size);
|
|
ptrdiff_t offset_bytes = (char*)storage->data<uint8_t>() - (char*)base_ptr;
|
|
|
|
// NOLINTNEXTLINE(cppcoreguidelines-init-variables)
|
|
cudaIpcMemHandle_t handle;
|
|
C10_CUDA_CHECK(cudaIpcGetMemHandle(&handle, base_ptr));
|
|
|
|
_handle = PyBytes_FromStringAndSize((char*)&handle, CUDA_IPC_HANDLE_SIZE);
|
|
_offset_bytes = PyLong_FromSsize_t((Py_ssize_t)offset_bytes);
|
|
|
|
// Put Storage Data behind new ref counting context
|
|
// See Note [CUDA IPC Refcounting implementation explained]
|
|
at::DataPtr sent_data_ptr =
|
|
torch::GetNewRefCountedSentData(storage->data(), storage->device());
|
|
auto old_data_ptr = storage->set_data_ptr(std::move(sent_data_ptr));
|
|
auto sent_data =
|
|
static_cast<torch::CudaIPCSentData*>(storage->data_ptr().get_context());
|
|
sent_data->set_original_ptr(std::move(old_data_ptr));
|
|
_ref_counter = PyBytes_FromString((sent_data->handle()).c_str());
|
|
_ref_counter_offset = THPUtils_packInt64(sent_data->offset());
|
|
|
|
// NOLINTNEXTLINE(cppcoreguidelines-init-variables)
|
|
cudaIpcEventHandle_t ipc_event_handle;
|
|
|
|
if (sent_data->event_sync_required_) {
|
|
C10_CUDA_CHECK(
|
|
cudaIpcGetEventHandle(&ipc_event_handle, sent_data->event_));
|
|
}
|
|
|
|
_event_handle = PyBytes_FromStringAndSize(
|
|
(char*)&ipc_event_handle, CUDA_IPC_HANDLE_SIZE);
|
|
_event_sync_required = PyBool_FromLong(sent_data->event_sync_required_);
|
|
}
|
|
|
|
if (!tuple || !device || !_handle || !size_bytes || !_offset_bytes ||
|
|
!_event_handle) {
|
|
return nullptr;
|
|
}
|
|
PyTuple_SET_ITEM(tuple.get(), 0, device.release());
|
|
// cudaIpcMemHandle_t(of basePtr)
|
|
PyTuple_SET_ITEM(tuple.get(), 1, _handle.release());
|
|
// Size(in bytes) of the real storage, note this is not the size of basePtr
|
|
// memory block.
|
|
PyTuple_SET_ITEM(tuple.get(), 2, size_bytes.release());
|
|
// Offset(in bytes) of the real storage in the basePtr memory block.
|
|
// NB: this offset MUST be in bytes instead of numel, since we use
|
|
// (storage_handle, offset)
|
|
// as key in shared_cache(multiprocessing/reduction.py).
|
|
// Offset in numel cannot uniquely represent a storage.
|
|
PyTuple_SET_ITEM(tuple.get(), 3, _offset_bytes.release());
|
|
PyTuple_SET_ITEM(tuple.get(), 4, _ref_counter.release());
|
|
PyTuple_SET_ITEM(tuple.get(), 5, _ref_counter_offset.release());
|
|
PyTuple_SET_ITEM(tuple.get(), 6, _event_handle.release());
|
|
PyTuple_SET_ITEM(tuple.get(), 7, _event_sync_required.release());
|
|
return tuple.release();
|
|
#else
|
|
TORCH_CHECK(false, "CUDA is not available");
|
|
#endif
|
|
END_HANDLE_TH_ERRORS
|
|
}
|
|
|
|
static PyObject* THPStorage_releaseIPCCounter(
|
|
PyObject* _unused,
|
|
PyObject* args) {
|
|
HANDLE_TH_ERRORS
|
|
#ifdef USE_CUDA
|
|
THPUtils_assert(PyTuple_GET_SIZE(args) == 2, "tuple of 2 items expected");
|
|
PyObject* _ref_counter = PyTuple_GET_ITEM(args, 0);
|
|
PyObject* _ref_counter_offset = PyTuple_GET_ITEM(args, 1);
|
|
if (!(PyBytes_Check(_ref_counter) &&
|
|
THPUtils_checkLong(_ref_counter_offset))) {
|
|
THPUtils_invalidArguments(
|
|
args,
|
|
nullptr,
|
|
"_release_ipc_counter in CUDA mode",
|
|
1,
|
|
"(bytes _ref_counter, int _ref_counter_offset)");
|
|
return nullptr;
|
|
}
|
|
std::string ref_counter_handle = PyBytes_AS_STRING(_ref_counter);
|
|
ptrdiff_t ref_counter_offset =
|
|
(ptrdiff_t)THPUtils_unpackLong(_ref_counter_offset);
|
|
// We don't want to break existing code, so resource deletion is best
|
|
// effort basis. Exception expected if producer process terminated
|
|
// before consumer released data.
|
|
int flags = at::ALLOCATOR_MAPPED_SHAREDMEM | at::ALLOCATOR_MAPPED_NOCREATE;
|
|
try {
|
|
auto sptr = at::RefcountedMapAllocator::makeDataPtr(
|
|
ref_counter_handle.c_str(),
|
|
flags,
|
|
sizeof(int64_t) * torch::CUDA_IPC_REF_COUNTER_FILE_SIZE,
|
|
nullptr);
|
|
*(static_cast<int64_t*>(sptr.get()) + ref_counter_offset) -= 1;
|
|
} catch (c10::Error& err) {
|
|
// Already warned inside of producer process
|
|
}
|
|
Py_RETURN_NONE;
|
|
#else
|
|
TORCH_CHECK(false, "CUDA is not available");
|
|
#endif
|
|
END_HANDLE_TH_ERRORS
|
|
}
|
|
|
|
#ifdef USE_CUDA
|
|
static std::string THPStorage_bytesAsHandleString(PyObject* handle) {
|
|
// NOLINTNEXTLINE(cppcoreguidelines-init-variables)
|
|
char* buffer;
|
|
// NOLINTNEXTLINE(cppcoreguidelines-init-variables)
|
|
Py_ssize_t handle_size;
|
|
if (PyBytes_AsStringAndSize(handle, &buffer, &handle_size) == -1) {
|
|
// NOLINTNEXTLINE(bugprone-string-constructor)
|
|
return nullptr;
|
|
}
|
|
// NOLINTNEXTLINE(bugprone-string-constructor)
|
|
THPUtils_assert(handle_size == CUDA_IPC_HANDLE_SIZE, "incorrect handle size");
|
|
return std::string(buffer, handle_size);
|
|
}
|
|
#endif
|
|
|
|
static PyObject* THPStorage_newSharedCuda(PyObject* _unused, PyObject* args) {
|
|
HANDLE_TH_ERRORS
|
|
#ifdef USE_CUDA
|
|
THPUtils_assert(PyTuple_GET_SIZE(args) == 8, "tuple of 8 items expected");
|
|
PyObject* _device = PyTuple_GET_ITEM(args, 0);
|
|
PyObject* _handle = PyTuple_GET_ITEM(args, 1);
|
|
PyObject* _size_bytes = PyTuple_GET_ITEM(args, 2);
|
|
PyObject* _offset_bytes = PyTuple_GET_ITEM(args, 3);
|
|
PyObject* _ref_counter = PyTuple_GET_ITEM(args, 4);
|
|
PyObject* _ref_counter_offset = PyTuple_GET_ITEM(args, 5);
|
|
PyObject* _event_handle = PyTuple_GET_ITEM(args, 6);
|
|
PyObject* _event_sync_required = PyTuple_GET_ITEM(args, 7);
|
|
if (!(THPUtils_checkLong(_device) && THPUtils_checkLong(_size_bytes) &&
|
|
PyBytes_Check(_handle) && PyBytes_Check(_ref_counter) &&
|
|
PyBytes_Check(_event_handle) && THPUtils_checkLong(_offset_bytes) &&
|
|
THPUtils_checkLong(_ref_counter_offset) &&
|
|
PyBool_Check(_event_sync_required))) {
|
|
THPUtils_invalidArguments(
|
|
args,
|
|
nullptr,
|
|
"_new_shared in CUDA mode",
|
|
1,
|
|
"(int device, bytes handle, int storage_size_bytes, int storage_offset_bytes, bytes _ref_counter, int _ref_counter_offset, bytes event_handle, bool event_sync_required)");
|
|
return nullptr;
|
|
}
|
|
|
|
size_t storage_size =
|
|
(size_t)THPUtils_unpackLong(_size_bytes) / sizeof(uint8_t);
|
|
ptrdiff_t storage_offset_bytes =
|
|
(ptrdiff_t)THPUtils_unpackLong(_offset_bytes);
|
|
|
|
int64_t device = THPUtils_unpackLong(_device);
|
|
at::cuda::CUDAGuard device_guard(device);
|
|
|
|
if (PyObject_IsTrue(_event_sync_required)) {
|
|
// Ensure that producer prepared all tensor's data
|
|
std::string s_ipc_event_handle =
|
|
THPStorage_bytesAsHandleString(_event_handle);
|
|
auto ipc_event_handle = reinterpret_cast<const cudaIpcEventHandle_t*>(
|
|
s_ipc_event_handle.c_str());
|
|
// NOLINTNEXTLINE(cppcoreguidelines-init-variables)
|
|
cudaEvent_t event;
|
|
cudaIpcOpenEventHandle(&event, *ipc_event_handle);
|
|
C10_CUDA_CHECK(
|
|
cudaStreamWaitEvent(c10::cuda::getCurrentCUDAStream(device), event, 0));
|
|
}
|
|
|
|
std::string s_handle = THPStorage_bytesAsHandleString(_handle);
|
|
std::shared_ptr<void> basePtr =
|
|
c10::cuda::CUDACachingAllocator::getIpcDevPtr(s_handle);
|
|
|
|
// Offset the basePtr to reconstruct the real storage
|
|
// devPtr = basePtr + storage_offset
|
|
// NOLINTNEXTLINE(cppcoreguidelines-init-variables)
|
|
void* devPtr = basePtr.get();
|
|
devPtr = (char*)devPtr + storage_offset_bytes;
|
|
|
|
std::string ref_counter_handle = PyBytes_AS_STRING(_ref_counter);
|
|
ptrdiff_t ref_counter_offset =
|
|
(ptrdiff_t)THPUtils_unpackLong(_ref_counter_offset);
|
|
|
|
struct IpcDeleterContext {
|
|
std::string ref_counter_handle;
|
|
ptrdiff_t ref_counter_offset;
|
|
int64_t device;
|
|
torch::CudaIPCReceivedData received_data;
|
|
};
|
|
|
|
auto ctx = std::make_unique<IpcDeleterContext>();
|
|
ctx->ref_counter_handle = std::move(ref_counter_handle);
|
|
ctx->ref_counter_offset = ref_counter_offset;
|
|
ctx->device = device;
|
|
ctx->received_data.shared_ptr_ = std::move(basePtr);
|
|
|
|
auto cur_device = at::cuda::current_device();
|
|
c10::DataPtr data_ptr(
|
|
devPtr,
|
|
ctx.release(),
|
|
+[](void* ctx_) {
|
|
std::unique_ptr<IpcDeleterContext> ctx(
|
|
static_cast<IpcDeleterContext*>(ctx_));
|
|
ctx->received_data.shared_ptr_.reset();
|
|
|
|
// Sync default stream to make sure all operations related to the
|
|
// storage is finished (otherwise another process may reuse memory and
|
|
// corrupt data)
|
|
|
|
// Ideally all shared memory reference counting could be replaced by
|
|
// sending untriggered CUDA event from the producer to consumer and
|
|
// using this event as the criteria of memory release. However, CUDA
|
|
// (atm 10.1) does not support the creation of untriggered events and
|
|
// performance impact of having thousands of shared events is unknown.
|
|
|
|
// TODO: Instead of cudaStreamSynchronize it is possible to add Stream
|
|
// Callback and release counter inside of it (need to check performance
|
|
// impact)
|
|
at::cuda::stream_synchronize(
|
|
c10::cuda::getCurrentCUDAStream(ctx->device));
|
|
|
|
// We don't want to break existing code, so resource deletion is best
|
|
// effort basis. Exception expected if producer process terminated
|
|
// before consumer released data.
|
|
int flags =
|
|
at::ALLOCATOR_MAPPED_SHAREDMEM | at::ALLOCATOR_MAPPED_NOCREATE;
|
|
try {
|
|
auto sptr = at::RefcountedMapAllocator::makeDataPtr(
|
|
ctx->ref_counter_handle.c_str(),
|
|
flags,
|
|
sizeof(int64_t) * torch::CUDA_IPC_REF_COUNTER_FILE_SIZE,
|
|
nullptr);
|
|
*(static_cast<int64_t*>(sptr.get()) + ctx->ref_counter_offset) -= 1;
|
|
} catch (c10::Error& err) {
|
|
// Already warned inside of producer process
|
|
}
|
|
},
|
|
at::Device(at::DeviceType::CUDA, cur_device));
|
|
|
|
auto base = c10::make_intrusive<at::StorageImpl>(
|
|
c10::StorageImpl::use_byte_size_t(),
|
|
storage_size,
|
|
std::move(data_ptr),
|
|
/*allocator=*/nullptr,
|
|
/*resizable=*/false);
|
|
|
|
base->set_resizable(false);
|
|
base->set_received_cuda(true);
|
|
|
|
return THPStorage_New(std::move(base));
|
|
#else
|
|
TORCH_CHECK(false, "CUDA is not available");
|
|
#endif
|
|
END_HANDLE_TH_ERRORS
|
|
}
|
|
|
|
// Returns an object that holds a "weak" pointer to the c10::StorageImpl. This
|
|
// pointer keeps the c10::StorageImpl struct live, but does not retain the data
|
|
// pointer.
|
|
//
|
|
// NB: This does NOT preserve object identity when you call it multiple times
|
|
static PyObject* THPStorage_weakRef(PyObject* _self, PyObject* args) {
|
|
HANDLE_TH_ERRORS
|
|
auto self = (THPStorage*)_self;
|
|
c10::StorageImpl* storage = self->cdata;
|
|
return PyLong_FromVoidPtr(c10::raw::intrusive_ptr::make_weak(storage));
|
|
END_HANDLE_TH_ERRORS
|
|
}
|
|
|
|
PyObject* THPStorage_newWithWeakPtr(PyObject* _unused, PyObject* arg) {
|
|
HANDLE_TH_ERRORS
|
|
THPUtils_assert(
|
|
THPUtils_checkLong(arg), "_new_with_weak_ptr(): arg must be an 'int'");
|
|
c10::StorageImpl* weak_storage = (c10::StorageImpl*)PyLong_AsVoidPtr(arg);
|
|
if (auto* storage = c10::raw::weak_intrusive_ptr::lock(weak_storage)) {
|
|
return THPStorage_New(
|
|
c10::intrusive_ptr<c10::StorageImpl>::reclaim(storage));
|
|
}
|
|
Py_RETURN_NONE;
|
|
END_HANDLE_TH_ERRORS
|
|
}
|
|
|
|
PyObject* THPStorage_freeWeakRef(PyObject* _unused, PyObject* arg) {
|
|
HANDLE_TH_ERRORS
|
|
if (arg == Py_None) {
|
|
Py_RETURN_NONE;
|
|
}
|
|
THPUtils_assert(
|
|
THPUtils_checkLong(arg), "_free_weak_ref(): arg must be an 'int'");
|
|
c10::StorageImpl* weak_storage = (c10::StorageImpl*)PyLong_AsVoidPtr(arg);
|
|
c10::raw::weak_intrusive_ptr::decref(weak_storage);
|
|
|
|
Py_RETURN_NONE;
|
|
END_HANDLE_TH_ERRORS
|
|
}
|
|
|
|
PyObject* THPStorage_expired(PyObject* _unused, PyObject* arg) {
|
|
HANDLE_TH_ERRORS
|
|
THPUtils_assert(THPUtils_checkLong(arg), "_expired(): arg must be an 'int'");
|
|
c10::StorageImpl* weak_storage = (c10::StorageImpl*)PyLong_AsVoidPtr(arg);
|
|
return PyBool_FromLong(
|
|
c10::raw::weak_intrusive_ptr::use_count(weak_storage) == 0);
|
|
END_HANDLE_TH_ERRORS
|
|
}
|
|
|
|
PyObject* THPStorage_sharedFd(PyObject* _self, PyObject* noargs) {
|
|
HANDLE_TH_ERRORS
|
|
auto self = (THPStorage*)_self;
|
|
at::MapAllocator* ctx = nullptr;
|
|
if (self->cdata->device_type() == at::kCPU) {
|
|
c10::StorageImpl* storage = self->cdata;
|
|
ctx = at::MapAllocator::fromDataPtr(storage->data_ptr());
|
|
}
|
|
|
|
THPUtils_assert(ctx, "couldn't retrieve a shared file descriptor");
|
|
return THPUtils_packInt32(ctx->fd());
|
|
END_HANDLE_TH_ERRORS
|
|
}
|
|
|
|
PyObject* THPStorage_isShared(PyObject* _self, PyObject* noargs) {
|
|
auto self = (THPStorage*)_self;
|
|
if (self->cdata->device_type() == at::kCUDA) {
|
|
Py_RETURN_TRUE;
|
|
}
|
|
if (at::MapAllocator::fromDataPtr(self->cdata->data_ptr()) ||
|
|
THManagedMapAllocator::fromDataPtr(self->cdata->data_ptr())) {
|
|
Py_RETURN_TRUE;
|
|
} else {
|
|
Py_RETURN_FALSE;
|
|
}
|
|
}
|
|
|
|
// NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays,cppcoreguidelines-avoid-non-const-global-variables)
|
|
static PyMethodDef THPStorage_sharingMethods[] = {
|
|
{"_new_with_weak_ptr",
|
|
THPStorage_newWithWeakPtr,
|
|
METH_O | METH_CLASS,
|
|
nullptr},
|
|
{"_share_cuda_", THPStorage_shareCuda, METH_NOARGS, nullptr},
|
|
{"_new_shared_cuda",
|
|
THPStorage_newSharedCuda,
|
|
METH_VARARGS | METH_STATIC,
|
|
nullptr},
|
|
{"_release_ipc_counter_cuda",
|
|
THPStorage_releaseIPCCounter,
|
|
METH_VARARGS | METH_STATIC,
|
|
nullptr},
|
|
{"_share_fd_cpu_", THPStorage_shareFd, METH_NOARGS, nullptr},
|
|
{"_new_shared_fd_cpu",
|
|
THPStorage_newSharedFd,
|
|
METH_VARARGS | METH_STATIC,
|
|
nullptr},
|
|
{"_new_using_fd_cpu",
|
|
THPStorage_pyNewFdStorage,
|
|
METH_VARARGS | METH_STATIC,
|
|
nullptr},
|
|
{"_share_filename_cpu_", THPStorage_shareFilename, METH_NOARGS, nullptr},
|
|
{"_new_shared_filename_cpu",
|
|
THPStorage_newSharedFilename,
|
|
METH_VARARGS | METH_STATIC,
|
|
nullptr},
|
|
{"_new_using_filename_cpu",
|
|
THPStorage_pyNewFilenameStorage,
|
|
METH_VARARGS | METH_STATIC,
|
|
nullptr},
|
|
{"_weak_ref", THPStorage_weakRef, METH_NOARGS, nullptr},
|
|
{"_free_weak_ref", THPStorage_freeWeakRef, METH_O | METH_STATIC, nullptr},
|
|
{"_expired", THPStorage_expired, METH_O | METH_STATIC, nullptr},
|
|
{"_shared_decref", THPStorage_sharedDecref, METH_NOARGS, nullptr},
|
|
{"_shared_incref", THPStorage_sharedIncref, METH_NOARGS, nullptr},
|
|
{"_get_shared_fd", THPStorage_sharedFd, METH_NOARGS, nullptr},
|
|
{"is_shared", THPStorage_isShared, METH_NOARGS, nullptr},
|
|
{nullptr}};
|
|
|
|
PyMethodDef* THPStorage_getSharingMethods() {
|
|
return THPStorage_sharingMethods;
|
|
}
|