from multiprocessing import Process, Queue import numpy as np import os import shutil import tempfile import unittest import time from unittest.mock import Mock from hypothesis import assume, given, settings import hypothesis.strategies as st from caffe2.proto import caffe2_pb2 from caffe2.python import brew, core, cnn, data_parallel_model, dyndep, \ model_helper, optimizer, rnn_cell, workspace from caffe2.python.test_util import TestCase dyndep.InitOpsLibrary("@/caffe2/caffe2/distributed:file_store_handler_ops") class TemporaryDirectory: def __enter__(self): self.tmpdir = tempfile.mkdtemp() return self.tmpdir def __exit__(self, type, value, traceback): shutil.rmtree(self.tmpdir) # Note(jiayq): we are yet to find out why Travis gives out an error in gloo # like: # RuntimeError: [enforce fail at /home/travis/build/caffe2/caffe2/third_party/gloo/gloo/transport/tcp/device.cc:113] ifa != nullptr. Unable to find interface for: [127.0.1.1] # See for example https://travis-ci.org/caffe2/caffe2/jobs/262433866 # As a result, we will check if this is travis, and if yes, disable it. @unittest.skipIf(os.environ.get("TRAVIS"), "DPMTest has a known issue with Travis.") class DataParallelModelTest(TestCase): def run_model(self, devices, gpu): ''' Helper function for test_equiv ''' def input_builder_fun(model): return None def model_build_fun(model, loss_scale): fc = model.FC("data", "fc", 16, 1, ("ConstantFill", {}), ("ConstantFill", {})) fc_fl = model.FlattenToVec(fc, "fc_fl") sigm = model.Sigmoid(fc_fl, "sigm") sq = model.SquaredL2Distance([sigm, "label"], "sq") loss = model.AveragedLoss(sq, "loss") loss = model.Scale(loss, scale=loss_scale) # For testing explicit sync model.param_init_net.UniformFill([], ["sync_num"], shape=[1]) return [loss] def add_optimizer(model): return optimizer.build_sgd( model, 0.1, policy="fixed", max_gradient_norm=5.0, allow_lr_injection=True, ) workspace.ResetWorkspace() model = cnn.CNNModelHelper( order="NHWC", name="test{}".format(devices), ) data_parallel_model.Parallelize( model, input_builder_fun=input_builder_fun, forward_pass_builder_fun=model_build_fun, optimizer_builder_fun=add_optimizer, devices=devices, cpu_device=not gpu, shared_model=not gpu, combine_spatial_bn=not gpu, ) data_parallel_model.AddBlobSync(model, ["sync_num"]) # Light test for LR names lr_names = data_parallel_model.GetLearningRateBlobNames(model) self.assertGreater(len(lr_names), 0) np.random.seed(2603) # Each run has same input, independent of number of gpus batch_size = 64 for i in range(0, 10): full_data = np.random.rand(batch_size, 16) full_labels = np.round(full_data[:, 0]) batch_per_device = batch_size // len(devices) for (j, g) in enumerate(devices): st = j * batch_per_device en = st + batch_per_device data = full_data[st:en, :].astype(np.float32) labels = full_labels[st:en].astype(np.float32) with core.DeviceScope(core.DeviceOption(model._device_type, g)): workspace.FeedBlob( "{}_{}/data".format(model._device_prefix, g), data ) workspace.FeedBlob( "{}_{}/label".format(model._device_prefix, g), labels ) if i == 0: workspace.RunNetOnce(model.param_init_net) workspace.CreateNet(model.net) workspace.FeedBlob( model._device_prefix + "_0/sync_num", np.array([i * 2]).astype(np.float32), device_option=core.DeviceOption(model._device_type, 0)) workspace.RunNet(model.net.Proto().name) # Test AddBlobSync for j in model._devices: sync = workspace.FetchBlob( model._device_prefix + "_{}/sync_num".format(j))[0] self.assertTrue(abs(sync - i * 2) < 0.01) return workspace.FetchBlob("{}_0/fc_w".format(model._device_prefix)) def run_test_locally(self, fn, device_option=None, **kwargs): # Queue for assertion errors on subprocesses queue = Queue() # Capture any exception thrown by the subprocess def run_fn(*args, **kwargs): try: if device_option is None: fn(*args, **kwargs) workspace.ResetWorkspace() else: with core.DeviceScope(device_option): fn(*args, **kwargs) workspace.ResetWorkspace() except Exception as ex: queue.put(ex) # Start N processes in the background procs = [] for i in range(kwargs['comm_size']): kwargs['comm_rank'] = i proc = Process( target=run_fn, kwargs=kwargs) proc.start() procs.append(proc) # Test complete, join background processes while len(procs) > 0: proc = procs.pop(0) while proc.is_alive(): proc.join(1) # Raise exception if we find any. # Note that the following is executed ALSO after # the last process was joined, so if ANY exception # was raised, it will be re-raised here. if not queue.empty(): raise queue.get() def test_equiv(self): ''' Test that the model produces exactly same results given total batchsize, independent of number of GPUs. ''' for gpu in [True, False]: if gpu and (not workspace.has_gpu_support or workspace.NumCudaDevices() < 2): continue result_2gpus = self.run_model([0, 1], gpu=gpu) result_1gpus = self.run_model([0], gpu=gpu) self.assertTrue(np.allclose(result_1gpus, result_2gpus)) if not gpu or workspace.NumCudaDevices() >= 4: result_4gpus = self.run_model(list(range(4)), gpu=gpu) self.assertTrue(np.allclose(result_1gpus, result_4gpus)) if not gpu or workspace.NumCudaDevices() >= 8: result_8gpus = self.run_model(list(range(8)), gpu=gpu) self.assertTrue(np.allclose(result_1gpus, result_8gpus)) if not gpu or workspace.NumCudaDevices() >= 16: result_16gpus = self.run_model(list(range(16)), gpu=gpu) self.assertTrue(np.allclose(result_1gpus, result_16gpus)) def test_checkpoint_params(self): def add_input_ops(model): pass def add_model_ops(model, loss_scale): model.NHWC2NCHW("data", "data_nchw") model.Conv("data_nchw", 'conv1', 3, 64, weight_init=("MSRAFill", {}), kernel=7, stride=2, pad=3, no_bias=0) model.SpatialBN('conv1', 'conv1_spatbn_relu', 64, epsilon=1e-3, is_test=False) model.Relu('conv1_spatbn_relu', 'conv1_spatbn_relu') model.MaxPool('conv1_spatbn_relu', 'pool1', kernel=3, stride=2) model.FC('pool1', 'fc', dim_in=(64 * 56 * 56), dim_out=100) model.Sigmoid('fc', 'fc_sigm') model.Softmax('fc_sigm', 'softmax') model.LabelCrossEntropy(['softmax', 'label'], 'xent') loss = model.AveragedLoss('xent', 'loss') # Add a duplicate param init to ensure it does not cause issues model.param_init_net.ConstantFill( [], ["fc_w"], shape=((64 * 56 * 56), 1000) ) return [loss] def add_optimizer(model): optimizer.build_sgd(model, 0.1, policy="fixed", momentum=0.9) model = cnn.CNNModelHelper( order="NHWC", name="test", ) data_parallel_model.Parallelize_CPU( model, input_builder_fun=add_input_ops, forward_pass_builder_fun=add_model_ops, optimizer_builder_fun=add_optimizer, devices=[1, 2, 3], ) # Only gpu_1 params should be returned (gpu_1 is the first gpu) checkpoint_params = data_parallel_model.GetCheckpointParams(model) for p in model.GetParams("cpu_1/"): self.assertTrue(p in checkpoint_params) self.assertTrue(p + "_momentum" in checkpoint_params) for p in model.GetParams("cpu_2/"): self.assertFalse(p in checkpoint_params) self.assertTrue( core.BlobReference("cpu_1/fc_w_momentum") in checkpoint_params) for c in model.GetComputedParams("cpu_1/"): self.assertTrue(c in checkpoint_params) for c in model.GetComputedParams("cpu_2/"): self.assertFalse(c in checkpoint_params) self.assertFalse(core.BlobReference("cpu_1/data") in checkpoint_params) self.assertTrue(core.BlobReference("optimizer_iteration") in checkpoint_params) def test_net_conversion_and_append_net(self): other = model_helper.ModelHelper() fc1 = brew.fc(other, "data", "other_fc1", dim_in=3*227*227, dim_out=10) fc2 = brew.fc(other, fc1, "other_fc2", dim_in=10, dim_out=10) brew.fc(other, fc2, "other_fc3", dim_in=10, dim_out=10) def add_input_ops(model): model.net.UniformFill([], ["data"], shape=[4, 227, 227, 3]) model.net.UniformFill([], ["label"], shape=[4]) def add_model_ops(model, loss_scale): model.NHWC2NCHW("data", "data_nchw") model.Conv("data_nchw", 'conv1', 3, 64, weight_init=("MSRAFill", {}), kernel=7, stride=2, pad=3, no_bias=0) model.SpatialBN('conv1', 'conv1_spatbn_relu', 64, epsilon=1e-3, is_test=False) model.Relu('conv1_spatbn_relu', 'conv1_spatbn_relu') model.MaxPool('conv1_spatbn_relu', 'pool1', kernel=3, stride=2) model.FC('pool1', 'fc', dim_in=(64 * 56 * 56), dim_out=10) # Append the net and param_init_net of the other model appendnet = data_parallel_model.ConvertNetForDevice(other.net) model.net.AppendNet(appendnet) model.param_init_net.AppendNet( data_parallel_model.ConvertNetForDevice(other.param_init_net)) model.Sigmoid('fc', 'fc_sigm') model.Softmax('fc_sigm', 'softmax') loss = model.AveragedLoss('softmax', 'loss') return [loss] def add_optimizer(model): optimizer.build_sgd(model, 0.1, policy="fixed", momentum=0.9) model = cnn.CNNModelHelper( order="NCHW", name="test", ) data_parallel_model.Parallelize_CPU( model, input_builder_fun=add_input_ops, forward_pass_builder_fun=add_model_ops, optimizer_builder_fun=add_optimizer, devices=range(4) ) # Just create and run net and confirm no exception is thrown workspace.RunNetOnce(model.param_init_net) workspace.CreateNet(model.net) workspace.RunNet(model.net) @unittest.skip("Test fails on GPU/RE") def test_synchronization_barrier(self): def run(comm_rank, comm_size, tmpdir): def add_input_ops(model): pass def add_model_ops(model, loss_scale): return [] def add_optimizer(model): pass store_handler = "store_handler" workspace.RunOperatorOnce( core.CreateOperator( "FileStoreHandlerCreate", [], [store_handler], path=tmpdir)) rendezvous = dict( kv_handler=store_handler, shard_id=comm_rank, num_shards=comm_size, engine='GLOO', ) model = cnn.CNNModelHelper( order="NHWC", name="test", ) data_parallel_model.Parallelize_CPU( model, input_builder_fun=add_input_ops, forward_pass_builder_fun=add_model_ops, optimizer_builder_fun=add_optimizer, devices=[1, 2, 3], rendezvous=rendezvous ) data_parallel_model.RunInitNet(model) for _ in range(2): data_parallel_model.Synchronize(model) with TemporaryDirectory() as tmpdir: self.run_test_locally( run, comm_size=2, device_option=None, tmpdir=tmpdir) @unittest.skip("Test fails on GPU/RE") def test_pre_train_synchronization_barrier(self): def run(comm_rank, comm_size, tmpdir): def add_input_ops(model): pass def add_model_ops(model, loss_scale): return [] def add_optimizer(model): pass workspace.ResetWorkspace() store_handler = "store_handler" workspace.RunOperatorOnce( core.CreateOperator( "FileStoreHandlerCreate", [], [store_handler], path=tmpdir)) rendezvous = dict( kv_handler=store_handler, shard_id=comm_rank, num_shards=comm_size, engine='GLOO', ) model = cnn.CNNModelHelper( order="NHWC", name="test", ) # Set network timeout to 2 seconds, and add a 3 seconds # sleep for 1 host. Make sure there is no timeout on the # second RunNet. data_parallel_model._DEFAULT_TIMEOUT_SEC = 2 data_parallel_model.Parallelize_CPU( model, input_builder_fun=add_input_ops, forward_pass_builder_fun=add_model_ops, optimizer_builder_fun=add_optimizer, devices=[1, 2, 3], rendezvous=rendezvous, barrier_net_timeout_sec=5 ) data_parallel_model.RunInitNet(model) data_parallel_model.RunNet(model, 2) if comm_rank == 0: time.sleep(data_parallel_model._DEFAULT_TIMEOUT_SEC) data_parallel_model.RunNet(model, 2) with TemporaryDirectory() as tmpdir: self.run_test_locally( run, comm_size=2, device_option=None, tmpdir=tmpdir) def test_device_scope_check(self): with self.assertRaises(AssertionError): with core.DeviceScope(core.DeviceOption(workspace.GpuDeviceType, 0)): data_parallel_model.Parallelize_GPU(None, None, None) def test_net_transformer_function(self): devices = [1, 2, 3] def add_input_ops(model): model.param_init_net.UniformFill([], ["data"], shape=[32, 8]) def add_optimizer(model): optimizer.build_sgd(model, 0.1) def add_model_ops(model, loss_scale): fc1 = brew.fc(model, "data", "fc1", dim_in=8, dim_out=8) return [fc1] kwargs = { 'input_builder_fun': add_input_ops, 'forward_pass_builder_fun': add_model_ops, 'devices': devices, } # assert that the transformer is called for both train and test cases transform = Mock() kwargs['net_transformer_fun'] = transform model = model_helper.ModelHelper(name="r", init_params=False) data_parallel_model.Parallelize_CPU(model, **kwargs) self.assertTrue(transform.called) self.assertEqual(transform.call_count, 1) transform = Mock() kwargs['net_transformer_fun'] = transform kwargs['optimizer_builder_fun'] = add_optimizer model = model_helper.ModelHelper(name="r", init_params=True) data_parallel_model.Parallelize_CPU(model, **kwargs) self.assertTrue(transform.called) self.assertEqual(transform.call_count, 1) @given(seed=st.integers(0, 65535), batch_size=st.integers(1, 20)) @settings(deadline=2000) def test_multi_device_bn_op_level_cpu(self, seed, batch_size): self._bn_check_op_level("cpu", seed, batch_size) @unittest.skipIf(not workspace.has_gpu_support, "No gpu support.") @unittest.skipIf(workspace.NumCudaDevices() < 2, "Need at least 2 GPUs.") @given(seed=st.integers(0, 65535), batch_size=st.integers(1, 20)) @settings(deadline=2000) def test_multi_device_bn_op_level_gpu(self, seed, batch_size): self._bn_check_op_level("gpu", seed, batch_size) def _bn_check_op_level(self, device_type, seed, batch_size): ''' Test multi device batch normalization at the operation level. This is done by checking the outputs of batch normalization and its gradient operator. We compare values produced with our manually calculated batch normalization values and gradients. ''' devices = [0, 1] epsilon = 1e-3 tolerance = 1e-3 def _test_forward_pass(x, devices, device_type, scale, bias, epsilon): x_concat = np.concatenate(x) mean = np.mean(x_concat, axis=0) var = np.var(x_concat, axis=0) for device in devices: x_i = x[device] x_hat = (x_i - mean) / (np.sqrt(var + epsilon)) expected_out = scale * x_hat + bias spatial_out = workspace.FetchBlob( "{}_{}/bn_out".format(device_type, device)) rel_error = np.linalg.norm(spatial_out - expected_out) \ / np.linalg.norm(expected_out) self.assertTrue(rel_error < 0.005) def _test_backward_pass(x, devices, device_type, scale, tolerance): dBias_arr = [] dY_arr = [] dGamma_arr = [] num_devices = len(devices) mean = np.array(workspace.FetchBlob( "{}_0/bn_out_sm".format(device_type)), dtype=np.float32) inv_var = np.array(workspace.FetchBlob( "{}_0/bn_out_siv".format(device_type)), dtype=np.float32) # dBias # Sum dBias values over all devices to find the average gradient for device in devices: dY_blob = workspace.FetchBlob( "{}_{}/bn_out_grad".format(device_type, device)) dY = np.array(dY_blob, dtype=np.float32) dY_arr.append(dY) dBias_arr.append(np.array(np.sum(dY, axis=0), dtype=np.float32)) dBias = np.sum(dBias_arr, dtype=np.float32) dBias_avg = dBias / num_devices for device in devices: dBiasActual = np.sum(workspace.FetchBlob("{}_{}/bn_out_b_grad" .format(device_type, device)), dtype=np.float32) self.assertTrue(np.isclose([dBiasActual], [dBias], atol=tolerance)) # dGamma # Sum dGamma values over all devices to find the average gradient for device in devices: dGamma = np.sum((x[device] - mean) * inv_var * dY_arr[device], axis=0, dtype=np.float32) dGamma_arr.append(dGamma) dGamma = np.sum(dGamma_arr, axis=0, dtype=np.float32) dGamma_avg = dGamma / num_devices for device in devices: dGammaActual = workspace.FetchBlob( "{}_{}/bn_out_s_grad".format(device_type, device)) self.assertTrue(np.isclose([dGamma], [dGammaActual], atol=tolerance)) # dX scale_inv_var = scale * inv_var / batch_size for device in devices: dX = scale_inv_var * (dY_arr[device] * batch_size - dBias_avg - (x[device] - mean) * dGamma_avg * inv_var) dX_actual = workspace.FetchBlob( "{}_{}/tanh_grad".format(device_type, device)) self.assertTrue(np.isclose([dX], [dX_actual], atol=tolerance).all()) def add_input_ops(model): for device in devices: data = np.random.rand(batch_size, 1, 1, 1).astype(np.float32) workspace.FeedBlob("{}_{}/data".format(device_type, device), data) def add_model_ops(model, loss_scale): if device_type == "gpu": model.CopyCPUToGPU("data", "device_data") model.Tanh("device_data", "tanh") else: model.Tanh("data", "tanh") model.SpatialBN("tanh", "bn_out", 1, epsilon=epsilon, is_test=False) model.Sqr("bn_out", "sqr") loss = model.SumElements("sqr", "loss") return [loss] def add_optimizer(model): return optimizer.build_sgd(model, 0.1) np.random.seed(seed) workspace.ResetWorkspace() model = cnn.CNNModelHelper( order="NCHW", name="test" ) data_parallel_model.Parallelize( model, input_builder_fun=add_input_ops, forward_pass_builder_fun=add_model_ops, optimizer_builder_fun=add_optimizer, devices=devices, cpu_device=device_type == "cpu", shared_model=False, combine_spatial_bn=True, ) workspace.RunNetOnce(model.param_init_net) scale = workspace.FetchBlob("{}_0/bn_out_s".format(device_type)) bias = workspace.FetchBlob("{}_0/bn_out_b".format(device_type)) workspace.RunNetOnce(model.net) x = [] for device in devices: x_blob = workspace.FetchBlob("{}_{}/tanh".format(device_type, device)) x_i = np.array(x_blob, dtype=np.float32) x.append(x_i) _test_forward_pass(x, devices, device_type, scale, bias, epsilon) _test_backward_pass(x, devices, device_type, scale, tolerance) @given(seed=st.integers(0, 65535), batch_size=st.integers(1, 20)) @settings(deadline=2000) def test_multi_device_bn_net_lvl_cpu(self, seed, batch_size): if batch_size % 2 == 1: batch_size += 1 self._test_multi_device_bn_net_lvl("cpu", seed, batch_size) @unittest.skipIf(not workspace.has_gpu_support, "No gpu support.") @unittest.skipIf(workspace.NumCudaDevices() < 2, "Need at least 2 GPUs.") @given(seed=st.integers(0, 65535), batch_size=st.integers(1, 20)) @settings(deadline=2000) def test_multi_device_bn_net_lvl_gpu(self, seed, batch_size): if batch_size % 2 == 1: batch_size += 1 self._test_multi_device_bn_net_lvl("gpu", seed, batch_size) def _test_multi_device_bn_net_lvl(self, device_type, seed, batch_size): ''' Test multi device batch normalization at the net level. This is done by verifying that the final batch normalization outputs and the gradient outputs from multiple devices are the same as those produced from a single device ''' # Verify that the gradients calculated over multiple devices are the # same as the gradients calculated over one device. These values should # be equivalent because combine_spatial_bn sums values over all devices def _verify_bn_outputs( devices, device_type, tolerance, single_device_bn_out, two_device_bn_out_vals, single_device_grads, two_device_grads, ): two_device_bn_out = np.concatenate(two_device_bn_out_vals) self.assertTrue(np.isclose( [single_device_bn_out], [two_device_bn_out], atol=tolerance).all()) # Scalar and Bias gradients should be the same across devices gradient_names = ["bn_out_s_grad", "bn_out_b_grad"] for name in gradient_names: expected_grad = single_device_grads[name] for device in devices: actual_grad = two_device_grads[device][name] self.assertTrue( np.isclose([actual_grad], [expected_grad], atol=tolerance)) # Expected tanh_grad should be the combined tanh_grad vectors # across the devices first_grad = two_device_grads[0]["tanh_grad"] second_grad = two_device_grads[1]["tanh_grad"] actual_grad = np.concatenate([first_grad, second_grad]) expected_grad = single_device_grads["tanh_grad"] rel_error = np.linalg.norm(actual_grad - expected_grad) \ / np.linalg.norm(expected_grad) self.assertTrue(rel_error < 1e-3) def _create_model(multiple_devices): def add_input_ops_no_combine(model): workspace.FeedBlob("{}_0/data".format(device_type), data) def add_input_ops_combine(model): half = int(batch_size / 2) workspace.FeedBlob("{}_0/data".format(device_type), data[:half]) workspace.FeedBlob("{}_1/data".format(device_type), data[half:]) def add_model_ops(model, loss_scale): if device_type == "gpu": model.CopyCPUToGPU("data", "device_data") model.Tanh("device_data", "tanh") else: model.Tanh("data", "tanh") model.SpatialBN("tanh", "bn_out", 1, epsilon=epsilon, is_test=False) model.Sqr("bn_out", "sqr") loss = model.SumElements("sqr", "loss") return [loss] def add_optimizer(model): return optimizer.build_sgd(model, 0.1) if multiple_devices: input_fun = add_input_ops_combine devices = [0, 1] combine_spatial_bn = True else: input_fun = add_input_ops_no_combine devices = [0] combine_spatial_bn = False model = cnn.CNNModelHelper( order="NCHW", name="test" ) data_parallel_model.Parallelize( model, input_builder_fun=input_fun, forward_pass_builder_fun=add_model_ops, optimizer_builder_fun=add_optimizer, devices=devices, cpu_device=device_type == "cpu", shared_model=False, combine_spatial_bn=combine_spatial_bn, ) return model devices = [0, 1] epsilon = 1e-3 tolerance = 1e-3 # We are generating random data np.random.seed(seed) data = np.random.rand(batch_size, 1, 1, 1).astype(np.float32) data = np.reshape(data, (batch_size, 1, 1, 1)) # Get values calculated without combine_spatial_bn workspace.ResetWorkspace() model_no_combine = _create_model(multiple_devices=False) workspace.RunNetOnce(model_no_combine.param_init_net) workspace.RunNetOnce(model_no_combine.net) single_device_bn_out = workspace.FetchBlob("{}_0/bn_out".format(device_type)) single_device_grads = {} single_device_grads["bn_out_s_grad"] = workspace.FetchBlob( "{}_0/bn_out_s_grad".format(device_type)) single_device_grads["bn_out_b_grad"] = workspace.FetchBlob( "{}_0/bn_out_b_grad".format(device_type)) single_device_grads["tanh_grad"] = workspace.FetchBlob( "{}_0/tanh_grad".format(device_type)) # Get values calculated over multiple devices with combine_spatial_bn true workspace.ResetWorkspace() model_combine = _create_model(multiple_devices=True) workspace.RunNetOnce(model_combine.param_init_net) workspace.RunNetOnce(model_combine.net) two_device_bn_out_vals = [] two_device_grads = {} for device in devices: bn_out_blob = "{}_{}/bn_out".format(device_type, device) two_device_bn_out_vals.append(workspace.FetchBlob(bn_out_blob)) two_device_grads[device] = {} two_device_grads[device]["bn_out_s_grad"] = workspace.FetchBlob( "{}_{}/bn_out_s_grad".format(device_type, device)) two_device_grads[device]["bn_out_b_grad"] = workspace.FetchBlob( "{}_{}/bn_out_b_grad".format(device_type, device)) two_device_grads[device]["tanh_grad"] = workspace.FetchBlob( "{}_{}/tanh_grad".format(device_type, device)) # Check to see if the combined values are equivalent _verify_bn_outputs( devices, device_type, tolerance, single_device_bn_out, two_device_bn_out_vals, single_device_grads, two_device_grads ) class RecurrentNetworkParallelTest(TestCase): def run_model(self, devices, gpu): ''' Helper function for test_equiv ''' def input_builder_fun(model): return None def model_build_fun(model, loss_scale): workspace.FeedBlob( core.ScopedBlobReference("seq_lengths"), np.array([self.T] * self.batch_per_device, dtype=np.int32) ) model.param_init_net.ConstantFill( [], "hidden_init", value=0.0, shape=[1, self.batch_per_device, self.hidden_dim] ) model.param_init_net.ConstantFill( [], "cell_init", value=0.0, shape=[1, self.batch_per_device, self.hidden_dim] ) output, _last_hidden, _, _last_state, = rnn_cell.LSTM( model=model, input_blob="data", seq_lengths="seq_lengths", initial_states=("hidden_init", "cell_init"), dim_in=self.input_dim, dim_out=self.hidden_dim, scope="partest", ) # A silly loss function loss = model.AveragedLoss( model.Sub([output, "target"], "dist"), "loss", ) loss = model.Scale(loss, "loss_scaled", scale=loss_scale) return [loss] def param_update_fun(model): ITER = model.Iter("ITER") LR = model.net.LearningRate( [ITER], "LR", base_lr=(-0.1), policy="fixed", ) ONE = model.param_init_net.ConstantFill( [], "ONE", shape=[1], value=1.0, ) for param in model.GetParams(): param_grad = model.param_to_grad[param] model.WeightedSum([param, ONE, param_grad, LR], param) assert len(model.GetParams()) == len(model.params) // len(model._devices) workspace.ResetWorkspace() model = cnn.CNNModelHelper( name="recurrent_test{}".format(devices), ) self.T = 8 self.batch_size = 64 self.input_dim = 8 self.hidden_dim = 31 self.batch_per_device = self.batch_size // len(devices) data_parallel_model.Parallelize( model, input_builder_fun=input_builder_fun, forward_pass_builder_fun=model_build_fun, param_update_builder_fun=param_update_fun, devices=devices, optimize_gradient_memory=True, cpu_device=not gpu, ) # Change all initialization to be ConstantFills so that # the everything is deterministic for op in model.param_init_net.Proto().op: if op.type.endswith('Fill'): op.type = 'ConstantFill' # Each run has same input, independent of number of gpus np.random.seed(20150210) for i in range(0, 10): full_data = np.random.rand(self.T, self.batch_size, self.input_dim) full_target = np.random.rand( self.T, self.batch_size, self.hidden_dim ) for (j, g) in enumerate(devices): st = j * self.batch_per_device en = st + self.batch_per_device data = full_data[:, st:en, :].astype(np.float32) targets = full_target[:, st:en, :].astype(np.float32) with core.DeviceScope(core.DeviceOption(model._device_type, g)): workspace.FeedBlob( "{}_{}/data".format(model._device_prefix, g), data ) workspace.FeedBlob( "{}_{}/target".format(model._device_prefix, g), targets ) if i == 0: workspace.RunNetOnce(model.param_init_net) workspace.CreateNet(model.net) workspace.RunNet(model.net.Proto().name) return workspace.FetchBlob("{}_0/partest/i2h_w".format(model._device_prefix)) @unittest.skip("Test is flaky: https://github.com/pytorch/pytorch/issues/10322") def test_equiv_recurrent(self): ''' Test that the model produces exactly same results given total batchsize, independent of number of GPUs/CPUs. ''' for gpu in [True, False]: if gpu and not workspace.has_gpu_support: continue result_2gpus = self.run_model([0, 1], gpu) result_1gpus = self.run_model([0], gpu) self.assertTrue(np.allclose(result_1gpus, result_2gpus)) if not gpu or workspace.NumCudaDevices() >= 4: result_4gpus = self.run_model(list(range(4)), gpu) self.assertTrue(np.allclose(result_1gpus, result_4gpus)) if not gpu or workspace.NumCudaDevices() >= 8: result_8gpus = self.run_model(list(range(8)), gpu) self.assertTrue(np.allclose(result_1gpus, result_8gpus)) @unittest.skipIf(not workspace.has_gpu_support, "No gpu support.") @unittest.skipIf(workspace.NumCudaDevices() < 2, "Need at least 2 GPUs.") class SparseDataParallelModelTest(TestCase): ''' Create and run the model. We try with both storing indices for gather on CPU and on GPU ''' def run_model(self, V, gpu_devices, cpu_indices): def input_builder_fun(model): return None def model_build_fun(model, loss_scale): if cpu_indices: with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)): gathered_cpu = model.net.Gather( [self.vecs, 'indices'], 'gathered_cpu') gathered = model.CopyCPUToGPU(gathered_cpu, "gathered") else: gpu_vecs = model.param_init_net.CopyCPUToGPU( self.vecs, "gpuvecs", ) model.params.append(gpu_vecs) gathered = model.net.Gather([gpu_vecs, 'indices'], 'gathered') flattened = model.Flatten(gathered, "flattened") fc = model.FC(flattened, "fc", 16 * 16, 1, ("ConstantFill", {}), ("ConstantFill", {})) fc_fl = model.FlattenToVec(fc, "fc_fl") sigm = model.Sigmoid(fc_fl, "sigm") sq = model.SquaredL2Distance([sigm, "label"], "sq") loss = model.AveragedLoss(sq, "loss") loss = model.Scale(loss, scale=loss_scale) return [loss] def param_update_fun(model): ONE = model.param_init_net.ConstantFill( [], "ONE", shape=[1], value=1.0, ) LR = model.CopyCPUToGPU(self.LR, "LR") for param in model.GetParams(): param_grad = model.param_to_grad[param] if not isinstance(param_grad, core.GradientSlice): model.WeightedSum([param, ONE, param_grad, LR], param) else: param_momentum = model.param_init_net.ConstantFill( [param], param + '_momentum', value=0.0, ) model.net.SparseMomentumSGDUpdate( [ param_grad.values, param_momentum, LR, param, param_grad.indices, ], [ param_grad.values, param_momentum, param ], momentum=0.1, nesterov=0, ) workspace.ResetWorkspace() model = cnn.CNNModelHelper( order="NHWC", name="sparse_test{}".format(gpu_devices), ) with core.NameScope("cpu"): with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)): self.ITER = model.Iter("ITER") self.LR = model.net.LearningRate( [self.ITER], "LR", base_lr=(-0.1), policy="fixed", ) self.vecs = model.param_init_net.UniformFill( [], "vecs", shape=[V, 16]) if cpu_indices: model.params.append(self.vecs) self.ONE_CPU = model.param_init_net.ConstantFill( [], "ONE_CPU", shape=[1], value=1.0, ) data_parallel_model.Parallelize_GPU( model, input_builder_fun=input_builder_fun, forward_pass_builder_fun=model_build_fun, param_update_builder_fun=param_update_fun, devices=gpu_devices, ) # Update the vecs if cpu_indices: with core.NameScope("cpu"): with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)): for param in model.GetParams(): param_grad = model.param_to_grad[param] model.ScatterWeightedSum([param, self.ONE_CPU, param_grad.indices, param_grad.values, self.LR], self.vecs) else: with core.DeviceScope(core.DeviceOption(workspace.GpuDeviceType, 0)): model.CopyGPUToCPU("gpu_0/gpuvecs", self.vecs) np.random.seed(2603) # Each run has same input, independent of number of gpus batch_size = 64 for i in range(0, 10): full_indices = np.random.permutation(V)[:batch_size * 16].reshape( batch_size, 16 ) full_labels = full_indices[:, 0] % 2 batch_per_device = batch_size // len(gpu_devices) for (j, g) in enumerate(gpu_devices): st = j * batch_per_device en = st + batch_per_device indices = full_indices[st:en, :].astype(np.int32) labels = full_labels[st:en].astype(np.float32) device_for_indices = core.DeviceOption(caffe2_pb2.CPU) if not cpu_indices: device_for_indices = core.DeviceOption(workspace.GpuDeviceType, g) with core.DeviceScope(device_for_indices): workspace.FeedBlob("gpu_{}/indices".format(g), indices) with core.DeviceScope(core.DeviceOption(workspace.GpuDeviceType, g)): workspace.FeedBlob("gpu_{}/label".format(g), labels) if i == 0: workspace.RunNetOnce(model.param_init_net) # Force vecs to be same on all runs orig_vecs = np.random.rand(V, 16).astype(np.float32) workspace.FeedBlob( self.vecs, orig_vecs ) if not cpu_indices: for g in gpu_devices: workspace.FeedBlob( "gpu_{}/gpuvecs".format(g), orig_vecs, device_option=core.DeviceOption(workspace.GpuDeviceType, g), ) workspace.CreateNet(model.net) workspace.RunNet(model.net.Proto().name) if len(gpu_devices) == 2: if not cpu_indices: idx = workspace.FetchBlob("gpu_0/indices") idx = list(idx.flatten()) n = len(idx) nu = len(set(idx)) assert n == nu, "We cannot have duplicate indices" # Sanity check to see the vecs were updated self.assertFalse( np.allclose(workspace.FetchBlob(self.vecs), orig_vecs)) return [workspace.FetchBlob(self.vecs if cpu_indices else "gpu_0/gpuvecs"), workspace.FetchBlob("gpu_0/fc_w")] def _test_equiv_sparse(self, cpu_indices): ''' Test that the model produces exactly same results given total batchsize, independent of number of GPUs. ''' V = 10000 result_2gpus = self.run_model(V, [0, 1], cpu_indices) result_1gpus = self.run_model(V, [0], cpu_indices) self.assertTrue(np.allclose(result_1gpus[0], result_2gpus[0])) self.assertTrue(np.allclose(result_1gpus[1], result_2gpus[1])) if workspace.NumCudaDevices() >= 4: result_4gpus = self.run_model(V, list(range(4)), cpu_indices) self.assertTrue(np.allclose(result_1gpus[0], result_4gpus[0])) self.assertTrue(np.allclose(result_1gpus[1], result_4gpus[1])) if workspace.NumCudaDevices() >= 8: result_8gpus = self.run_model(V, list(range(8)), cpu_indices) self.assertTrue(np.allclose(result_1gpus[0], result_8gpus[0])) self.assertTrue(np.allclose(result_1gpus[1], result_8gpus[1])) def test_equiv_sparse(self): self._test_equiv_sparse(True) self._test_equiv_sparse(False) @unittest.skipIf(not workspace.has_gpu_support, "No gpu support.") @unittest.skipIf(workspace.NumGpuDevices() < 2, "Need at least 2 GPUs.") class ParallelizeBMUFTest(TestCase): def _run_model(self, gpu_devices): ''' Helper function for test_equiv ''' def input_builder_fun(model): return None def _model_build_fun(self, model, loss_scale): fc = model.FC( "data", "fc", 16, 1, ("ConstantFill", {}), ("ConstantFill", {}) ) fc_fl = model.FlattenToVec(fc, "fc_fl") sigm = model.Sigmoid(fc_fl, "sigm") sq = model.SquaredL2Distance([sigm, "label"], "sq") loss = model.AveragedLoss(sq, "loss") loss = model.Scale(loss, scale=loss_scale) return [loss] def _param_update_fun(self, model): ITER = model.Iter("ITER") LR = model.net.LearningRate( [ITER], "LR", base_lr=(-0.1), policy="fixed", ) ONE = model.param_init_net.ConstantFill( [], "ONE", shape=[1], value=1.0, ) for param in model.GetParams(): grad = model.param_to_grad[param] model.WeightedSum([param, ONE, grad, LR], param) def _generate_data(self, devices, device_type, device_prefix): np.random.seed(26) # Each run has same input, independent of number of gpus batch_size = 64 for _ in range(0, 10): full_data = np.random.rand(batch_size, 16) full_labels = np.round(full_data[:, 0]) batch_per_device = batch_size // len(devices) for (j, g) in enumerate(devices): st = j * batch_per_device en = st + batch_per_device data = full_data[st:en, :].astype(np.float32) labels = full_labels[st:en].astype(np.float32) with core.DeviceScope(core.DeviceOption(device_type, g)): workspace.FeedBlob("{}_{}/data".format(device_prefix, g), data) workspace.FeedBlob("{}_{}/label".format(device_prefix, g), labels) @given( cpu_device=st.booleans() ) @settings(deadline=2000) def test_parallelize_bmuf(self, cpu_device): assume(cpu_device or workspace.has_gpu_support or workspace.has_hip_support) workspace.ResetWorkspace() model = cnn.CNNModelHelper( order="NHWC", name="test" ) devices = [0, 1] def input_builder_fun(model): return None if not cpu_device: device_type = workspace.GpuDeviceType device_prefix = "gpu" else: device_type = caffe2_pb2.CPU device_prefix = "cpu" self._generate_data(devices, device_type, device_prefix) data_parallel_model.Parallelize_BMUF( model, input_builder_fun, self._model_build_fun, self._param_update_fun, devices=devices, cpu_device=cpu_device ) data_parallel_model.RunInitNet(model) # Check initial momentum params are zeros self.assertEqual( list(model._device_grouped_blobs.keys()), ['fc_w', 'fc_b'] ) self.assertEqual(workspace.FetchBlob('{}_0/fc_b_v'.format(device_prefix)), 0) np.testing.assert_equal( workspace.FetchBlob('{}_0/fc_w_v'.format(device_prefix)), np.zeros(16).astype(np.float32).reshape(1, 16) ) # Run the algorithm for one iteration to have non-zero params. data_parallel_model.RunNet(model, 1) # Save iteration momentum and post local update params v_b_ = workspace.FetchBlob('{}_0/fc_b_v'.format(device_prefix)) v_w_ = workspace.FetchBlob('{}_0/fc_w_v'.format(device_prefix)) workspace.RunNetOnce(model.net) b_0_ = workspace.FetchBlob('{}_0/fc_b'.format(device_prefix)) w_0_ = workspace.FetchBlob('{}_0/fc_w'.format(device_prefix)) b_1_ = workspace.FetchBlob('{}_1/fc_b'.format(device_prefix)) w_1_ = workspace.FetchBlob('{}_1/fc_w'.format(device_prefix)) # Compute block gradients. b_g_ = workspace.FetchBlob('{}_0/fc_b_g'.format(device_prefix)) w_g_ = workspace.FetchBlob('{}_0/fc_w_g'.format(device_prefix)) workspace.RunNetOnce(model._global_model_param_updates_net) g_b = (b_0_ + b_1_) / 2 - b_g_ g_w = (w_0_ + w_1_) / 2 - w_g_ v_b = workspace.FetchBlob('{}_0/fc_b_v'.format(device_prefix)) v_w = workspace.FetchBlob('{}_0/fc_w_v'.format(device_prefix)) w_g = workspace.FetchBlob('{}_0/fc_w_g'.format(device_prefix)) b_g = workspace.FetchBlob('{}_0/fc_b_g'.format(device_prefix)) w_0 = workspace.FetchBlob('{}_0/fc_w'.format(device_prefix)) b_0 = workspace.FetchBlob('{}_0/fc_b'.format(device_prefix)) w_1 = workspace.FetchBlob('{}_1/fc_w'.format(device_prefix)) b_1 = workspace.FetchBlob('{}_1/fc_b'.format(device_prefix)) # Check momentum update step np.testing.assert_equal(v_b, 0.5 * v_b_ + g_b) np.testing.assert_equal(v_w, 0.5 * v_w_ + g_w) np.testing.assert_equal(w_g, w_0) np.testing.assert_equal(w_g, w_1) np.testing.assert_equal(b_g, b_0) np.testing.assert_equal(b_g, b_1) # Check params update step np.testing.assert_equal(w_0, w_g_ + v_w) np.testing.assert_equal(b_0, b_g_ + v_b) @unittest.skipIf(not workspace.has_gpu_support, "No gpu support.") @unittest.skipIf(workspace.NumGpuDevices() < 2, "Need at least 2 GPUs.") class SparseDataParallelModelTestWithSharedIndices(TestCase): ''' Create and run the model. We try with both storing indices for gather on CPU and on GPU ''' def run_model(self, V, gpu_devices): def input_builder_fun(model): return None def model_build_fun(model, loss_scale): gpu_vecs_gathered = [] gpu_vecs = [] for num, vec in enumerate(self.vecs): gpu_vec = model.param_init_net.CopyCPUToGPU( vec, 'gpuvec_{}'.format(num), ) if num != 2: model.params.append(gpu_vec) gpu_vecs.append(gpu_vec) for num, gpu_vec in enumerate(gpu_vecs): gpu_vec_gathered = model.net.Gather( [gpu_vec, 'indices'], ['gpu_vec_gathered_{}'.format(num)] ) gpu_vecs_gathered.append(gpu_vec_gathered) assert len(gpu_vecs_gathered) == 3 fc = model.net.FC( [ gpu_vecs_gathered[2], gpu_vecs_gathered[0], gpu_vecs_gathered[1], ], ['fc'], ) _, loss = model.net.SoftmaxWithLoss( [fc, 'label'], ['ce_loss', 'avg_loss'], only_loss=True, ) loss = model.Scale(loss, scale=loss_scale) model.net.Print(loss, [], limit=10) return [loss] def param_update_fun(model): ONE = model.param_init_net.ConstantFill( [], "ONE", shape=[1], value=1.0, ) LR = model.CopyCPUToGPU(self.LR, "LR") for param in model.GetParams(): param_grad = model.param_to_grad[param] if not isinstance(param_grad, core.GradientSlice): model.WeightedSum([param, ONE, param_grad, LR], param) else: model.net.ScatterWeightedSum( [ param, ONE, param_grad.indices, param_grad.values, ONE, ], param, ) workspace.ResetWorkspace() model = cnn.CNNModelHelper( order="NHWC", name="sparse_test{}".format(gpu_devices), ) batch_size = 32 batch_per_device = batch_size // len(gpu_devices) with core.NameScope("cpu"): with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)): self.ITER = model.Iter("ITER") self.LR = model.net.LearningRate( [self.ITER], "LR", base_lr=(-0.1), policy="fixed", ) ''' self.vecs consists of 3 big blobs on which we call Gather: 1) FC weights, shape=(V, 16) 2) FC bias, shape=(V) 3) FC input, shape=(batch_per_device, 16) ''' self.vecs = [ model.param_init_net.UniformFill( [], "vec_{}".format(num), shape=[V, 16]) for num in range(2) ] self.vecs.append( model.param_init_net.UniformFill( [], "vec_2", shape=[batch_per_device, 16] ) ) self.ONE_CPU = model.param_init_net.ConstantFill( [], "ONE_CPU", shape=[1], value=1.0, ) data_parallel_model.Parallelize_GPU( model, input_builder_fun=input_builder_fun, forward_pass_builder_fun=model_build_fun, param_update_builder_fun=param_update_fun, devices=gpu_devices, ) # Update the vecs with core.DeviceScope(core.DeviceOption(workspace.GpuDeviceType, 0)): for num, vec in enumerate(self.vecs[:-1]): model.CopyGPUToCPU("gpu_0/gpuvec_{}".format(num), vec) # Each run has same input, independent of number of gpus for i in range(0, 10): np.random.seed(2603) full_indices = np.random.permutation(V)[:batch_size].reshape( batch_size ) full_labels = full_indices[:] % batch_per_device for (j, g) in enumerate(gpu_devices): st = j * batch_per_device en = st + batch_per_device indices = full_indices[st:en].astype(np.int32) labels = full_labels[st:en].astype(np.int32) with core.DeviceScope(core.DeviceOption(workspace.GpuDeviceType, g)): workspace.FeedBlob("gpu_{}/indices".format(g), indices) workspace.FeedBlob("gpu_{}/label".format(g), labels) if i == 0: workspace.RunNetOnce(model.param_init_net) # Force vecs to be same on all runs orig_vecs = [ np.random.rand(V, 16).astype(np.float32), np.random.rand(V).astype(np.float32), np.random.rand(V, 16).astype(np.float32), ] for vec, orig_vec in zip(self.vecs, orig_vecs): workspace.FeedBlob( vec, orig_vec ) for g in gpu_devices: for num, orig_vec in enumerate(orig_vecs): workspace.FeedBlob( "gpu_{}/gpuvec_{}".format(g, num), orig_vec, device_option=core.DeviceOption( workspace.GpuDeviceType, g), ) workspace.CreateNet(model.net) workspace.RunNet(model.net.Proto().name) idx = workspace.FetchBlob('gpu_0/indices') grad_slices = [ workspace.FetchBlob( 'gpu_{}/gpu_vec_gathered_{}_grad'.format(g, num)) for g in gpu_devices for num in range(2) ] for grad_slice in grad_slices: # print (len(idx), len(grad_slice)) assert len(idx) == len(grad_slice), ( 'Number of indices {} is not same as number of gradient ' 'slices {}. This might lead to illegal memory access'.format( len(idx), len(grad_slice) ) ) def test_sparse_shared_indices_gpu(self): ''' Test that the model has same number of indices and gradient rows given total batchsize, independent of number of GPUs. ''' V = 10000 self.run_model(V, [0, 1]) self.run_model(V, [0]) if workspace.NumGpuDevices() >= 4: self.run_model(V, list(range(4))) if workspace.NumGpuDevices() >= 8: self.run_model(V, list(range(8))) if __name__ == "__main__": import unittest unittest.main()