pytorch/torch/csrc/jit/api/function_impl.cpp
Jeremy Lilley 8d64a3848c [jit] In RPC Server, handle TorchScript continuations asynchronously (#34109)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/34109

This change adds glue to GraphExecutor to give the RPC server
access to the future-based Interpreter::runAsync() api.

Previously, if a server encounted a TorchScript continuation-based block
with fork/wait, it would simply block in the server thread until the handler
completed, since it uses the synchronous Interpreter::run() api.

With the ivalue::Future returned by the Interpreter, we can run the
TorchScript code asynchronously from c++ simply by connecting its
callback to the server callback.

We add test cases to cover the new logic, both rpc_async and remote.

ghstack-source-id: 101245438

Test Plan: buck test mode/dev-nosan caffe2/test/distributed/rpc/...

Differential Revision: D20194321

fbshipit-source-id: 16785ec5d9ed0b16cb1ffab0a9771a77de30fcb0
2020-03-31 17:21:46 -07:00

76 lines
2.0 KiB
C++

#include <torch/csrc/jit/api/function_impl.h>
#include <torch/csrc/jit/passes/inliner.h>
#include <torch/csrc/jit/frontend/error_report.h>
namespace torch {
namespace jit {
namespace {
c10::FunctionSchema defaultSchemaFor(const Function& function) {
std::vector<c10::Argument> args;
std::vector<c10::Argument> returns;
Graph& g = *function.graph();
size_t num_inputs = function.num_inputs();
for (size_t i = 0; i < num_inputs; ++i) {
const Value* v = g.inputs().at(i);
std::string name = v->hasDebugName() ? v->debugNameBase()
: ("argument_" + c10::to_string(i));
args.emplace_back(std::move(name), unshapedType(g.inputs()[i]->type()));
}
for (size_t i = 0; i < g.outputs().size(); ++i) {
returns.emplace_back("", unshapedType(g.outputs()[i]->type()));
}
return {function.name(), "", std::move(args), std::move(returns)};
}
} // namespace
void placeholderCreator(GraphFunction&) {
throw RecursiveMethodCallError();
}
void GraphFunction::run(Stack& stack) {
get_executor().run(stack);
}
void GraphFunction::run(Stack&& stack) {
run(stack);
}
c10::intrusive_ptr<c10::ivalue::Future> GraphFunction::runAsync(Stack& stack) {
return get_executor().runAsync(stack);
}
IValue GraphFunction::operator()(
std::vector<IValue> stack,
const Kwargs& kwargs) {
getSchema().checkAndNormalizeInputs(stack, kwargs);
run(stack);
return stack.front();
}
void GraphFunction::ensure_defined() {
if (function_creator_) {
auto creator = function_creator_;
function_creator_ = placeholderCreator;
creator(*this);
function_creator_ = nullptr;
}
check_single_output();
}
const c10::FunctionSchema& GraphFunction::getSchema() const {
if (schema_ == nullptr) {
schema_ = std::make_unique<c10::FunctionSchema>(defaultSchemaFor(*this));
}
return *schema_;
}
void preoptimizeGraph(std::shared_ptr<Graph>& graph) {
// TODO: Invoke cleanup passes before and after inlining to reduce amount of
// code we're copying.
Inline(*graph);
}
} // namespace jit
} // namespace torch