Extract updates:

- Default CPU detector to MTCNN
  - add basic Aligner false positive filters
  - Typing: align + plugins
  - Use specific AlignerBatch class for alignment
  -
This commit is contained in:
torzdf 2022-09-18 19:44:41 +01:00
parent 8a803e24c4
commit a8f22cc019
8 changed files with 688 additions and 199 deletions

View File

@ -371,7 +371,8 @@ class ExtractArgs(ExtractConvertArgs):
The list of optional command line options for the Extract command
"""
if get_backend() == "cpu":
default_detector = default_aligner = "cv2-dnn"
default_detector = "mtcnn"
default_aligner = "cv2-dnn"
else:
default_detector = "s3fd"
default_aligner = "fan"

View File

@ -3,7 +3,7 @@
:mod:`~plugins.extract.mask` Plugins
"""
import logging
from typing import Dict
from tensorflow.python.framework import errors_impl as tf_errors # pylint:disable=no-name-in-module # noqa
from lib.multithreading import MultiThread
@ -144,7 +144,7 @@ class Extractor():
self._threads = []
""" list: Internal threads for this plugin """
self._extract_media = {}
self._extract_media: Dict[str, ExtractMedia] = {}
""" dict: The :class:`plugins.extract.pipeline.ExtractMedia` objects currently being
processed. Stored at input for pairing back up on output of extractor process """

View File

@ -26,8 +26,51 @@ class Config(FaceswapConfig):
section = "global"
self.add_section(title=section, info="Options that apply to all extraction plugins")
self.add_item(
section=section, title="allow_growth", datatype=bool, default=False, group="settings",
section=section,
title="allow_growth",
datatype=bool,
default=False,
group="settings",
info="[Nvidia Only]. Enable the Tensorflow GPU `allow_growth` configuration option. "
"This option prevents Tensorflow from allocating all of the GPU VRAM at launch "
"but can lead to higher VRAM fragmentation and slower performance. Should only "
"be enabled if you are having problems running extraction.")
self.add_item(
section=section,
title="aligner_min_scale",
datatype=float,
min_max=(0.0, 1.0),
rounding=2,
default=0.05,
group="filters",
info="Filters out faces below this size. This is a multiplier of the minimum "
"dimension of the frame (i.e. 1280x720 = 720). If the original face extract "
"box is smaller than the minimum dimension times this multiplier, it is "
"considered a false positive and discarded. Faces which are found to be "
"unusually smaller than the frame tend to be misaligned images, except in "
"extreme long-shots. These can be usually be safely discarded.")
self.add_item(
section=section,
title="aligner_max_scale",
datatype=float,
min_max=(0.0, 10.0),
rounding=2,
default=2.00,
group="filters",
info="Filters out faces above this size. This is a multiplier of the minimum "
"dimension of the frame (i.e. 1280x720 = 720). If the original face extract "
"box is larger than the minimum dimension times this multiplier, it is "
"considered a false positive and discarded. Faces which are found to be "
"unusually larger than the frame tend to be misaligned images except in extreme "
"close-ups. These can be usually be safely discarded.")
self.add_item(
section=section,
title="aligner_distance",
datatype=float,
min_max=(0.0, 25.0),
rounding=1,
default=16,
group="filters",
info="Filters out faces who's landmarks are above this distance from an 'average' "
"face. Values above 16 tend to be fairly safe. Values above 10 will remove more "
"false positives, but may also filter out some faces at extreme angles.")

View File

