mirror of
https://github.com/zebrajr/pytorch.git
synced 2025-12-06 12:20:52 +01:00
Summary: a parameter can be initialized multiple times in init_net if parameter sharing is enabled. With the original implementation, only the first parameter init will be replaced by pre-trained parameters and the next are still unchanged. This overwrites the initialization with pre-trained parameters. This diff fixes this issue and also support model init for ads-intent project Reviewed By: dragonxlwang Differential Revision: D5991291 fbshipit-source-id: 36173f6239c56bd0d604a77bd94e36072f32faa7
383 lines
14 KiB
Python
383 lines
14 KiB
Python
# Copyright (c) 2016-present, Facebook, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
##############################################################################
|
|
|
|
## @package layers
|
|
# Module caffe2.python.layers.layers
|
|
from __future__ import absolute_import
|
|
from __future__ import division
|
|
from __future__ import print_function
|
|
from __future__ import unicode_literals
|
|
|
|
import logging
|
|
from caffe2.python import core, schema, scope, workspace
|
|
from caffe2.python.layers.tags import TagContext
|
|
from caffe2.proto import caffe2_pb2
|
|
|
|
from collections import namedtuple
|
|
import numpy as np
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(logging.INFO)
|
|
|
|
# Some types to simplify descriptions of things traveling between ops
|
|
IdList = schema.List(np.int64)
|
|
IdScoreList = schema.Map(np.int64, np.float32)
|
|
|
|
|
|
def get_key(record):
|
|
if schema.equal_schemas(record, IdList):
|
|
key = 'values'
|
|
elif schema.equal_schemas(record, IdScoreList, check_field_types=False):
|
|
key = 'values:keys'
|
|
else:
|
|
raise NotImplementedError()
|
|
assert record[key].metadata is not None, (
|
|
"Blob {} doesn't have metadata".format(str(record[key]())))
|
|
return record[key]
|
|
|
|
|
|
def get_categorical_limit(record):
|
|
key = get_key(record)
|
|
return key.metadata.categorical_limit
|
|
|
|
|
|
def set_request_only(field):
|
|
for f in field.all_scalars():
|
|
categorical_limit, expected_value = None, None
|
|
if not f.metadata:
|
|
feature_specs = schema.FeatureSpec(
|
|
feature_is_request_only=True,
|
|
)
|
|
elif not f.metadata.feature_specs:
|
|
categorical_limit = f.metadata.categorical_limit
|
|
expected_value = f.metadata.expected_value
|
|
feature_specs = schema.FeatureSpec(
|
|
feature_is_request_only=True,
|
|
)
|
|
else:
|
|
categorical_limit = f.metadata.categorical_limit
|
|
expected_value = f.metadata.expected_value
|
|
feature_specs = schema.FeatureSpec(
|
|
feature_type=f.metadata.feature_specs.feature_type,
|
|
feature_names=f.metadata.feature_specs.feature_names,
|
|
feature_ids=f.metadata.feature_specs.feature_ids,
|
|
feature_is_request_only=True,
|
|
desired_hash_size=f.metadata.feature_specs.desired_hash_size,
|
|
)
|
|
|
|
# make sure not to set categorical_limit for a non-integer field
|
|
if not np.issubdtype(f.field_type(), np.integer):
|
|
assert categorical_limit is None, \
|
|
"categorical_limit shouldn't be set for no-integer field"
|
|
|
|
f.set_metadata(
|
|
schema.Metadata(
|
|
categorical_limit=categorical_limit,
|
|
expected_value=expected_value,
|
|
feature_specs=feature_specs,
|
|
)
|
|
)
|
|
|
|
|
|
class InstantiationContext(object):
|
|
"""
|
|
List of contexts where layer could be instantitated
|
|
"""
|
|
# The layers support this context will accumulate predictions, labels,
|
|
# weights. The accumulated data can later be used to compute
|
|
# calibration or for other
|
|
# purpose.
|
|
ACCUMULATE_PRED = 'accumulate_pred'
|
|
EVAL = 'eval'
|
|
PREDICTION = 'prediction'
|
|
TRAINING = 'training'
|
|
|
|
|
|
_LAYER_REGISTRY = {}
|
|
|
|
|
|
def register_layer(name, layer):
|
|
assert name not in _LAYER_REGISTRY, "{0} already exists".format(name)
|
|
_LAYER_REGISTRY[name] = layer
|
|
|
|
|
|
def layer_exists(name):
|
|
return name in _LAYER_REGISTRY
|
|
|
|
|
|
def get_layer_class(name):
|
|
return _LAYER_REGISTRY[name]
|
|
|
|
|
|
def create_layer(layer_name, *args, **kwargs):
|
|
return _LAYER_REGISTRY[layer_name](*args, **kwargs)
|
|
|
|
|
|
LayerPsParam = namedtuple('LayerPsParam', ['sparse_key', 'average_length'])
|
|
|
|
|
|
class LayerParameter(object):
|
|
|
|
def __init__(self, parameter=None, optimizer=None, initializer=None,
|
|
ps_param=None):
|
|
assert isinstance(parameter, core.BlobReference), \
|
|
"expect {0} to be a blob reference".format(str(parameter))
|
|
# need to put the following line (shape) before initialier
|
|
# shape will be updated once initializer is (re)set
|
|
self._shape = None
|
|
self.parameter = parameter
|
|
self.optimizer = optimizer
|
|
self.initializer = initializer
|
|
self.ps_param = ps_param
|
|
|
|
@property
|
|
def initializer(self):
|
|
return self._initializer
|
|
|
|
@initializer.setter
|
|
def initializer(self, op):
|
|
assert op is None or core.IsOperator(getattr(op, 'type', None)), \
|
|
"initializer expects an operator, got type: {}".format(type(op))
|
|
self._initializer = op
|
|
if op is not None:
|
|
shape = self._infer_shape_from_initializer()
|
|
assert self.shape is None or self.shape == shape, \
|
|
"inconsistent shape for layer parameter:"\
|
|
" {}, expect: {}, but got {}".format(self, self.shape, shape)
|
|
self._shape = shape
|
|
|
|
@property
|
|
def shape(self):
|
|
return self._shape
|
|
|
|
def _infer_shape_from_initializer(self):
|
|
for arg in self.initializer.arg:
|
|
if arg.name == 'shape':
|
|
return list(arg.ints)
|
|
with workspace.WorkspaceGuard("model_init_by_loading_params"):
|
|
try:
|
|
net = core.Net("shape_checker")
|
|
net._net.op.extend([self.initializer])
|
|
shape_blob = net.NextScopedBlob(self.parameter + "_shape")
|
|
net.Shape([self.parameter], shape_blob)
|
|
workspace.RunNetOnce(net)
|
|
return workspace.FetchBlob(shape_blob).tolist()
|
|
except RuntimeError:
|
|
logger.warning(
|
|
"Cannot infer the shape of blob {} from operator {}".format(
|
|
self.parameter, self.initializer.type)
|
|
)
|
|
workspace.ResetWorkspace()
|
|
return None
|
|
|
|
def __str__(self):
|
|
return str(self.parameter)
|
|
|
|
|
|
def is_request_only_scalar(scalar):
|
|
if len(scalar.field_metadata()) == 0:
|
|
return False
|
|
for metadata in scalar.field_metadata():
|
|
if not (metadata and metadata.feature_specs and getattr(
|
|
metadata.feature_specs, 'feature_is_request_only', False)):
|
|
return False
|
|
return True
|
|
|
|
|
|
class ModelLayer(object):
|
|
|
|
def __init__(self, model, prefix, input_record,
|
|
predict_input_record_fields=None, tags=None, **kwargs):
|
|
"""
|
|
Base class for model layers. Layer is an abstraction that allows to
|
|
provide model description in terms of meta-operators, where each of the
|
|
meta-operators can have different implementations for training,
|
|
evaluation and prediction, that are instantiated later. As an example
|
|
SampledSoftmax can do something related to sampling depending on
|
|
supervision during the training and just apply softmax if it's used for
|
|
prediction/evaluation.
|
|
|
|
All inputs/outputs from layers are represented as a record (instance of
|
|
schema bounded to blobs) and are accessible through input_record and
|
|
output_schema. If Layer needs to have only a subset of inputs/provides
|
|
subset of outputs during the inference - it should provide
|
|
predict_input_record and predict_output_schema correspondingly (those
|
|
records are expected to be a subset of input_record/output_schema).
|
|
|
|
Each layer has a list of Tags associated with it, that depends on
|
|
current context and arguments. It's possible to use those tags during
|
|
the instantiation time.
|
|
|
|
"""
|
|
self.name = model.next_layer_name(prefix)
|
|
self.model = model
|
|
self.kwargs = kwargs
|
|
self._input_record = input_record
|
|
if predict_input_record_fields:
|
|
if not isinstance(predict_input_record_fields, list):
|
|
predict_input_record_fields = [predict_input_record_fields]
|
|
self._predict_input_record = self._input_record[
|
|
predict_input_record_fields]
|
|
else:
|
|
self._predict_input_record = None
|
|
|
|
self.request_only = True
|
|
if len(input_record.all_scalars()) == 0:
|
|
self.request_only = False
|
|
for scalar in input_record.all_scalars():
|
|
if not is_request_only_scalar(scalar):
|
|
self.request_only = False
|
|
break
|
|
|
|
self._output_schema = None
|
|
self._predict_output_schema = None
|
|
self.eval_output_schema = None
|
|
self.tags = set(tags or [])
|
|
self.tags.update(TagContext.current().tags)
|
|
self.params = []
|
|
|
|
def get_type(self):
|
|
return self.__class__.__name__
|
|
|
|
def _check_output_schema(self):
|
|
assert self._output_schema is not None, "Schema is not initialized"
|
|
assert (self._predict_output_schema is None or
|
|
schema.is_schema_subset(self._predict_output_schema,
|
|
self._output_schema)), (
|
|
"predict_output_schema is not a subset of the output_schema")
|
|
|
|
@property
|
|
def predict_input_record(self):
|
|
return self._predict_input_record or self._input_record
|
|
|
|
@property
|
|
def input_record(self):
|
|
return self._input_record
|
|
|
|
@property
|
|
def predict_output_schema(self):
|
|
self._check_output_schema()
|
|
return self._predict_output_schema or self._output_schema
|
|
|
|
@predict_output_schema.setter
|
|
def predict_output_schema(self, output_schema):
|
|
assert self._predict_output_schema is None
|
|
self._predict_output_schema = output_schema
|
|
|
|
@property
|
|
def output_schema(self):
|
|
if self.request_only:
|
|
set_request_only(self._output_schema)
|
|
self._check_output_schema()
|
|
return self._output_schema
|
|
|
|
@output_schema.setter
|
|
def output_schema(self, output_schema):
|
|
assert self._output_schema is None
|
|
self._output_schema = output_schema
|
|
|
|
def get_parameters(self):
|
|
return self.params
|
|
|
|
def get_fp16_compatible_parameters(self):
|
|
"""Return a subset of parameters which can be converted to fp16"""
|
|
return []
|
|
|
|
def get_memory_usage(self):
|
|
return 0
|
|
|
|
def add_init_params(self, init_net):
|
|
'''
|
|
Adds layer initialization operators to passed net.
|
|
'''
|
|
for param in self.params:
|
|
# TODO(amalevich): Either return back to lambdas, that add
|
|
# all params (looks a bit safer and breaking less
|
|
# abstractions) or extend Net interface to this type of
|
|
# operations better
|
|
# TODO(xlwang) init_net._net.op has type google.protobuf.\
|
|
# internal.containers.RepeatedCompositeFieldContainer, but
|
|
# the version of protobuf in fbcode does not support append
|
|
# so extend is used
|
|
init_op = param.initializer
|
|
current_device_scope = scope.CurrentDeviceScope()
|
|
if init_op:
|
|
if not init_op.HasField('device_option') and\
|
|
current_device_scope:
|
|
init_op = caffe2_pb2.OperatorDef()
|
|
init_op.CopyFrom(param.initializer)
|
|
init_op.device_option.CopyFrom(current_device_scope)
|
|
init_net._net.op.extend([init_op])
|
|
|
|
def create_param(self, param_name, shape, initializer, optimizer,
|
|
ps_param=None):
|
|
with scope.NameScope(self.name, reset=True):
|
|
param = self.model.create_param(param_name=param_name,
|
|
shape=shape,
|
|
initializer=initializer,
|
|
optimizer=optimizer,
|
|
ps_param=ps_param)
|
|
self.params.append(param)
|
|
return param.parameter
|
|
|
|
def get_next_blob_reference(self, name):
|
|
with scope.NameScope(self.name, reset=True):
|
|
return self.model.net.NextScopedBlob(name)
|
|
|
|
def add_operators(self, net, init_net=None,
|
|
context=InstantiationContext.TRAINING):
|
|
'''
|
|
Adds layer trainig or initialization operators to the passed in net.
|
|
init_net can be None and can be called independently from add_init_params
|
|
'''
|
|
# Namescope below should warranty that all intermediate blobs will be
|
|
# assiciated with the layer that produces them
|
|
with scope.NameScope(self.name):
|
|
if context not in {InstantiationContext.PREDICTION,
|
|
InstantiationContext.EVAL,
|
|
InstantiationContext.ACCUMULATE_PRED}:
|
|
assert init_net, (
|
|
"Only prediction and eval context don't need init_net")
|
|
if init_net:
|
|
self.add_init_params(init_net)
|
|
if context == InstantiationContext.TRAINING:
|
|
self.add_train_ops(net)
|
|
elif context == InstantiationContext.EVAL:
|
|
self.add_eval_ops(net)
|
|
elif context == InstantiationContext.ACCUMULATE_PRED:
|
|
self.add_ops_to_accumulate_pred(net)
|
|
else:
|
|
self.add_ops(net)
|
|
|
|
def add_ops(self, net):
|
|
# Predict layer implementation.
|
|
raise NotImplementedError
|
|
|
|
def add_eval_ops(self, net):
|
|
# Default eval layer implementation is completely matching
|
|
# predict layer implementation.
|
|
self.add_ops(net)
|
|
|
|
def add_train_ops(self, net):
|
|
# Default train layer implementation is completely matching
|
|
# eval layer implementation.
|
|
self.add_eval_ops(net)
|
|
|
|
def add_ops_to_accumulate_pred(self, net):
|
|
# This adds operators to accumulate predictions/labels/weights. The
|
|
# accumulated data can later be used to compute calibration or for other
|
|
# purpose. Default layer implementation is completely matching eval
|
|
# layer implementation.
|
|
self.add_eval_ops(net)
|