# TODO(jiayq): as more and more tests are moving to hypothesis test, we # can gradually remove this test script. DO NOT ADD MORE TESTS TO THIS # FILE. from __future__ import absolute_import from __future__ import division from __future__ import print_function from __future__ import unicode_literals import numpy as np from caffe2.python import ( brew, core, device_checker, gradient_checker, model_helper, test_util, workspace, ) from caffe2.python.gradient_checker import NetGradientChecker from caffe2.proto import caffe2_pb2 import unittest if workspace.has_gpu_support and workspace.NumCudaDevices() > 0: gpu_device_option = caffe2_pb2.DeviceOption() gpu_device_option.device_type = caffe2_pb2.CUDA cpu_device_option = caffe2_pb2.DeviceOption() gpu_device_checker = device_checker.DeviceChecker( 0.01, [gpu_device_option] ) device_checker = device_checker.DeviceChecker( 0.01, [gpu_device_option, cpu_device_option] ) gpu_gradient_checkers = [ gradient_checker.GradientChecker( 0.005, 0.05, gpu_device_option, "gpu_checker_ws" ), ] gradient_checkers = [ gradient_checker.GradientChecker( 0.005, 0.05, gpu_device_option, "gpu_checker_ws" ), gradient_checker.GradientChecker( 0.01, 0.05, cpu_device_option, "cpu_checker_ws" ), ] else: cpu_device_option = caffe2_pb2.DeviceOption() gpu_device_option = None gpu_device_checker = device_checker.DeviceChecker( 0.01, [] ) device_checker = device_checker.DeviceChecker(0.01, [cpu_device_option]) gradient_checkers = [ gradient_checker.GradientChecker( 0.01, 0.05, cpu_device_option, "cpu_checker_ws" ) ] gpu_gradient_checkers = [] class TestLRN(test_util.TestCase): def setUp(self): self.test_configs = [(6, 10), (3, 13), ] def testLRN(self): for input_size, depth in self.test_configs: op = core.CreateOperator("LRN", ["X"], ["Y", "Y_scale"], size=11, alpha=0.001, beta=0.5, bias=2.0, order="NHWC" ) X = np.random.rand(2, input_size, input_size, depth).astype(np.float32) res = device_checker.CheckSimple(op, [X], [0]) self.assertTrue(res) for checker in gradient_checkers: res, grad, grad_estimated = checker.CheckSimple(op, [X], 0, [0]) self.assertTrue(res) class TestFlatten(test_util.TestCase): def testFlatten(self): op = core.CreateOperator("Flatten", ["X"], ["Y"]) X = np.random.rand(2, 3, 4, 5).astype(np.float32) res = device_checker.CheckSimple(op, [X], [0]) self.assertTrue(res) for checker in gradient_checkers: res, grad, grad_estimated = checker.CheckSimple(op, [X], 0, [0]) self.assertTrue(res) class TestConcat(test_util.TestCase): def setUp(self): self.test_configs = [ # input_size, depth1, depth2, depth3, depth4 (3, 2, 3, 4, 5), (4, 5, 4, 3, 2), ] def testConcatNHWC(self): for input_size, d1, d2, d3, d4 in self.test_configs: op = core.CreateOperator("Concat", ["X1", "X2", "X3", "X4"], ["Y", "Y_dims"], order="NHWC" ) Xs = [ np.random.rand(2, input_size, input_size, d1).astype(np.float32), np.random.rand(2, input_size, input_size, d2).astype(np.float32), np.random.rand(2, input_size, input_size, d3).astype(np.float32), np.random.rand(2, input_size, input_size, d4).astype(np.float32) ] for i in range(4): res = device_checker.CheckSimple(op, Xs, [0]) self.assertTrue(res) for checker in gradient_checkers: res, grad, grad_estimated = checker.CheckSimple(op, Xs, i, [0]) self.assertTrue(res) def testConcatNCHW(self): for input_size, d1, d2, d3, d4 in self.test_configs: op = core.CreateOperator("Concat", ["X1", "X2", "X3", "X4"], ["Y", "Y_dims"], order="NCHW" ) Xs = [ np.random.rand(2, d1, input_size, input_size).astype(np.float32), np.random.rand(2, d2, input_size, input_size).astype(np.float32), np.random.rand(2, d3, input_size, input_size).astype(np.float32), np.random.rand(2, d4, input_size, input_size).astype(np.float32) ] for i in range(4): res = device_checker.CheckSimple(op, Xs, [0]) self.assertTrue(res) for checker in gradient_checkers: res, grad, grad_estimated = checker.CheckSimple(op, Xs, i, [0]) self.assertTrue(res) class TestRelu(test_util.TestCase): def setUp(self): self.test_configs = [ # input size # (0, 1), (1, 1), (2, 1), (1, 3, 3, 1), (2, 3, 3, 1), (1, 5, 5, 3), (2, 5, 5, 3), ] def testRelu(self): for input_size in self.test_configs: op = core.CreateOperator("Relu", ["X"], ["Y"]) X = np.random.rand(*input_size).astype(np.float32) # go away from the origin point to avoid kink problems X += 0.01 * np.sign(X) X[X == 0] = 0.01 res = device_checker.CheckSimple(op, [X], [0]) self.assertTrue(res) for checker in gradient_checkers: res, grad, grad_estimated = checker.CheckSimple(op, [X], 0, [0]) self.assertTrue(res) class TestTanh(test_util.TestCase): def setUp(self): self.test_configs = [ # (0, 1), (1, 1), (2, 1), (1, 2, 3, 4), ] def testTanh(self): for input_size in self.test_configs: op = core.CreateOperator("Tanh", ["X"], ["Y"]) X = np.random.rand(*input_size).astype(np.float32) - 0.5 res = device_checker.CheckSimple(op, [X], [0]) self.assertTrue(res) for checker in gradient_checkers: res, grad, grad_estimated = checker.CheckSimple(op, [X], 0, [0]) self.assertTrue(res) class TestExp(test_util.TestCase): def setUp(self): self.test_configs = [ # (0, 1), (1, 1), (2, 1), (1, 2, 3, 4), ] def testExp(self): for input_size in self.test_configs: op = core.CreateOperator("Exp", ["X"], ["Y"]) X = np.random.rand(*input_size).astype(np.float32) - 0.5 res = device_checker.CheckSimple(op, [X], [0]) self.assertTrue(res) for checker in gradient_checkers: res, grad, grad_estimated = checker.CheckSimple(op, [X], 0, [0]) self.assertTrue(res) class TestSigmoid(test_util.TestCase): def setUp(self): self.test_configs = [ # (0, 1), (1, 1), (2, 1), (1, 2, 3, 4), ] def testSigmoid(self): for input_size in self.test_configs: op = core.CreateOperator("Sigmoid", ["X"], ["Y"]) X = np.random.rand(*input_size).astype(np.float32) - 0.5 res = device_checker.CheckSimple(op, [X], [0]) self.assertTrue(res) for checker in gradient_checkers: res, grad, grad_estimated = checker.CheckSimple(op, [X], 0, [0]) self.assertTrue(res) class TestSum(test_util.TestCase): def setUp(self): self.test_configs = [ # ((0, 1), False), ((1, 2, 3, 4), True), ((1, 2, 3, 4), False)] def testSum(self): for (input_size, in_place) in self.test_configs: op = core.CreateOperator("Sum", ["X1", "X2"], ["Y" if not in_place else "X1"]) X1 = np.random.rand(*input_size).astype(np.float32) - 0.5 X2 = np.random.rand(*input_size).astype(np.float32) - 0.5 res = device_checker.CheckSimple(op, [X1, X2], [0]) self.assertTrue(res) for checker in gradient_checkers: res, grad, grad_estimated = checker.CheckSimple( op, [X1, X2], 0, [0]) self.assertTrue(res) class TestMakeTwoClass(test_util.TestCase): def setUp(self): self.test_configs = [ # input size # (0, 1), (1,), (7,), (1, 3), (2, 5), ] def testMakeTwoClass(self): for input_size in self.test_configs: op = core.CreateOperator("MakeTwoClass", ["X"], ["Y"]) X = np.random.rand(*input_size).astype(np.float32) # step a little to avoid gradient problems X[X < 0.01] += 0.01 X[X > 0.99] -= 0.01 res = device_checker.CheckSimple(op, [X], [0]) self.assertTrue(res) for checker in gradient_checkers: res, grad, grad_estimated = checker.CheckSimple(op, [X], 0, [0]) self.assertTrue(res) class TestNetGradientChecker(test_util.TestCase): def test_net_gradient_checker(self): model = model_helper.ModelHelper(name="test") const = model.net.AddExternalInputs("const1", "const2") fc = brew.fc(model, dim_in=3, dim_out=4, blob_in="X", blob_out="Y", axis=0) dist = [model.net.SquaredL2Distance([fc, c]) for c in const] losses = [model.net.AveragedLoss(d) for d in dist] # using two losses here workspace.RunNetOnce(model.param_init_net) NetGradientChecker.Check( model.net, outputs_with_grad=losses, input_values={"X": np.array([1, 2, 3], dtype="float32"), const[0]: np.array([1, 1, 1, 1], dtype="float32"), const[1]: np.array([2, 2, 2, 2], dtype="float32")}, input_to_check="X", ) def test_net_comparison(self): # (a + b) * (c + d) == a * c + a * d + b * c + b * d net1 = core.Net("net1") a, b, c, d = net1.AddExternalInputs("a", "b", "c", "d") a_b = net1.Sum([a, b], "a+b") c_d = net1.Sum([c, d], "c+d") x = net1.Mul([a_b, c_d], "x") net2 = core.Net("net2") ac = net2.Mul([a, c], "ac") ad = net2.Mul([a, d], "ad") bc = net2.Mul([b, c], "bc") bd = net2.Mul([b, d], "bd") y = net2.Sum([ac, ad, bc, bd], "y") input_values = {blob: np.array([i], dtype=np.float32) for i, blob in enumerate([a, b, c, d])} NetGradientChecker.CompareNets( [net1, net2], [[x], [y]], [0], inputs_with_grads=[a, b, c, d], input_values=input_values, ) if __name__ == '__main__': workspace.GlobalInit(["python"]) unittest.main()