@ -12,16 +12,59 @@ For each source item, the plugin must pass a dict to finalize containing:
>>> "landmarks": [list of 68 point face landmarks]
>>> "detected_faces": [<list of DetectedFace objects>]}
"""
import sys
from dataclasses import dataclass, field
from typing import Any, cast, Dict, Generator, List, Optional, Tuple, TYPE_CHECKING, Union
import cv2
import numpy as np
from tensorflow.python.framework import errors_impl as tf_errors # pylint:disable=no-name-in-module # noqa
from lib.align import AlignedFace, DetectedFace
from lib.utils import get_backend, FaceswapError
from plugins.extract._base import Extractor, logger, ExtractMedia
if sys.version_info < (3, 8):
from typing_extensions import Literal
else:
from typing import Literal
if TYPE_CHECKING:
from queue import Queue
@dataclass
class AlignerBatch:
""" Dataclass for holding items flowing through the aligner.
Parameters
----------
image: list
List of :class:`numpy.ndarray` containing the original frame
detected_faces: list
List of :class:`~lib.align.DetectedFace` objects
filename: list
List of original frame filenames for the batch
feed: list
List of feed images to feed the aligner net for each re-feed increment
prediction: list
List of predictions. Direct output from the aligner net
landmarks: list
List of 68 point :class:`numpy.ndarray` landmark points returned from the aligner
data: dict
Any aligner specific data required during the processing phase. List of dictionaries for
holding data on each sub-batch if re-feed > 1
"""
image: List[np.ndarray] = field(default_factory=list)
detected_faces: List[DetectedFace] = field(default_factory=list)
filename: List[str] = field(default_factory=list)
feed: List[np.ndarray] = field(default_factory=list)
prediction: np.ndarray = np.empty([])
landmarks: np.ndarray = np.empty([])
data: List[Dict[str, Any]] = field(default_factory=list)
class Aligner(Extractor): # pylint:disable=abstract-method
""" Aligner plugin _base Object
@ -55,8 +98,13 @@ class Aligner(Extractor): # pylint:disable=abstract-method
plugins.extract.mask._base : Masker parent class for extraction plugins.
"""
def __init__(self, git_model_id=None, model_filename=None,
configfile=None, instance=0, normalize_method=None, re_feed=0, **kwargs):
def __init__(self,
git_model_id: Optional[int] = None,
model_filename: Optional[str] = None,
configfile: Optional[str] = None,
instance: int = 0,
normalize_method: Optional[Literal["none", "clahe", "hist", "mean"]] = None,
re_feed: int = 0, **kwargs) -> None:
logger.debug("Initializing %s: (normalize_method: %s, re_feed: %s)",
self.__class__.__name__, normalize_method, re_feed)
super().__init__(git_model_id,
@ -64,18 +112,21 @@ class Aligner(Extractor): # pylint:disable=abstract-method
configfile=configfile,
instance=instance,
**kwargs)
self._normalize_method = None
self._normalize_method: Optional[Literal["clahe", "hist", "mean"]] = None
self._re_feed = re_feed
self.set_normalize_method(normalize_method)
self._plugin_type = "align"
self._faces_per_filename = {} # Tracking for recompiling face batches
self._rollover = None # Items that are rolled over from the previous batch in get_batch
self._output_faces = []
self._additional_keys = []
self._faces_per_filename: Dict[str, int] = {} # Tracking for recompiling batches
self._rollover: Optional[ExtractMedia] = None # batch rollover items
self._output_faces: List[DetectedFace] = []
self._filter = AlignedFilter(min_scale=self.config["aligner_min_scale"],
max_scale=self.config["aligner_max_scale"],
distance=self.config["aligner_distance"])
logger.debug("Initialized %s", self.__class__.__name__)
def set_normalize_method(self, method):
def set_normalize_method(self,
method: Optional[Literal["none", "clahe", "hist", "mean"]]) -> None:
""" Set the normalization method for feeding faces into the aligner.
Parameters
@ -84,10 +135,10 @@ class Aligner(Extractor): # pylint:disable=abstract-method
The normalization method to apply to faces prior to feeding into the model
"""
method = None if method is None or method.lower() == "none" else method
self._normalize_method = method
self._normalize_method = cast(Optional[Literal["clahe", "hist", "mean"]], method)
# << QUEUE METHODS >>> #
def get_batch(self, queue):
def get_batch(self, queue: "Queue") -> Tuple[bool, AlignerBatch]:
""" Get items for inputting into the aligner from the queue in batches
Items are returned from the ``queue`` in batches of
@ -122,12 +173,13 @@ class Aligner(Extractor): # pylint:disable=abstract-method
A dictionary of lists of :attr:`~plugins.extract._base.Extractor.batchsize`:
"""
exhausted = False
batch = {}
batch = AlignerBatch()
idx = 0
while idx < self.batchsize:
item = self._collect_item(queue)
if item == "EOF":
logger.trace("EOF received")
logger.trace("EOF received") # type:ignore
self._filter.output_counts()
exhausted = True
break
# Put frames with no faces into the out queue to keep TQDM consistent
@ -137,9 +189,9 @@ class Aligner(Extractor): # pylint:disable=abstract-method
converted_image = item.get_image_copy(self.color_format)
for f_idx, face in enumerate(item.detected_faces):
batch.setdefault("image", []).append(converted_image)
batch.setdefault("detected_faces", []).append(face)
batch.setdefault("filename", []).append(item.filename)
batch.image.append(converted_image)
batch.detected_faces.append(face)
batch.filename.append(item.filename)
idx += 1
if idx == self.batchsize:
frame_faces = len(item.detected_faces)
@ -148,36 +200,48 @@ class Aligner(Extractor): # pylint:disable=abstract-method
item.filename,
item.image,
detected_faces=item.detected_faces[f_idx + 1:])
logger.trace("Rolled over %s faces of %s to next batch for '%s'",
len(self._rollover.detected_faces), frame_faces,
logger.trace("Rolled over %s faces of %s to next batch " # type:ignore
"for '%s'", len(self._rollover.detected_faces), frame_faces,
item.filename)
break
if batch:
logger.trace("Returning batch: %s", {k: v.shape if isinstance(v, np.ndarray) else v
for k, v in batch.items()})
logger.trace("Returning batch: %s", {k: v.shape # type:ignore
if isinstance(v, np.ndarray) else v
for k, v in batch.__dict__.items()})
else:
logger.trace(item)
logger.trace(item) # type:ignore
return exhausted, batch
def _collect_item(self, queue):
""" Collect the item from the :attr:`_rollover` dict or from the queue
Add face count per frame to self._faces_per_filename for joining
batches back up in finalize """
def _collect_item(self, queue: "Queue") -> Union[Literal["EOF"], ExtractMedia]:
""" Collect the item from the :attr:`_rollover` dict or from the queue. Add face count per
frame to self._faces_per_filename for joining batches back up in finalize
Parameters
----------
queue: :class:`queue.Queue`
The input queue to the aligner. Should contain
:class:`~plugins.extract.pipeline.ExtractMedia` objects
Returns
-------
:class:`~plugins.extract.pipeline.ExtractMedia` or EOF
The next extract media object, or EOF if pipe has ended
"""
if self._rollover is not None:
logger.trace("Getting from _rollover: (filename: `%s`, faces: %s)",
logger.trace("Getting from _rollover: (filename: `%s`, faces: %s)", # type:ignore
self._rollover.filename, len(self._rollover.detected_faces))
item = self._rollover
self._rollover = None
else:
item = self._get_item(queue)
if item != "EOF":
logger.trace("Getting from queue: (filename: %s, faces: %s)",
logger.trace("Getting from queue: (filename: %s, faces: %s)", # type:ignore
item.filename, len(item.detected_faces))
self._faces_per_filename[item.filename] = len(item.detected_faces)
return item
# <<< FINALIZE METHODS >>> #
def finalize(self, batch):
def finalize(self, batch: AlignerBatch) -> Generator[ExtractMedia, None, None]:
""" Finalize the output from Aligner
This should be called as the final task of each `plugin`.
@ -186,9 +250,8 @@ class Aligner(Extractor): # pylint:disable=abstract-method
Parameters
----------
batch : dict
The final ``dict`` from the `plugin` process. It must contain the `keys`:
``detected_faces``, ``landmarks``, ``filename``
batch : :class:`AlignerBatch`
The final batch item from the `plugin` process.
Yields
------
@ -197,32 +260,35 @@ class Aligner(Extractor): # pylint:disable=abstract-method
and landmarks for the detected faces found in the frame.
"""
for face, landmarks in zip(batch["detected_faces"], batch["landmarks"]):
for face, landmarks in zip(batch.detected_faces, batch.landmarks):
if not isinstance(landmarks, np.ndarray):
landmarks = np.array(landmarks)
face._landmarks_xy = landmarks
logger.trace("Item out: %s", {key: val.shape if isinstance(val, np.ndarray) else val
for key, val in batch.items()})
logger.trace("Item out: %s", {key: val.shape # type:ignore
if isinstance(val, np.ndarray) else val
for key, val in batch.__dict__.items()})
for filename, face in zip(batch["filename"], batch["detected_faces"]):
for frame, filename, face in zip(batch.image, batch.filename, batch.detected_faces):
self._output_faces.append(face)
if len(self._output_faces) != self._faces_per_filename[filename]:
continue
self._output_faces = self._filter(self._output_faces, min(frame.shape[:2]))
output = self._extract_media.pop(filename)
output.add_detected_faces(self._output_faces)
self._output_faces = []
logger.trace("Final Output: (filename: '%s', image shape: %s, detected_faces: %s, "
"item: %s)",
logger.trace("Final Output: (filename: '%s', image shape: %s, " # type:ignore
"detected_faces: %s, item: %s)",
output.filename, output.image_shape, output.detected_faces, output)
yield output
# <<< PROTECTED METHODS >>> #
# << PROCESS_INPUT WRAPPER >>
def _process_input(self, batch):
def _process_input(self, batch: AlignerBatch) -> AlignerBatch:
""" Process the input to the aligner model multiple times based on the user selected
`re-feed` command line option. This adjusts the bounding box for the face to be fed
into the model by a random amount within 0.05 pixels of the detected face's shortest axis.
@ -233,40 +299,32 @@ class Aligner(Extractor): # pylint:disable=abstract-method
Parameters
----------
batch: dict
batch: :class:`AlignerBatch`
Contains the batch that is currently being passed through the plugin process
Returns
-------
dict
:class:`AlignerBatch`
The batch with input processed
"""
if not self._additional_keys:
existing_keys = list(batch.keys())
original_boxes = np.array([(face.left, face.top, face.width, face.height)
for face in batch["detected_faces"]])
for face in batch.detected_faces])
adjusted_boxes = self._get_adjusted_boxes(original_boxes)
retval = {}
# Put in random re-feed data to the bounding boxes
for bounding_boxes in adjusted_boxes:
for face, box in zip(batch["detected_faces"], bounding_boxes):
for face, box in zip(batch.detected_faces, bounding_boxes):
face.left, face.top, face.width, face.height = box
result = self.process_input(batch)
if not self._additional_keys:
self._additional_keys = [key for key in result if key not in existing_keys]
for key in self._additional_keys:
retval.setdefault(key, []).append(batch[key])
del batch[key]
self.process_input(batch)
# Place the original bounding box back to detected face objects
for face, box in zip(batch["detected_faces"], original_boxes):
for face, box in zip(batch.detected_faces, original_boxes):
face.left, face.top, face.width, face.height = box
batch.update(retval)
return batch
def _get_adjusted_boxes(self, original_boxes):
def _get_adjusted_boxes(self, original_boxes: np.ndarray) -> np.ndarray:
""" Obtain an array of adjusted bounding boxes based on the number of re-feed iterations
that have been selected and the minimum dimension of the original bounding box.
@ -288,14 +346,30 @@ class Aligner(Extractor): # pylint:disable=abstract-method
rands = np.random.rand(self._re_feed, *original_boxes.shape) * 2 - 1
new_boxes = np.rint(original_boxes + (rands * max_shift[None, :, None])).astype("int32")
retval = np.concatenate((original_boxes[None, ...], new_boxes))
logger.trace(retval)
logger.trace(retval) # type:ignore
return retval
# <<< PREDICT WRAPPER >>> #
def _predict(self, batch):
""" Just return the aligner's predict function """
def _predict(self, batch: AlignerBatch) -> AlignerBatch:
""" Just return the aligner's predict function
Parameters
----------
batch: :class:`AlignerBatch`
The current batch to find alignments for
Returns
-------
:class:`AlignerBatch`
The batch item with the :attr:`prediction` populated
Raises
------
FaceswapError
If GPU resources are exhausted
"""
try:
batch["prediction"] = [self.predict(feed) for feed in batch["feed"]]
batch.prediction = np.array([self.predict(feed) for feed in batch.feed])
return batch
except tf_errors.ResourceExhaustedError as err:
msg = ("You do not have enough GPU memory available to run detection at the "
@ -325,45 +399,72 @@ class Aligner(Extractor): # pylint:disable=abstract-method
raise FaceswapError(msg) from err
raise
def _process_output(self, batch):
def _process_output(self, batch: AlignerBatch) -> AlignerBatch:
""" Process the output from the aligner model multiple times based on the user selected
`re-feed amount` configuration option, then average the results for final prediction.
Parameters
----------
batch : dict
batch : :class:`AlignerBatch`
Contains the batch that is currently being passed through the plugin process
Returns
-------
:class:`AlignerBatch`
The batch item with :attr:`landmarks` populated
"""
landmarks = []
for idx in range(self._re_feed + 1):
subbatch = {key: val
for key, val in batch.items()
if key not in ["feed", "prediction"] + self._additional_keys}
subbatch["prediction"] = batch["prediction"][idx]
for key in self._additional_keys:
subbatch[key] = batch[key][idx]
# Create a pseudo object that only populates the data, feed and prediction slots with
# the current re-feed iteration
subbatch = AlignerBatch(image=batch.image,
detected_faces=batch.detected_faces,
filename=batch.filename,
feed=[batch.feed[idx]],
prediction=batch.prediction[idx],
data=[batch.data[idx]])
self.process_output(subbatch)
landmarks.append(subbatch["landmarks"])
batch["landmarks"] = np.average(landmarks, axis=0)
landmarks.append(subbatch.landmarks)
batch.landmarks = np.average(landmarks, axis=0)
return batch
# <<< FACE NORMALIZATION METHODS >>> #
def _normalize_faces(self, faces):
def _normalize_faces(self, faces: List[np.ndarray]) -> List[np.ndarray]:
""" Normalizes the face for feeding into model
The normalization method is dictated by the normalization command line argument
Parameters
----------
faces: :class:`numpy.ndarray`
The faces to normalize
Returns
-------
:class:`numpy.ndarray`
The normalized faces
"""
if self._normalize_method is None:
return faces
logger.trace("Normalizing faces")
logger.trace("Normalizing faces") # type:ignore
meth = getattr(self, f"_normalize_{self._normalize_method.lower()}")
faces = [meth(face) for face in faces]
logger.trace("Normalized faces")
logger.trace("Normalized faces") # type:ignore
return faces
@staticmethod
def _normalize_mean(face):
""" Normalize Face to the Mean """
@classmethod
def _normalize_mean(cls, face: np.ndarray) -> np.ndarray:
""" Normalize Face to the Mean
Parameters
----------
faces: :class:`numpy.ndarray`
The faces to normalize
Returns
-------
:class:`numpy.ndarray`
The normalized faces
"""
face = face / 255.0
for chan in range(3):
layer = face[:, :, chan]
@ -371,17 +472,114 @@ class Aligner(Extractor): # pylint:disable=abstract-method
face[:, :, chan] = layer
return face * 255.0
@staticmethod
def _normalize_hist(face):
""" Equalize the RGB histogram channels """
@classmethod
def _normalize_hist(cls, face: np.ndarray) -> np.ndarray:
""" Equalize the RGB histogram channels
Parameters
----------
faces: :class:`numpy.ndarray`
The faces to normalize
Returns
-------
:class:`numpy.ndarray`
The normalized faces
"""
for chan in range(3):
face[:, :, chan] = cv2.equalizeHist(face[:, :, chan])
return face
@staticmethod
def _normalize_clahe(face):
""" Perform Contrast Limited Adaptive Histogram Equalization """
@classmethod
def _normalize_clahe(cls, face: np.ndarray) -> np.ndarray:
""" Perform Contrast Limited Adaptive Histogram Equalization
Parameters
----------
faces: :class:`numpy.ndarray`
The faces to normalize
Returns
-------
:class:`numpy.ndarray`
The normalized faces
"""
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(4, 4))
for chan in range(3):
face[:, :, chan] = clahe.apply(face[:, :, chan])
return face
class AlignedFilter():
""" Applies filters on the output of the aligner
Parameters
----------
min_scale: float
Filters out faces that have been aligned at below this value as a multiplier of the
minimum frame dimension. Set to ``0`` for off.
max_scale: float
Filters out faces that have been aligned at above this value as a multiplier of the
minimum frame dimension. Set to ``0`` for off.
distance: float:
Filters out faces that are further than this distance from an "average" face. Set to
``0`` for off.
"""
def __init__(self, min_scale: float, max_scale: float, distance: float):
logger.debug("Initializing %s: (min_scale: %s, max_scale: %s, distance: %s)",
self.__class__.__name__, min_scale, max_scale, distance)
self._min_scale = min_scale
self._max_scale = max_scale
self._distance = distance / 100.
self._active = max_scale > 0.0 or min_scale > 0.0 or distance > 0.0
self._counts: Dict[str, int] = dict(min_scale=0, max_scale=0, distance=0)
logger.debug("Initialized %s: ", self.__class__.__name__)
def __call__(self, faces: List[DetectedFace], minimum_dimension: int) -> List[DetectedFace]:
""" Apply the filter to the incoming batch
Parameters
----------
batch: list
List of detected face objects to filter out on size
minimum_dimension: int
The minimum (height, width) of the original frame
Returns
-------
list
The filtered list of detected face objects
"""
if not self._active:
return faces
max_size = minimum_dimension * self._max_scale
min_size = minimum_dimension * self._min_scale
retval: List[DetectedFace] = []
for face in faces:
test = AlignedFace(landmarks=face.landmarks_xy, centering="face")
if self._min_scale > 0.0 or self._max_scale > 0.0:
roi = test.original_roi
size = ((roi[1][0] - roi[0][0]) ** 2 + (roi[1][1] - roi[0][1]) ** 2) ** 0.5
if self._min_scale > 0.0 and size < min_size:
self._counts["min_scale"] += 1
continue
if self._max_scale > 0.0 and size > max_size:
self._counts["max_scale"] += 1
continue
if 0.0 < self._distance < test.average_distance:
self._counts["distance"] += 1
continue
retval.append(face)
return retval
def output_counts(self):
""" Output the counts of filtered items """
if not self._active:
return
counts = [f"{key} ({getattr(self, f'_{key}'):.2f}): {count}"
for key, count in self._counts.items()
if count > 0]
if counts:
logger.info("Aligner filtered: [%s)", ", ".join(counts))

