Use c10 threadpool for GPU to CPU distributed autograd continuations. (#42511)

Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/42511

DistEngine currently only has a single thread to execute GPU to CPU
continuations as part of the backward pass. This would be a significant
performance bottleneck in cases where we have such continuations and would like
to execute these using all CPU cores.

To alleviate this in this PR, we have the single thread in DistEngine only
dequeue work from the global queue, but then hand off execution of that work to
the c10 threadpool where we call "execute_graph_task_until_ready_queue_empty".

For more context please see:
https://github.com/pytorch/pytorch/issues/40255#issuecomment-663298062.
ghstack-source-id: 109997718

Test Plan: waitforbuildbot

Reviewed By: albanD

Differential Revision: D22917579

fbshipit-source-id: c634b6c97f3051f071fd7b994333e6ecb8c54155
This commit is contained in:
Pritam Damania 2020-08-17 15:00:32 -07:00 committed by Facebook GitHub Bot
parent 825ec18eed
commit 133e9f96e1
3 changed files with 59 additions and 23 deletions

View File

@ -246,8 +246,8 @@ struct ReadyQueue {
public:
// incrementOutstandingTasks indicates whether or not we should increment
// 'outstanding_tasks_' for the associated GraphTask. This should mostly
// always be true, see the doc for 'enqueue_blocked_task_on_cpu' for when we
// might set this to false.
// always be true and is only set false in certain cases (see docs for
// DistEngine.execute_graph_task_until_ready_queue_empty)
void push(NodeTask item, bool incrementOutstandingTasks = true);
void pushShutdownTask();
NodeTask pop();

View File

@ -73,19 +73,50 @@ class DistAccumulateGradCaptureHook
ContextPtr autogradContext_;
};
void DistEngine::globalCpuThread(
const std::shared_ptr<ReadyQueue>& ready_queue) {
while (true) {
NodeTask task = ready_queue->pop();
if (task.isShutdownTask_) {
// Need to shutdown this thread.
C10_LOG_API_USAGE_ONCE("torch.autograd.thread_shutdown");
break;
}
auto graphTask = task.base_.lock();
if (graphTask == nullptr) {
// GraphTask has expired, ignore and continue processing.
continue;
}
// Launch the execution on a JIT thread.
at::launch([this,
graphTask,
graphRoot = task.fn_,
variables =
InputBuffer::variables(std::move(task.inputs_))]() mutable {
InputBuffer inputs(variables.size());
for (size_t i = 0; i < variables.size(); i++) {
inputs.add(i, std::move(variables[i]), c10::nullopt, c10::nullopt);
}
execute_graph_task_until_ready_queue_empty(
/*node_task*/ NodeTask(graphTask, graphRoot, std::move(inputs)),
/*incrementOutstandingTasks*/ false);
});
}
}
DistEngine::DistEngine()
: initializedContextIds_(),
engine_(Engine::get_default_engine()),
global_cpu_ready_queue_(std::make_shared<ReadyQueue>()),
global_cpu_thread_(
&Engine::thread_init,
&engine_,
torch::autograd::CPU_DEVICE,
global_cpu_ready_queue_,
/* increment */ false) /* We track the thread in DistEngine */ {
&DistEngine::globalCpuThread,
this,
global_cpu_ready_queue_) {
// Note [GPU to CPU continuations]
// ~~~~~~~~~~~~~~~~~~~~~~~~~~
// HACK: Initialize a single CPU thread to execute continuations from GPU
// Initialize a single CPU thread to execute continuations from GPU
// tasks. The multithreaded structure for the distributed engine works
// well only for CPU tasks. If we have an order of tasks like
// CPU->GPU->CPU, distributed autograd has no thread to execute the last
@ -93,10 +124,10 @@ DistEngine::DistEngine()
// such situations and it will be responsible for executing these CPU
// tasks. The CPU thread has its own ready_queue which is used as the
// cpu_ready_queue for all GraphTasks for DistEngine. This ensures all GPU
// to CPU continuations are enqueued on this thread. This is a short term
// fix for PyTorch 1.6 and we plan to work towards a better design as we
// add full GPU support for the RPC framework.
// See https://github.com/pytorch/pytorch/issues/40255 for more details.
// to CPU continuations are enqueued on this thread. The global CPU thread
// simply dequeues tasks from the global queue and calls
// "execute_graph_task_until_ready_queue_empty" on a JIT thread to execute the
// appropriate task.
global_cpu_thread_.detach();
}
@ -280,17 +311,21 @@ void DistEngine::computeDependencies(
}
void DistEngine::execute_graph_task_until_ready_queue_empty(
const std::shared_ptr<GraphTask>& graph_task,
std::shared_ptr<Node> root_to_execute,
NodeTask&& node_task,
bool incrementOutstandingTasks) {
engine_.initialize_device_threads_pool();
// Create a ready queue per call to traverse the graph_task from
// root_to_execute This allow concurrent execution of the same GraphTask from
// different threads
std::shared_ptr<ReadyQueue> cpu_ready_queue = std::make_shared<ReadyQueue>();
cpu_ready_queue->push(
NodeTask(graph_task, std::move(root_to_execute), InputBuffer(0)),
incrementOutstandingTasks);
auto graph_task = node_task.base_.lock();
if (graph_task == nullptr) {
LOG(ERROR) << "GraphTask has expired for NodeTask: "
<< node_task.fn_->name() << ", skipping execution.";
return;
}
cpu_ready_queue->push(std::move(node_task), incrementOutstandingTasks);
torch::autograd::set_device(torch::autograd::CPU_DEVICE);
graph_task->owner_ = torch::autograd::CPU_DEVICE;
@ -344,8 +379,7 @@ std::shared_ptr<rpc::FutureMessage> DistEngine::runEngineAndAccumulateGradients(
auto graphTask = autogradContext->retrieveGraphTask();
at::launch([this, graphTask, graphRoot, incrementOutstandingTasks]() {
execute_graph_task_until_ready_queue_empty(
/*graph_task*/ graphTask,
/*root_to_execute*/ graphRoot,
/*node_task*/ NodeTask(graphTask, graphRoot, InputBuffer(0)),
/*incrementOutstandingTasks*/ incrementOutstandingTasks);
});
// Use a reference here to avoid refcount bump on futureGrads.
@ -460,8 +494,7 @@ std::shared_ptr<rpc::FutureMessage> DistEngine::executeSendFunctionAsync(
auto graphTask = autogradContext->retrieveGraphTask();
at::launch([this, graphTask, sendFunction]() {
execute_graph_task_until_ready_queue_empty(
/*graph_task*/ graphTask,
/*root_to_execute*/ sendFunction,
/*node_task*/ NodeTask(graphTask, sendFunction, InputBuffer(0)),
/*incrementOutstandingTasks*/ false);
});
return std::make_shared<rpc::FutureMessage>(rpc::Message());

View File

@ -119,8 +119,7 @@ class TORCH_API DistEngine {
// 2. properly setup the thread local ready queue to enable reentrant
// backwards
void execute_graph_task_until_ready_queue_empty(
const std::shared_ptr<torch::autograd::GraphTask>& graph_task,
std::shared_ptr<torch::autograd::Node> root_to_execute,
torch::autograd::NodeTask&& node_task,
bool incrementOutstandingTasks = true);
// Run the local autograd engine using the provided graphTask and graphRoot
@ -135,6 +134,10 @@ class TORCH_API DistEngine {
// Run after the backward pass is done to appropriately cleanup structures.
void cleanupBackwardPass(const ContextPtr& autogradContext);
// Global thread to execute CPU continuations.
void globalCpuThread(
const std::shared_ptr<torch::autograd::ReadyQueue>& ready_queue);
// Set of autograd context_ids, which we have already initialized for
// distributed autograd on this node (e.g.: already computed dependencies)
std::unordered_set<int64_t> initializedContextIds_;