pytorch/caffe2/python/dataio_test.py
Yinghai Lu ef8f556212
[Caffe2] Changes done inside Facebook (#6378)
* fix unit test for sqrt op

From the error logging:

[idx, grad, grad_estimate] are:
[[ 146.            0.5           0.45776367]
 [ 147.            0.5           0.45776367]

The gradient == 0.5 is correct, which means the SqrtOp and its gradient is doing right job. (Because y = sqrt(x), loss = y^2/2 = x/2, and then d(loss)/dx = 1/2 = 0.5; )

The test failed because of numerical problem of grad_estimate (in unit test). It can be because the step_size is small, and float precision is not high (when there are multiple elements in the tensor, we do sum(y^2) to compute loss)

This diff
- increase the step size, and also move the test cases to be further away from 0 (where sqrt(x) is not well defined) to be safe :)
- also clean up, and merge the test case for inplace Vs. non-inplace

Tested with:

`CAFFE2_HYPOTHESIS_PROFILE=debug ai_bt caffe2/caffe2/python/operator_test:elementwise_ops_test -- "test_sqrt"`

* CompositeReader & CompositeReaderBuilder

A new type of reader gluing multiple readers together.

* Back out "Revert D7394363: [GanH]: Log D Trick for Cross Entropy with Sigmoid"

Original commit changeset: 9325a4356dbe

* [dai][WIP] convert params to int8 on ps before sending to trainer

Add float->uint8 conversion in addition to float->fp16 conversion in model_saver.

* [easy] improve unit test for sparse length sum ops

as desc.

#accept2ship

* Update GitHub upstream to 771fcb3455

* move sparse hash unique ops to OOS and add unit tests

- move the SparseHash version to OOS, since 'sparsehash' is already deps of caffe2 OOS: https://fburl.com/arssw4n1
- The 'SparseHash' engine is also being used in OOS, so the SparseHash version shall be in OOS to reduce confusion: https://fburl.com/o5ea7ah2

- fix the CUDA UniqueOp for the case when batch is empty.
- add unit test

* group_norm_op for caffe2

This is the cuda op for Group Normalization (GN): https://arxiv.org/abs/1803.08494

This code implements GN in one op that computes Y=gamma * (X-mu) / sigma + beta and also its gradients. It is expected to have minimal memory consumption (similar to the BN op), without creating new blobs if GN were implemented as several ops (e.g., reshape, norm_mean/std, affine_channel).

* Resubmit D7405233: disappeared in D7464958

OOS publish causes the op missing -- however, test was still there

* [c2] add sparse hash engine for cuda unique op

The SparseHash version of UniqueOp copy input tensor to CPU, and make use of sparse hash map to get unique output, and then copy back to GPU.

* [dper][gpu] enable unit testing gpu trainer for sparse nn

to debug the GPU trainer using mock data in unit test.

make it easier to develop GPU trainer for new models.

* Reuse Gloo context for Synchronize() calls

Previously we were creating (and leaking) the Gloo context on each call to Synchronize(). Now only run the common world op and create the barrier net once, then run the barrier net on each Synchronize() call. Since timeout is associated with the Gloo context, assert that the timeout is fixed instead of trying to handle the complexity of multiple timeouts (and associated contexts).

* [GanH/WGAN][1/n]: add FC param clipping

as titled

* [mobile] minimizing changes between caffe2_benchmark and speed_benchmark

* [GanH]: enable diagnose within model

avoid finding blob names but to directly enable inside the model

* Add `net_transformer_fun` option to DPM

This callback allows for various transformations to be made to the
model after gradient operators have been added. The immediate motivation for
this is to allow transformations such has "checkpoint-and-recompute" which
allow trading off memory for additional compute.

Adding several callbacks like this has made DPM's API less than ideal at this
stage. However, I could not find any reasonable alternative.

* [DT] [33/n] Compile flow task groups

task groups need to compiled in order to pickle the object in fblearner. However I also changed the Job's compile function as creating new object is not necessary.

* Initial commit for sparse_normalize vectorization and benchmark

* [GanH]: LB Calibration for JSD

as titled

* Tracing event in async executor

Adding event tracing through TRACE_EVENT macro in async executor

* [Resubmit] D7409751 Reseting book-keeping blobs when the reservoir is reset

D7409751 got lost in D7464958

* Visualizing realtime weights values

we want to visualize the weights values as optimizer is iterating. This diff supports to visual the weights at an assigned index.
Currently, we assume the blob to be 2 dimensional.

* [GanH][Easy]: Fix Homotopy Weighting

apparantely, there was a bug in homotopy weight (alpha, beta) update

* [c2] move sparse hash unique op out of oss

so that oss do not need to depend on google hash map.

* Get rid of std::round as it's not supported on Android

* Revert changes on setup.py

* Skip shaky test on Dataio

* fix
2018-04-10 21:11:43 -07:00

389 lines
15 KiB
Python

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from caffe2.python.dataio import (
CompositeReader,
CompositeReaderBuilder,
Reader,
ReaderBuilder,
ReaderWithLimit,
ReaderWithTimeLimit,
)
from caffe2.python.dataset import Dataset
from caffe2.python.pipeline import pipe
from caffe2.python.schema import Struct, NewRecord, FeedRecord
from caffe2.python.session import LocalSession
from caffe2.python.task import TaskGroup, final_output, WorkspaceType
from caffe2.python.test_util import TestCase
from caffe2.python.cached_reader import CachedReader
from caffe2.python import core, workspace, schema
from caffe2.python.net_builder import ops
import numpy as np
import numpy.testing as npt
import os
import shutil
import unittest
import tempfile
import time
def init_dataset(ws, size=100, offset=0, name=None):
name = name or "src"
src_init = core.Net("{}_init".format(name))
with core.NameScope(name):
src_values = Struct(('label', np.array(range(offset, offset + size))))
src_blobs = NewRecord(src_init, src_values)
src_ds = Dataset(src_blobs, name=name)
FeedRecord(src_blobs, src_values, ws)
ws.run(src_init)
return src_ds
def read_all_data(ws, reader, session):
dst_init = core.Net('dst_init')
with core.NameScope('dst'):
dst_ds = Dataset(reader.schema().clone_schema())
dst_ds.init_empty(dst_init)
session.run(dst_init)
with TaskGroup(workspace_type=WorkspaceType.GLOBAL) as tg:
pipe(reader, dst_ds.writer(), num_runtime_threads=8)
session.run(tg)
return ws.blobs[str(dst_ds.content().label())].fetch()
class ReaderWithDelay(Reader):
"""Test reader class that inserts a delay between reading batches."""
def __init__(self, reader, delay):
Reader.__init__(self, schema=reader._schema)
self.reader = reader
self.delay = delay
def setup_ex(self, global_init_net, global_finish_net):
self.reader.setup_ex(global_init_net, global_finish_net)
def read_ex(self, local_init_net, local_finish_net):
read_net = core.Net('reader_body')
def sleep_op(*args, **argd):
time.sleep(self.delay)
read_net.Python(sleep_op)([], [])
return ([read_net], ) + self.reader.read(read_net)
class TestReaderBuilder(ReaderBuilder):
def __init__(self, name, size, offset):
self._schema = schema.Struct(
('label', schema.Scalar()),
)
self._name = name
self._size = size
self._offset = offset
self._src_ds = None
def schema(self):
return self._schema
def setup(self, ws):
self._src_ds = init_dataset(ws, offset=self._offset, size=self._size,
name=self._name)
def new_reader(self, **kwargs):
return self._src_ds
class TestCompositeReader(TestCase):
@unittest.skipIf(os.environ.get('JENKINS_URL'), 'Flaky test on Jenkins')
def test_composite_reader(self):
ws = workspace.C.Workspace()
session = LocalSession(ws)
num_srcs = 3
names = ["src_{}".format(i) for i in range(num_srcs)]
size = 100
offsets = [i * size for i in range(num_srcs)]
src_dses = [init_dataset(ws, offset=offset, size=size, name=name)
for (name, offset) in zip(names, offsets)]
data = [ws.fetch_blob(str(src.field_blobs[0])) for src in src_dses]
# Sanity check we didn't overwrite anything
for d, offset in zip(data, offsets):
npt.assert_array_equal(d, range(offset, offset + size))
# Create an identically sized empty destnation dataset
dst_init = core.Net('dst_init')
with core.NameScope('dst'):
dst_ds = Dataset(schema.Struct(
*[(name, src_ds.content().clone_schema())
for name, src_ds in zip(names, src_dses)]
))
dst_ds.init_empty(dst_init)
ws.run(dst_init)
with TaskGroup() as tg:
reader = CompositeReader(names,
[src_ds.reader() for src_ds in src_dses])
pipe(reader, dst_ds.writer(), num_runtime_threads=3)
session.run(tg)
for i in range(num_srcs):
written_data = sorted(
ws.fetch_blob(str(dst_ds.content()[names[i]].label())))
npt.assert_array_equal(data[i], written_data)
@unittest.skipIf(os.environ.get('JENKINS_URL'), 'Flaky test on Jenkins')
def test_composite_reader_builder(self):
ws = workspace.C.Workspace()
session = LocalSession(ws)
num_srcs = 3
names = ["src_{}".format(i) for i in range(num_srcs)]
size = 100
offsets = [i * size for i in range(num_srcs)]
src_ds_builders = [
TestReaderBuilder(offset=offset, size=size, name=name)
for (name, offset) in zip(names, offsets)
]
# Create an identically sized empty destnation dataset
dst_init = core.Net('dst_init')
with core.NameScope('dst'):
dst_ds = Dataset(schema.Struct(
*[(name, src_ds_builder.schema())
for name, src_ds_builder in zip(names, src_ds_builders)]
))
dst_ds.init_empty(dst_init)
ws.run(dst_init)
with TaskGroup() as tg:
reader_builder = CompositeReaderBuilder(
names, src_ds_builders)
reader_builder.setup(ws=ws)
pipe(reader_builder.new_reader(), dst_ds.writer(),
num_runtime_threads=3)
session.run(tg)
for name, offset in zip(names, offsets):
written_data = sorted(
ws.fetch_blob(str(dst_ds.content()[name].label())))
npt.assert_array_equal(range(offset, offset + size), written_data)
class TestReaderWithLimit(TestCase):
def test_runtime_threads(self):
ws = workspace.C.Workspace()
session = LocalSession(ws)
src_ds = init_dataset(ws)
totals = [None] * 3
def proc(rec):
# executed once
with ops.task_init():
counter1 = ops.CreateCounter([], ['global_counter'])
counter2 = ops.CreateCounter([], ['global_counter2'])
counter3 = ops.CreateCounter([], ['global_counter3'])
# executed once per thread
with ops.task_instance_init():
task_counter = ops.CreateCounter([], ['task_counter'])
# executed on each iteration
ops.CountUp(counter1)
ops.CountUp(task_counter)
# executed once per thread
with ops.task_instance_exit():
with ops.loop(ops.RetrieveCount(task_counter)):
ops.CountUp(counter2)
ops.CountUp(counter3)
# executed once
with ops.task_exit():
totals[0] = final_output(ops.RetrieveCount(counter1))
totals[1] = final_output(ops.RetrieveCount(counter2))
totals[2] = final_output(ops.RetrieveCount(counter3))
return rec
# Read full data set from original reader
with TaskGroup() as tg:
pipe(src_ds.reader(), num_runtime_threads=8, processor=proc)
session.run(tg)
self.assertEqual(totals[0].fetch(), 100)
self.assertEqual(totals[1].fetch(), 100)
self.assertEqual(totals[2].fetch(), 8)
# Read with a count-limited reader
with TaskGroup() as tg:
q1 = pipe(src_ds.reader(), num_runtime_threads=2)
q2 = pipe(
ReaderWithLimit(q1.reader(), num_iter=25),
num_runtime_threads=3)
pipe(q2, processor=proc, num_runtime_threads=6)
session.run(tg)
self.assertEqual(totals[0].fetch(), 25)
self.assertEqual(totals[1].fetch(), 25)
self.assertEqual(totals[2].fetch(), 6)
def _test_limit_reader_init_shared(self, size):
ws = workspace.C.Workspace()
session = LocalSession(ws)
# Build test dataset
src_ds = init_dataset(ws, size=size)
# Create an identically sized empty destnation dataset
dst_init = core.Net('dst_init')
with core.NameScope('dst'):
dst_ds = Dataset(src_ds.content().clone_schema())
dst_ds.init_empty(dst_init)
ws.run(dst_init)
return ws, session, src_ds, dst_init, dst_ds
def _test_limit_reader_shared(self, reader_class, size, expected_read_len,
expected_finish, num_threads, read_delay,
**limiter_args):
ws, session, src_ds, dst_init, dst_ds = \
self._test_limit_reader_init_shared(size)
# Read without limiter
# WorkspaceType.GLOBAL is required because we are fetching
# reader.data_finished() after the TaskGroup finishes.
with TaskGroup(workspace_type=WorkspaceType.GLOBAL) as tg:
if read_delay > 0:
reader = reader_class(ReaderWithDelay(src_ds.reader(),
read_delay),
**limiter_args)
else:
reader = reader_class(src_ds.reader(), **limiter_args)
pipe(reader, dst_ds.writer(), num_runtime_threads=num_threads)
session.run(tg)
read_len = len(sorted(ws.blobs[str(dst_ds.content().label())].fetch()))
self.assertEqual(read_len, expected_read_len)
self.assertEqual(
sorted(ws.blobs[str(dst_ds.content().label())].fetch()),
list(range(expected_read_len))
)
self.assertEqual(ws.blobs[str(reader.data_finished())].fetch(),
expected_finish)
def test_count_limit_reader_without_limit(self):
# No iter count specified, should read all records.
self._test_limit_reader_shared(ReaderWithLimit,
size=100,
expected_read_len=100,
expected_finish=True,
num_threads=8,
read_delay=0,
num_iter=None)
def test_count_limit_reader_with_zero_limit(self):
# Zero iter count specified, should read 0 records.
self._test_limit_reader_shared(ReaderWithLimit,
size=100,
expected_read_len=0,
expected_finish=False,
num_threads=8,
read_delay=0,
num_iter=0)
def test_count_limit_reader_with_low_limit(self):
# Read with limit smaller than size of dataset
self._test_limit_reader_shared(ReaderWithLimit,
size=100,
expected_read_len=10,
expected_finish=False,
num_threads=8,
read_delay=0,
num_iter=10)
def test_count_limit_reader_with_high_limit(self):
# Read with limit larger than size of dataset
self._test_limit_reader_shared(ReaderWithLimit,
size=100,
expected_read_len=100,
expected_finish=True,
num_threads=8,
read_delay=0,
num_iter=110)
def test_time_limit_reader_without_limit(self):
# No duration specified, should read all records.
self._test_limit_reader_shared(ReaderWithTimeLimit,
size=100,
expected_read_len=100,
expected_finish=True,
num_threads=8,
read_delay=0.1,
duration=0)
def test_time_limit_reader_with_short_limit(self):
# Read with insufficient time limit
size = 50
num_threads = 4
sleep_duration = 0.25
duration = 1
expected_read_len = int(round(num_threads * duration / sleep_duration))
# Because the time limit check happens before the delay + read op,
# subtract a little bit of time to ensure we don't get in an extra read
duration = duration - 0.25 * sleep_duration
self._test_limit_reader_shared(ReaderWithTimeLimit,
size=size,
expected_read_len=expected_read_len,
expected_finish=False,
num_threads=num_threads,
read_delay=sleep_duration,
duration=duration)
def test_time_limit_reader_with_long_limit(self):
# Read with ample time limit
self._test_limit_reader_shared(ReaderWithTimeLimit,
size=50,
expected_read_len=50,
expected_finish=True,
num_threads=4,
read_delay=0.25,
duration=6)
def test_cached_reader(self):
ws = workspace.C.Workspace()
session = LocalSession(ws)
def build_source_reader(size):
src_ds = init_dataset(ws, size)
return src_ds.reader()
with tempfile.NamedTemporaryFile(delete=False) as f:
path = f.name
f.close()
os.remove(path)
# Read data for the first time.
cached_reader1 = CachedReader(build_source_reader(100))
init_step = cached_reader1.build_cache(path)
session.run(init_step)
data = read_all_data(ws, cached_reader1, session)
self.assertEqual(sorted(data), list(range(100)))
# Read data from cache.
workspace.ResetWorkspace()
cached_reader2 = CachedReader(build_source_reader(200))
init_step = cached_reader2.build_cache(path)
session.run(init_step)
data = read_all_data(ws, cached_reader2, session)
self.assertEqual(sorted(data), list(range(100)))
shutil.rmtree(path)
# We removed cache so we expect to receive data from original reader
workspace.ResetWorkspace()
cached_reader3 = CachedReader(build_source_reader(300))
init_step = cached_reader3.build_cache(path)
session.run(init_step)
data = read_all_data(ws, cached_reader3, session)
self.assertEqual(sorted(data), list(range(300)))
shutil.rmtree(path)