pytorch/caffe2/python/layers/feature_sparse_to_dense.py
Kevin Wilfong 88b1f6619e Return list of AccessedFeatures from get_accessed_features (#23983)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/23983

While testing I realized that model layers can extract different types of features from the same column.  For example, MultifeedFeaturesTransform uses float and ID list features from the "features" column.

get_accessed_features returns a map from column to AccessedFeatures, and AccessedFeatures only has the feature IDs for one feature type.  This is incompatible with have multiple types of features per column, one type ends up overwriting another in the map.

To fix this, I've modified get_accessed_features to return a map from column to a list of AccessedFeatures objects.

Reviewed By: itomatik

Differential Revision: D16693845

fbshipit-source-id: 2099aac8dc3920dd61de6b6ad5cf343c864803bc
2019-08-14 10:50:27 -07:00

313 lines
14 KiB
Python

# @package sparse_to_dense
# Module caffe2.python.layers.sparse_to_dense
from __future__ import absolute_import, division, print_function, unicode_literals
from collections import defaultdict
import numpy as np
from caffe2.python import schema
from caffe2.python.layers.layers import ModelLayer, AccessedFeatures
class FeatureSparseToDense(ModelLayer):
def __init__(
self, model, input_record, input_specs, name="feature_sparse_to_dense", **kwargs
):
"""
`input_specs` follows the format of FeatureSpec from schema. To be more
precise it's a namedtuple that should have:
'feature_type', 'feature_names', 'feature_ids'
"""
super(FeatureSparseToDense, self).__init__(model, name, input_record, **kwargs)
self.input_specs = input_specs
outputs = []
for field, feature_specs in self.input_specs:
assert len(feature_specs.feature_names) == len(feature_specs.feature_ids)
if feature_specs.feature_type == "FLOAT":
outputs.append(
(
field,
schema.Scalar(
(np.float32, (len(feature_specs.feature_ids),)),
self.get_next_blob_reference(field + "_output"),
),
)
)
elif feature_specs.feature_type == "ID_LIST":
outputs.append(
(
field,
schema.Struct(
(
"ranges",
schema.Scalar(
(np.int32, (len(feature_specs.feature_ids), 2)),
self.get_next_blob_reference(field + "_ranges"),
),
),
(
"values",
schema.Scalar(
np.int64,
self.get_next_blob_reference(field + "_values"),
),
),
),
)
)
elif feature_specs.feature_type == "ID_SCORE_LIST":
outputs.append(
(
field,
schema.Struct(
(
"ranges",
schema.Scalar(
(np.int32, (len(feature_specs.feature_ids), 2)),
self.get_next_blob_reference(field + "_ranges"),
),
),
(
"ids",
schema.Scalar(
np.int64,
self.get_next_blob_reference(field + "_ids"),
),
),
(
"scores",
schema.Scalar(
np.float32,
self.get_next_blob_reference(field + "_scores"),
),
),
),
)
)
elif feature_specs.feature_type == "EMBEDDING":
# We don't know dimensions of embeddings in input data.
# Even though they should match dimensions from feature config,
# we keep ranges blob to check input data later.
outputs.append(
(
field,
schema.Struct(
(
"ranges",
schema.Scalar(
(np.int32, (len(feature_specs.feature_ids), 2)),
self.get_next_blob_reference(field + "_ranges"),
),
),
(
"values",
schema.Scalar(
np.float32,
self.get_next_blob_reference(field + "_values"),
),
),
),
)
)
elif feature_specs.feature_type == "GENERIC_FEATURE":
# We don't know dimensions of embeddings in input data.
# Even though they should match dimensions from feature config,
# we keep ranges blob to check input data later.
# Currently this schema with ranges and values is only for
# generic type enum 1. If new types are implemented, we need to
# modify the ParseGeneric operator, and this part accordinly
outputs.append(
(
field,
schema.Struct(
(
"ranges",
schema.Scalar(
(np.int32, (len(feature_specs.feature_ids), 2)),
self.get_next_blob_reference(field + "_ranges"),
),
),
(
"values",
schema.Scalar(
np.float32,
self.get_next_blob_reference(field + "_values"),
),
),
),
)
)
else:
raise TypeError(
"Unsupported input type: {0}".format(feature_specs.feature_type)
)
# TODO(amalevich): This schema is producing ranges. And thus if there is
# something using it it should support ranges as well. It might be
# confusing, if we don't add better support for ranges/have it as a
# first layer
self.output_schema = schema.Struct(*outputs)
# TODO(amalevich): Consider moving this data to schema, instead
# Structs doens't support attaching metadata to them and clonning
# will break things badly, but this is the most elegant way to pass
# this info around. Should we change it or it'll be too much work and
# not worse it?
for field, feature_specs in input_specs:
schema.attach_metadata_to_scalars(
self.output_schema[field], schema.Metadata(feature_specs=feature_specs)
)
self.zero = model.global_constants["ZERO"]
self.zero_range = model.global_constants["ZERO_RANGE"]
# Add operators to all types that need to be densified
def add_ops(self, net):
record = self.input_record
for field, feature_specs in self.input_specs:
if feature_specs.feature_type == "FLOAT":
net.SparseToDenseMask(
[
record[field].keys(),
record[field].values(),
self.zero,
record[field].lengths(),
],
[self.output_schema[field]()],
mask=feature_specs.feature_ids,
)
elif feature_specs.feature_type == "ID_LIST":
id_list_ranges = net.LengthsToRanges(
record[field].values.lengths(), net.NextScopedBlob("id_list_ranges")
)
net.SparseToDenseMask(
[
record[field].keys(),
id_list_ranges,
self.zero_range,
record[field].lengths(),
],
self.output_schema[field].ranges(),
mask=feature_specs.feature_ids,
)
# Alias helps to enforce the fact that all SparseToDense calls
# produce new blobs.
# Reusing blob names might result in some weird consequences
# during the delivery time, when content of the blobs is
# generated based on the inputSpecs.
net.Alias(
record[field].values.items(), self.output_schema[field].values()
)
elif feature_specs.feature_type == "ID_SCORE_LIST":
# TODO: merge this to the case above?
id_list_ranges = net.LengthsToRanges(
record[field].values.lengths(),
net.NextScopedBlob("id_score_list_ranges"),
)
net.SparseToDenseMask(
[
record[field].keys(),
id_list_ranges,
self.zero_range,
record[field].lengths(),
],
self.output_schema[field].ranges(),
mask=feature_specs.feature_ids,
)
# Alias helps to enforce the fact that all SparseToDense calls
# produce new blobs.
# Reusing blob names might result in some weird consequences
# during the delivery time, when content of the blobs is
# generated based on the inputSpecs.
net.Alias(record[field].values.keys(), self.output_schema[field].ids())
net.Alias(
record[field].values.values(), self.output_schema[field].scores()
)
elif feature_specs.feature_type == "EMBEDDING":
ranges = net.LengthsToRanges(
record[field].values.lengths(),
net.NextScopedBlob("embeddings_ranges"),
)
net.SparseToDenseMask(
[
record[field].keys(),
ranges,
self.zero_range,
record[field].lengths(),
],
self.output_schema[field].ranges(),
mask=feature_specs.feature_ids,
)
# Alias helps to enforce the fact that all SparseToDense calls
# produce new blobs.
# Reusing blob names might result in some weird consequences
# during the delivery time, when content of the blobs is
# generated based on the inputSpecs.
net.Alias(
record[field].values.items(), self.output_schema[field].values()
)
elif feature_specs.feature_type == "GENERIC_FEATURE":
(
feature_lengths_blob,
feature_ids_blob,
value_lengths_blob,
value_values_blob,
) = net.ParseGeneric(
[record[field]()],
["feature_lengths", "feature_ids", "value_lengths", "value_values"],
feature_type_enum=1,
)
# Currently our implementation only supports
# generic type enum 1. If new types are implemented, we need to
# modify the ParseGeneric operator, the schema above,
# and this part accordinly to parse the generic feature strings
# into input_record
ranges = net.LengthsToRanges(
value_lengths_blob, net.NextScopedBlob("generics_ranges")
)
net.SparseToDenseMask(
[feature_ids_blob, ranges, self.zero_range, feature_lengths_blob],
self.output_schema[field].ranges(),
mask=feature_specs.feature_ids,
)
# Alias helps to enforce the fact that all SparseToDense calls
# produce new blobs.
# Reusing blob names might result in some weird consequences
# during the delivery time, when content of the blobs is
# generated based on the inputSpecs.
net.Alias(value_values_blob, self.output_schema[field].values())
def get_metadata(self):
metadata = []
for field, feature_specs in self.input_specs:
metadata.append(
(
{
"type": feature_specs.feature_type,
"names": feature_specs.feature_names,
"ids": feature_specs.feature_ids,
},
self.output_schema[field].field_blobs(),
self.output_schema[field].field_types(),
)
)
if feature_specs.feature_type == "FLOAT":
metadata[-1][0]["cardinality"] = 1
return metadata
def get_accessed_features(self):
accessed_features = defaultdict(list)
# The features that are accessed are just those features that appear in
# the input specs
for field, feature_specs in self.input_specs:
accessed_features[field].append(
AccessedFeatures(
feature_specs.feature_type,
set(feature_specs.feature_ids)
)
)
return accessed_features