pytorch/caffe2/python/data_parallel_model_test.py
Orion Reblitz-Richardson 6223bfdb1d Update from Facebook (#6692)
* [GanH][Easy]: Add assertion to adaptive weighting layer

0 weight causes numeric instability and exploding ne

* [Easy] Add cast op before computing norm in diagnose options

As LpNorm only takes floats we add a manual casting here.

* Introduce a new caching device allocator

`cudaMalloc` and `cudaFree` calls are slow, and become slower the
more GPUs there are. Essentially, they grab a host-wide (not device-wide) lock
because GPU memory is transparently shared across all GPUs. Normally, this
isn't much of a concern since workloads allocate memory upfront, and reuse it
during later computation.

However, under some computation models (specifically, memory conserving
approaches like checkpoint-and-recompute, see
https://medium.com/@yaroslavvb/fitting-larger-networks-into-memory-583e3c758ff9)
this assumption is no longer true. In these situations, `cudaMalloc` and
`cudaFree` are common and frequent. Furthermore, in data parallel contexts,
these calls happen at nearly the same time from all GPUs worsening lock
contention.

A common solution to this problem is to add a custom allocator. In fact,
nVIDIA provides one out of the box: CUB, which Caffe2 already supports.
Unfortunately, the CUB allocator suffers from very high fragmentation. This is
primarily because it is a "buddy" allocator which neither splits nor merges
free cached blocks. Study
https://github.com/NVlabs/cub/blob/1.8.0/cub/util_allocator.cuh#L357 if you
want to convince yourself.

This diff adapts a caching allocator from the Torch codebase
https://github.com/torch/cutorch/blob/master/lib/THC/THCCachingAllocator.cpp
which does splitting and merging and ends up working really well, at least for
workloads like the checkpoint-and-recompute computation models noted above.

I simplified the implementation a little bit, made it a bit more C++-like. I
also removed a bunch of stream synchronization primitives for this diff. I
plan to add them back in subsequent diffs.

* Report reader progress in fblearner workflows

Integrate with fblearner progress reporting API and add support to report training progress from reader nodes.
If reader is constructed with batch limits, report based on finished batch vs total batch. The finished batch may be more than total batch because we evaludate if we should stop processing everytime we dequeue a split.
If no limit for the reader, report based on finished splits (Hive files) vs total splits. This is fairly accurate.

* [GanH][Diagnose]: fix plotting

1. ganh diagnose needs to set plot options
2. modifier's blob name is used for metric field can need to be fixed before
generating net

* Automatic update of fbcode/onnx to 985af3f5a0f7e7d29bc0ee6b13047e7ead9c90c8

* Make CompositeReader stops as soon as one reader finishes

Previously, CompositeReader calls all readers before stopping. It results in flaky test since the last batch may be read by different threads; resulting in dropped data.

* [dper] make sure loss is not nan

as desc.

* [rosetta2] [mobile-vision] Option to export NHWC order for RoIWarp/RoIAlign

Thanks for finding this @stzpz and @wangyanghan. Looks like NHWC is more
optimized. For OCR though it doesn't yet help since NHWC uses more mem b/w but
will soon become important.

* Intra-op parallel FC operator

Intra-op parallel FC operator

* [C2 Proto] extra info in device option

passing extra information in device option

design doc: https://fb.quip.com/yAiuAXkRXZGx

* Unregister MKL fallbacks for NCHW conversions

* Tracing for more executors

Modified Tracer to work with other executors and add more tracing

* Remove ShiftActivationDevices()

* Check for blob entry iff it is present

When processing the placeholders ops, ignore if the blob is not present in the blob_to_device.

* Internalize use of eigen tensor

Move use of eigen tensor out of the header file so we don't get template partial specialization errors when building other libraries.

* feature importance for transformed features.

* - Fix unused parameter warnings

The changes in this diff comments out unused parameters.
This will allow us to enable -Wunused-parameter as error.

#accept2ship

* add opencv dependencies to caffe2

The video input op requires additional opencv packages. This is to add them to
cmake so that it can build

* Add clip_by_value option in gradient clipping

Add clip_by_value option in gradient clipping

when the value is bigger than max or smaller than min, do the clip

* std::round compat
2018-04-17 23:36:40 -07:00

1077 lines
40 KiB
Python

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from future.utils import viewkeys
from multiprocessing import Process, Queue
import numpy as np
import os
import shutil
import tempfile
import unittest
from mock import Mock
from hypothesis import assume, given
import hypothesis.strategies as st
from caffe2.proto import caffe2_pb2
from caffe2.python import brew, core, cnn, data_parallel_model, dyndep, \
model_helper, optimizer, rnn_cell, workspace
from caffe2.python.test_util import TestCase
dyndep.InitOpsLibrary("@/caffe2/caffe2/distributed:file_store_handler_ops")
class TemporaryDirectory:
def __enter__(self):
self.tmpdir = tempfile.mkdtemp()
return self.tmpdir
def __exit__(self, type, value, traceback):
shutil.rmtree(self.tmpdir)
# Note(jiayq): we are yet to find out why Travis gives out an error in gloo
# like:
# RuntimeError: [enforce fail at /home/travis/build/caffe2/caffe2/third_party/gloo/gloo/transport/tcp/device.cc:113] ifa != nullptr. Unable to find interface for: [127.0.1.1]
# See for example https://travis-ci.org/caffe2/caffe2/jobs/262433866
# As a result, we will check if this is travis, and if yes, disable it.
@unittest.skipIf(os.environ.get("TRAVIS"), "DPMTest has a known issue with Travis.")
class DataParallelModelTest(TestCase):
def run_model(self, devices, gpu):
'''
Helper function for test_equiv
'''
def input_builder_fun(model):
return None
def model_build_fun(model, loss_scale):
fc = model.FC("data", "fc", 16, 1,
("ConstantFill", {}), ("ConstantFill", {}))
fc_fl = model.FlattenToVec(fc, "fc_fl")
sigm = model.Sigmoid(fc_fl, "sigm")
sq = model.SquaredL2Distance([sigm, "label"], "sq")
loss = model.AveragedLoss(sq, "loss")
loss = model.Scale(loss, scale=loss_scale)
# For testing explicit sync
model.param_init_net.UniformFill([], ["sync_num"], shape=[1])
return [loss]
def add_optimizer(model):
return optimizer.build_sgd(
model,
0.1,
policy="fixed",
max_gradient_norm=5.0,
allow_lr_injection=True,
)
workspace.ResetWorkspace()
model = cnn.CNNModelHelper(
order="NHWC",
name="test{}".format(devices),
)
data_parallel_model.Parallelize(
model,
input_builder_fun=input_builder_fun,
forward_pass_builder_fun=model_build_fun,
optimizer_builder_fun=add_optimizer,
devices=devices,
cpu_device=not gpu,
shared_model=not gpu,
combine_spatial_bn=not gpu,
)
data_parallel_model.AddBlobSync(model, ["sync_num"])
# Light test for LR names
lr_names = data_parallel_model.GetLearningRateBlobNames(model)
self.assertGreater(len(lr_names), 0)
np.random.seed(2603)
# Each run has same input, independent of number of gpus
batch_size = 64
for i in range(0, 10):
full_data = np.random.rand(batch_size, 16)
full_labels = np.round(full_data[:, 0])
batch_per_device = batch_size // len(devices)
for (j, g) in enumerate(devices):
st = j * batch_per_device
en = st + batch_per_device
data = full_data[st:en, :].astype(np.float32)
labels = full_labels[st:en].astype(np.float32)
with core.DeviceScope(core.DeviceOption(model._device_type, g)):
workspace.FeedBlob(
"{}_{}/data".format(model._device_prefix, g), data
)
workspace.FeedBlob(
"{}_{}/label".format(model._device_prefix, g), labels
)
if i == 0:
workspace.RunNetOnce(model.param_init_net)
workspace.CreateNet(model.net)
workspace.FeedBlob(
model._device_prefix + "_0/sync_num",
np.array([i * 2]).astype(np.float32),
device_option=core.DeviceOption(model._device_type, 0))
workspace.RunNet(model.net.Proto().name)
# Test AddBlobSync
for j in model._devices:
sync = workspace.FetchBlob(
model._device_prefix + "_{}/sync_num".format(j))[0]
self.assertTrue(abs(sync - i * 2) < 0.01)
return workspace.FetchBlob("{}_0/fc_w".format(model._device_prefix))
def run_test_locally(self, fn, device_option=None, **kwargs):
# Queue for assertion errors on subprocesses
queue = Queue()
# Capture any exception thrown by the subprocess
def run_fn(*args, **kwargs):
try:
if device_option is None:
fn(*args, **kwargs)
workspace.ResetWorkspace()
else:
with core.DeviceScope(device_option):
fn(*args, **kwargs)
workspace.ResetWorkspace()
except Exception as ex:
queue.put(ex)
# Start N processes in the background
procs = []
for i in range(kwargs['comm_size']):
kwargs['comm_rank'] = i
proc = Process(
target=run_fn,
kwargs=kwargs)
proc.start()
procs.append(proc)
# Test complete, join background processes
while len(procs) > 0:
proc = procs.pop(0)
while proc.is_alive():
proc.join(1)
# Raise exception if we find any.
# Note that the following is executed ALSO after
# the last process was joined, so if ANY exception
# was raised, it will be re-raised here.
if not queue.empty():
raise queue.get()
def test_equiv(self):
'''
Test that the model produces exactly same results given
total batchsize, independent of number of GPUs.
'''
for gpu in [True, False]:
if gpu and (not workspace.has_gpu_support or
workspace.NumCudaDevices() < 2):
continue
result_2gpus = self.run_model([0, 1], gpu=gpu)
result_1gpus = self.run_model([0], gpu=gpu)
self.assertTrue(np.allclose(result_1gpus, result_2gpus))
if not gpu or workspace.NumCudaDevices() >= 4:
result_4gpus = self.run_model(list(range(4)), gpu=gpu)
self.assertTrue(np.allclose(result_1gpus, result_4gpus))
if not gpu or workspace.NumCudaDevices() >= 8:
result_8gpus = self.run_model(list(range(8)), gpu=gpu)
self.assertTrue(np.allclose(result_1gpus, result_8gpus))
if not gpu or workspace.NumCudaDevices() >= 16:
result_16gpus = self.run_model(list(range(16)), gpu=gpu)
self.assertTrue(np.allclose(result_1gpus, result_16gpus))
def test_checkpoint_params(self):
def add_input_ops(model):
pass
def add_model_ops(model, loss_scale):
model.NHWC2NCHW("data", "data_nchw")
model.Conv("data_nchw", 'conv1', 3, 64,
weight_init=("MSRAFill", {}), kernel=7,
stride=2, pad=3, no_bias=0)
model.SpatialBN('conv1', 'conv1_spatbn_relu', 64, epsilon=1e-3, is_test=False)
model.Relu('conv1_spatbn_relu', 'conv1_spatbn_relu')
model.MaxPool('conv1_spatbn_relu', 'pool1', kernel=3, stride=2)
model.FC('pool1', 'fc', dim_in=(64 * 56 * 56), dim_out=100)
model.Sigmoid('fc', 'fc_sigm')
model.Softmax('fc_sigm', 'softmax')
model.LabelCrossEntropy(['softmax', 'label'], 'xent')
loss = model.AveragedLoss('xent', 'loss')
# Add a duplicate param init to ensure it does not cause issues
model.param_init_net.ConstantFill(
[], ["fc_w"], shape=((64 * 56 * 56), 1000)
)
return [loss]
def add_optimizer(model):
optimizer.build_sgd(model, 0.1, policy="fixed", momentum=0.9)
model = cnn.CNNModelHelper(
order="NHWC",
name="test",
)
data_parallel_model.Parallelize_CPU(
model,
input_builder_fun=add_input_ops,
forward_pass_builder_fun=add_model_ops,
optimizer_builder_fun=add_optimizer,
devices=[1, 2, 3],
)
# Only gpu_1 params should be returned (gpu_1 is the first gpu)
checkpoint_params = data_parallel_model.GetCheckpointParams(model)
for p in model.GetParams("cpu_1/"):
self.assertTrue(p in checkpoint_params)
self.assertTrue(p + "_momentum" in checkpoint_params)
for p in model.GetParams("cpu_2/"):
self.assertFalse(p in checkpoint_params)
self.assertTrue(
core.BlobReference("cpu_1/fc_w_momentum") in checkpoint_params)
for c in model.GetComputedParams("cpu_1/"):
self.assertTrue(c in checkpoint_params)
for c in model.GetComputedParams("cpu_2/"):
self.assertFalse(c in checkpoint_params)
self.assertFalse(core.BlobReference("cpu_1/data") in checkpoint_params)
self.assertTrue(core.BlobReference("optimizer_iteration") in checkpoint_params)
def test_net_conversion_and_append_net(self):
other = model_helper.ModelHelper()
fc1 = brew.fc(other, "data", "other_fc1", dim_in=3*227*227, dim_out=10)
fc2 = brew.fc(other, fc1, "other_fc2", dim_in=10, dim_out=10)
brew.fc(other, fc2, "other_fc3", dim_in=10, dim_out=10)
def add_input_ops(model):
model.net.UniformFill([], ["data"], shape=[4, 227, 227, 3])
model.net.UniformFill([], ["label"], shape=[4])
def add_model_ops(model, loss_scale):
model.NHWC2NCHW("data", "data_nchw")
model.Conv("data_nchw", 'conv1', 3, 64,
weight_init=("MSRAFill", {}), kernel=7,
stride=2, pad=3, no_bias=0)
model.SpatialBN('conv1', 'conv1_spatbn_relu', 64, epsilon=1e-3, is_test=False)
model.Relu('conv1_spatbn_relu', 'conv1_spatbn_relu')
model.MaxPool('conv1_spatbn_relu', 'pool1', kernel=3, stride=2)
model.FC('pool1', 'fc', dim_in=(64 * 56 * 56), dim_out=10)
# Append the net and param_init_net of the other model
appendnet = data_parallel_model.ConvertNetForDevice(other.net)
model.net.AppendNet(appendnet)
model.param_init_net.AppendNet(
data_parallel_model.ConvertNetForDevice(other.param_init_net))
model.Sigmoid('fc', 'fc_sigm')
model.Softmax('fc_sigm', 'softmax')
loss = model.AveragedLoss('softmax', 'loss')
return [loss]
def add_optimizer(model):
optimizer.build_sgd(model, 0.1, policy="fixed", momentum=0.9)
model = cnn.CNNModelHelper(
order="NCHW",
name="test",
)
data_parallel_model.Parallelize_CPU(
model,
input_builder_fun=add_input_ops,
forward_pass_builder_fun=add_model_ops,
optimizer_builder_fun=add_optimizer,
devices=range(4)
)
# Just create and run net and confirm no exception is thrown
workspace.RunNetOnce(model.param_init_net)
workspace.CreateNet(model.net)
workspace.RunNet(model.net)
def test_synchronization_barrier(self):
def run(comm_rank, comm_size, tmpdir):
def add_input_ops(model):
pass
def add_model_ops(model, loss_scale):
return []
def add_optimizer(model):
pass
store_handler = "store_handler"
workspace.RunOperatorOnce(
core.CreateOperator(
"FileStoreHandlerCreate",
[],
[store_handler],
path=tmpdir))
rendezvous = dict(
kv_handler=store_handler,
shard_id=comm_rank,
num_shards=comm_size,
engine='GLOO',
)
model = cnn.CNNModelHelper(
order="NHWC",
name="test",
)
data_parallel_model.Parallelize_CPU(
model,
input_builder_fun=add_input_ops,
forward_pass_builder_fun=add_model_ops,
optimizer_builder_fun=add_optimizer,
devices=[1, 2, 3],
rendezvous=rendezvous
)
data_parallel_model.RunInitNet(model)
for _ in range(2):
data_parallel_model.Synchronize(model)
with TemporaryDirectory() as tmpdir:
self.run_test_locally(
run,
comm_size=2,
device_option=None,
tmpdir=tmpdir)
def test_device_scope_check(self):
with self.assertRaises(AssertionError):
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CUDA, 0)):
data_parallel_model.Parallelize_GPU(None, None, None)
def test_net_transformer_function(self):
devices = [1, 2, 3]
def add_input_ops(model):
model.param_init_net.UniformFill([], ["data"], shape=[32, 8])
def add_optimizer(model):
optimizer.build_sgd(model, 0.1)
def add_model_ops(model, loss_scale):
fc1 = brew.fc(model, "data", "fc1", dim_in=8, dim_out=8)
return [fc1]
kwargs = {
'input_builder_fun': add_input_ops,
'forward_pass_builder_fun': add_model_ops,
'devices': devices,
}
# assert that the transformer is called for both train and test cases
transform = Mock()
kwargs['net_transformer_fun'] = transform
model = model_helper.ModelHelper(name="r", init_params=False)
data_parallel_model.Parallelize_CPU(model, **kwargs)
self.assertTrue(transform.called)
self.assertEqual(transform.call_count, 1)
transform = Mock()
kwargs['net_transformer_fun'] = transform
kwargs['optimizer_builder_fun'] = add_optimizer
model = model_helper.ModelHelper(name="r", init_params=True)
data_parallel_model.Parallelize_CPU(model, **kwargs)
self.assertTrue(transform.called)
self.assertEqual(transform.call_count, 1)
class RecurrentNetworkParallelTest(TestCase):
def run_model(self, devices, gpu):
'''
Helper function for test_equiv
'''
def input_builder_fun(model):
return None
def model_build_fun(model, loss_scale):
workspace.FeedBlob(
core.ScopedBlobReference("seq_lengths"),
np.array([self.T] * self.batch_per_device, dtype=np.int32)
)
model.param_init_net.ConstantFill(
[],
"hidden_init",
value=0.0,
shape=[1, self.batch_per_device, self.hidden_dim]
)
model.param_init_net.ConstantFill(
[],
"cell_init",
value=0.0,
shape=[1, self.batch_per_device, self.hidden_dim]
)
output, _last_hidden, _, _last_state, = rnn_cell.LSTM(
model=model,
input_blob="data",
seq_lengths="seq_lengths",
initial_states=("hidden_init", "cell_init"),
dim_in=self.input_dim,
dim_out=self.hidden_dim,
scope="partest",
)
# A silly loss function
loss = model.AveragedLoss(
model.Sub([output, "target"], "dist"),
"loss",
)
loss = model.Scale(loss, "loss_scaled", scale=loss_scale)
return [loss]
def param_update_fun(model):
ITER = model.Iter("ITER")
LR = model.net.LearningRate(
[ITER],
"LR",
base_lr=(-0.1),
policy="fixed",
)
ONE = model.param_init_net.ConstantFill(
[], "ONE", shape=[1], value=1.0,
)
for param in model.GetParams():
param_grad = model.param_to_grad[param]
model.WeightedSum([param, ONE, param_grad, LR], param)
assert len(model.GetParams()) == len(model.params) // len(model._devices)
workspace.ResetWorkspace()
model = cnn.CNNModelHelper(
name="recurrent_test{}".format(devices),
)
self.T = 8
self.batch_size = 64
self.input_dim = 8
self.hidden_dim = 31
self.batch_per_device = self.batch_size // len(devices)
data_parallel_model.Parallelize(
model,
input_builder_fun=input_builder_fun,
forward_pass_builder_fun=model_build_fun,
param_update_builder_fun=param_update_fun,
devices=devices,
optimize_gradient_memory=True,
cpu_device=not gpu,
)
# Change all initialization to be ConstantFills so that
# the everything is deterministic
for op in model.param_init_net.Proto().op:
if op.type.endswith('Fill'):
op.type = 'ConstantFill'
# Each run has same input, independent of number of gpus
np.random.seed(20150210)
for i in range(0, 10):
full_data = np.random.rand(self.T, self.batch_size, self.input_dim)
full_target = np.random.rand(
self.T, self.batch_size, self.hidden_dim
)
for (j, g) in enumerate(devices):
st = j * self.batch_per_device
en = st + self.batch_per_device
data = full_data[:, st:en, :].astype(np.float32)
targets = full_target[:, st:en, :].astype(np.float32)
with core.DeviceScope(core.DeviceOption(model._device_type, g)):
workspace.FeedBlob(
"{}_{}/data".format(model._device_prefix, g), data
)
workspace.FeedBlob(
"{}_{}/target".format(model._device_prefix, g), targets
)
if i == 0:
workspace.RunNetOnce(model.param_init_net)
workspace.CreateNet(model.net)
workspace.RunNet(model.net.Proto().name)
return workspace.FetchBlob("{}_0/partest/i2h_w".format(model._device_prefix))
def test_equiv_recurrent(self):
'''
Test that the model produces exactly same results given
total batchsize, independent of number of GPUs/CPUs.
'''
for gpu in [True, False]:
if gpu and not workspace.has_gpu_support:
continue
result_2gpus = self.run_model([0, 1], gpu)
result_1gpus = self.run_model([0], gpu)
self.assertTrue(np.allclose(result_1gpus, result_2gpus))
if not gpu or workspace.NumCudaDevices() >= 4:
result_4gpus = self.run_model(list(range(4)), gpu)
self.assertTrue(np.allclose(result_1gpus, result_4gpus))
if not gpu or workspace.NumCudaDevices() >= 8:
result_8gpus = self.run_model(list(range(8)), gpu)
self.assertTrue(np.allclose(result_1gpus, result_8gpus))
@unittest.skipIf(not workspace.has_gpu_support, "No gpu support.")
@unittest.skipIf(workspace.NumCudaDevices() < 2, "Need at least 2 GPUs.")
class SparseDataParallelModelTest(TestCase):
'''
Create and run the model. We try with both storing indices for gather
on CPU and on GPU
'''
def run_model(self, V, gpu_devices, cpu_indices):
def input_builder_fun(model):
return None
def model_build_fun(model, loss_scale):
if cpu_indices:
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)):
gathered_cpu = model.net.Gather(
[self.vecs, 'indices'], 'gathered_cpu')
gathered = model.CopyCPUToGPU(gathered_cpu, "gathered")
else:
gpu_vecs = model.param_init_net.CopyCPUToGPU(
self.vecs, "gpuvecs",
)
model.params.append(gpu_vecs)
gathered = model.net.Gather([gpu_vecs, 'indices'], 'gathered')
flattened = model.Flatten(gathered, "flattened")
fc = model.FC(flattened, "fc", 16 * 16, 1,
("ConstantFill", {}), ("ConstantFill", {}))
fc_fl = model.FlattenToVec(fc, "fc_fl")
sigm = model.Sigmoid(fc_fl, "sigm")
sq = model.SquaredL2Distance([sigm, "label"], "sq")
loss = model.AveragedLoss(sq, "loss")
loss = model.Scale(loss, scale=loss_scale)
return [loss]
def param_update_fun(model):
ONE = model.param_init_net.ConstantFill(
[], "ONE", shape=[1], value=1.0,
)
LR = model.CopyCPUToGPU(self.LR, "LR")
for param in model.GetParams():
param_grad = model.param_to_grad[param]
if not isinstance(param_grad, core.GradientSlice):
model.WeightedSum([param, ONE, param_grad, LR], param)
else:
param_momentum = model.param_init_net.ConstantFill(
[param],
param + '_momentum',
value=0.0,
)
model.net.SparseMomentumSGDUpdate(
[
param_grad.values,
param_momentum,
LR,
param,
param_grad.indices,
],
[
param_grad.values, param_momentum, param
],
momentum=0.1,
nesterov=0,
)
workspace.ResetWorkspace()
model = cnn.CNNModelHelper(
order="NHWC",
name="sparse_test{}".format(gpu_devices),
)
with core.NameScope("cpu"):
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)):
self.ITER = model.Iter("ITER")
self.LR = model.net.LearningRate(
[self.ITER],
"LR",
base_lr=(-0.1),
policy="fixed",
)
self.vecs = model.param_init_net.UniformFill(
[], "vecs", shape=[V, 16])
if cpu_indices:
model.params.append(self.vecs)
self.ONE_CPU = model.param_init_net.ConstantFill(
[], "ONE_CPU", shape=[1], value=1.0,
)
data_parallel_model.Parallelize_GPU(
model,
input_builder_fun=input_builder_fun,
forward_pass_builder_fun=model_build_fun,
param_update_builder_fun=param_update_fun,
devices=gpu_devices,
)
# Update the vecs
if cpu_indices:
with core.NameScope("cpu"):
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)):
for param in model.GetParams():
param_grad = model.param_to_grad[param]
model.ScatterWeightedSum([param, self.ONE_CPU,
param_grad.indices,
param_grad.values,
self.LR],
self.vecs)
else:
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CUDA, 0)):
model.CopyGPUToCPU("gpu_0/gpuvecs", self.vecs)
np.random.seed(2603)
# Each run has same input, independent of number of gpus
batch_size = 64
for i in range(0, 10):
full_indices = np.random.permutation(V)[:batch_size * 16].reshape(
batch_size, 16
)
full_labels = full_indices[:, 0] % 2
batch_per_device = batch_size // len(gpu_devices)
for (j, g) in enumerate(gpu_devices):
st = j * batch_per_device
en = st + batch_per_device
indices = full_indices[st:en, :].astype(np.int32)
labels = full_labels[st:en].astype(np.float32)
device_for_indices = core.DeviceOption(caffe2_pb2.CPU)
if not cpu_indices:
device_for_indices = core.DeviceOption(caffe2_pb2.CUDA, g)
with core.DeviceScope(device_for_indices):
workspace.FeedBlob("gpu_{}/indices".format(g), indices)
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CUDA, g)):
workspace.FeedBlob("gpu_{}/label".format(g), labels)
if i == 0:
workspace.RunNetOnce(model.param_init_net)
# Force vecs to be same on all runs
orig_vecs = np.random.rand(V, 16).astype(np.float32)
workspace.FeedBlob(
self.vecs,
orig_vecs
)
if not cpu_indices:
for g in gpu_devices:
workspace.FeedBlob(
"gpu_{}/gpuvecs".format(g),
orig_vecs,
device_option=core.DeviceOption(caffe2_pb2.CUDA, g),
)
workspace.CreateNet(model.net)
workspace.RunNet(model.net.Proto().name)
if len(gpu_devices) == 2:
if not cpu_indices:
idx = workspace.FetchBlob("gpu_0/indices")
idx = list(idx.flatten())
n = len(idx)
nu = len(set(idx))
assert n == nu, "We cannot have duplicate indices"
# Sanity check to see the vecs were updated
self.assertFalse(
np.allclose(workspace.FetchBlob(self.vecs), orig_vecs))
return [workspace.FetchBlob(self.vecs if cpu_indices else "gpu_0/gpuvecs"),
workspace.FetchBlob("gpu_0/fc_w")]
def _test_equiv_sparse(self, cpu_indices):
'''
Test that the model produces exactly same results given
total batchsize, independent of number of GPUs.
'''
V = 10000
result_2gpus = self.run_model(V, [0, 1], cpu_indices)
result_1gpus = self.run_model(V, [0], cpu_indices)
self.assertTrue(np.allclose(result_1gpus[0], result_2gpus[0]))
self.assertTrue(np.allclose(result_1gpus[1], result_2gpus[1]))
if workspace.NumCudaDevices() >= 4:
result_4gpus = self.run_model(V, list(range(4)), cpu_indices)
self.assertTrue(np.allclose(result_1gpus[0], result_4gpus[0]))
self.assertTrue(np.allclose(result_1gpus[1], result_4gpus[1]))
if workspace.NumCudaDevices() >= 8:
result_8gpus = self.run_model(V, list(range(8)), cpu_indices)
self.assertTrue(np.allclose(result_1gpus[0], result_8gpus[0]))
self.assertTrue(np.allclose(result_1gpus[1], result_8gpus[1]))
def test_equiv_sparse(self):
self._test_equiv_sparse(True)
self._test_equiv_sparse(False)
@unittest.skipIf(workspace.NumCudaDevices() < 2, "Need at least 2 GPUs.")
class ParallelizeBMUFTest(TestCase):
def _run_model(self, gpu_devices):
'''
Helper function for test_equiv
'''
def input_builder_fun(model):
return None
def _model_build_fun(self, model, loss_scale):
fc = model.FC(
"data", "fc", 16, 1, ("ConstantFill", {}), ("ConstantFill", {})
)
fc_fl = model.FlattenToVec(fc, "fc_fl")
sigm = model.Sigmoid(fc_fl, "sigm")
sq = model.SquaredL2Distance([sigm, "label"], "sq")
loss = model.AveragedLoss(sq, "loss")
loss = model.Scale(loss, scale=loss_scale)
return [loss]
def _param_update_fun(self, model):
ITER = model.Iter("ITER")
LR = model.net.LearningRate(
[ITER],
"LR",
base_lr=(-0.1),
policy="fixed",
)
ONE = model.param_init_net.ConstantFill(
[], "ONE", shape=[1], value=1.0,
)
for param in model.GetParams():
grad = model.param_to_grad[param]
model.WeightedSum([param, ONE, grad, LR], param)
def _generate_data(self, devices, device_type, device_prefix):
np.random.seed(26)
# Each run has same input, independent of number of gpus
batch_size = 64
for _ in range(0, 10):
full_data = np.random.rand(batch_size, 16)
full_labels = np.round(full_data[:, 0])
batch_per_device = batch_size // len(devices)
for (j, g) in enumerate(devices):
st = j * batch_per_device
en = st + batch_per_device
data = full_data[st:en, :].astype(np.float32)
labels = full_labels[st:en].astype(np.float32)
with core.DeviceScope(core.DeviceOption(device_type, g)):
workspace.FeedBlob("{}_{}/data".format(device_prefix, g), data)
workspace.FeedBlob("{}_{}/label".format(device_prefix, g), labels)
@given(
cpu_device=st.booleans()
)
def test_parallelize_bmuf(self, cpu_device):
assume(cpu_device or workspace.has_gpu_support)
workspace.ResetWorkspace()
model = cnn.CNNModelHelper(
order="NHWC",
name="test"
)
devices = [0, 1]
def input_builder_fun(model):
return None
if not cpu_device:
device_type = caffe2_pb2.CUDA
device_prefix = "gpu"
else:
device_type = caffe2_pb2.CPU
device_prefix = "cpu"
self._generate_data(devices, device_type, device_prefix)
data_parallel_model.Parallelize_BMUF(
model,
input_builder_fun,
self._model_build_fun,
self._param_update_fun,
devices=devices,
cpu_device=cpu_device
)
data_parallel_model.RunInitNet(model)
# Check initial momentum params are zeros
self.assertEqual(
list(viewkeys(model._device_grouped_blobs)), ['fc_w', 'fc_b']
)
self.assertEqual(workspace.FetchBlob('{}_0/fc_b_v'.format(device_prefix)), 0)
np.testing.assert_equal(
workspace.FetchBlob('{}_0/fc_w_v'.format(device_prefix)),
np.zeros(16).astype(np.float32).reshape(1, 16)
)
# Run the algorithm for one iteration to have non-zero params.
data_parallel_model.RunNet(model, 1)
# Save iteration momentum and post local update params
v_b_ = workspace.FetchBlob('{}_0/fc_b_v'.format(device_prefix))
v_w_ = workspace.FetchBlob('{}_0/fc_w_v'.format(device_prefix))
workspace.RunNetOnce(model.net)
b_0_ = workspace.FetchBlob('{}_0/fc_b'.format(device_prefix))
w_0_ = workspace.FetchBlob('{}_0/fc_w'.format(device_prefix))
b_1_ = workspace.FetchBlob('{}_1/fc_b'.format(device_prefix))
w_1_ = workspace.FetchBlob('{}_1/fc_w'.format(device_prefix))
# Compute block gradients.
b_g_ = workspace.FetchBlob('{}_0/fc_b_g'.format(device_prefix))
w_g_ = workspace.FetchBlob('{}_0/fc_w_g'.format(device_prefix))
workspace.RunNetOnce(model._global_model_param_updates_net)
g_b = (b_0_ + b_1_) / 2 - b_g_
g_w = (w_0_ + w_1_) / 2 - w_g_
v_b = workspace.FetchBlob('{}_0/fc_b_v'.format(device_prefix))
v_w = workspace.FetchBlob('{}_0/fc_w_v'.format(device_prefix))
w_g = workspace.FetchBlob('{}_0/fc_w_g'.format(device_prefix))
b_g = workspace.FetchBlob('{}_0/fc_b_g'.format(device_prefix))
w_0 = workspace.FetchBlob('{}_0/fc_w'.format(device_prefix))
b_0 = workspace.FetchBlob('{}_0/fc_b'.format(device_prefix))
w_1 = workspace.FetchBlob('{}_1/fc_w'.format(device_prefix))
b_1 = workspace.FetchBlob('{}_1/fc_b'.format(device_prefix))
# Check momentum update step
np.testing.assert_equal(v_b, 0.5 * v_b_ + g_b)
np.testing.assert_equal(v_w, 0.5 * v_w_ + g_w)
np.testing.assert_equal(w_g, w_0)
np.testing.assert_equal(w_g, w_1)
np.testing.assert_equal(b_g, b_0)
np.testing.assert_equal(b_g, b_1)
# Check params update step
np.testing.assert_equal(w_0, w_g_ + v_w)
np.testing.assert_equal(b_0, b_g_ + v_b)
@unittest.skipIf(not workspace.has_gpu_support, "No gpu support.")
@unittest.skipIf(workspace.NumCudaDevices() < 2, "Need at least 2 GPUs.")
class SparseDataParallelModelTestWithSharedIndices(TestCase):
'''
Create and run the model. We try with both storing indices for gather
on CPU and on GPU
'''
def run_model(self, V, gpu_devices):
def input_builder_fun(model):
return None
def model_build_fun(model, loss_scale):
gpu_vecs_gathered = []
gpu_vecs = []
for num, vec in enumerate(self.vecs):
gpu_vec = model.param_init_net.CopyCPUToGPU(
vec, 'gpuvec_{}'.format(num),
)
if num != 2:
model.params.append(gpu_vec)
gpu_vecs.append(gpu_vec)
for num, gpu_vec in enumerate(gpu_vecs):
gpu_vec_gathered = model.net.Gather(
[gpu_vec, 'indices'],
['gpu_vec_gathered_{}'.format(num)]
)
gpu_vecs_gathered.append(gpu_vec_gathered)
assert len(gpu_vecs_gathered) == 3
fc = model.net.FC(
[
gpu_vecs_gathered[2],
gpu_vecs_gathered[0],
gpu_vecs_gathered[1],
],
['fc'],
)
_, loss = model.net.SoftmaxWithLoss(
[fc, 'label'],
['ce_loss', 'avg_loss'],
only_loss=True,
)
loss = model.Scale(loss, scale=loss_scale)
model.net.Print(loss, [], limit=10)
return [loss]
def param_update_fun(model):
ONE = model.param_init_net.ConstantFill(
[], "ONE", shape=[1], value=1.0,
)
LR = model.CopyCPUToGPU(self.LR, "LR")
for param in model.GetParams():
param_grad = model.param_to_grad[param]
if not isinstance(param_grad, core.GradientSlice):
model.WeightedSum([param, ONE, param_grad, LR], param)
else:
model.net.ScatterWeightedSum(
[
param,
ONE,
param_grad.indices,
param_grad.values,
ONE,
],
param,
)
workspace.ResetWorkspace()
model = cnn.CNNModelHelper(
order="NHWC",
name="sparse_test{}".format(gpu_devices),
)
batch_size = 32
batch_per_device = batch_size // len(gpu_devices)
with core.NameScope("cpu"):
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)):
self.ITER = model.Iter("ITER")
self.LR = model.net.LearningRate(
[self.ITER],
"LR",
base_lr=(-0.1),
policy="fixed",
)
'''
self.vecs consists of 3 big blobs on which we call Gather:
1) FC weights, shape=(V, 16)
2) FC bias, shape=(V)
3) FC input, shape=(batch_per_device, 16)
'''
self.vecs = [
model.param_init_net.UniformFill(
[], "vec_{}".format(num), shape=[V, 16])
for num in range(2)
]
self.vecs.append(
model.param_init_net.UniformFill(
[],
"vec_2", shape=[batch_per_device, 16]
)
)
self.ONE_CPU = model.param_init_net.ConstantFill(
[], "ONE_CPU", shape=[1], value=1.0,
)
data_parallel_model.Parallelize_GPU(
model,
input_builder_fun=input_builder_fun,
forward_pass_builder_fun=model_build_fun,
param_update_builder_fun=param_update_fun,
devices=gpu_devices,
)
# Update the vecs
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CUDA, 0)):
for num, vec in enumerate(self.vecs[:-1]):
model.CopyGPUToCPU("gpu_0/gpuvec_{}".format(num), vec)
# Each run has same input, independent of number of gpus
for i in range(0, 10):
np.random.seed(2603)
full_indices = np.random.permutation(V)[:batch_size].reshape(
batch_size
)
full_labels = full_indices[:] % batch_per_device
for (j, g) in enumerate(gpu_devices):
st = j * batch_per_device
en = st + batch_per_device
indices = full_indices[st:en].astype(np.int32)
labels = full_labels[st:en].astype(np.int32)
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CUDA, g)):
workspace.FeedBlob("gpu_{}/indices".format(g), indices)
workspace.FeedBlob("gpu_{}/label".format(g), labels)
if i == 0:
workspace.RunNetOnce(model.param_init_net)
# Force vecs to be same on all runs
orig_vecs = [
np.random.rand(V, 16).astype(np.float32),
np.random.rand(V).astype(np.float32),
np.random.rand(V, 16).astype(np.float32),
]
for vec, orig_vec in zip(self.vecs, orig_vecs):
workspace.FeedBlob(
vec,
orig_vec
)
for g in gpu_devices:
for num, orig_vec in enumerate(orig_vecs):
workspace.FeedBlob(
"gpu_{}/gpuvec_{}".format(g, num),
orig_vec,
device_option=core.DeviceOption(
caffe2_pb2.CUDA, g),
)
workspace.CreateNet(model.net)
workspace.RunNet(model.net.Proto().name)
idx = workspace.FetchBlob('gpu_0/indices')
grad_slices = [
workspace.FetchBlob(
'gpu_{}/gpu_vec_gathered_{}_grad'.format(g, num))
for g in gpu_devices for num in range(2)
]
for grad_slice in grad_slices:
# print (len(idx), len(grad_slice))
assert len(idx) == len(grad_slice), (
'Number of indices {} is not same as number of gradient '
'slices {}. This might lead to illegal memory access'.format(
len(idx), len(grad_slice)
)
)
def test_sparse_shared_indices_gpu(self):
'''
Test that the model has same number of indices and gradient rows
given total batchsize, independent of number of GPUs.
'''
V = 10000
self.run_model(V, [0, 1])
self.run_model(V, [0])
if workspace.NumCudaDevices() >= 4:
self.run_model(V, list(range(4)))
if workspace.NumCudaDevices() >= 8:
self.run_model(V, list(range(8)))
if __name__ == "__main__":
import unittest
unittest.main()