mirror of
https://github.com/zebrajr/pytorch.git
synced 2025-12-07 12:21:27 +01:00
Summary: Many constructors like `torch.zeros` or `torch.randn` didn't support size tracing correctly which is fixed by this pass. Same issue has been fixed in legacy tensor constructors. Additionally, new tensor constructors, which do not participate in tracing (most notably `torch.tensor`, `torch.as_tensor` and `torch.from_numpy`) raise a warning when they are used. Finally, entering a traceable operation disables the tracing in its body. This is needed because zdevito Pull Request resolved: https://github.com/pytorch/pytorch/pull/11288 Reviewed By: ezyang Differential Revision: D9751183 Pulled By: apaszke fbshipit-source-id: 51444a39d76a3e164adc396c432fd5ee3c8d5f7f
980 lines
38 KiB
Python
980 lines
38 KiB
Python
from __future__ import absolute_import
|
|
from __future__ import division
|
|
from __future__ import print_function
|
|
from __future__ import unicode_literals
|
|
|
|
from functools import wraps
|
|
import numpy as np
|
|
import sys
|
|
import unittest
|
|
import itertools
|
|
|
|
import torch.onnx
|
|
import torch.onnx.operators
|
|
from torch import nn
|
|
from torch.autograd import Variable, function
|
|
import torch.utils.model_zoo as model_zoo
|
|
from torch.nn.utils import rnn as rnn_utils
|
|
from debug_embed_params import run_embed_params
|
|
import io
|
|
|
|
# Import various models for testing
|
|
from torchvision.models.alexnet import alexnet
|
|
from torchvision.models.inception import inception_v3
|
|
from torchvision.models.densenet import densenet121
|
|
from torchvision.models.resnet import resnet50
|
|
from torchvision.models.vgg import vgg16, vgg16_bn, vgg19, vgg19_bn
|
|
|
|
from model_defs.squeezenet import SqueezeNet
|
|
from model_defs.super_resolution import SuperResolutionNet
|
|
from model_defs.srresnet import SRResNet
|
|
import model_defs.dcgan as dcgan
|
|
import model_defs.word_language_model as word_language_model
|
|
from model_defs.mnist import MNIST
|
|
from model_defs.lstm_flattening_result import LstmFlatteningResult
|
|
from model_defs.rnn_model_with_packed_sequence import RnnModelWithPackedSequence
|
|
|
|
import onnx
|
|
import caffe2.python.onnx.backend as c2
|
|
|
|
from test_pytorch_common import skipIfTravis, skipIfNoLapack, skipIfNoCuda
|
|
import verify
|
|
|
|
skip = unittest.skip
|
|
|
|
|
|
def skipIfEmbed(func):
|
|
def wrapper(self):
|
|
if self.embed_params:
|
|
raise unittest.SkipTest("Skip embed_params verify test")
|
|
return func(self)
|
|
return wrapper
|
|
|
|
|
|
# def import_model(proto, input, workspace=None, use_gpu=True):
|
|
# model_def = onnx.ModelProto.FromString(proto)
|
|
# onnx.checker.check_model(model_def)
|
|
#
|
|
# if workspace is None:
|
|
# workspace = {}
|
|
# if isinstance(input, tuple):
|
|
# for i in range(len(input)):
|
|
# workspace[model_def.graph.input[i]] = input[i]
|
|
# else:
|
|
# workspace[model_def.graph.input[0]] = input
|
|
#
|
|
# caffe2_out_workspace = c2.run_model(
|
|
# init_graph=None,
|
|
# predict_graph=graph_def,
|
|
# inputs=workspace,
|
|
# use_gpu=use_gpu)
|
|
# caffe2_out = caffe2_out_workspace[0]
|
|
# return caffe2_out
|
|
|
|
|
|
def do_export(model, inputs, *args, **kwargs):
|
|
f = io.BytesIO()
|
|
out = torch.onnx._export(model, inputs, f, *args, **kwargs)
|
|
return f.getvalue(), out
|
|
|
|
|
|
torch.set_default_tensor_type('torch.FloatTensor')
|
|
try:
|
|
import torch
|
|
except ImportError:
|
|
print('Cannot import torch, hence caffe2-torch test will not run.')
|
|
sys.exit(0)
|
|
|
|
|
|
BATCH_SIZE = 2
|
|
|
|
RNN_BATCH_SIZE = 7
|
|
RNN_SEQUENCE_LENGTH = 11
|
|
RNN_INPUT_SIZE = 5
|
|
RNN_HIDDEN_SIZE = 3
|
|
|
|
model_urls = {
|
|
'alexnet': 'https://download.pytorch.org/models/alexnet-owt-4df8aa71.pth',
|
|
'dcgan_b': 'https://s3.amazonaws.com/pytorch/test_data/export/netG_bedroom_epoch_1-0649e76b.pth',
|
|
'dcgan_f': 'https://s3.amazonaws.com/pytorch/test_data/export/netG_faces_epoch_49-d86035a6.pth',
|
|
'densenet121': 'https://download.pytorch.org/models/densenet121-d66d3027.pth',
|
|
'inception_v3_google': 'https://download.pytorch.org/models/inception_v3_google-1a9a5a14.pth',
|
|
'resnet50': 'https://download.pytorch.org/models/resnet50-19c8e357.pth',
|
|
'srresNet': 'https://s3.amazonaws.com/pytorch/demos/srresnet-e10b2039.pth',
|
|
'super_resolution': 'https://s3.amazonaws.com/pytorch/test_data/export/superres_epoch100-44c6958e.pth',
|
|
'squeezenet1_0': 'https://download.pytorch.org/models/squeezenet1_0-a815701f.pth',
|
|
'squeezenet1_1': 'https://download.pytorch.org/models/squeezenet1_1-f364aa15.pth',
|
|
'vgg16': 'https://download.pytorch.org/models/vgg16-397923af.pth',
|
|
'vgg19': 'https://download.pytorch.org/models/vgg19-dcbb9e9d.pth',
|
|
}
|
|
|
|
|
|
class TestCaffe2Backend(unittest.TestCase):
|
|
embed_params = False
|
|
|
|
def setUp(self):
|
|
torch.manual_seed(0)
|
|
if torch.cuda.is_available():
|
|
torch.cuda.manual_seed_all(0)
|
|
np.random.seed(seed=0)
|
|
|
|
def convert_cuda(self, model, input):
|
|
cuda_model = model.cuda()
|
|
# input might be nested - we want to move everything to GPU
|
|
cuda_input = function._nested_map(
|
|
lambda o: isinstance(o, Variable) or torch.is_tensor(o),
|
|
lambda o: o.cuda())(input)
|
|
return cuda_model, cuda_input
|
|
|
|
def run_debug_test(self, model, train, batch_size, state_dict=None,
|
|
input=None, use_gpu=True):
|
|
"""
|
|
# TODO: remove this from the final release version
|
|
This test is for our debugging only for the case where
|
|
embed_params=False
|
|
"""
|
|
model.train(train)
|
|
if state_dict is not None:
|
|
model.load_state_dict(state_dict)
|
|
|
|
# Either user specified input or random (deterministic) input
|
|
if input is None:
|
|
input = Variable(torch.randn(batch_size, 3, 224, 224),
|
|
requires_grad=True)
|
|
if use_gpu:
|
|
model, input = self.convert_cuda(model, input)
|
|
|
|
onnxir, torch_out = do_export(model, input, export_params=self.embed_params, verbose=False)
|
|
if isinstance(torch_out, torch.autograd.Variable):
|
|
torch_out = (torch_out,)
|
|
|
|
caffe2_out = run_embed_params(onnxir, model, input, state_dict, use_gpu)
|
|
for i, (x, y) in enumerate(zip(torch_out, caffe2_out)):
|
|
np.testing.assert_almost_equal(x.data.cpu().numpy(), y, decimal=3)
|
|
|
|
def run_actual_test(self, model, train, batch_size, state_dict=None,
|
|
input=None, use_gpu=True, rtol=0.001, atol=1e-7):
|
|
"""
|
|
This is what the user facing version will look like
|
|
"""
|
|
# set the training/test mode for the model
|
|
model.train(train)
|
|
# use the pre-trained model params if available
|
|
if state_dict is not None:
|
|
model.load_state_dict(state_dict)
|
|
|
|
# Either user specified input or random (deterministic) input
|
|
if input is None:
|
|
input = Variable(torch.randn(batch_size, 3, 224, 224),
|
|
requires_grad=True)
|
|
# GPU-ize the model, if requested
|
|
if use_gpu:
|
|
model, input = self.convert_cuda(model, input)
|
|
|
|
# Verify the model runs the same in Caffe2
|
|
verify.verify(model, input, c2, rtol=rtol, atol=atol)
|
|
|
|
def run_model_test(self, model, train, batch_size, state_dict=None,
|
|
input=None, use_gpu=True, rtol=0.001, atol=1e-7):
|
|
use_gpu_ = torch.cuda.is_available() and use_gpu
|
|
if self.embed_params:
|
|
self.run_actual_test(model, train, batch_size, state_dict, input,
|
|
use_gpu=use_gpu_, rtol=rtol, atol=atol)
|
|
else:
|
|
self.run_debug_test(model, train, batch_size, state_dict, input,
|
|
use_gpu=use_gpu_)
|
|
|
|
def test_linear(self):
|
|
model = nn.Linear(1, 1)
|
|
input = Variable(torch.randn(1, 1), requires_grad=True)
|
|
self.run_model_test(model, train=False, batch_size=0, input=input)
|
|
|
|
def test_lstm_cell(self):
|
|
model = nn.LSTMCell(RNN_INPUT_SIZE, RNN_HIDDEN_SIZE)
|
|
input = Variable(torch.randn(BATCH_SIZE, RNN_INPUT_SIZE))
|
|
h0 = Variable(torch.randn(BATCH_SIZE, RNN_HIDDEN_SIZE))
|
|
c0 = Variable(torch.randn(BATCH_SIZE, RNN_HIDDEN_SIZE))
|
|
self.run_model_test(model, train=False, batch_size=BATCH_SIZE, input=(input, (h0, c0)), use_gpu=False)
|
|
|
|
def test_gru_cell(self):
|
|
model = nn.GRUCell(RNN_INPUT_SIZE, RNN_HIDDEN_SIZE)
|
|
input = Variable(torch.randn(BATCH_SIZE, RNN_INPUT_SIZE))
|
|
h0 = Variable(torch.randn(BATCH_SIZE, RNN_HIDDEN_SIZE))
|
|
self.run_model_test(model, train=False, batch_size=BATCH_SIZE, input=(input, h0), use_gpu=False)
|
|
|
|
def _dispatch_rnn_test(self, name, *args, **kwargs):
|
|
if name == 'elman':
|
|
self._elman_rnn_test(*args, **kwargs)
|
|
if name == 'lstm':
|
|
self._lstm_test(*args, **kwargs)
|
|
if name == 'gru':
|
|
self._gru_test(*args, **kwargs)
|
|
|
|
def _elman_rnn_test(self, layers, nonlinearity, bidirectional,
|
|
initial_state, packed_sequence, dropout):
|
|
model = nn.RNN(RNN_INPUT_SIZE, RNN_HIDDEN_SIZE,
|
|
layers,
|
|
nonlinearity=nonlinearity,
|
|
bidirectional=bidirectional,
|
|
dropout=dropout)
|
|
|
|
if packed_sequence == 1:
|
|
model = RnnModelWithPackedSequence(model, False)
|
|
if packed_sequence == 2:
|
|
model = RnnModelWithPackedSequence(model, True)
|
|
|
|
def make_input(batch_size):
|
|
seq_lengths = np.random.randint(1, RNN_SEQUENCE_LENGTH + 1, size=batch_size)
|
|
seq_lengths = list(reversed(sorted(map(int, seq_lengths))))
|
|
inputs = [Variable(torch.randn(l, RNN_INPUT_SIZE)) for l in seq_lengths]
|
|
inputs = rnn_utils.pad_sequence(inputs)
|
|
if packed_sequence == 2:
|
|
inputs = inputs.transpose(0, 1)
|
|
inputs = [inputs]
|
|
|
|
directions = 2 if bidirectional else 1
|
|
|
|
if initial_state:
|
|
h0 = Variable(torch.randn(directions * layers, batch_size, RNN_HIDDEN_SIZE))
|
|
inputs.append(h0)
|
|
if packed_sequence != 0:
|
|
inputs.append(Variable(torch.IntTensor(seq_lengths)))
|
|
if len(inputs) == 1:
|
|
input = inputs[0]
|
|
else:
|
|
input = tuple(inputs)
|
|
return input
|
|
|
|
input = make_input(RNN_BATCH_SIZE)
|
|
self.run_model_test(model, train=False, batch_size=RNN_BATCH_SIZE, input=input, use_gpu=False, atol=1e-7)
|
|
|
|
# test that the model still runs with a different batch size
|
|
onnxir, _ = do_export(model, input)
|
|
other_input = make_input(RNN_BATCH_SIZE + 1)
|
|
_ = run_embed_params(onnxir, model, other_input, use_gpu=False)
|
|
|
|
def _lstm_test(self, layers, bidirectional, initial_state,
|
|
packed_sequence, dropout):
|
|
model = LstmFlatteningResult(
|
|
RNN_INPUT_SIZE, RNN_HIDDEN_SIZE, layers,
|
|
bidirectional=bidirectional, dropout=dropout)
|
|
if packed_sequence == 1:
|
|
model = RnnModelWithPackedSequence(model, False)
|
|
if packed_sequence == 2:
|
|
model = RnnModelWithPackedSequence(model, True)
|
|
|
|
def make_input(batch_size):
|
|
seq_lengths = np.random.randint(1, RNN_SEQUENCE_LENGTH + 1, size=batch_size)
|
|
seq_lengths = list(reversed(sorted(map(int, seq_lengths))))
|
|
inputs = [Variable(torch.randn(l, RNN_INPUT_SIZE)) for l in seq_lengths]
|
|
inputs = rnn_utils.pad_sequence(inputs)
|
|
if packed_sequence == 2:
|
|
inputs = inputs.transpose(0, 1)
|
|
inputs = [inputs]
|
|
|
|
directions = 2 if bidirectional else 1
|
|
|
|
if initial_state:
|
|
h0 = Variable(torch.randn(directions * layers, batch_size, RNN_HIDDEN_SIZE))
|
|
c0 = Variable(torch.randn(directions * layers, batch_size, RNN_HIDDEN_SIZE))
|
|
inputs.append((h0, c0))
|
|
if packed_sequence != 0:
|
|
inputs.append(Variable(torch.IntTensor(seq_lengths)))
|
|
if len(inputs) == 1:
|
|
input = inputs[0]
|
|
else:
|
|
input = tuple(inputs)
|
|
return input
|
|
|
|
input = make_input(RNN_BATCH_SIZE)
|
|
self.run_model_test(model, train=False, batch_size=RNN_BATCH_SIZE, input=input, use_gpu=False)
|
|
|
|
# test that the model still runs with a different batch size
|
|
onnxir, _ = do_export(model, input)
|
|
other_input = make_input(RNN_BATCH_SIZE + 1)
|
|
_ = run_embed_params(onnxir, model, other_input, use_gpu=False)
|
|
|
|
def _gru_test(self, layers, bidirectional, initial_state,
|
|
packed_sequence, dropout):
|
|
model = nn.GRU(RNN_INPUT_SIZE, RNN_HIDDEN_SIZE, layers,
|
|
bidirectional=bidirectional, dropout=dropout)
|
|
if packed_sequence == 1:
|
|
model = RnnModelWithPackedSequence(model, False)
|
|
if packed_sequence == 2:
|
|
model = RnnModelWithPackedSequence(model, True)
|
|
|
|
def make_input(batch_size):
|
|
seq_lengths = np.random.randint(1, RNN_SEQUENCE_LENGTH + 1, size=batch_size)
|
|
seq_lengths = list(reversed(sorted(map(int, seq_lengths))))
|
|
inputs = [Variable(torch.randn(l, RNN_INPUT_SIZE)) for l in seq_lengths]
|
|
inputs = rnn_utils.pad_sequence(inputs)
|
|
if packed_sequence == 2:
|
|
inputs = inputs.transpose(0, 1)
|
|
inputs = [inputs]
|
|
|
|
directions = 2 if bidirectional else 1
|
|
|
|
if initial_state:
|
|
h0 = Variable(torch.randn(directions * layers, batch_size, RNN_HIDDEN_SIZE))
|
|
inputs.append(h0)
|
|
if packed_sequence != 0:
|
|
inputs.append(Variable(torch.IntTensor(seq_lengths)))
|
|
if len(inputs) == 1:
|
|
input = inputs[0]
|
|
else:
|
|
input = tuple(inputs)
|
|
return input
|
|
|
|
input = make_input(RNN_BATCH_SIZE)
|
|
self.run_model_test(model, train=False, batch_size=RNN_BATCH_SIZE, input=input, use_gpu=False)
|
|
|
|
# test that the model still runs with a different batch size
|
|
onnxir, _ = do_export(model, input)
|
|
other_input = make_input(RNN_BATCH_SIZE + 1)
|
|
_ = run_embed_params(onnxir, model, other_input, use_gpu=False)
|
|
|
|
def test_rnn_init_predict_split(self):
|
|
model = nn.LSTM(RNN_INPUT_SIZE, RNN_HIDDEN_SIZE, 3, bidirectional=True)
|
|
seq_lengths = np.random.randint(1, RNN_SEQUENCE_LENGTH + 1, size=7)
|
|
seq_lengths = list(reversed(sorted(map(int, seq_lengths))))
|
|
input = [Variable(torch.randn(l, RNN_INPUT_SIZE)) for l in seq_lengths]
|
|
input = rnn_utils.pad_sequence(input)
|
|
|
|
# Test that we are correctly splitting between init and
|
|
# predict net. When we embed parameters, there should be more
|
|
# ops in the init net.
|
|
mp = onnx.ModelProto.FromString(do_export(model, input, export_params=self.embed_params)[0])
|
|
prepared = c2.prepare(mp, device='CPU')
|
|
if self.embed_params:
|
|
assert len(prepared.init_net.op) == 1019
|
|
assert len(prepared.predict_net.op) == 142
|
|
else:
|
|
assert len(prepared.init_net.op) == 8
|
|
assert len(prepared.predict_net.op) == 1153
|
|
|
|
def test_alexnet(self):
|
|
state_dict = model_zoo.load_url(model_urls['alexnet'], progress=False)
|
|
self.run_model_test(alexnet(), train=False, batch_size=BATCH_SIZE,
|
|
state_dict=state_dict, atol=1e-3)
|
|
|
|
@skipIfNoCuda
|
|
def test_dcgan(self):
|
|
# dcgan is flaky on some seeds, see:
|
|
# https://github.com/ProjectToffee/onnx/pull/70
|
|
torch.manual_seed(1)
|
|
if torch.cuda.is_available():
|
|
torch.cuda.manual_seed_all(1)
|
|
|
|
netD = dcgan._netD(1)
|
|
netD.apply(dcgan.weights_init)
|
|
input = Variable(torch.randn(BATCH_SIZE, 3, dcgan.imgsz, dcgan.imgsz))
|
|
self.run_model_test(netD, train=False, batch_size=BATCH_SIZE,
|
|
input=input)
|
|
|
|
netG = dcgan._netG(1)
|
|
netG.apply(dcgan.weights_init)
|
|
state_dict = model_zoo.load_url(model_urls['dcgan_b'], progress=False)
|
|
# state_dict = model_zoo.load_url(model_urls['dcgan_f'], progress=False)
|
|
noise = Variable(
|
|
torch.randn(BATCH_SIZE, dcgan.nz, 1, 1).normal_(0, 1))
|
|
self.run_model_test(netG, train=False, batch_size=BATCH_SIZE,
|
|
input=noise, state_dict=state_dict, rtol=1e-2, atol=1e-6)
|
|
|
|
@unittest.skipIf(not torch.cuda.is_available(),
|
|
"model on net has cuda in it, awaiting fix")
|
|
def test_densenet(self):
|
|
state_dict = model_zoo.load_url(model_urls['densenet121'], progress=False)
|
|
self.run_model_test(densenet121(), train=False, batch_size=BATCH_SIZE,
|
|
state_dict=state_dict, atol=1e-7)
|
|
|
|
@skip("doesn't match exactly...")
|
|
# TODO: figure out the numerical instabilities
|
|
def test_inception(self):
|
|
x = Variable(
|
|
torch.randn(BATCH_SIZE, 3, 299, 299), requires_grad=True)
|
|
# state_dict = model_zoo.load_url(model_urls['inception_v3_google'], progress=False)
|
|
state_dict = None
|
|
self.run_model_test(inception_v3(), train=False, batch_size=BATCH_SIZE,
|
|
state_dict=state_dict, input=x)
|
|
|
|
def test_resnet(self):
|
|
state_dict = model_zoo.load_url(model_urls['resnet50'], progress=False)
|
|
self.run_model_test(resnet50(), train=False, batch_size=BATCH_SIZE,
|
|
state_dict=state_dict, atol=1e-6)
|
|
|
|
def test_squeezenet(self):
|
|
sqnet_v1_1 = SqueezeNet(version=1.1)
|
|
state_dict = model_zoo.load_url(model_urls['squeezenet1_1'], progress=False)
|
|
# state_dict = model_zoo.load_url(model_urls['squeezenet1_0'], progress=False)
|
|
self.run_model_test(sqnet_v1_1, train=False, batch_size=BATCH_SIZE,
|
|
state_dict=state_dict)
|
|
|
|
# @skip('takes long to run, LAPACK needed for gpu')
|
|
@skipIfNoLapack
|
|
@unittest.skip("This model takes too much memory")
|
|
def test_srresnet(self):
|
|
super_resolution_net = SRResNet(
|
|
rescale_factor=4, n_filters=64, n_blocks=8)
|
|
state_dict = model_zoo.load_url(model_urls['srresNet'], progress=False)
|
|
x = Variable(torch.randn(1, 3, 224, 224), requires_grad=True)
|
|
self.run_model_test(super_resolution_net, train=False,
|
|
batch_size=1, state_dict=state_dict,
|
|
input=x, use_gpu=False)
|
|
|
|
@skipIfTravis
|
|
@skipIfNoLapack
|
|
@skipIfNoCuda
|
|
def test_super_resolution(self):
|
|
super_resolution_net = SuperResolutionNet(upscale_factor=3)
|
|
state_dict = model_zoo.load_url(model_urls['super_resolution'], progress=False)
|
|
x = Variable(torch.randn(1, 1, 224, 224), requires_grad=True)
|
|
self.run_model_test(super_resolution_net, train=False,
|
|
batch_size=BATCH_SIZE, state_dict=state_dict,
|
|
input=x, use_gpu=False, atol=1e-6)
|
|
|
|
@unittest.skip("This model takes too much memory")
|
|
def test_vgg16(self):
|
|
state_dict = model_zoo.load_url(model_urls['vgg16'], progress=False)
|
|
self.run_model_test(vgg16(), train=False, batch_size=BATCH_SIZE,
|
|
state_dict=state_dict)
|
|
|
|
@skip("disable to run tests faster...")
|
|
def test_vgg16_bn(self):
|
|
self.run_model_test(vgg16_bn(), train=False,
|
|
batch_size=BATCH_SIZE)
|
|
|
|
@skip("disable to run tests faster...")
|
|
def test_vgg19(self):
|
|
state_dict = model_zoo.load_url(model_urls['vgg19'], progress=False)
|
|
self.run_model_test(vgg19(), train=False, batch_size=BATCH_SIZE,
|
|
state_dict=state_dict)
|
|
|
|
@skip("disable to run tests faster...")
|
|
def test_vgg19_bn(self):
|
|
self.run_model_test(vgg19_bn(), train=False,
|
|
batch_size=BATCH_SIZE)
|
|
|
|
def run_word_language_model(self, model_name):
|
|
ntokens = 50
|
|
emsize = 5
|
|
nhid = 5
|
|
nlayers = 5
|
|
dropout = 0.2
|
|
tied = False
|
|
batchsize = 5
|
|
model = word_language_model.RNNModel(model_name, ntokens, emsize,
|
|
nhid, nlayers, dropout, tied,
|
|
batchsize)
|
|
x = Variable(torch.arange(0, ntokens).long().view(-1, batchsize),
|
|
requires_grad=False)
|
|
# Only support CPU version, since tracer is not working in GPU RNN.
|
|
self.run_model_test(model, train=False, input=(x, model.hidden),
|
|
batch_size=batchsize, use_gpu=False)
|
|
|
|
def test_word_language_model_RNN_TANH(self):
|
|
self.run_word_language_model("RNN_TANH")
|
|
|
|
def test_word_language_model_RNN_RELU(self):
|
|
self.run_word_language_model("RNN_RELU")
|
|
|
|
def test_word_language_model_LSTM(self):
|
|
self.run_word_language_model("LSTM")
|
|
|
|
def test_word_language_model_GRU(self):
|
|
self.run_word_language_model("GRU")
|
|
|
|
def test_batchnorm1d_special(self):
|
|
c = Variable(torch.randn(BATCH_SIZE, 224))
|
|
model = nn.BatchNorm1d(224)
|
|
self.run_model_test(model, train=True, input=c, batch_size=BATCH_SIZE)
|
|
|
|
def test_batchnorm2d_noaffine(self):
|
|
c = Variable(torch.randn(128, 128, 1, 1))
|
|
model = nn.BatchNorm2d(128, affine=False)
|
|
self.run_model_test(model, train=False, input=c, batch_size=BATCH_SIZE)
|
|
|
|
def test_constant(self):
|
|
c = Variable(torch.randn(BATCH_SIZE, 3, 224, 224))
|
|
|
|
class MyModel(torch.nn.Module):
|
|
def __init__(self):
|
|
super(MyModel, self).__init__()
|
|
|
|
def forward(self, input):
|
|
return input + c.type_as(input)
|
|
|
|
self.run_model_test(MyModel(), train=False, batch_size=BATCH_SIZE)
|
|
|
|
def test_consumed_bn(self):
|
|
underlying = nn.BatchNorm2d(3)
|
|
self.run_model_test(underlying, train=True, batch_size=BATCH_SIZE)
|
|
|
|
def _test_index_generic(self, fn):
|
|
class MyModel(torch.nn.Module):
|
|
def __init__(self):
|
|
super(MyModel, self).__init__()
|
|
|
|
def forward(self, input):
|
|
return fn(input)
|
|
|
|
m1 = Variable(torch.randn(3, 4))
|
|
self.run_model_test(MyModel(), input=m1, train=False, batch_size=BATCH_SIZE)
|
|
|
|
def test_index_1d(self):
|
|
self._test_index_generic(lambda input: input[0])
|
|
|
|
def test_index_2d_1dimslice(self):
|
|
self._test_index_generic(lambda input: input[0:1, :])
|
|
|
|
def test_index_2d_sliceint(self):
|
|
self._test_index_generic(lambda input: input[1, :])
|
|
|
|
def test_index_2d_neg_slice(self):
|
|
self._test_index_generic(lambda input: input[0:-1, :])
|
|
|
|
# TODO: Slicing along two dimensions is currently unsupported by the caffe2
|
|
# backend. Revisit if this becomes supported in the future.
|
|
"""
|
|
def test_index_2d_2dimslice(self):
|
|
self._test_index_generic(lambda input: input[0:1, 0:1])
|
|
"""
|
|
"""
|
|
def test_index_2d_neg_slice2dim(self):
|
|
self._test_index_generic(lambda input: input[0:-1, 0:-1])
|
|
"""
|
|
|
|
def test_chunk(self):
|
|
class MyModel(torch.nn.Module):
|
|
def __init__(self):
|
|
super(MyModel, self).__init__()
|
|
|
|
def forward(self, input):
|
|
# TODO: Why index? This returns a tuple and test runner doesn't
|
|
# support tuple comparison.
|
|
return input.chunk(8, dim=2)[-1]
|
|
self.run_model_test(MyModel(), train=False, batch_size=BATCH_SIZE)
|
|
|
|
def test_sqrt(self):
|
|
class MyModel(torch.nn.Module):
|
|
def __init__(self):
|
|
super(MyModel, self).__init__()
|
|
|
|
def forward(self, input):
|
|
return input.sqrt()
|
|
input = Variable(torch.empty(BATCH_SIZE, 10, 10).uniform_(4, 9))
|
|
self.run_model_test(MyModel(), train=False, input=input, batch_size=BATCH_SIZE)
|
|
|
|
def test_log(self):
|
|
class MyModel(torch.nn.Module):
|
|
def __init__(self):
|
|
super(MyModel, self).__init__()
|
|
|
|
def forward(self, input):
|
|
return input.log()
|
|
input = Variable(torch.empty(BATCH_SIZE, 10, 10).uniform_(4, 9))
|
|
self.run_model_test(MyModel(), train=False, input=input, batch_size=BATCH_SIZE)
|
|
|
|
def test_trigonometry(self):
|
|
def test_func(name):
|
|
class MyModel(torch.nn.Module):
|
|
def __init__(self):
|
|
super(MyModel, self).__init__()
|
|
|
|
def forward(self, input):
|
|
return getattr(input, name)()
|
|
input = Variable(torch.empty(BATCH_SIZE, 10, 10).uniform_())
|
|
self.run_model_test(MyModel(), train=False, input=input, batch_size=BATCH_SIZE)
|
|
|
|
test_func('cos')
|
|
test_func('sin')
|
|
test_func('tan')
|
|
test_func('acos')
|
|
test_func('asin')
|
|
test_func('atan')
|
|
|
|
def test_addconstant(self):
|
|
class MyModel(torch.nn.Module):
|
|
def __init__(self):
|
|
super(MyModel, self).__init__()
|
|
|
|
def forward(self, input):
|
|
# TODO: Why index? This returns a tuple and test runner doesn't
|
|
# support tuple comparison.
|
|
return input + 1
|
|
self.run_model_test(MyModel(), train=False, batch_size=BATCH_SIZE)
|
|
|
|
def test_subconstant(self):
|
|
class MyModel(torch.nn.Module):
|
|
def __init__(self):
|
|
super(MyModel, self).__init__()
|
|
|
|
def forward(self, input):
|
|
# TODO: Why index? This returns a tuple and test runner doesn't
|
|
# support tuple comparison.
|
|
return input - 1
|
|
self.run_model_test(MyModel(), train=False, batch_size=BATCH_SIZE)
|
|
|
|
def test_embedding(self):
|
|
model = nn.Embedding(10, 3, padding_idx=-1)
|
|
input = Variable(torch.LongTensor(list(range(10))[::-1]))
|
|
self.run_model_test(model, train=False, input=input, batch_size=BATCH_SIZE)
|
|
|
|
def test_constantpad2d(self):
|
|
model = nn.ConstantPad2d((1, 2, 3, 4), 3.5)
|
|
self.run_model_test(model, train=False, batch_size=BATCH_SIZE)
|
|
|
|
def test_reflectionpad2d(self):
|
|
model = nn.ReflectionPad2d((1, 2, 3, 4))
|
|
self.run_model_test(model, train=False, batch_size=BATCH_SIZE)
|
|
|
|
def test_replicationpad2d(self):
|
|
model = nn.ReplicationPad2d((1, 2, 3, 4))
|
|
self.run_model_test(model, train=False, batch_size=BATCH_SIZE)
|
|
|
|
def test_maxpool2d(self):
|
|
model = nn.MaxPool2d(5, padding=(1, 2))
|
|
self.run_model_test(model, train=False, batch_size=BATCH_SIZE)
|
|
|
|
def test_maxpool2d_single_padding(self):
|
|
model = nn.MaxPool2d(5, padding=2)
|
|
self.run_model_test(model, train=False, batch_size=BATCH_SIZE)
|
|
|
|
@unittest.skip("C2 and PyTorch have small difference in padding implementation")
|
|
def test_avgpool2d(self):
|
|
model = nn.AvgPool2d(5, padding=(2))
|
|
self.run_model_test(model, train=False, batch_size=BATCH_SIZE)
|
|
|
|
def test_avgpool2d_no_padding(self):
|
|
model = nn.AvgPool2d(5)
|
|
self.run_model_test(model, train=False, batch_size=BATCH_SIZE)
|
|
|
|
def test_weight_norm(self):
|
|
model = nn.utils.weight_norm(nn.Conv1d(1, 1, 3))
|
|
input = Variable(torch.randn(1, 1, 5), requires_grad=True)
|
|
self.run_model_test(
|
|
model, train=True, batch_size=0, input=input, use_gpu=False
|
|
)
|
|
|
|
def test_mnist(self):
|
|
model = MNIST()
|
|
input = Variable(torch.randn(BATCH_SIZE, 1, 28, 28))
|
|
state_dict = None
|
|
# TODO: test with state_dict
|
|
self.run_model_test(model, train=False, input=input, batch_size=BATCH_SIZE,
|
|
state_dict=state_dict)
|
|
|
|
def test_mm(self):
|
|
class MyModel(torch.nn.Module):
|
|
def __init__(self):
|
|
super(MyModel, self).__init__()
|
|
|
|
def forward(self, m1, m2):
|
|
return torch.mm(m1, m2)
|
|
m1 = Variable(torch.randn(3, 4))
|
|
m2 = Variable(torch.randn(4, 5))
|
|
self.run_model_test(MyModel(), train=False, input=(m1, m2), batch_size=BATCH_SIZE, use_gpu=False)
|
|
|
|
def test_addmm(self):
|
|
class MyModel(torch.nn.Module):
|
|
def __init__(self):
|
|
super(MyModel, self).__init__()
|
|
|
|
def forward(self, ma, m1, m2):
|
|
return torch.addmm(ma, m1, m2)
|
|
ma = Variable(torch.randn(5))
|
|
m1 = Variable(torch.randn(3, 4))
|
|
m2 = Variable(torch.randn(4, 5))
|
|
self.run_model_test(MyModel(), train=False, input=(ma, m1, m2), batch_size=BATCH_SIZE, use_gpu=False)
|
|
|
|
# test for a pytorch optimization pass, see https://github.com/pytorch/pytorch/pull/7872
|
|
def test_consecutive_transposes(self):
|
|
class MyModel(torch.nn.Module):
|
|
def __init__(self):
|
|
super(MyModel, self).__init__()
|
|
|
|
def forward(self, x):
|
|
return x.transpose(1, 2).transpose(2, 3)
|
|
x = Variable(torch.randn(5, 6, 7, 8))
|
|
self.run_model_test(MyModel(), train=False, input=x, batch_size=BATCH_SIZE, use_gpu=False)
|
|
|
|
def test_sum(self):
|
|
shape = (3, 4, 5)
|
|
for params in [{}] + [{'dim': i} for i in range(len(shape))]:
|
|
class MyModel(torch.nn.Module):
|
|
def __init__(self):
|
|
super(MyModel, self).__init__()
|
|
|
|
def forward(self, x):
|
|
return torch.sum(x, **params)
|
|
x = Variable(torch.randn(*shape))
|
|
self.run_model_test(MyModel(), train=False, input=(x), batch_size=BATCH_SIZE, use_gpu=False)
|
|
|
|
def test_cumsum(self):
|
|
shape = (3, 4, 5)
|
|
for params in [{'dim': i} for i in range(len(shape))]:
|
|
class MyModel(torch.nn.Module):
|
|
def __init__(self):
|
|
super(MyModel, self).__init__()
|
|
|
|
def forward(self, x):
|
|
return torch.cumsum(x, **params)
|
|
x = Variable(torch.randn(*shape))
|
|
self.run_model_test(MyModel(), train=False, input=(x), batch_size=BATCH_SIZE, use_gpu=False)
|
|
|
|
def test_layer_norm(self):
|
|
shape = (20, 5, 10, 10)
|
|
|
|
class MyModel(torch.nn.Module):
|
|
def __init__(self):
|
|
super(MyModel, self).__init__()
|
|
self.ln = torch.nn.LayerNorm([5, 10, 10])
|
|
|
|
def forward(self, x):
|
|
return self.ln(x)
|
|
|
|
x = torch.randn(*shape)
|
|
self.run_model_test(MyModel(), train=False, input=(x,), batch_size=BATCH_SIZE, use_gpu=False)
|
|
|
|
def test_repeat(self):
|
|
class MyModel(torch.nn.Module):
|
|
def __init__(self):
|
|
super(MyModel, self).__init__()
|
|
|
|
def forward(self, x):
|
|
return x.repeat(1, 2, 3, 4)
|
|
|
|
x = Variable(torch.randn(4, 3, 2, 1), requires_grad=True)
|
|
self.run_model_test(MyModel(), train=False, input=(x), batch_size=BATCH_SIZE, use_gpu=False)
|
|
|
|
def test_repeat_dim_overflow(self):
|
|
class MyModel(torch.nn.Module):
|
|
def __init__(self):
|
|
super(MyModel, self).__init__()
|
|
|
|
def forward(self, x):
|
|
return x.repeat(1, 2, 3, 4)
|
|
|
|
x = Variable(torch.randn(1, 2), requires_grad=True)
|
|
self.run_model_test(MyModel(), train=False, input=(x), batch_size=BATCH_SIZE, use_gpu=False)
|
|
|
|
def test_repeat_dynamic(self):
|
|
class MyModel(torch.nn.Module):
|
|
def __init__(self):
|
|
super(MyModel, self).__init__()
|
|
|
|
def forward(self, x, y):
|
|
return x.repeat(y.size()[0] / 2, y.size()[1] * 2)
|
|
|
|
x = Variable(torch.randn(1, 2), requires_grad=True)
|
|
y = Variable(torch.randn(2, 4), requires_grad=True)
|
|
self.run_model_test(MyModel(), train=False, input=(x, y), batch_size=BATCH_SIZE, use_gpu=False)
|
|
|
|
def test_mean(self):
|
|
shape = (3, 4, 5)
|
|
for params in [{}] + [{'dim': i} for i in range(len(shape))]:
|
|
class MyModel(torch.nn.Module):
|
|
def __init__(self):
|
|
super(MyModel, self).__init__()
|
|
|
|
def forward(self, x):
|
|
return torch.mean(x, **params)
|
|
x = Variable(torch.randn(*shape))
|
|
self.run_model_test(MyModel(), train=False, input=(x), batch_size=BATCH_SIZE, use_gpu=False)
|
|
|
|
# TODO: Add test cases for prod once Caffe2 has support for ReduceProd
|
|
|
|
def test_softmax(self):
|
|
for i in range(7)[2:]:
|
|
model = nn.Softmax(dim=i - 1)
|
|
dims = [2] * (i - 2) + [3, 4]
|
|
input = Variable(torch.randn(*dims).fill_(1),
|
|
requires_grad=True)
|
|
self.run_model_test(model, train=False, batch_size=BATCH_SIZE, input=input)
|
|
|
|
def test_logsoftmax(self):
|
|
for i in range(7)[2:]:
|
|
model = nn.LogSoftmax(dim=i - 1)
|
|
dims = [2] * (i - 2) + [3, 4]
|
|
input = Variable(torch.randn(*dims).fill_(1),
|
|
requires_grad=True)
|
|
self.run_model_test(model, train=False, batch_size=BATCH_SIZE, input=input)
|
|
|
|
def test_convtranspose(self):
|
|
model = nn.ConvTranspose2d(3, 3, 3, stride=3, bias=False, padding=1, output_padding=2)
|
|
self.run_model_test(model, train=False, batch_size=BATCH_SIZE, atol=1e-7)
|
|
|
|
def test_unsqueeze(self):
|
|
shape = (3, 4, 5)
|
|
for dim in range(len(shape) + 1):
|
|
class MyModel(torch.nn.Module):
|
|
def __init__(self):
|
|
super(MyModel, self).__init__()
|
|
|
|
def forward(self, x):
|
|
return x.unsqueeze(dim)
|
|
x = Variable(torch.randn(*shape))
|
|
self.run_model_test(MyModel(), train=False, input=(x), batch_size=BATCH_SIZE, atol=1e-7)
|
|
|
|
# NB: InstanceNorm model includes unused weights, so skip this in TestCaffe2BackendEmbed
|
|
# TODO: We should have another pass to eliminate the unused initializers in ONNX models.
|
|
@skipIfEmbed
|
|
def test_instance_norm(self):
|
|
underlying = nn.InstanceNorm2d(3)
|
|
self.run_model_test(underlying, train=False, batch_size=BATCH_SIZE)
|
|
|
|
def test_dynamic_sizes(self):
|
|
class MyModel(torch.nn.Module):
|
|
def __init__(self):
|
|
super(MyModel, self).__init__()
|
|
|
|
def forward(self, x):
|
|
shape = torch.onnx.operators.shape_as_tensor(x)
|
|
new_shape = torch.cat((torch.LongTensor([-1]), shape[0].view(1)))
|
|
return torch.onnx.operators.reshape_from_tensor_shape(x, new_shape)
|
|
x = Variable(torch.randn(3, 5, 7))
|
|
self.run_model_test(MyModel(), train=False, input=x, batch_size=BATCH_SIZE, use_gpu=False)
|
|
|
|
def test_advanced_broadcast(self):
|
|
class MyModel(torch.nn.Module):
|
|
def __init__(self):
|
|
super(MyModel, self).__init__()
|
|
|
|
def forward(self, x, y):
|
|
return torch.mul(x, y)
|
|
x = Variable(torch.randn(1, 5, 10))
|
|
y = Variable(torch.randn(1, 5, 1))
|
|
self.run_model_test(MyModel(), train=False, input=(x, y), batch_size=BATCH_SIZE, use_gpu=False)
|
|
|
|
def test_int8_export(self):
|
|
class MyModel(torch.nn.Module):
|
|
def __init__(self):
|
|
super(MyModel, self).__init__()
|
|
self.param = torch.ByteTensor(3, 4).random_()
|
|
|
|
def forward(self, x):
|
|
return x * self.param.float()
|
|
|
|
import io
|
|
f = io.BytesIO()
|
|
from torch.onnx import ExportTypes
|
|
torch.onnx._export(MyModel(), (torch.rand(3, 4),), f, verbose=True, export_type=ExportTypes.ZIP_ARCHIVE)
|
|
|
|
X = np.random.rand(3, 4).astype(np.float32)
|
|
|
|
f.seek(0)
|
|
import caffe2.python.onnx.backend as c2
|
|
model = c2.prepare_zip_archive(f)
|
|
model.run(X)
|
|
|
|
def test_neg_slice(self):
|
|
class NegSlice(torch.nn.Module):
|
|
def forward(self, x):
|
|
return x[-1, :, :]
|
|
|
|
x = torch.randn(3, 4, 5)
|
|
self.run_model_test(NegSlice(), train=False, input=(x,), batch_size=BATCH_SIZE, use_gpu=False)
|
|
|
|
def test_neg_slice_large(self):
|
|
class NegSlice(torch.nn.Module):
|
|
def forward(self, x):
|
|
return x[:, :, :, :, -3]
|
|
|
|
x = torch.randn(3, 4, 5, 6, 7)
|
|
self.run_model_test(NegSlice(), train=False, input=(x,), batch_size=BATCH_SIZE, use_gpu=False)
|
|
|
|
@unittest.skip('https://github.com/pytorch/pytorch/issues/10984')
|
|
def test_neg_slice_large_negone(self):
|
|
class NegSlice(torch.nn.Module):
|
|
def forward(self, x):
|
|
return x[:, :, :, :, -1]
|
|
|
|
x = torch.randn(3, 4, 5, 6, 7)
|
|
self.run_model_test(NegSlice(), train=False, input=(x,), batch_size=BATCH_SIZE, use_gpu=False)
|
|
|
|
# a bit of metaprogramming to set up all the rnn tests
|
|
|
|
|
|
def make_test(name, base, layer, bidirectional, initial_state,
|
|
variable_length, dropout,
|
|
**extra_kwargs):
|
|
test_name = str('_'.join([
|
|
'test', name, layer[1],
|
|
bidirectional[1], initial_state[1],
|
|
variable_length[1], dropout[1]
|
|
]))
|
|
|
|
def f(self):
|
|
self._dispatch_rnn_test(
|
|
base,
|
|
layers=layer[0],
|
|
bidirectional=bidirectional[0],
|
|
initial_state=initial_state[0],
|
|
packed_sequence=variable_length[0],
|
|
dropout=dropout[0],
|
|
**extra_kwargs)
|
|
|
|
f.__name__ = test_name
|
|
setattr(TestCaffe2Backend, f.__name__, f)
|
|
|
|
|
|
def setup_rnn_tests():
|
|
layers_opts = [
|
|
(1, 'unilayer'),
|
|
(3, 'trilayer')
|
|
]
|
|
bidirectional_opts = [
|
|
(False, 'forward'),
|
|
(True, 'bidirectional')
|
|
]
|
|
initial_state_opts = [
|
|
(True, 'with_initial_state'),
|
|
(False, 'no_initial_state')
|
|
]
|
|
variable_length_opts = [
|
|
(0, 'without_sequence_lengths'),
|
|
(1, 'with_variable_length_sequences'),
|
|
(2, 'with_batch_first_sequence_lengths')
|
|
]
|
|
dropout_opts = [
|
|
(0.2, 'with_dropout'),
|
|
(0.0, 'without_dropout')
|
|
]
|
|
test_count = 0
|
|
for (layer, bidirectional, initial_state, variable_length, dropout) in \
|
|
itertools.product(
|
|
layers_opts,
|
|
bidirectional_opts,
|
|
initial_state_opts,
|
|
variable_length_opts,
|
|
dropout_opts,
|
|
):
|
|
|
|
for base, name, extra_kwargs in (
|
|
('elman', 'elman_relu', {'nonlinearity': u'relu'}),
|
|
('elman', 'elman_tanh', {'nonlinearity': u'tanh'}),
|
|
('lstm', 'lstm', {}),
|
|
('gru', 'gru', {})
|
|
):
|
|
make_test(name, base, layer, bidirectional, initial_state,
|
|
variable_length, dropout,
|
|
**extra_kwargs)
|
|
test_count += 1
|
|
|
|
# sanity check that a representative example does exist
|
|
TestCaffe2Backend.test_gru_trilayer_forward_with_initial_state_without_sequence_lengths_with_dropout
|
|
|
|
# make sure no one accidentally disables all the tests without
|
|
# noticing
|
|
assert test_count == 192, test_count
|
|
setup_rnn_tests()
|
|
|
|
# add the same test suite as above, but switch embed_params=False
|
|
# to embed_params=True
|
|
TestCaffe2BackendEmbed = type(str("TestCaffe2BackendEmbed"),
|
|
(unittest.TestCase,),
|
|
dict(TestCaffe2Backend.__dict__, embed_params=True))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|