import sys import os import math import shutil import random import tempfile import unittest import torch import torch.cuda from torch.autograd import Variable from torch.utils.trainer import Trainer from torch.utils.trainer.plugins import * from torch.utils.trainer.plugins.plugin import Plugin from torch.utils.data import * from torch.utils.ffi import compile_extension from common import TestCase class SimplePlugin(Plugin): def __init__(self, interval): super(SimplePlugin, self).__init__(interval) self.trainer = None self.num_iteration = 0 self.num_epoch = 0 self.num_batch = 0 self.num_update = 0 def register(self, trainer): self.trainer = trainer def iteration(self, *args): self.iteration_args = args self.num_iteration += 1 def epoch(self, *args): self.epoch_args = args self.num_epoch += 1 def batch(self, *args): self.batch_args = args self.num_batch += 1 def update(self, *args): self.update_args = args self.num_update += 1 class ModelMock(object): def __init__(self): self.num_calls = 0 self.output = Variable(torch.ones(1, 1)) def __call__(self, i): self.num_calls += 1 return self.output * 2 class CriterionMock(object): def __init__(self): self.num_calls = 0 def __call__(self, out, target): self.num_calls += 1 return out class OptimizerMock(object): max_evals = 5 min_evals = 1 def __init__(self): self.num_steps = 0 self.num_evals = 0 def step(self, closure): for i in range(random.randint(1, self.max_evals)): loss = closure() self.num_evals += 1 loss.backward() self.num_steps += 1 class DatasetMock(object): def __iter__(self): for i in range(10): yield torch.randn(2, 10), torch.randperm(10)[:2] def __len__(self): return 10 class TestTrainer(TestCase): intervals = [ [(1, 'iteration')], [(1, 'epoch')], [(1, 'batch')], [(1, 'update')], [(5, 'iteration')], [(5, 'epoch')], [(5, 'batch')], [(5, 'update')], [(1, 'iteration'), (1, 'epoch')], [(5, 'update'), (1, 'iteration')], [(2, 'epoch'), (1, 'batch')], ] def setUp(self): self.trainer = Trainer(ModelMock(), CriterionMock(), OptimizerMock(), DatasetMock()) self.num_epochs = 3 self.dataset_size = len(self.trainer.dataset) self.num_iters = self.num_epochs * self.dataset_size def test_register_plugin(self): for interval in self.intervals: simple_plugin = SimplePlugin(interval) self.trainer.register_plugin(simple_plugin) self.assertEqual(simple_plugin.trainer, self.trainer) def test_optimizer_step(self): self.trainer.run(epochs=1) self.assertEqual(self.trainer.optimizer.num_steps, 10) def test_plugin_interval(self): for interval in self.intervals: self.setUp() simple_plugin = SimplePlugin(interval) self.trainer.register_plugin(simple_plugin) self.trainer.run(epochs=self.num_epochs) units = { ('iteration', self.num_iters), ('epoch', self.num_epochs), ('batch', self.num_iters), ('update', self.num_iters) } for unit, num_triggers in units: call_every = None for i, i_unit in interval: if i_unit == unit: call_every = i break if call_every: expected_num_calls = math.floor(num_triggers / call_every) else: expected_num_calls = 0 num_calls = getattr(simple_plugin, 'num_' + unit) self.assertEqual(num_calls, expected_num_calls, 0) def test_model_called(self): self.trainer.run(epochs=self.num_epochs) num_model_calls = self.trainer.model.num_calls num_crit_calls = self.trainer.criterion.num_calls self.assertEqual(num_model_calls, num_crit_calls) for num_calls in [num_model_calls, num_crit_calls]: lower_bound = OptimizerMock.min_evals * self.num_iters upper_bound = OptimizerMock.max_evals * self.num_iters self.assertEqual(num_calls, self.trainer.optimizer.num_evals) self.assertLessEqual(lower_bound, num_calls) self.assertLessEqual(num_calls, upper_bound) def test_model_gradient(self): self.trainer.run(epochs=self.num_epochs) output_var = self.trainer.model.output expected_grad = torch.ones(1, 1) * 2 * self.num_iters self.assertEqual(output_var.grad, expected_grad) class TestTensorDataSource(TestCase): def test_len(self): source = TensorDataSource(torch.randn(15, 10, 2, 3, 4, 5), torch.randperm(15)) self.assertEqual(len(source), 15) def test_getitem(self): t = torch.randn(15, 10, 2, 3, 4, 5) l = torch.randn(15, 10) source = TensorDataSource(t, l) for i in range(15): self.assertEqual(t[i], source[i][0]) self.assertEqual(l[i], source[i][1]) def test_getitem_1d(self): t = torch.randn(15) l = torch.randn(15) source = TensorDataSource(t, l) for i in range(15): self.assertEqual(t[i:i+1], source[i][0]) self.assertEqual(l[i:i+1], source[i][1]) class TestDataset(TestCase): def setUp(self): self.data = torch.randn(10, 2, 3, 5) self.labels = torch.randperm(5).repeat(2) self.datasource = TensorDataSource(self.data, self.labels) def _test_sequential(self, dataset): batch_size = dataset.batch_size for i, (sample, target) in enumerate(dataset): idx = i * batch_size self.assertEqual(sample, self.data[idx:idx+batch_size]) self.assertEqual(target, self.labels[idx:idx+batch_size].view(-1, 1)) self.assertEqual(i, math.floor((len(self.datasource)-1) / batch_size)) def _test_shuffle(self, dataset): batch_size = dataset.batch_size found_data = {i: 0 for i in range(self.data.size(0))} found_labels = {i: 0 for i in range(self.labels.size(0))} for i, (batch_samples, batch_targets) in enumerate(dataset): for sample, target in zip(batch_samples, batch_targets): for data_point_idx, data_point in enumerate(self.data): if data_point.eq(sample).all(): self.assertFalse(found_data[data_point_idx]) found_data[data_point_idx] += 1 break self.assertEqual(target, self.labels.narrow(0, data_point_idx, 1)) found_labels[data_point_idx] += 1 self.assertEqual(sum(found_data.values()), (i+1) * batch_size) self.assertEqual(sum(found_labels.values()), (i+1) * batch_size) self.assertEqual(i, math.floor((len(self.datasource)-1) / batch_size)) def test_seqential(self): self._test_sequential(Dataset(self.datasource)) def test_seqential_batch(self): self._test_sequential(Dataset(self.datasource, batch_size=2)) def test_shuffle(self): self._test_shuffle(Dataset(self.datasource, shuffle=True)) def test_shuffle_batch(self): self._test_shuffle(Dataset(self.datasource, batch_size=2, shuffle=True)) def test_types(self): dataset = Dataset(self.datasource, batch_size=2) for samples, targets in dataset: self.assertIs(type(samples), torch.DoubleTensor) self.assertIs(type(targets), torch.DoubleTensor) dataset.input_type(torch.FloatTensor) for samples, targets in dataset: self.assertIs(type(samples), torch.FloatTensor) self.assertIs(type(targets), torch.DoubleTensor) dataset.target_type(torch.IntTensor) for samples, targets in dataset: self.assertIs(type(samples), torch.FloatTensor) self.assertIs(type(targets), torch.IntTensor) test_dir = os.path.abspath(os.path.dirname(__file__)) class TestFFI(TestCase): def setUp(self): self.tmpdir = tempfile.mkdtemp() os.chdir(self.tmpdir) sys.path.append(self.tmpdir) def tearDown(self): shutil.rmtree(self.tmpdir) def test_cpu(self): compile_extension( name='test_extensions.cpulib', header=test_dir + '/ffi/src/cpu/lib.h', sources=[ test_dir + '/ffi/src/cpu/lib1.c', test_dir + '/ffi/src/cpu/lib2.c', ], verbose=False, ) from test_extensions import cpulib tensor = torch.ones(2, 2).float() cpulib.good_func(tensor, 2, 1.5) self.assertEqual(tensor, torch.ones(2, 2) * 2 + 1.5) new_tensor = cpulib.new_tensor(4) self.assertEqual(new_tensor, torch.ones(4, 4) * 4) f = cpulib.int_to_float(5) self.assertIs(type(f), float) self.assertRaises(TypeError, lambda: cpulib.good_func(tensor.double(), 2, 1.5)) self.assertRaises(torch.FatalError, lambda: cpulib.bad_func(tensor, 2, 1.5)) def test_gpu(self): compile_extension( name='gpulib', header=test_dir + '/ffi/src/cuda/cudalib.h', sources=[ test_dir + '/ffi/src/cuda/cudalib.c', ], with_cuda=True, verbose=False, ) import gpulib tensor = torch.ones(2, 2).float() gpulib.good_func(tensor, 2, 1.5) self.assertEqual(tensor, torch.ones(2, 2) * 2 + 1.5) ctensor = tensor.cuda().fill_(1) gpulib.cuda_func(ctensor, 2, 1.5) self.assertEqual(ctensor, torch.ones(2, 2) * 2 + 1.5) self.assertRaises(TypeError, lambda: gpulib.cuda_func(tensor, 2, 1.5)) self.assertRaises(TypeError, lambda: gpulib.cuda_func(ctensor.storage(), 2, 1.5)) if __name__ == '__main__': unittest.main()