View File

@ -23,16 +23,20 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
from typing import cast, List, Tuple, TYPE_CHECKING
import cv2
import numpy as np
from ._base import Aligner, logger
from ._base import Aligner, AlignerBatch, logger
if TYPE_CHECKING:
from lib.align.detected_face import DetectedFace
class Align(Aligner):
""" Perform transformation to align and get landmarks """
def __init__(self, **kwargs):
def __init__(self, **kwargs) -> None:
git_model_id = 1
model_filename = "cnn-facial-landmark_v1.pb"
super().__init__(git_model_id=git_model_id, model_filename=model_filename, **kwargs)
@ -44,33 +48,81 @@ class Align(Aligner):
self.vram_per_batch = 0
self.batchsize = 1
def init_model(self):
def init_model(self) -> None:
""" Initialize CV2 DNN Detector Model"""
self.model = cv2.dnn.readNetFromTensorflow(self.model_path) # pylint: disable=no-member
self.model.setPreferableTarget(cv2.dnn.DNN_TARGET_CPU) # pylint: disable=no-member
def process_input(self, batch):
""" Compile the detected faces for prediction """
faces, batch["roi"], batch["offsets"] = self.align_image(batch)
faces = self._normalize_faces(faces)
batch["feed"] = np.array(faces, dtype="float32")[..., :3].transpose((0, 3, 1, 2))
return batch
def process_input(self, batch: AlignerBatch) -> None:
""" Compile the detected faces for prediction
def align_image(self, batch):
""" Align the incoming image for prediction """
logger.trace("Aligning image around center")
Parameters
----------
batch: :class:`AlignerBatch`
The current batch to process input for
Returns
-------
:class:`AlignerBatch`
The batch item with the :attr:`feed` populated and any required :attr:`data` added
"""
faces, roi, offsets = self.align_image(batch)
faces = self._normalize_faces(faces)
batch.data.append(dict(roi=roi, offsets=offsets))
batch.feed.append(np.array(faces, dtype="float32")[..., :3].transpose((0, 3, 1, 2)))
def _get_box_and_offset(self, face: "DetectedFace") -> Tuple[List[int], int]:
"""Obtain the bounding box and offset from a detected face.
Parameters
----------
face: :class:`~lib.align.DetectedFace`
The detected face object to obtain the bounding box and offset from
Returns
-------
box: list
The [left, top, right, bottom] bounding box
offset: int
The offset of the box (difference between half width vs height)
"""
box = cast(List[int], [face.left,
face.top,
face.right,
face.bottom])
diff_height_width = cast(int, face.height) - cast(int, face.width)
offset = int(abs(diff_height_width / 2))
return box, offset
def align_image(self, batch: AlignerBatch) -> Tuple[List[np.ndarray],
List[List[int]],
List[Tuple[int, int]]]:
""" Align the incoming image for prediction
Parameters
----------
batch: :class:`AlignerBatch`
The current batch to align the input for
Returns
-------
faces: list
List of feed faces for the aligner
rois: list
List of roi's for the faces
offsets: list
List of offsets for the faces
"""
logger.trace("Aligning image around center") # type:ignore
sizes = (self.input_size, self.input_size)
rois = []
faces = []
offsets = []
for det_face, image in zip(batch["detected_faces"], batch["image"]):
box = (det_face.left,
det_face.top,
det_face.right,
det_face.bottom)
diff_height_width = det_face.height - det_face.width
offset_y = int(abs(diff_height_width / 2))
box_moved = self.move_box(box, [0, offset_y])
for det_face, image in zip(batch.detected_faces, batch.image):
box, offset_y = self._get_box_and_offset(det_face)
box_moved = self.move_box(box, (0, offset_y))
# Make box square.
roi = self.get_square_box(box_moved)
@ -85,9 +137,24 @@ class Align(Aligner):
offsets.append(offset)
return faces, rois, offsets
@staticmethod
def move_box(box, offset):
"""Move the box to direction specified by vector offset"""
@classmethod
def move_box(cls,
box: List[int],
offset: Tuple[int, int]) -> List[int]:
"""Move the box to direction specified by vector offset
Parameters
----------
box: list
The (`left`, `top`, `right`, `bottom`) box positions
offset: tuple
(x, y) offset to move the box
Returns
-------
list
The original box shifted by the offset
"""
left = box[0] + offset[0]
top = box[1] + offset[1]
right = box[2] + offset[0]
@ -95,8 +162,19 @@ class Align(Aligner):
return [left, top, right, bottom]
@staticmethod
def get_square_box(box):
"""Get a square box out of the given box, by expanding it."""
def get_square_box(box: List[int]) -> List[int]:
"""Get a square box out of the given box, by expanding it.
Parameters
----------
box: list
The (`left`, `top`, `right`, `bottom`) box positions
Returns
-------
list
The original box but made square
"""
left = box[0]
top = box[1]
right = box[2]
@ -127,15 +205,29 @@ class Align(Aligner):
return [left, top, right, bottom]
@staticmethod
def pad_image(box, image):
"""Pad image if face-box falls outside of boundaries """
@classmethod
def pad_image(cls, box: List[int], image: np.ndarray) -> Tuple[np.ndarray, Tuple[int, int]]:
"""Pad image if face-box falls outside of boundaries
Parameters
----------
box: list
The (`left`, `top`, `right`, `bottom`) roi box positions
image: :class:`numpy.ndarray`
The image to be padded
Returns
-------
:class:`numpy.ndarray`
The padded image
"""
height, width = image.shape[:2]
pad_l = 1 - box[0] if box[0] < 0 else 0
pad_t = 1 - box[1] if box[1] < 0 else 0
pad_r = box[2] - width if box[2] > width else 0
pad_b = box[3] - height if box[3] > height else 0
logger.trace("Padding: (l: %s, t: %s, r: %s, b: %s)", pad_l, pad_t, pad_r, pad_b)
logger.trace("Padding: (l: %s, t: %s, r: %s, b: %s)", # type:ignore
pad_l, pad_t, pad_r, pad_b)
padded_image = cv2.copyMakeBorder(image.copy(),
pad_t,
pad_b,
@ -144,29 +236,61 @@ class Align(Aligner):
cv2.BORDER_CONSTANT,
value=(0, 0, 0))
offsets = (pad_l - pad_r, pad_t - pad_b)
logger.trace("image_shape: %s, Padded shape: %s, box: %s, offsets: %s",
logger.trace("image_shape: %s, Padded shape: %s, box: %s, offsets: %s", # type:ignore
image.shape, padded_image.shape, box, offsets)
return padded_image, offsets
def predict(self, batch):
""" Predict the 68 point landmarks """
logger.trace("Predicting Landmarks")
def predict(self, batch: AlignerBatch) -> np.ndarray:
""" Predict the 68 point landmarks
Parameters
----------
batch: :class:`numpy.ndarray`
The batch to feed into the aligner
Returns
-------
:class:`numpy.ndarray`
The predictions from the aligner
"""
logger.trace("Predicting Landmarks") # type:ignore
self.model.setInput(batch)
retval = self.model.forward()
return retval
def process_output(self, batch):
""" Process the output from the model """
def process_output(self, batch: AlignerBatch) -> AlignerBatch:
""" Process the output from the model
Parameters
----------
batch: :class:`AlignerBatch`
The current batch from the model with :attr:`predictions` populated
Returns
-------
:class:`AlignerBatch`
The current batch with the :attr:`landmarks` populated
"""
self.get_pts_from_predict(batch)
return batch
@staticmethod
def get_pts_from_predict(batch):
""" Get points from predictor """
for prediction, roi, offset in zip(batch["prediction"], batch["roi"], batch["offsets"]):
@classmethod
def get_pts_from_predict(cls, batch: AlignerBatch):
""" Get points from predictor and populates the :attr:`landmarks` property
Parameters
----------
batch: :class:`AlignerBatch`
The current batch from the model with :attr:`predictions` populated
"""
landmarks = []
for prediction, roi, offset in zip(batch.prediction,
batch.data[0]["roi"],
batch.data[0]["offsets"]):
points = np.reshape(prediction, (-1, 2))
points *= (roi[2] - roi[0])
points[:, 0] += (roi[0] - offset[0])
points[:, 1] += (roi[1] - offset[1])
batch.setdefault("landmarks", []).append(points)
logger.trace("Predicted Landmarks: %s", batch["landmarks"])
landmarks.append(points)
batch.landmarks = np.array(landmarks)
logger.trace("Predicted Landmarks: %s", batch.landmarks) # type:ignore

View File

@ -3,16 +3,21 @@
Code adapted and modified from:
https://github.com/1adrianb/face-alignment
"""
from typing import cast, List, TYPE_CHECKING
import cv2
import numpy as np
from lib.model.session import KSession
from ._base import Aligner, logger
from ._base import Aligner, AlignerBatch, logger
if TYPE_CHECKING:
from lib.align import DetectedFace
class Align(Aligner):
""" Perform transformation to align and get landmarks """
def __init__(self, **kwargs):
def __init__(self, **kwargs) -> None:
git_model_id = 13
model_filename = "face-alignment-network_2d4_keras_v2.h5"
super().__init__(git_model_id=git_model_id, model_filename=model_filename, **kwargs)
@ -22,10 +27,10 @@ class Align(Aligner):
self.vram = 2240
self.vram_warnings = 512 # Will run at this with warnings
self.vram_per_batch = 64
self.batchsize = self.config["batch-size"]
self.batchsize: int = self.config["batch-size"]
self.reference_scale = 200. / 195.
def init_model(self):
def init_model(self) -> None:
""" Initialize FAN model """
self.model = KSession(self.name,
self.model_path,
@ -37,69 +42,133 @@ class Align(Aligner):
placeholder = np.zeros(placeholder_shape, dtype="float32")
self.model.predict(placeholder)
def process_input(self, batch):
""" Compile the detected faces for prediction """
logger.debug("Aligning faces around center")
batch["center_scale"] = self.get_center_scale(batch["detected_faces"])
faces = self.crop(batch)
logger.trace("Aligned image around center")
faces = self._normalize_faces(faces)
batch["feed"] = np.array(faces, dtype="float32")[..., :3] / 255.0
return batch
def process_input(self, batch: AlignerBatch) -> None:
""" Compile the detected faces for prediction
def get_center_scale(self, detected_faces):
""" Get the center and set scale of bounding box """
Parameters
----------
batch: :class:`AlignerBatch`
The current batch to process input for
"""
logger.debug("Aligning faces around center")
center_scale = self.get_center_scale(batch.detected_faces)
faces = self.crop(batch, center_scale)
logger.trace("Aligned image around center") # type:ignore
faces = self._normalize_faces(faces)
batch.data.append(dict(center_scale=center_scale))
batch.feed.append(np.array(faces, dtype="float32")[..., :3] / 255.0)
def get_center_scale(self, detected_faces: List["DetectedFace"]) -> np.ndarray:
""" Get the center and set scale of bounding box
Parameters
----------
detected_faces: list
List of :class:`~lib.align.DetectedFace` objects for the batch
Returns
-------
:class:`numpy.ndarray`
The center and scale of the bounding box
"""
logger.debug("Calculating center and scale")
center_scale = np.empty((len(detected_faces), 68, 3), dtype='float32')
for index, face in enumerate(detected_faces):
x_center = (face.left + face.right) / 2.0
y_center = (face.top + face.bottom) / 2.0 - face.height * 0.12
scale = (face.width + face.height) * self.reference_scale
x_center = (cast(int, face.left) + face.right) / 2.0
y_center = (cast(int, face.top) + face.bottom) / 2.0 - cast(int, face.height) * 0.12
scale = (cast(int, face.width) + cast(int, face.height)) * self.reference_scale
center_scale[index, :, 0] = np.full(68, x_center, dtype='float32')
center_scale[index, :, 1] = np.full(68, y_center, dtype='float32')
center_scale[index, :, 2] = np.full(68, scale, dtype='float32')
logger.trace("Calculated center and scale: %s", center_scale)
logger.trace("Calculated center and scale: %s", center_scale) # type:ignore
return center_scale
def crop(self, batch): # pylint:disable=too-many-locals
""" Crop image around the center point """
def _crop_image(self,
image: np.ndarray,
top_left: np.ndarray,
bottom_right: np.ndarray) -> np.ndarray:
""" Crop a single image
Parameters
----------
image: :class:`numpy.ndarray`
The image to crop
top_left: :class:`numpy.ndarray`
The top left (x, y) point to crop from
bottom_right: :class:`numpy.ndarray`
The bottom right (x, y) point to crop to
Returns
-------
:class:`numpy.ndarray`
The cropped image
"""
bottom_right_width, bottom_right_height = bottom_right[0].astype('int32')
top_left_width, top_left_height = top_left[0].astype('int32')
new_dim = (bottom_right_height - top_left_height,
bottom_right_width - top_left_width,
3 if image.ndim > 2 else 1)
new_img = np.empty(new_dim, dtype=np.uint8)
new_x = slice(max(0, -top_left_width),
min(bottom_right_width, image.shape[1]) - top_left_width)
new_y = slice(max(0, -top_left_height),
min(bottom_right_height, image.shape[0]) - top_left_height)
old_x = slice(max(0, top_left_width), min(bottom_right_width, image.shape[1]))
old_y = slice(max(0, top_left_height), min(bottom_right_height, image.shape[0]))
new_img[new_y, new_x] = image[old_y, old_x]
interp = cv2.INTER_CUBIC if new_dim[0] < self.input_size else cv2.INTER_AREA
return cv2.resize(new_img,
dsize=(self.input_size, self.input_size),
interpolation=interp)
def crop(self, batch: AlignerBatch, center_scale: np.ndarray) -> List[np.ndarray]:
""" Crop image around the center point
Parameters
----------
batch: :class:`AlignerBatch`
The current batch to crop the image for
center_scale: :class:`numpy.ndarray`
The center and scale for the bounding box
Returns
-------
list
List of cropped images for the batch
"""
logger.debug("Cropping images")
sizes = (self.input_size, self.input_size)
batch_shape = batch["center_scale"].shape[:2]
batch_shape = center_scale.shape[:2]
resolutions = np.full(batch_shape, self.input_size, dtype='float32')
matrix_ones = np.ones(batch_shape + (3,), dtype='float32')
matrix_size = np.full(batch_shape + (3,), self.input_size, dtype='float32')
matrix_size[..., 2] = 1.0
upper_left = self.transform(matrix_ones, batch["center_scale"], resolutions)
bot_right = self.transform(matrix_size, batch["center_scale"], resolutions)
upper_left = self.transform(matrix_ones, center_scale, resolutions)
bot_right = self.transform(matrix_size, center_scale, resolutions)
# TODO second pass .. convert to matrix
new_images = []
for image, top_left, bottom_right in zip(batch["image"], upper_left, bot_right):
height, width = image.shape[:2]
channels = 3 if image.ndim > 2 else 1
bottom_right_width, bottom_right_height = bottom_right[0].astype('int32')
top_left_width, top_left_height = top_left[0].astype('int32')
new_dim = (bottom_right_height - top_left_height,
bottom_right_width - top_left_width,
channels)
new_img = np.empty(new_dim, dtype=np.uint8)
new_x = slice(max(0, -top_left_width), min(bottom_right_width, width) - top_left_width)
new_y = slice(max(0, -top_left_height),
min(bottom_right_height, height) - top_left_height)
old_x = slice(max(0, top_left_width), min(bottom_right_width, width))
old_y = slice(max(0, top_left_height), min(bottom_right_height, height))
new_img[new_y, new_x] = image[old_y, old_x]
interp = cv2.INTER_CUBIC if new_dim[0] < self.input_size else cv2.INTER_AREA
new_images.append(cv2.resize(new_img, dsize=sizes, interpolation=interp))
logger.trace("Cropped images")
new_images = [self._crop_image(image, top_left, bottom_right)
for image, top_left, bottom_right in zip(batch.image, upper_left, bot_right)]
logger.trace("Cropped images") # type:ignore
return new_images
@staticmethod
def transform(points, center_scales, resolutions):
""" Transform Image """
@classmethod
def transform(cls,
points: np.ndarray,
center_scales: np.ndarray,
resolutions: np.ndarray) -> np.ndarray:
""" Transform Image
Parameters
----------
points: :class:`numpy.ndarray`
The points to transform
center_scales: :class:`numpy.ndarray`
The calculated centers and scales for the batch
resolutions: :class:`numpy.ndarray`
The resolutions
"""
logger.debug("Transforming Points")
num_images, num_landmarks = points.shape[:2]
transform_matrix = np.eye(3, dtype='float32')
@ -113,45 +182,79 @@ class Align(Aligner):
transform_matrix[:, :, 1, 2] = translations[:, :, 1] # y translation
new_points = np.einsum('abij, abj -> abi', transform_matrix, points, optimize='greedy')
retval = new_points[:, :, :2].astype('float32')
logger.trace("Transformed Points: %s", retval)
logger.trace("Transformed Points: %s", retval) # type:ignore
return retval
def predict(self, batch):
""" Predict the 68 point landmarks """
def predict(self, batch: np.ndarray) -> np.ndarray:
""" Predict the 68 point landmarks
Parameters
----------
batch: :class:`numpy.ndarray`
The batch to feed into the aligner
Returns
-------
:class:`numpy.ndarray`
The predictions from the aligner
"""
logger.debug("Predicting Landmarks")
# TODO Remove lazy transpose and change points from predict to use the correct
# order
retval = self.model.predict(batch)[-1].transpose(0, 3, 1, 2)
logger.trace(retval.shape)
logger.trace(retval.shape) # type:ignore
return retval
def process_output(self, batch):
""" Process the output from the model """
def process_output(self, batch: AlignerBatch) -> AlignerBatch:
""" Process the output from the model
Parameters
----------
batch: :class:`AlignerBatch`
The current batch from the model with :attr:`predictions` populated
Returns
-------
:class:`AlignerBatch`
The current batch with the :attr:`landmarks` populated
"""
self.get_pts_from_predict(batch)
return batch
def get_pts_from_predict(self, batch):
""" Get points from predictor """
def get_pts_from_predict(self, batch: AlignerBatch):
""" Get points from predictor and populate the :attr:`landmarks` property of the
:class:`AlignerBatch`
Parameters
----------
batch: :class:`AlignerBatch`
The current batch from the model with :attr:`predictions` populated
"""
logger.debug("Obtain points from prediction")
num_images, num_landmarks, height, width = batch["prediction"].shape
num_images, num_landmarks = batch.prediction.shape[:2]
image_slice = np.repeat(np.arange(num_images)[:, None], num_landmarks, axis=1)
landmark_slice = np.repeat(np.arange(num_landmarks)[None, :], num_images, axis=0)
resolution = np.full((num_images, num_landmarks), 64, dtype='int32')
subpixel_landmarks = np.ones((num_images, num_landmarks, 3), dtype='float32')
flat_indices = batch["prediction"].reshape(num_images, num_landmarks, -1).argmax(-1)
indices = np.array(np.unravel_index(flat_indices, (height, width)))
min_clipped = np.minimum(indices + 1, height - 1)
indices = np.array(np.unravel_index(batch.prediction.reshape(num_images,
num_landmarks,
-1).argmax(-1),
(batch.prediction.shape[2], # height
batch.prediction.shape[3]))) # width
min_clipped = np.minimum(indices + 1, batch.prediction.shape[2] - 1)
max_clipped = np.maximum(indices - 1, 0)
offsets = [(image_slice, landmark_slice, indices[0], min_clipped[1]),
(image_slice, landmark_slice, indices[0], max_clipped[1]),
(image_slice, landmark_slice, min_clipped[0], indices[1]),
(image_slice, landmark_slice, max_clipped[0], indices[1])]
x_subpixel_shift = batch["prediction"][offsets[0]] - batch["prediction"][offsets[1]]
y_subpixel_shift = batch["prediction"][offsets[2]] - batch["prediction"][offsets[3]]
x_subpixel_shift = batch.prediction[offsets[0]] - batch.prediction[offsets[1]]
y_subpixel_shift = batch.prediction[offsets[2]] - batch.prediction[offsets[3]]
# TODO improve rudimentary sub-pixel logic to centroid of 3x3 window algorithm
subpixel_landmarks[:, :, 0] = indices[1] + np.sign(x_subpixel_shift) * 0.25 + 0.5
subpixel_landmarks[:, :, 1] = indices[0] + np.sign(y_subpixel_shift) * 0.25 + 0.5
batch["landmarks"] = self.transform(subpixel_landmarks, batch["center_scale"], resolution)
logger.trace("Obtained points from prediction: %s", batch["landmarks"])
batch.landmarks = self.transform(subpixel_landmarks,
batch.data[0]["center_scale"],
resolution)
logger.trace("Obtained points from prediction: %s", batch.landmarks) # type:ignore

View File

@ -76,7 +76,7 @@ class Extractor():
exactly what angles to check. Can also pass in ``'on'`` to increment at 90 degree
intervals. Default: ``None``
min_size: int, optional
Used to set the :attr:`plugins.extract.detect.min_size` attribute Filters out faces
Used to set the :attr:`plugins.extract.detect.min_size` attribute. Filters out faces
detected below this size. Length, in pixels across the diagonal of the bounding box. Set
to ``0`` for off. Default: ``0``
normalize_method: {`None`, 'clahe', 'hist', 'mean'}, optional
@ -103,8 +103,8 @@ class Extractor():
multiprocess: bool = False,
exclude_gpus: Optional[List[int]] = None,
rotate_images: Optional[List[int]] = None,
min_size: int = 20,
normalize_method: Optional[str] = None,
min_size: int = 0,
normalize_method: Optional[Literal["none", "clahe", "hist", "mean"]] = None,
re_feed: int = 0,
image_is_aligned: bool = False) -> None:
logger.debug("Initializing %s: (detector: %s, aligner: %s, masker: %s, configfile: %s, "
@ -541,9 +541,25 @@ class Extractor():
def _load_align(self,
aligner: Optional[str],
configfile: Optional[str],
normalize_method: Optional[str],
normalize_method: Optional[Literal["none", "clahe", "hist", "mean"]],
re_feed: int) -> Optional["Aligner"]:
""" Set global arguments and load aligner plugin """
""" Set global arguments and load aligner plugin
Parameters
----------
aligner: str
The aligner plugin to load or ``None`` for no aligner
configfile: str
Optional full path to custom config file
normalize_method: str
Optional normalization method to use
re_feed: int
The number of times to adjust the image and re-feed to get an average score
Returns
-------
Aligner plugin if one is specified otherwise ``None``
"""
if aligner is None or aligner.lower() == "none":
logger.debug("No aligner selected. Returning None")
return None
@ -637,7 +653,10 @@ class Extractor():
- plugins_required) // len(gpu_plugins)
self._set_plugin_batchsize(gpu_plugins, available_vram)
def set_aligner_normalization_method(self, method: str) -> None:
def set_aligner_normalization_method(self, method: Optional[Literal["none",
"clahe",
"hist",
"mean"]]) -> None:
""" Change the normalization method for faces fed into the aligner.
Parameters

View File

@ -83,7 +83,7 @@ class Extract(): # pylint:disable=too-few-public-methods
logger.debug("Input locations: %s", retval)
return retval
def _validate_batchmode(self):
def _validate_batchmode(self) -> None:
""" Validate the command line arguments.
If batch-mode selected and there is only one object to extract from, then batch mode is
@ -330,7 +330,8 @@ class _Extract(): # pylint:disable=too-few-public-methods
for idx, extract_media in enumerate(tqdm(self._extractor.detected_faces(),
total=self._images.process_count,
file=sys.stdout,
desc=desc)):
desc=desc,
leave=False)):
self._check_thread_error()
if is_final:
self._output_processing(extract_media, size)