mirror of
https://github.com/zebrajr/pytorch.git
synced 2025-12-07 12:21:27 +01:00
Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/10197 Support generic feature in DPER2 For now since we only have one generic type 1, we are directly adding the parsed feature record to embedding feature. For new feature types with specific structure, there should also be corresponding coding changes expected. Reviewed By: itomatik Differential Revision: D8788177 fbshipit-source-id: 9aaa6f35ece382acb4072ec5e57061bb0727f184
297 lines
13 KiB
Python
297 lines
13 KiB
Python
# @package sparse_to_dense
|
|
# Module caffe2.python.layers.sparse_to_dense
|
|
from __future__ import absolute_import, division, print_function, unicode_literals
|
|
|
|
import numpy as np
|
|
from caffe2.python import schema
|
|
from caffe2.python.layers.layers import ModelLayer
|
|
|
|
|
|
class FeatureSparseToDense(ModelLayer):
|
|
def __init__(
|
|
self, model, input_record, input_specs, name="feature_sparse_to_dense", **kwargs
|
|
):
|
|
"""
|
|
`input_specs` follows the format of FeatureSpec from schema. To be more
|
|
precise it's a namedtuple that should have:
|
|
'feature_type', 'feature_names', 'feature_ids'
|
|
"""
|
|
super(FeatureSparseToDense, self).__init__(model, name, input_record, **kwargs)
|
|
|
|
self.input_specs = input_specs
|
|
|
|
outputs = []
|
|
for field, feature_specs in self.input_specs:
|
|
assert len(feature_specs.feature_names) == len(feature_specs.feature_ids)
|
|
if feature_specs.feature_type == "FLOAT":
|
|
outputs.append(
|
|
(
|
|
field,
|
|
schema.Scalar(
|
|
(np.float32, (len(feature_specs.feature_ids),)),
|
|
self.get_next_blob_reference(field + "_output"),
|
|
),
|
|
)
|
|
)
|
|
elif feature_specs.feature_type == "ID_LIST":
|
|
outputs.append(
|
|
(
|
|
field,
|
|
schema.Struct(
|
|
(
|
|
"ranges",
|
|
schema.Scalar(
|
|
(np.int32, (len(feature_specs.feature_ids), 2)),
|
|
self.get_next_blob_reference(field + "_ranges"),
|
|
),
|
|
),
|
|
(
|
|
"values",
|
|
schema.Scalar(
|
|
np.int64,
|
|
self.get_next_blob_reference(field + "_values"),
|
|
),
|
|
),
|
|
),
|
|
)
|
|
)
|
|
elif feature_specs.feature_type == "ID_SCORE_LIST":
|
|
outputs.append(
|
|
(
|
|
field,
|
|
schema.Struct(
|
|
(
|
|
"ranges",
|
|
schema.Scalar(
|
|
(np.int32, (len(feature_specs.feature_ids), 2)),
|
|
self.get_next_blob_reference(field + "_ranges"),
|
|
),
|
|
),
|
|
(
|
|
"ids",
|
|
schema.Scalar(
|
|
np.int64,
|
|
self.get_next_blob_reference(field + "_ids"),
|
|
),
|
|
),
|
|
(
|
|
"scores",
|
|
schema.Scalar(
|
|
np.float32,
|
|
self.get_next_blob_reference(field + "_scores"),
|
|
),
|
|
),
|
|
),
|
|
)
|
|
)
|
|
elif feature_specs.feature_type == "EMBEDDING":
|
|
# We don't know dimensions of embeddings in input data.
|
|
# Even though they should match dimensions from feature config,
|
|
# we keep ranges blob to check input data later.
|
|
outputs.append(
|
|
(
|
|
field,
|
|
schema.Struct(
|
|
(
|
|
"ranges",
|
|
schema.Scalar(
|
|
(np.int32, (len(feature_specs.feature_ids), 2)),
|
|
self.get_next_blob_reference(field + "_ranges"),
|
|
),
|
|
),
|
|
(
|
|
"values",
|
|
schema.Scalar(
|
|
np.float32,
|
|
self.get_next_blob_reference(field + "_values"),
|
|
),
|
|
),
|
|
),
|
|
)
|
|
)
|
|
elif feature_specs.feature_type == "GENERIC_FEATURE":
|
|
# We don't know dimensions of embeddings in input data.
|
|
# Even though they should match dimensions from feature config,
|
|
# we keep ranges blob to check input data later.
|
|
# Currently this schema with ranges and values is only for
|
|
# generic type enum 1. If new types are implemented, we need to
|
|
# modify the ParseGeneric operator, and this part accordinly
|
|
outputs.append(
|
|
(
|
|
field,
|
|
schema.Struct(
|
|
(
|
|
"ranges",
|
|
schema.Scalar(
|
|
(np.int32, (len(feature_specs.feature_ids), 2)),
|
|
self.get_next_blob_reference(field + "_ranges"),
|
|
),
|
|
),
|
|
(
|
|
"values",
|
|
schema.Scalar(
|
|
np.float32,
|
|
self.get_next_blob_reference(field + "_values"),
|
|
),
|
|
),
|
|
),
|
|
)
|
|
)
|
|
else:
|
|
raise TypeError(
|
|
"Unsupported input type: {0}".format(feature_specs.feature_type)
|
|
)
|
|
|
|
# TODO(amalevich): This schema is producing ranges. And thus if there is
|
|
# something using it it should support ranges as well. It might be
|
|
# confusing, if we don't add better support for ranges/have it as a
|
|
# first layer
|
|
self.output_schema = schema.Struct(*outputs)
|
|
|
|
# TODO(amalevich): Consider moving this data to schema, instead
|
|
# Structs doens't support attaching metadata to them and clonning
|
|
# will break things badly, but this is the most elegant way to pass
|
|
# this info around. Should we change it or it'll be too much work and
|
|
# not worse it?
|
|
for field, feature_specs in input_specs:
|
|
schema.attach_metadata_to_scalars(
|
|
self.output_schema[field], schema.Metadata(feature_specs=feature_specs)
|
|
)
|
|
self.zero = model.global_constants["ZERO"]
|
|
self.zero_range = model.global_constants["ZERO_RANGE"]
|
|
|
|
# Add operators to all types that need to be densified
|
|
def add_ops(self, net):
|
|
record = self.input_record
|
|
for field, feature_specs in self.input_specs:
|
|
if feature_specs.feature_type == "FLOAT":
|
|
net.SparseToDenseMask(
|
|
[
|
|
record[field].keys(),
|
|
record[field].values(),
|
|
self.zero,
|
|
record[field].lengths(),
|
|
],
|
|
[self.output_schema[field]()],
|
|
mask=feature_specs.feature_ids,
|
|
)
|
|
elif feature_specs.feature_type == "ID_LIST":
|
|
id_list_ranges = net.LengthsToRanges(
|
|
record[field].values.lengths(), net.NextScopedBlob("id_list_ranges")
|
|
)
|
|
net.SparseToDenseMask(
|
|
[
|
|
record[field].keys(),
|
|
id_list_ranges,
|
|
self.zero_range,
|
|
record[field].lengths(),
|
|
],
|
|
self.output_schema[field].ranges(),
|
|
mask=feature_specs.feature_ids,
|
|
)
|
|
# Alias helps to enforce the fact that all SparseToDense calls
|
|
# produce new blobs.
|
|
# Reusing blob names might result in some weird consequences
|
|
# during the delivery time, when content of the blobs is
|
|
# generated based on the inputSpecs.
|
|
net.Alias(
|
|
record[field].values.items(), self.output_schema[field].values()
|
|
)
|
|
elif feature_specs.feature_type == "ID_SCORE_LIST":
|
|
# TODO: merge this to the case above?
|
|
id_list_ranges = net.LengthsToRanges(
|
|
record[field].values.lengths(),
|
|
net.NextScopedBlob("id_score_list_ranges"),
|
|
)
|
|
net.SparseToDenseMask(
|
|
[
|
|
record[field].keys(),
|
|
id_list_ranges,
|
|
self.zero_range,
|
|
record[field].lengths(),
|
|
],
|
|
self.output_schema[field].ranges(),
|
|
mask=feature_specs.feature_ids,
|
|
)
|
|
# Alias helps to enforce the fact that all SparseToDense calls
|
|
# produce new blobs.
|
|
# Reusing blob names might result in some weird consequences
|
|
# during the delivery time, when content of the blobs is
|
|
# generated based on the inputSpecs.
|
|
net.Alias(record[field].values.keys(), self.output_schema[field].ids())
|
|
net.Alias(
|
|
record[field].values.values(), self.output_schema[field].scores()
|
|
)
|
|
elif feature_specs.feature_type == "EMBEDDING":
|
|
ranges = net.LengthsToRanges(
|
|
record[field].values.lengths(),
|
|
net.NextScopedBlob("embeddings_ranges"),
|
|
)
|
|
net.SparseToDenseMask(
|
|
[
|
|
record[field].keys(),
|
|
ranges,
|
|
self.zero_range,
|
|
record[field].lengths(),
|
|
],
|
|
self.output_schema[field].ranges(),
|
|
mask=feature_specs.feature_ids,
|
|
)
|
|
# Alias helps to enforce the fact that all SparseToDense calls
|
|
# produce new blobs.
|
|
# Reusing blob names might result in some weird consequences
|
|
# during the delivery time, when content of the blobs is
|
|
# generated based on the inputSpecs.
|
|
net.Alias(
|
|
record[field].values.items(), self.output_schema[field].values()
|
|
)
|
|
elif feature_specs.feature_type == "GENERIC_FEATURE":
|
|
(
|
|
feature_lengths_blob,
|
|
feature_ids_blob,
|
|
value_lengths_blob,
|
|
value_values_blob,
|
|
) = net.ParseGeneric(
|
|
[record[field]()],
|
|
["feature_lengths", "feature_ids", "value_lengths", "value_values"],
|
|
feature_type_enum=1,
|
|
)
|
|
# Currently our implementation only supports
|
|
# generic type enum 1. If new types are implemented, we need to
|
|
# modify the ParseGeneric operator, the schema above,
|
|
# and this part accordinly to parse the generic feature strings
|
|
# into input_record
|
|
|
|
ranges = net.LengthsToRanges(
|
|
value_lengths_blob, net.NextScopedBlob("generics_ranges")
|
|
)
|
|
net.SparseToDenseMask(
|
|
[feature_ids_blob, ranges, self.zero_range, feature_lengths_blob],
|
|
self.output_schema[field].ranges(),
|
|
mask=feature_specs.feature_ids,
|
|
)
|
|
# Alias helps to enforce the fact that all SparseToDense calls
|
|
# produce new blobs.
|
|
# Reusing blob names might result in some weird consequences
|
|
# during the delivery time, when content of the blobs is
|
|
# generated based on the inputSpecs.
|
|
net.Alias(value_values_blob, self.output_schema[field].values())
|
|
|
|
def get_metadata(self):
|
|
metadata = []
|
|
for field, feature_specs in self.input_specs:
|
|
metadata.append(
|
|
(
|
|
{
|
|
"type": feature_specs.feature_type,
|
|
"names": feature_specs.feature_names,
|
|
"ids": feature_specs.feature_ids,
|
|
},
|
|
self.output_schema[field].field_blobs(),
|
|
self.output_schema[field].field_types(),
|
|
)
|
|
)
|
|
if feature_specs.feature_type == "FLOAT":
|
|
metadata[-1][0]["cardinality"] = 1
|
|
return metadata
|