pytorch/caffe2/python/test/executor_test.py
Ilia Cherniavskii 1149b9bbb5 Polling async net executor
Summary:
Implementation of polling async net executor.
Notes:
- New net executor async_polling - schedules CPU and GPU ops asynchronously, uses single polling thread
- Events: update to Caffe2 events to support async CPU events, adding new methods:
 Query() - non-blocking checking of event states: INITIALIZED -> RECORDED -> SUCCESS/FAILED
 ErrorMessage() - when operation runs asynchronously and fails calling this on event will give error message
- Tasks: using existing DAGNet's algorithm to compute CPU and GPU chains, a separate task for each chain
- Polling: using single thread to query state of events - for CPU tasks atomically queries task state, for GPU task - uses cudaEventQuery; using Event
- Scheduling of CPU ops: using global thread pools
- Scheduling of GPU ops: using GPU thread pool per GPU device

Reviewed By: dzhulgakov

Differential Revision: D5985110

fbshipit-source-id: a9de7fcbb71d046a3aa1b573072b89a65dfeee8c
2017-11-03 07:27:44 -07:00

70 lines
2.1 KiB
Python

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from caffe2.python import workspace
from caffe2.python.test.executor_test_util import (
build_conv_model,
build_resnet50_dataparallel_model,
run_resnet50_epoch,
ExecutorTestBase,
executor_test_settings,
executor_test_model_names)
from hypothesis import given
import hypothesis.strategies as st
import unittest
EXECUTORS = ["async_polling", "dag", "async_dag"]
ITERATIONS = 2
class ExecutorCPUConvNetTest(ExecutorTestBase):
@given(executor=st.sampled_from(EXECUTORS),
model_name=st.sampled_from(executor_test_model_names()),
batch_size=st.sampled_from([8]),
num_workers=st.sampled_from([8]))
@executor_test_settings
def test_executor(self, executor, model_name, batch_size, num_workers):
model = build_conv_model(model_name, batch_size)
model.Proto().num_workers = num_workers
def run_model():
iterations = ITERATIONS
if model_name == "MLP":
iterations = 1 # avoid numeric instability with MLP gradients
workspace.RunNet(model.net, iterations)
self.compare_executors(
model,
ref_executor="simple",
test_executor=executor,
model_run_func=run_model,
)
@unittest.skipIf(not workspace.has_gpu_support, "no gpu")
class ExecutorGPUResNetTest(ExecutorTestBase):
@given(executor=st.sampled_from(EXECUTORS),
num_workers=st.sampled_from([8]))
@executor_test_settings
def test_executor(self, executor, num_workers):
model = build_resnet50_dataparallel_model(
num_gpus=workspace.NumCudaDevices(), batch_size=8, epoch_size=8)
model.Proto().num_workers = num_workers
def run_model():
run_resnet50_epoch(model, batch_size=8, epoch_size=8)
self.compare_executors(
model,
ref_executor="simple",
test_executor=executor,
model_run_func=run_model,
)
if __name__ == '__main__':
unittest.main()