In shared batch scheduler, rename 'max_batch_size' to 'input_batch_size_limit'.

PiperOrigin-RevId: 320725389
Change-Id: I10979f6e225498b52ec3c20b6d5e91b3322b442d
This commit is contained in:
Mingming Liu 2020-07-10 21:28:52 -07:00 committed by TensorFlower Gardener
parent b597319553
commit 43b19c184e
5 changed files with 34 additions and 41 deletions

View File

@ -11,10 +11,6 @@
* C-API functions `TF_StringDecode`, `TF_StringEncode`, and * C-API functions `TF_StringDecode`, `TF_StringEncode`, and
`TF_StringEncodedSize` are no longer relevant and have been removed; see `TF_StringEncodedSize` are no longer relevant and have been removed; see
core/platform/ctstring.h for string access/modification in C. core/platform/ctstring.h for string access/modification in C.
* In batching library, rename parameter
SharedBatchScheduler::QueueOptions::max_batch_size to a more accurate name
(input_batch_size_limit) for a recent feature to enable split of large batch
sizes.
## Known Caveats ## Known Caveats

View File

@ -368,8 +368,7 @@ class BatchResource : public ResourceBase {
TF_RETURN_IF_ERROR( TF_RETURN_IF_ERROR(
Batcher::Create(batcher_options, &new_resource->batcher_)); Batcher::Create(batcher_options, &new_resource->batcher_));
new_resource->batcher_queue_options_.input_batch_size_limit = new_resource->batcher_queue_options_.max_batch_size = max_batch_size;
max_batch_size;
new_resource->batcher_queue_options_.max_enqueued_batches = new_resource->batcher_queue_options_.max_enqueued_batches =
max_enqueued_batches; max_enqueued_batches;
new_resource->batcher_queue_options_.batch_timeout_micros = new_resource->batcher_queue_options_.batch_timeout_micros =

View File

@ -226,8 +226,7 @@ Status BasicBatchScheduler<TaskType>::Create(
typename SharedBatchScheduler<TaskType>::QueueOptions typename SharedBatchScheduler<TaskType>::QueueOptions
shared_scheduler_queue_options; shared_scheduler_queue_options;
shared_scheduler_queue_options.input_batch_size_limit = shared_scheduler_queue_options.max_batch_size = options.max_batch_size;
options.max_batch_size;
shared_scheduler_queue_options.batch_timeout_micros = shared_scheduler_queue_options.batch_timeout_micros =
options.batch_timeout_micros; options.batch_timeout_micros;
shared_scheduler_queue_options.max_enqueued_batches = shared_scheduler_queue_options.max_enqueued_batches =

View File

@ -136,15 +136,17 @@ class SharedBatchScheduler
struct QueueOptions { struct QueueOptions {
// The size limit of an input batch to the queue. // The size limit of an input batch to the queue.
// //
// If `enable_large_batch_splitting` is True, 'input_batch_size_limit' // If `enable_large_batch_splitting` is True, 'max_batch_size' should be
// should be greater or equal than `max_execution_batch_size`; otherwise // greater or equal than `max_execution_batch_size`; otherwise
// `input_batch_size_limit` should be equal to `max_execution_batch_size`. // `max_batch_size` should be equal to `max_execution_batch_size`.
size_t input_batch_size_limit = 1000; // TODO(b/154140947):
// Rename it to 'input_batch_size_limit' here and in caller's code.
size_t max_batch_size = 1000;
// If a task has been enqueued for this amount of time (in microseconds), // If a task has been enqueued for this amount of time (in microseconds),
// and a thread is available, the scheduler will immediately form a batch // and a thread is available, the scheduler will immediately form a batch
// from enqueued tasks and assign the batch to the thread for processing, // from enqueued tasks and assign the batch to the thread for processing,
// even if the batch's size is below 'input_batch_size_limit'. // even if the batch's size is below 'max_batch_size'.
// //
// This parameter offers a way to bound queue latency, so that a task isn't // This parameter offers a way to bound queue latency, so that a task isn't
// stuck in the queue indefinitely waiting for enough tasks to arrive to // stuck in the queue indefinitely waiting for enough tasks to arrive to
@ -171,7 +173,7 @@ class SharedBatchScheduler
// `input_task`: a unit of task to be splitted (raw pointer not owned). // `input_task`: a unit of task to be splitted (raw pointer not owned).
// `first_output_task_size`: task size of first output. // `first_output_task_size`: task size of first output.
// `max_execution_batch_size`: Maximum size of each batch. // `max_batch_size`: Maximum size of each batch.
// `output_tasks`: A list of output tasks after split. // `output_tasks`: A list of output tasks after split.
// //
// REQUIRED: // REQUIRED:
@ -182,7 +184,7 @@ class SharedBatchScheduler
// Instantiations of `TaskType` may vary, so it's up to caller to define // Instantiations of `TaskType` may vary, so it's up to caller to define
// how (e.g., which members to access) to split input tasks. // how (e.g., which members to access) to split input tasks.
std::function<Status(std::unique_ptr<TaskType>* input_task, std::function<Status(std::unique_ptr<TaskType>* input_task,
int first_output_task_size, int input_batch_size_limit, int first_output_task_size, int max_batch_size,
std::vector<std::unique_ptr<TaskType>>* output_tasks)> std::vector<std::unique_ptr<TaskType>>* output_tasks)>
split_input_task_func; split_input_task_func;
@ -267,7 +269,7 @@ class Queue {
using SchedulableBatchCallback = std::function<void()>; using SchedulableBatchCallback = std::function<void()>;
using SplitInputTaskIntoSubtasksCallback = std::function<Status( using SplitInputTaskIntoSubtasksCallback = std::function<Status(
std::unique_ptr<TaskType>* input_task, int open_batch_remaining_slot, std::unique_ptr<TaskType>* input_task, int open_batch_remaining_slot,
int max_execution_batch_size, int max_batch_size,
std::vector<std::unique_ptr<TaskType>>* output_tasks)>; std::vector<std::unique_ptr<TaskType>>* output_tasks)>;
Queue(const typename SharedBatchScheduler<TaskType>::QueueOptions& options, Queue(const typename SharedBatchScheduler<TaskType>::QueueOptions& options,
Env* env, ProcessBatchCallback process_batch_callback, Env* env, ProcessBatchCallback process_batch_callback,
@ -295,7 +297,7 @@ class Queue {
size_t SchedulingCapacity() const; size_t SchedulingCapacity() const;
// Returns the maximum allowed size of tasks submitted to the queue. // Returns the maximum allowed size of tasks submitted to the queue.
size_t max_task_size() const { return options_.input_batch_size_limit; } size_t max_task_size() const { return options_.max_batch_size; }
// Returns the maximum allowed size of tasks to be enqueued. // Returns the maximum allowed size of tasks to be enqueued.
// Returned value would be less than or equal to the maximum allowed input // Returned value would be less than or equal to the maximum allowed input
@ -304,7 +306,7 @@ class Queue {
if (options_.enable_large_batch_splitting) { if (options_.enable_large_batch_splitting) {
return options_.max_execution_batch_size; return options_.max_execution_batch_size;
} else { } else {
return options_.input_batch_size_limit; return options_.max_batch_size;
} }
} }
@ -457,10 +459,9 @@ Status SharedBatchScheduler<TaskType>::AddQueue(
std::function<void(std::unique_ptr<Batch<TaskType>>)> std::function<void(std::unique_ptr<Batch<TaskType>>)>
process_batch_callback, process_batch_callback,
std::unique_ptr<BatchScheduler<TaskType>>* queue) { std::unique_ptr<BatchScheduler<TaskType>>* queue) {
if (options.input_batch_size_limit == 0) { if (options.max_batch_size == 0) {
return errors::InvalidArgument( return errors::InvalidArgument("max_batch_size must be positive; was ",
"input_batch_size_limit must be positive; was ", options.max_batch_size);
options.input_batch_size_limit);
} }
if (options.batch_timeout_micros < 0) { if (options.batch_timeout_micros < 0) {
return errors::InvalidArgument( return errors::InvalidArgument(
@ -482,12 +483,11 @@ Status SharedBatchScheduler<TaskType>::AddQueue(
} }
if (options.enable_large_batch_splitting && if (options.enable_large_batch_splitting &&
(options.input_batch_size_limit < options.max_execution_batch_size)) { (options.max_batch_size < options.max_execution_batch_size)) {
return errors::InvalidArgument( return errors::InvalidArgument(
"When enable_large_batch_splitting is true, input_batch_size_limit " "When enable_large_batch_splitting is true, max_batch_size must be "
"must be "
"greater than or equal to max_execution_batch_size.", "greater than or equal to max_execution_batch_size.",
options.enable_large_batch_splitting, options.input_batch_size_limit, options.enable_large_batch_splitting, options.max_batch_size,
options.max_execution_batch_size); options.max_execution_batch_size);
} }
@ -616,10 +616,10 @@ Status Queue<TaskType>::Schedule(std::unique_ptr<TaskType>* task) {
template <typename TaskType> template <typename TaskType>
Status Queue<TaskType>::ScheduleWithoutSplit(std::unique_ptr<TaskType>* task) { Status Queue<TaskType>::ScheduleWithoutSplit(std::unique_ptr<TaskType>* task) {
if ((*task)->size() > options_.input_batch_size_limit) { if ((*task)->size() > options_.max_batch_size) {
return errors::InvalidArgument("Task size ", (*task)->size(), return errors::InvalidArgument("Task size ", (*task)->size(),
" is larger than maximum input batch size ", " is larger than maximum input batch size ",
options_.input_batch_size_limit); options_.max_batch_size);
} }
bool notify_of_schedulable_batch = false; bool notify_of_schedulable_batch = false;
@ -628,8 +628,7 @@ Status Queue<TaskType>::ScheduleWithoutSplit(std::unique_ptr<TaskType>* task) {
DCHECK(!closed_); DCHECK(!closed_);
if (batches_.back()->size() + (*task)->size() > if (batches_.back()->size() + (*task)->size() > options_.max_batch_size) {
options_.input_batch_size_limit) {
if (batches_.size() >= options_.max_enqueued_batches) { if (batches_.size() >= options_.max_enqueued_batches) {
return errors::Unavailable( return errors::Unavailable(
"The batch scheduling queue to which this task was submitted is " "The batch scheduling queue to which this task was submitted is "
@ -670,10 +669,10 @@ Status Queue<TaskType>::ScheduleWithSplit(std::unique_ptr<TaskType>* task) {
profiler::TraceMe trace_me([task] { profiler::TraceMe trace_me([task] {
return strings::StrCat("ScheduleWithSplit:", (*task)->size()); return strings::StrCat("ScheduleWithSplit:", (*task)->size());
}); });
if ((*task)->size() > options_.input_batch_size_limit) { if ((*task)->size() > options_.max_batch_size) {
return errors::InvalidArgument("Task size ", (*task)->size(), return errors::InvalidArgument("Task size ", (*task)->size(),
" is larger than maximum input batch size ", " is larger than maximum input batch size ",
options_.input_batch_size_limit); options_.max_batch_size);
} }
// The max size to be enqueued. // The max size to be enqueued.

View File

@ -97,7 +97,7 @@ TEST(SharedBatchSchedulerTest, Basic) {
// Create two queues. // Create two queues.
SharedBatchScheduler<FakeTask>::QueueOptions queue_options; SharedBatchScheduler<FakeTask>::QueueOptions queue_options;
queue_options.input_batch_size_limit = 10; queue_options.max_batch_size = 10;
queue_options.batch_timeout_micros = 10 * 1000 * 1000; // 10 seconds queue_options.batch_timeout_micros = 10 * 1000 * 1000; // 10 seconds
queue_options.max_enqueued_batches = 2; queue_options.max_enqueued_batches = 2;
std::unique_ptr<BatchScheduler<FakeTask>> queue_0; std::unique_ptr<BatchScheduler<FakeTask>> queue_0;
@ -155,7 +155,7 @@ TEST(SharedBatchSchedulerTest, ObeyBatchSizeConstraint) {
std::shared_ptr<SharedBatchScheduler<FakeTask>> scheduler; std::shared_ptr<SharedBatchScheduler<FakeTask>> scheduler;
TF_ASSERT_OK(SharedBatchScheduler<FakeTask>::Create(options, &scheduler)); TF_ASSERT_OK(SharedBatchScheduler<FakeTask>::Create(options, &scheduler));
SharedBatchScheduler<FakeTask>::QueueOptions queue_options; SharedBatchScheduler<FakeTask>::QueueOptions queue_options;
queue_options.input_batch_size_limit = 10; queue_options.max_batch_size = 10;
queue_options.batch_timeout_micros = 10 * 1000 * 1000; // 10 seconds queue_options.batch_timeout_micros = 10 * 1000 * 1000; // 10 seconds
queue_options.max_enqueued_batches = 2; queue_options.max_enqueued_batches = 2;
std::unique_ptr<BatchScheduler<FakeTask>> queue; std::unique_ptr<BatchScheduler<FakeTask>> queue;
@ -217,7 +217,7 @@ TEST(SharedBatchSchedulerTest, ObeysTimeout) {
std::shared_ptr<SharedBatchScheduler<FakeTask>> scheduler; std::shared_ptr<SharedBatchScheduler<FakeTask>> scheduler;
TF_ASSERT_OK(SharedBatchScheduler<FakeTask>::Create(options, &scheduler)); TF_ASSERT_OK(SharedBatchScheduler<FakeTask>::Create(options, &scheduler));
SharedBatchScheduler<FakeTask>::QueueOptions queue_options; SharedBatchScheduler<FakeTask>::QueueOptions queue_options;
queue_options.input_batch_size_limit = 4; queue_options.max_batch_size = 4;
queue_options.batch_timeout_micros = 10; queue_options.batch_timeout_micros = 10;
queue_options.max_enqueued_batches = 2; queue_options.max_enqueued_batches = 2;
std::unique_ptr<BatchScheduler<FakeTask>> queue; std::unique_ptr<BatchScheduler<FakeTask>> queue;
@ -273,7 +273,7 @@ TEST(SharedBatchSchedulerTest, ObeysTimeoutWithRealClock) {
std::shared_ptr<SharedBatchScheduler<FakeTask>> scheduler; std::shared_ptr<SharedBatchScheduler<FakeTask>> scheduler;
TF_ASSERT_OK(SharedBatchScheduler<FakeTask>::Create(options, &scheduler)); TF_ASSERT_OK(SharedBatchScheduler<FakeTask>::Create(options, &scheduler));
SharedBatchScheduler<FakeTask>::QueueOptions queue_options; SharedBatchScheduler<FakeTask>::QueueOptions queue_options;
queue_options.input_batch_size_limit = 10; queue_options.max_batch_size = 10;
queue_options.batch_timeout_micros = 100 * 1000; // 100 milliseconds queue_options.batch_timeout_micros = 100 * 1000; // 100 milliseconds
queue_options.max_enqueued_batches = 2; queue_options.max_enqueued_batches = 2;
std::unique_ptr<BatchScheduler<FakeTask>> queue; std::unique_ptr<BatchScheduler<FakeTask>> queue;
@ -318,7 +318,7 @@ TEST(SharedBatchSchedulerTest,
TF_ASSERT_OK(SharedBatchScheduler<FakeTask>::Create(options, &scheduler)); TF_ASSERT_OK(SharedBatchScheduler<FakeTask>::Create(options, &scheduler));
SharedBatchScheduler<FakeTask>::QueueOptions queue_options; SharedBatchScheduler<FakeTask>::QueueOptions queue_options;
// Set a large batch size, so that we don't hit the batch size limit. // Set a large batch size, so that we don't hit the batch size limit.
queue_options.input_batch_size_limit = 100; queue_options.max_batch_size = 100;
// Process a batch as soon as a thread is available. // Process a batch as soon as a thread is available.
queue_options.batch_timeout_micros = 0; queue_options.batch_timeout_micros = 0;
queue_options.max_enqueued_batches = 2; queue_options.max_enqueued_batches = 2;
@ -371,7 +371,7 @@ TEST(SharedBatchSchedulerTest, Fairness) {
std::shared_ptr<SharedBatchScheduler<FakeTask>> scheduler; std::shared_ptr<SharedBatchScheduler<FakeTask>> scheduler;
TF_ASSERT_OK(SharedBatchScheduler<FakeTask>::Create(options, &scheduler)); TF_ASSERT_OK(SharedBatchScheduler<FakeTask>::Create(options, &scheduler));
SharedBatchScheduler<FakeTask>::QueueOptions queue_options; SharedBatchScheduler<FakeTask>::QueueOptions queue_options;
queue_options.input_batch_size_limit = 10; queue_options.max_batch_size = 10;
queue_options.batch_timeout_micros = 1; queue_options.batch_timeout_micros = 1;
queue_options.max_enqueued_batches = 100 /* give plenty of room */; queue_options.max_enqueued_batches = 100 /* give plenty of room */;
std::vector<std::unique_ptr<BatchScheduler<FakeTask>>> queues(2); std::vector<std::unique_ptr<BatchScheduler<FakeTask>>> queues(2);
@ -423,7 +423,7 @@ TEST(SharedBatchSchedulerTest, ConstMethods) {
std::shared_ptr<SharedBatchScheduler<FakeTask>> scheduler; std::shared_ptr<SharedBatchScheduler<FakeTask>> scheduler;
TF_ASSERT_OK(SharedBatchScheduler<FakeTask>::Create(options, &scheduler)); TF_ASSERT_OK(SharedBatchScheduler<FakeTask>::Create(options, &scheduler));
SharedBatchScheduler<FakeTask>::QueueOptions queue_options; SharedBatchScheduler<FakeTask>::QueueOptions queue_options;
queue_options.input_batch_size_limit = 2; queue_options.max_batch_size = 2;
queue_options.batch_timeout_micros = 0; queue_options.batch_timeout_micros = 0;
queue_options.max_enqueued_batches = max_enqueued_batches; queue_options.max_enqueued_batches = max_enqueued_batches;
std::unique_ptr<BatchScheduler<FakeTask>> queue; std::unique_ptr<BatchScheduler<FakeTask>> queue;
@ -494,7 +494,7 @@ TEST(SharedBatchSchedulerTest, OneFullQueueDoesntBlockOtherQueues) {
std::shared_ptr<SharedBatchScheduler<FakeTask>> scheduler; std::shared_ptr<SharedBatchScheduler<FakeTask>> scheduler;
TF_ASSERT_OK(SharedBatchScheduler<FakeTask>::Create(options, &scheduler)); TF_ASSERT_OK(SharedBatchScheduler<FakeTask>::Create(options, &scheduler));
SharedBatchScheduler<FakeTask>::QueueOptions queue_options; SharedBatchScheduler<FakeTask>::QueueOptions queue_options;
queue_options.input_batch_size_limit = 10; queue_options.max_batch_size = 10;
queue_options.batch_timeout_micros = 0; queue_options.batch_timeout_micros = 0;
queue_options.max_enqueued_batches = 2; queue_options.max_enqueued_batches = 2;
std::unique_ptr<BatchScheduler<FakeTask>> queue_0; std::unique_ptr<BatchScheduler<FakeTask>> queue_0;
@ -550,7 +550,7 @@ TEST(SharedBatchSchedulerTest, QueueDestructorBlocksUntilAllTasksProcessed) {
std::shared_ptr<SharedBatchScheduler<FakeTask>> scheduler; std::shared_ptr<SharedBatchScheduler<FakeTask>> scheduler;
TF_ASSERT_OK(SharedBatchScheduler<FakeTask>::Create(options, &scheduler)); TF_ASSERT_OK(SharedBatchScheduler<FakeTask>::Create(options, &scheduler));
SharedBatchScheduler<FakeTask>::QueueOptions queue_options; SharedBatchScheduler<FakeTask>::QueueOptions queue_options;
queue_options.input_batch_size_limit = 10; queue_options.max_batch_size = 10;
queue_options.batch_timeout_micros = 0; queue_options.batch_timeout_micros = 0;
queue_options.max_enqueued_batches = 2; queue_options.max_enqueued_batches = 2;
std::unique_ptr<BatchScheduler<FakeTask>> queue; std::unique_ptr<BatchScheduler<FakeTask>> queue;