mirror of
https://github.com/zebrajr/pytorch.git
synced 2025-12-06 12:20:52 +01:00
* [fix] Re-enable events in RNN ops We have earlier added event disabling in RNN ops as back then we didn't use events, with current use cases this is no longer true (https://fburl.com/8vd0lp8y) * use ops with cude impl * Revert D7729695: [caffe2][fix] Re-enable events in RNN ops This reverts commit 4b215c7496fb724656ff4c776933a15bdbbcde5e @bypass-lint An infra SEV is better than not reverting this diff. If you copy this password, see you in SEV Review! @cause_a_sev_many_files * [observer] Clean up observer_config.h #accept2ship * [1/n] Refactor dataio_test.py Replace code duplication with a common function * Add barrier net that runs before training nets Add a synchonize barrier net that is run before training nets. With this net, shards that are faster will wait for other shards before start training. This reduce chances of the faster shards timing out during GLOO AllReduce. Removed explicit data_parallel_model.py.synchronize call in holmes workflow. Similar change in speech/asr_training workflow will come in another diff. * Support the dnnlowp backend in caffe2_benchmark This is for SHARE operator latency evaluation * Migrate integral_image_op to main caffe2 migrate integral_image_op(GPU version) given by https://fburl.com/yvqezigi to caffe2/caffe2/operators and implement its CPU version. Write up a test using the hypothesis_test mechanism * [pos_disc, fbcode] Implement unjoined lr loss As explained in https://our.intern.facebook.com/intern/wiki/Model_Based_Calibration/, when the dataset is an joined data set, where labels might change later, we need to use unjoined logloss. The implementation is almost the same as in Sigrid (https://fburl.com/1trngsls), where loss = y (log(p) - log(1-p)) + (1-y)(log(1-p)) = xy - (1-y)x - (1-y)log(1+exp(-x)) For x < 0, to ensure stability and avoid overflow, we reformulate the above exp as loss = xy - (1-y)x - (1-y)x + (1-y)log(1+exp(x)) = xy + (1-y)log(1+exp(x)) Then the final expression becomes loss = xy + (y - 1) x (x >= 0) - (1 - y) log(1 + exp(x - 2 x (x >= 0))) where y is the true label, x is the dot product and p = logistic(x). This kind of implementation is align with the current implementation of the original cross entropy in https://phabricator.intern.facebook.com/diffusion/FBS/browse/master/fbcode/caffe2/caffe2/operators/cross_entropy_op.cc;0bae3b5d0f825897c5e0dd0ff10f489d7271bf25$7-13 * Keep the array to fix the conflict * [C2] Compute Adagrad effective LR The AdagradWithLR op outputs an extra blob which is contains the average effective learning rate across all weights in this blob. * Open-source extractMetaNetDef & runGlobalInitialization, add new Predictor constructor from db file, and add run_map_outputs 1. Open-source extractMetaNetDef and runGlobalInitialization, for use in 2. new Predictor constructor from db file. 3. Add new run function that returns outputs as TensorMap * Disable eigen cpu Disable eigen cpu in transpose and reduce * Introduce request_only/object_only property of ModelLayer by default this is False * A simple TC Caffe2 benchmark We can run tunner, get MappingOptions and then use them to compare against cuBLAS currently broken due to LLVM issues. How to run: hg checkout eec1ab31b59c03b8deded1c755a9abaf8c45be01 add D7401202 add D7434625 add D7506031 add D7540728 buck run @mode/dev-nosan tc/tc/benchmarks_python:caffe2_benchmark * Move Caffe2 feature_maps_ops to open source Need feature maps operators in open source project facebookresearch/BlueWhale * Manually fix the conflicts in channel shuffle op * Fix the inconsistency between different gh and fbcode * Skip Adagrad GPU Test (Because some gpu implementation is missing) * Fix another test to make sure it won't run on gpu when implementation is not available yet
391 lines
15 KiB
Python
391 lines
15 KiB
Python
from __future__ import absolute_import
|
|
from __future__ import division
|
|
from __future__ import print_function
|
|
from __future__ import unicode_literals
|
|
|
|
from caffe2.python.dataio import (
|
|
CompositeReader,
|
|
CompositeReaderBuilder,
|
|
Reader,
|
|
ReaderBuilder,
|
|
ReaderWithLimit,
|
|
ReaderWithTimeLimit,
|
|
)
|
|
from caffe2.python.dataset import Dataset
|
|
from caffe2.python.pipeline import pipe
|
|
from caffe2.python.schema import Struct, NewRecord, FeedRecord
|
|
from caffe2.python.session import LocalSession
|
|
from caffe2.python.task import TaskGroup, final_output, WorkspaceType
|
|
from caffe2.python.test_util import TestCase
|
|
from caffe2.python.cached_reader import CachedReader
|
|
from caffe2.python import core, workspace, schema
|
|
from caffe2.python.net_builder import ops
|
|
|
|
import numpy as np
|
|
import numpy.testing as npt
|
|
import os
|
|
import shutil
|
|
import unittest
|
|
import tempfile
|
|
import time
|
|
|
|
|
|
def make_source_dataset(ws, size=100, offset=0, name=None):
|
|
name = name or "src"
|
|
src_init = core.Net("{}_init".format(name))
|
|
with core.NameScope(name):
|
|
src_values = Struct(('label', np.array(range(offset, offset + size))))
|
|
src_blobs = NewRecord(src_init, src_values)
|
|
src_ds = Dataset(src_blobs, name=name)
|
|
FeedRecord(src_blobs, src_values, ws)
|
|
ws.run(src_init)
|
|
return src_ds
|
|
|
|
|
|
def make_destination_dataset(ws, schema, name=None):
|
|
name = name or 'dst'
|
|
dst_init = core.Net('{}_init'.format(name))
|
|
with core.NameScope(name):
|
|
dst_ds = Dataset(schema, name=name)
|
|
dst_ds.init_empty(dst_init)
|
|
ws.run(dst_init)
|
|
return dst_ds
|
|
|
|
|
|
def read_all_data(ws, reader, session):
|
|
dst_ds = make_destination_dataset(ws, reader.schema().clone_schema())
|
|
|
|
with TaskGroup(workspace_type=WorkspaceType.GLOBAL) as tg:
|
|
pipe(reader, dst_ds.writer(), num_runtime_threads=8)
|
|
session.run(tg)
|
|
|
|
return ws.blobs[str(dst_ds.content().label())].fetch()
|
|
|
|
|
|
class ReaderWithDelay(Reader):
|
|
"""Test reader class that inserts a delay between reading batches."""
|
|
def __init__(self, reader, delay):
|
|
Reader.__init__(self, schema=reader._schema)
|
|
self.reader = reader
|
|
self.delay = delay
|
|
|
|
def setup_ex(self, global_init_net, global_finish_net):
|
|
self.reader.setup_ex(global_init_net, global_finish_net)
|
|
|
|
def read_ex(self, local_init_net, local_finish_net):
|
|
read_net = core.Net('reader_body')
|
|
|
|
def sleep_op(*args, **argd):
|
|
time.sleep(self.delay)
|
|
|
|
read_net.Python(sleep_op)([], [])
|
|
return ([read_net], ) + self.reader.read(read_net)
|
|
|
|
|
|
class TestReaderBuilder(ReaderBuilder):
|
|
def __init__(self, name, size, offset):
|
|
self._schema = schema.Struct(
|
|
('label', schema.Scalar()),
|
|
)
|
|
self._name = name
|
|
self._size = size
|
|
self._offset = offset
|
|
self._src_ds = None
|
|
|
|
def schema(self):
|
|
return self._schema
|
|
|
|
def setup(self, ws):
|
|
self._src_ds = make_source_dataset(ws, offset=self._offset, size=self._size,
|
|
name=self._name)
|
|
|
|
def new_reader(self, **kwargs):
|
|
return self._src_ds
|
|
|
|
|
|
class TestCompositeReader(TestCase):
|
|
@unittest.skipIf(os.environ.get('JENKINS_URL'), 'Flaky test on Jenkins')
|
|
def test_composite_reader(self):
|
|
ws = workspace.C.Workspace()
|
|
session = LocalSession(ws)
|
|
num_srcs = 3
|
|
names = ["src_{}".format(i) for i in range(num_srcs)]
|
|
size = 100
|
|
offsets = [i * size for i in range(num_srcs)]
|
|
src_dses = [make_source_dataset(ws, offset=offset, size=size, name=name)
|
|
for (name, offset) in zip(names, offsets)]
|
|
|
|
data = [ws.fetch_blob(str(src.field_blobs[0])) for src in src_dses]
|
|
# Sanity check we didn't overwrite anything
|
|
for d, offset in zip(data, offsets):
|
|
npt.assert_array_equal(d, range(offset, offset + size))
|
|
|
|
# Make an identically-sized empty destnation dataset
|
|
dst_ds_schema = schema.Struct(
|
|
*[
|
|
(name, src_ds.content().clone_schema())
|
|
for name, src_ds in zip(names, src_dses)
|
|
]
|
|
)
|
|
dst_ds = make_destination_dataset(ws, dst_ds_schema)
|
|
|
|
with TaskGroup() as tg:
|
|
reader = CompositeReader(names,
|
|
[src_ds.reader() for src_ds in src_dses])
|
|
pipe(reader, dst_ds.writer(), num_runtime_threads=3)
|
|
session.run(tg)
|
|
|
|
for i in range(num_srcs):
|
|
written_data = sorted(
|
|
ws.fetch_blob(str(dst_ds.content()[names[i]].label())))
|
|
npt.assert_array_equal(data[i], written_data, "i: {}".format(i))
|
|
|
|
@unittest.skipIf(os.environ.get('JENKINS_URL'), 'Flaky test on Jenkins')
|
|
def test_composite_reader_builder(self):
|
|
ws = workspace.C.Workspace()
|
|
session = LocalSession(ws)
|
|
num_srcs = 3
|
|
names = ["src_{}".format(i) for i in range(num_srcs)]
|
|
size = 100
|
|
offsets = [i * size for i in range(num_srcs)]
|
|
src_ds_builders = [
|
|
TestReaderBuilder(offset=offset, size=size, name=name)
|
|
for (name, offset) in zip(names, offsets)
|
|
]
|
|
|
|
# Make an identically-sized empty destnation dataset
|
|
dst_ds_schema = schema.Struct(
|
|
*[
|
|
(name, src_ds_builder.schema())
|
|
for name, src_ds_builder in zip(names, src_ds_builders)
|
|
]
|
|
)
|
|
dst_ds = make_destination_dataset(ws, dst_ds_schema)
|
|
|
|
with TaskGroup() as tg:
|
|
reader_builder = CompositeReaderBuilder(
|
|
names, src_ds_builders)
|
|
reader_builder.setup(ws=ws)
|
|
pipe(reader_builder.new_reader(), dst_ds.writer(),
|
|
num_runtime_threads=3)
|
|
session.run(tg)
|
|
|
|
for name, offset in zip(names, offsets):
|
|
written_data = sorted(
|
|
ws.fetch_blob(str(dst_ds.content()[name].label())))
|
|
npt.assert_array_equal(range(offset, offset + size), written_data,
|
|
"name: {}".format(name))
|
|
|
|
|
|
class TestReaderWithLimit(TestCase):
|
|
def test_runtime_threads(self):
|
|
ws = workspace.C.Workspace()
|
|
session = LocalSession(ws)
|
|
src_ds = make_source_dataset(ws)
|
|
totals = [None] * 3
|
|
|
|
def proc(rec):
|
|
# executed once
|
|
with ops.task_init():
|
|
counter1 = ops.CreateCounter([], ['global_counter'])
|
|
counter2 = ops.CreateCounter([], ['global_counter2'])
|
|
counter3 = ops.CreateCounter([], ['global_counter3'])
|
|
# executed once per thread
|
|
with ops.task_instance_init():
|
|
task_counter = ops.CreateCounter([], ['task_counter'])
|
|
# executed on each iteration
|
|
ops.CountUp(counter1)
|
|
ops.CountUp(task_counter)
|
|
# executed once per thread
|
|
with ops.task_instance_exit():
|
|
with ops.loop(ops.RetrieveCount(task_counter)):
|
|
ops.CountUp(counter2)
|
|
ops.CountUp(counter3)
|
|
# executed once
|
|
with ops.task_exit():
|
|
totals[0] = final_output(ops.RetrieveCount(counter1))
|
|
totals[1] = final_output(ops.RetrieveCount(counter2))
|
|
totals[2] = final_output(ops.RetrieveCount(counter3))
|
|
return rec
|
|
|
|
# Read full data set from original reader
|
|
with TaskGroup() as tg:
|
|
pipe(src_ds.reader(), num_runtime_threads=8, processor=proc)
|
|
session.run(tg)
|
|
self.assertEqual(totals[0].fetch(), 100)
|
|
self.assertEqual(totals[1].fetch(), 100)
|
|
self.assertEqual(totals[2].fetch(), 8)
|
|
|
|
# Read with a count-limited reader
|
|
with TaskGroup() as tg:
|
|
q1 = pipe(src_ds.reader(), num_runtime_threads=2)
|
|
q2 = pipe(
|
|
ReaderWithLimit(q1.reader(), num_iter=25),
|
|
num_runtime_threads=3)
|
|
pipe(q2, processor=proc, num_runtime_threads=6)
|
|
session.run(tg)
|
|
self.assertEqual(totals[0].fetch(), 25)
|
|
self.assertEqual(totals[1].fetch(), 25)
|
|
self.assertEqual(totals[2].fetch(), 6)
|
|
|
|
def _test_limit_reader_init_shared(self, size):
|
|
ws = workspace.C.Workspace()
|
|
session = LocalSession(ws)
|
|
|
|
# Make source dataset
|
|
src_ds = make_source_dataset(ws, size=size)
|
|
|
|
# Make an identically-sized empty destination Dataset
|
|
dst_ds = make_destination_dataset(ws, src_ds.content().clone_schema())
|
|
|
|
return ws, session, src_ds, dst_ds
|
|
|
|
def _test_limit_reader_shared(self, reader_class, size, expected_read_len,
|
|
expected_finish, num_threads, read_delay,
|
|
**limiter_args):
|
|
ws, session, src_ds, dst_ds = \
|
|
self._test_limit_reader_init_shared(size)
|
|
|
|
# Read without limiter
|
|
# WorkspaceType.GLOBAL is required because we are fetching
|
|
# reader.data_finished() after the TaskGroup finishes.
|
|
with TaskGroup(workspace_type=WorkspaceType.GLOBAL) as tg:
|
|
if read_delay > 0:
|
|
reader = reader_class(ReaderWithDelay(src_ds.reader(),
|
|
read_delay),
|
|
**limiter_args)
|
|
else:
|
|
reader = reader_class(src_ds.reader(), **limiter_args)
|
|
pipe(reader, dst_ds.writer(), num_runtime_threads=num_threads)
|
|
session.run(tg)
|
|
read_len = len(sorted(ws.blobs[str(dst_ds.content().label())].fetch()))
|
|
self.assertEqual(read_len, expected_read_len)
|
|
self.assertEqual(
|
|
sorted(ws.blobs[str(dst_ds.content().label())].fetch()),
|
|
list(range(expected_read_len))
|
|
)
|
|
self.assertEqual(ws.blobs[str(reader.data_finished())].fetch(),
|
|
expected_finish)
|
|
|
|
def test_count_limit_reader_without_limit(self):
|
|
# No iter count specified, should read all records.
|
|
self._test_limit_reader_shared(ReaderWithLimit,
|
|
size=100,
|
|
expected_read_len=100,
|
|
expected_finish=True,
|
|
num_threads=8,
|
|
read_delay=0,
|
|
num_iter=None)
|
|
|
|
def test_count_limit_reader_with_zero_limit(self):
|
|
# Zero iter count specified, should read 0 records.
|
|
self._test_limit_reader_shared(ReaderWithLimit,
|
|
size=100,
|
|
expected_read_len=0,
|
|
expected_finish=False,
|
|
num_threads=8,
|
|
read_delay=0,
|
|
num_iter=0)
|
|
|
|
def test_count_limit_reader_with_low_limit(self):
|
|
# Read with limit smaller than size of dataset
|
|
self._test_limit_reader_shared(ReaderWithLimit,
|
|
size=100,
|
|
expected_read_len=10,
|
|
expected_finish=False,
|
|
num_threads=8,
|
|
read_delay=0,
|
|
num_iter=10)
|
|
|
|
def test_count_limit_reader_with_high_limit(self):
|
|
# Read with limit larger than size of dataset
|
|
self._test_limit_reader_shared(ReaderWithLimit,
|
|
size=100,
|
|
expected_read_len=100,
|
|
expected_finish=True,
|
|
num_threads=8,
|
|
read_delay=0,
|
|
num_iter=110)
|
|
|
|
def test_time_limit_reader_without_limit(self):
|
|
# No duration specified, should read all records.
|
|
self._test_limit_reader_shared(ReaderWithTimeLimit,
|
|
size=100,
|
|
expected_read_len=100,
|
|
expected_finish=True,
|
|
num_threads=8,
|
|
read_delay=0.1,
|
|
duration=0)
|
|
|
|
def test_time_limit_reader_with_short_limit(self):
|
|
# Read with insufficient time limit
|
|
size = 50
|
|
num_threads = 4
|
|
sleep_duration = 0.25
|
|
duration = 1
|
|
expected_read_len = int(round(num_threads * duration / sleep_duration))
|
|
# Because the time limit check happens before the delay + read op,
|
|
# subtract a little bit of time to ensure we don't get in an extra read
|
|
duration = duration - 0.25 * sleep_duration
|
|
self._test_limit_reader_shared(ReaderWithTimeLimit,
|
|
size=size,
|
|
expected_read_len=expected_read_len,
|
|
expected_finish=False,
|
|
num_threads=num_threads,
|
|
read_delay=sleep_duration,
|
|
duration=duration)
|
|
|
|
def test_time_limit_reader_with_long_limit(self):
|
|
# Read with ample time limit
|
|
self._test_limit_reader_shared(ReaderWithTimeLimit,
|
|
size=50,
|
|
expected_read_len=50,
|
|
expected_finish=True,
|
|
num_threads=4,
|
|
read_delay=0.25,
|
|
duration=6)
|
|
|
|
def test_cached_reader(self):
|
|
ws = workspace.C.Workspace()
|
|
session = LocalSession(ws)
|
|
|
|
def build_source_reader(size):
|
|
src_ds = make_source_dataset(ws, size)
|
|
return src_ds.reader()
|
|
|
|
# Make a temp file path as cache_path
|
|
with tempfile.NamedTemporaryFile(delete=False) as f:
|
|
cache_path = f.name
|
|
f.close()
|
|
os.remove(cache_path)
|
|
|
|
# Read data for the first time.
|
|
cached_reader1 = CachedReader(build_source_reader(100))
|
|
init_step = cached_reader1.build_cache(cache_path)
|
|
session.run(init_step)
|
|
|
|
data = read_all_data(ws, cached_reader1, session)
|
|
self.assertEqual(sorted(data), list(range(100)))
|
|
|
|
# Read data from cache.
|
|
workspace.ResetWorkspace()
|
|
cached_reader2 = CachedReader(build_source_reader(200))
|
|
init_step = cached_reader2.build_cache(cache_path)
|
|
session.run(init_step)
|
|
|
|
data = read_all_data(ws, cached_reader2, session)
|
|
self.assertEqual(sorted(data), list(range(100)))
|
|
|
|
shutil.rmtree(cache_path)
|
|
|
|
# We removed cache so we expect to receive data from original reader
|
|
workspace.ResetWorkspace()
|
|
cached_reader3 = CachedReader(build_source_reader(300))
|
|
init_step = cached_reader3.build_cache(cache_path)
|
|
session.run(init_step)
|
|
|
|
data = read_all_data(ws, cached_reader3, session)
|
|
self.assertEqual(sorted(data), list(range(300)))
|
|
|
|
shutil.rmtree(cache_path)
|