From 9d56130881ca6da543e203e346cfc0ffd3ddc02a Mon Sep 17 00:00:00 2001 From: Chris Leary Date: Mon, 31 Jul 2017 19:10:37 -0700 Subject: [PATCH] [XLA:CPU] Atomically enqueue tuple buffers for outfeed. Previously it was possible that a distinct thread could hop in between the buffer enqueues done by a tuple-outfeeding thread. This changes the sequence to enqueue all the tuple buffers as an atomic unit. PiperOrigin-RevId: 163781804 --- .../compiler/xla/service/cpu/cpu_runtime.cc | 3 +- .../compiler/xla/service/cpu/xfeed_manager.cc | 2 +- .../compiler/xla/service/cpu/xfeed_manager.h | 3 +- .../xla/service/cpu/xfeed_manager_test.cc | 12 +- .../xla/service/cpu_transfer_manager.cc | 111 ++++++++++++------ .../xla/service/cpu_transfer_manager.h | 22 +++- tensorflow/compiler/xla/util.cc | 9 ++ tensorflow/compiler/xla/util.h | 1 + 8 files changed, 115 insertions(+), 48 deletions(-) diff --git a/tensorflow/compiler/xla/service/cpu/cpu_runtime.cc b/tensorflow/compiler/xla/service/cpu/cpu_runtime.cc index 5d6efa53595..c7155b858bd 100644 --- a/tensorflow/compiler/xla/service/cpu/cpu_runtime.cc +++ b/tensorflow/compiler/xla/service/cpu/cpu_runtime.cc @@ -130,5 +130,6 @@ void __xla_cpu_runtime_ReleaseOutfeedBufferAfterPopulation( xla::cpu::runtime::XfeedManager* xfeed = xla::cpu::runtime::GetXfeedManager(); xla::StatusOr shape = xla::llvm_ir::DecodeSelfDescribingShapeConstant(shape_ptr, shape_length); - xfeed->outfeed()->ReleaseCurrentBuffer(buffer_length, buffer_ptr, shape); + xfeed->outfeed()->ReleaseCurrentBuffer(buffer_length, buffer_ptr, + std::move(shape)); } diff --git a/tensorflow/compiler/xla/service/cpu/xfeed_manager.cc b/tensorflow/compiler/xla/service/cpu/xfeed_manager.cc index 2160c3cd01d..d0f21420290 100644 --- a/tensorflow/compiler/xla/service/cpu/xfeed_manager.cc +++ b/tensorflow/compiler/xla/service/cpu/xfeed_manager.cc @@ -36,7 +36,7 @@ void XfeedQueueManager::Reset() { enqueued_buffers_.clear(); } -void XfeedQueueManager::EnqueueBuffers( +void XfeedQueueManager::EnqueueBuffersAtomically( tensorflow::gtl::ArraySlice buffers) { tensorflow::mutex_lock l(mu_); bool was_empty = enqueued_buffers_.empty(); diff --git a/tensorflow/compiler/xla/service/cpu/xfeed_manager.h b/tensorflow/compiler/xla/service/cpu/xfeed_manager.h index 86af789384e..6af55700052 100644 --- a/tensorflow/compiler/xla/service/cpu/xfeed_manager.h +++ b/tensorflow/compiler/xla/service/cpu/xfeed_manager.h @@ -63,7 +63,8 @@ class XfeedQueueManager { // called when the buffer will no longer be accessed by the XfeedManager, // either as a result of a call to Reset or because the runtime has dequeued // and used the buffer. - void EnqueueBuffers(tensorflow::gtl::ArraySlice buffers); + void EnqueueBuffersAtomically( + tensorflow::gtl::ArraySlice buffers); // Blocks until the queue is non-empty, then returns the buffer at the head of // the queue. Sets the current buffer to be the returned buffer. It is an diff --git a/tensorflow/compiler/xla/service/cpu/xfeed_manager_test.cc b/tensorflow/compiler/xla/service/cpu/xfeed_manager_test.cc index 8defd28b013..8fe65f488a2 100644 --- a/tensorflow/compiler/xla/service/cpu/xfeed_manager_test.cc +++ b/tensorflow/compiler/xla/service/cpu/xfeed_manager_test.cc @@ -87,8 +87,8 @@ TEST_F(InfeedManagerTest, SingleThreadedSequential) { cpu::runtime::XfeedManager* xfeed = cpu::runtime::GetXfeedManager(); - xfeed->infeed()->EnqueueBuffers({a}); - xfeed->infeed()->EnqueueBuffers({b}); + xfeed->infeed()->EnqueueBuffersAtomically({a}); + xfeed->infeed()->EnqueueBuffersAtomically({b}); ProcessNextBuffer(a->length()); ProcessNextBuffer(b->length()); } @@ -99,9 +99,9 @@ TEST_F(InfeedManagerTest, SingleThreadedInterleaved) { cpu::runtime::XfeedManager* xfeed = cpu::runtime::GetXfeedManager(); - xfeed->infeed()->EnqueueBuffers({a}); + xfeed->infeed()->EnqueueBuffersAtomically({a}); ProcessNextBuffer(a->length()); - xfeed->infeed()->EnqueueBuffers({b}); + xfeed->infeed()->EnqueueBuffersAtomically({b}); ProcessNextBuffer(b->length()); } @@ -122,7 +122,7 @@ TEST_F(InfeedManagerTest, MultiThreaded) { } } TestInfeedBuffer* a = new TestInfeedBuffer(length); - xfeed->infeed()->EnqueueBuffers({a}); + xfeed->infeed()->EnqueueBuffersAtomically({a}); }); ProcessNextBuffer(length); @@ -131,7 +131,7 @@ TEST_F(InfeedManagerTest, MultiThreaded) { TEST_F(InfeedManagerTest, OutfeedWrongShape) { TestInfeedBuffer* b = new TestInfeedBuffer(32, /*expect_shape_match=*/false); cpu::runtime::XfeedManager* xfeed = cpu::runtime::GetXfeedManager(); - xfeed->outfeed()->EnqueueBuffers({b}); + xfeed->outfeed()->EnqueueBuffersAtomically({b}); ProcessNextOutfeedBuffer(32, ShapeUtil::MakeShape(U8, {33})); } diff --git a/tensorflow/compiler/xla/service/cpu_transfer_manager.cc b/tensorflow/compiler/xla/service/cpu_transfer_manager.cc index d8a76443a66..bf43c04ae2a 100644 --- a/tensorflow/compiler/xla/service/cpu_transfer_manager.cc +++ b/tensorflow/compiler/xla/service/cpu_transfer_manager.cc @@ -111,9 +111,9 @@ Status CpuTransferManager::TransferLiteralToInfeed(se::StreamExecutor* executor, // infeed manager. std::vector buffers; buffers.reserve(literal.tuple_literals_size()); - auto cleanup = tensorflow::gtl::MakeCleanup([buffers]() { + auto cleanup = tensorflow::gtl::MakeCleanup([&buffers]() { for (cpu::runtime::XfeedBuffer* b : buffers) { - b->Done(ShapeUtil::MakeNil()); + b->Done(Cancelled("Failed to infeed buffer to device.")); } }); @@ -128,7 +128,7 @@ Status CpuTransferManager::TransferLiteralToInfeed(se::StreamExecutor* executor, } cpu::runtime::XfeedManager* xfeed_manager = cpu::runtime::GetXfeedManager(); - xfeed_manager->infeed()->EnqueueBuffers(buffers); + xfeed_manager->infeed()->EnqueueBuffersAtomically(buffers); cleanup.release(); return Status::OK(); @@ -141,7 +141,7 @@ Status CpuTransferManager::TransferBufferToInfeed(se::StreamExecutor* executor, TransferBufferToInfeedInternal(executor, size, source)); cpu::runtime::XfeedManager* xfeed_manager = cpu::runtime::GetXfeedManager(); - xfeed_manager->infeed()->EnqueueBuffers({buffer}); + xfeed_manager->infeed()->EnqueueBuffersAtomically({buffer}); return Status::OK(); } @@ -166,7 +166,7 @@ CpuTransferManager::TransferBufferToInfeedInternal(se::StreamExecutor* executor, /*source=*/source, queued_buffer->device_memory()); if (!s.ok()) { - queued_buffer->Done(ShapeUtil::MakeNil()); + queued_buffer->Done(s); return s; } return queued_buffer; @@ -186,8 +186,8 @@ Status CpuTransferManager::TransferLiteralFromOutfeed( Literal::CreateFromDimensions(literal_shape.element_type(), dimensions); literal->Swap(empty.get()); TF_ASSIGN_OR_RETURN(Shape received_shape, - TransferBufferFromOutfeed( - executor, size, literal->MutableInternalData())); + TransferArrayBufferFromOutfeed( + executor, literal->MutableInternalData(), size)); TF_RET_CHECK(ShapeUtil::Compatible(received_shape, literal->shape())) << "Shape received from outfeed " << ShapeUtil::HumanString(received_shape) @@ -204,6 +204,7 @@ Status CpuTransferManager::TransferLiteralFromOutfeed( } std::vector> elements; + std::vector> buffer_data; for (int64 i = 0; i < literal_shape.tuple_shapes_size(); ++i) { const Shape& tuple_element_shape = ShapeUtil::GetTupleElementShape(literal_shape, i); @@ -215,48 +216,88 @@ Status CpuTransferManager::TransferLiteralFromOutfeed( tuple_element_shape.dimensions().size()); auto empty = Literal::CreateFromDimensions( tuple_element_shape.element_type(), dimensions); - TF_ASSIGN_OR_RETURN( - Shape received_shape, - TransferBufferFromOutfeed(executor, - GetByteSizeRequirement(tuple_element_shape), - empty->MutableInternalData())); - TF_RET_CHECK(ShapeUtil::Compatible(received_shape, tuple_element_shape)) - << "Shape received from outfeed " - << ShapeUtil::HumanString(received_shape) - << " did not match the shape that was requested for outfeed: " - << ShapeUtil::HumanString(tuple_element_shape); - TF_RET_CHECK(GetByteSizeRequirement(tuple_element_shape) == - GetByteSizeRequirement(received_shape)); - *empty->mutable_shape() = received_shape; + int64 size = GetByteSizeRequirement(tuple_element_shape); + buffer_data.push_back({empty->MutableInternalData(), size}); elements.push_back(std::move(empty)); } + + TF_ASSIGN_OR_RETURN(Shape received_shape, + TransferTupleBuffersFromOutfeed(executor, buffer_data)); + + TF_RET_CHECK(ShapeUtil::Compatible(received_shape, literal_shape)) + << "Shape received from outfeed " + << ShapeUtil::HumanString(received_shape) + << " did not match the shape that was requested for outfeed: " + << ShapeUtil::HumanString(literal_shape); + TF_RET_CHECK(GetByteSizeRequirement(literal_shape) == + GetByteSizeRequirement(received_shape)); + + for (int64 i = 0; i < literal_shape.tuple_shapes_size(); ++i) { + *elements[i]->mutable_shape() = received_shape.tuple_shapes(i); + } auto result = Literal::MakeTupleOwned(std::move(elements)); literal->Swap(result.get()); TF_RET_CHECK(ShapeUtil::Equal(literal->shape(), literal_shape)); return Status::OK(); } -StatusOr CpuTransferManager::TransferBufferFromOutfeed( - perftools::gputools::StreamExecutor* executor, int64 size, - void* destination) { - if (size > std::numeric_limits::max()) { - return InvalidArgument("Outfeed shape is too large: needs %lld bytes", - size); +StatusOr CpuTransferManager::TransferTupleBuffersFromOutfeed( + perftools::gputools::StreamExecutor* executor, + tensorflow::gtl::ArraySlice> buffer_data) { + return TransferBuffersFromOutfeedInternal(executor, buffer_data, + /*is_tuple=*/true); +} + +StatusOr CpuTransferManager::TransferArrayBufferFromOutfeed( + perftools::gputools::StreamExecutor* executor, void* destination, + int64 size_bytes) { + return TransferBuffersFromOutfeedInternal( + executor, {{destination, size_bytes}}, /*is_tuple=*/false); +} + +StatusOr CpuTransferManager::TransferBuffersFromOutfeedInternal( + perftools::gputools::StreamExecutor* executor, + tensorflow::gtl::ArraySlice> buffer_data, + bool is_tuple) { + std::vector> buffers; + for (auto b : buffer_data) { + int64 size = b.second; + if (size > std::numeric_limits::max()) { + return InvalidArgument("Outfeed shape is too large: needs %lld bytes", + size); + } + + if (size <= 0) { + return InvalidArgument("Outfeed shape must have positive size; got %lld", + size); + } + + int32 size_32 = static_cast(size); + VLOG(2) + << "Enqueueing outfeed buffer (for the device to populate) of length " + << size_32 << "B"; + buffers.emplace_back(MakeUnique(b.first, size_32)); } - if (size <= 0) { - return InvalidArgument("Outfeed shape must have positive size; got %lld", - size); + std::vector buffer_pointers; + buffer_pointers.reserve(buffers.size()); + for (auto& b : buffers) { + buffer_pointers.push_back(b.get()); } - int32 size_32 = static_cast(size); cpu::runtime::XfeedManager* xfeed_manager = cpu::runtime::GetXfeedManager(); - CpuOutfeedBuffer buffer(destination, size_32); - VLOG(2) << "Enqueueing outfeed buffer (for the device to populate) of length " - << size_32 << "B"; - xfeed_manager->outfeed()->EnqueueBuffers({&buffer}); + xfeed_manager->outfeed()->EnqueueBuffersAtomically(buffer_pointers); VLOG(2) << "Waiting for buffer to be notified as populated."; - return buffer.WaitForNotification(); + std::vector outfed_shapes; + for (auto& buffer : buffers) { + TF_ASSIGN_OR_RETURN(Shape outfed_shape, buffer->WaitForNotification()); + outfed_shapes.push_back(std::move(outfed_shape)); + } + if (is_tuple) { + return ShapeUtil::MakeTupleShape(outfed_shapes); + } + TF_RET_CHECK(outfed_shapes.size() == 1); + return std::move(outfed_shapes[0]); } } // namespace xla diff --git a/tensorflow/compiler/xla/service/cpu_transfer_manager.h b/tensorflow/compiler/xla/service/cpu_transfer_manager.h index 30dc2d90623..6c7524d9471 100644 --- a/tensorflow/compiler/xla/service/cpu_transfer_manager.h +++ b/tensorflow/compiler/xla/service/cpu_transfer_manager.h @@ -23,6 +23,7 @@ limitations under the License. #include "tensorflow/compiler/xla/service/transfer_manager.h" #include "tensorflow/compiler/xla/statusor.h" #include "tensorflow/compiler/xla/xla_data.pb.h" +#include "tensorflow/core/lib/gtl/array_slice.h" #include "tensorflow/core/platform/macros.h" #include "tensorflow/core/platform/stream_executor_no_cuda.h" #include "tensorflow/core/platform/types.h" @@ -51,10 +52,23 @@ class CpuTransferManager : public GenericTransferManager { perftools::gputools::StreamExecutor* executor, int64 size, const void* source); - // On success, returns the shape that was transferred from the outfeed. - StatusOr TransferBufferFromOutfeed( - perftools::gputools::StreamExecutor* executor, int64 size, - void* destination); + // Helper that transfers a tuple of element buffers from the device's outfeed. + StatusOr TransferTupleBuffersFromOutfeed( + perftools::gputools::StreamExecutor* executor, + tensorflow::gtl::ArraySlice> buffer_data); + + // Helper that transfers an array buffer from the device's outfeed. + StatusOr TransferArrayBufferFromOutfeed( + perftools::gputools::StreamExecutor* executor, void* destination, + int64 size_bytes); + + // On success, returns the shape that was transferred from the outfeed -- if + // is_tuple is true, the returned shape will be a tuple of the returned shapes + // for the given buffers. + StatusOr TransferBuffersFromOutfeedInternal( + perftools::gputools::StreamExecutor* executor, + tensorflow::gtl::ArraySlice> buffer_data, + bool is_tuple); TF_DISALLOW_COPY_AND_ASSIGN(CpuTransferManager); }; diff --git a/tensorflow/compiler/xla/util.cc b/tensorflow/compiler/xla/util.cc index 1ecdb9852d8..0bae5e11c9b 100644 --- a/tensorflow/compiler/xla/util.cc +++ b/tensorflow/compiler/xla/util.cc @@ -109,6 +109,15 @@ Status FailedPrecondition(const char* format, ...) { return WithLogBacktrace(tensorflow::errors::FailedPrecondition(message)); } +Status Cancelled(const char* format, ...) { + string message; + va_list args; + va_start(args, format); + tensorflow::strings::Appendv(&message, format, args); + va_end(args); + return WithLogBacktrace(tensorflow::errors::Cancelled(message)); +} + Status ResourceExhausted(const char* format, ...) { string message; va_list args; diff --git a/tensorflow/compiler/xla/util.h b/tensorflow/compiler/xla/util.h index 00151e5da6b..48c5962127e 100644 --- a/tensorflow/compiler/xla/util.h +++ b/tensorflow/compiler/xla/util.h @@ -171,6 +171,7 @@ Status InvalidArgument(const char* format, ...) TF_PRINTF_ATTRIBUTE(1, 2); Status Unimplemented(const char* format, ...) TF_PRINTF_ATTRIBUTE(1, 2); Status InternalError(const char* format, ...) TF_PRINTF_ATTRIBUTE(1, 2); Status FailedPrecondition(const char* format, ...) TF_PRINTF_ATTRIBUTE(1, 2); +Status Cancelled(const char* format, ...) TF_PRINTF_ATTRIBUTE(1, 2); Status ResourceExhausted(const char* format, ...) TF_PRINTF_ATTRIBUTE(1, 2); Status NotFound(const char* format, ...) TF_PRINTF_ATTRIBUTE(1, 2); Status Unavailable(const char* format, ...) TF_PRINTF_ATTRIBUTE(1, 2);