Minor updates and fixups

- Mask Tool - Typing + BiSeNet mask update fix
  - Alignments Tool - Auto search for alignments file
This commit is contained in:
torzdf 2022-09-14 19:14:03 +01:00
parent 952d79922b
commit 2d312a9db2
7 changed files with 297 additions and 148 deletions

View File

@ -6,26 +6,26 @@ msgid ""
msgstr ""
"Project-Id-Version: faceswap.spanish\n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2022-05-24 12:38+0100\n"
"PO-Revision-Date: 2022-05-24 12:41+0100\n"
"POT-Creation-Date: 2022-09-14 18:36+0100\n"
"PO-Revision-Date: 2022-09-14 18:38+0100\n"
"Last-Translator: \n"
"Language-Team: tokafondo\n"
"Language: es_ES\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=UTF-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: pygettext.py 1.5\n"
"X-Generator: Poedit 3.0\n"
"Plural-Forms: nplurals=2; plural=(n != 1);\n"
"Generated-By: pygettext.py 1.5\n"
"X-Generator: Poedit 3.0.1\n"
#: tools/alignments/cli.py:15
#: tools/alignments/cli.py:17
msgid ""
"This command lets you perform various tasks pertaining to an alignments file."
msgstr ""
"Este comando le permite realizar varias tareas relacionadas con un archivo "
"de alineación."
#: tools/alignments/cli.py:30
#: tools/alignments/cli.py:32
msgid ""
"Alignments tool\n"
"This tool allows you to perform numerous actions on or using an alignments "
@ -36,16 +36,16 @@ msgstr ""
"caras o una fuente de fotogramas, usando opcionalmente su correspondiente "
"archivo de alineación."
#: tools/alignments/cli.py:41
#: tools/alignments/cli.py:44
msgid " Must Pass in a frames folder/source video file (-fr)."
msgstr ""
" Debe indicar una carpeta de fotogramas o archivo de vídeo de origen (-fr)."
#: tools/alignments/cli.py:42
#: tools/alignments/cli.py:45
msgid " Must Pass in a faces folder (-fc)."
msgstr " Debe indicar una carpeta de caras (-fc)."
#: tools/alignments/cli.py:43
#: tools/alignments/cli.py:46
msgid ""
" Must Pass in either a frames folder/source video file OR afaces folder (-fr "
"or -fc)."
@ -53,7 +53,7 @@ msgstr ""
" Debe indicar una carpeta de fotogramas o archivo de vídeo de origen, o una "
"carpeta de caras (-fr o -fc)."
#: tools/alignments/cli.py:45
#: tools/alignments/cli.py:48
msgid ""
" Must Pass in a frames folder/source video file AND a faces folder (-fr and -"
"fc)."
@ -61,15 +61,15 @@ msgstr ""
" Debe indicar una carpeta de fotogramas o archivo de vídeo de origen, y una "
"carpeta de caras (-fr y -fc)."
#: tools/alignments/cli.py:47
#: tools/alignments/cli.py:50
msgid " Use the output option (-o) to process results."
msgstr " Usar la opción de salida (-o) para procesar los resultados."
#: tools/alignments/cli.py:55 tools/alignments/cli.py:94
#: tools/alignments/cli.py:58 tools/alignments/cli.py:97
msgid "processing"
msgstr "proceso"
#: tools/alignments/cli.py:57
#: tools/alignments/cli.py:60
#, python-brace-format
msgid ""
"R|Choose which action you want to perform. NB: All actions require an "
@ -142,7 +142,7 @@ msgstr ""
"L|'spatial': Realiza un filtrado espacial y temporal para suavizar las "
"alineaciones (¡EXPERIMENTAL!)"
#: tools/alignments/cli.py:96
#: tools/alignments/cli.py:99
msgid ""
"R|How to output discovered items ('faces' and 'frames' only):\n"
"L|'console': Print the list of frames to the screen. (DEFAULT)\n"
@ -158,37 +158,41 @@ msgstr ""
"L|'move': Mueve los elementos descubiertos a una subcarpeta dentro del "
"directorio de origen."
#: tools/alignments/cli.py:107 tools/alignments/cli.py:118
#: tools/alignments/cli.py:125
#: tools/alignments/cli.py:110 tools/alignments/cli.py:123
#: tools/alignments/cli.py:130 tools/alignments/cli.py:149
msgid "data"
msgstr "datos"
#: tools/alignments/cli.py:111
#: tools/alignments/cli.py:114
msgid ""
"Full path to the alignments file to be processed. This is required for all "
"jobs except for 'from-faces' when the alignments file will be generated in "
"the specified faces folder."
"Full path to the alignments file to be processed. If you have input a "
"'frames_dir' and don't provide this option, the process will try to find the "
"alignments file at the default location. All jobs require an alignments file "
"with the exception of 'from-faces' when the alignments file will be "
"generated in the specified faces folder."
msgstr ""
"Ruta completa del archivo de alineaciones a procesar. Esto es necesario para "
"todos los trabajos excepto para 'caras desde' cuando el archivo de "
"alineaciones se generará en la carpeta de caras especificada."
"Ruta completa al archivo de alineaciones a procesar. Si ingresó un "
"'frames_dir' y no proporciona esta opción, el proceso intentará encontrar el "
"archivo de alineaciones en la ubicación predeterminada. Todos los trabajos "
"requieren un archivo de alineaciones con la excepción de 'from-faces' cuando "
"el archivo de alineaciones se generará en la carpeta de caras especificada."
#: tools/alignments/cli.py:119
#: tools/alignments/cli.py:124
msgid "Directory containing extracted faces."
msgstr "Directorio que contiene las caras extraídas."
#: tools/alignments/cli.py:126
#: tools/alignments/cli.py:131
msgid "Directory containing source frames that faces were extracted from."
msgstr ""
"Directorio que contiene los fotogramas de origen de los que se extrajeron "
"las caras."
#: tools/alignments/cli.py:135 tools/alignments/cli.py:146
#: tools/alignments/cli.py:156
#: tools/alignments/cli.py:140 tools/alignments/cli.py:164
#: tools/alignments/cli.py:174
msgid "extract"
msgstr "extracción"
#: tools/alignments/cli.py:136
#: tools/alignments/cli.py:141
msgid ""
"[Extract only] Extract every 'nth' frame. This option will skip frames when "
"extracting faces. For example a value of 1 will extract faces from every "
@ -199,11 +203,22 @@ msgstr ""
"caras de cada fotograma, un valor de 10 extraerá las caras de cada 10 "
"fotogramas."
#: tools/alignments/cli.py:147
#: tools/alignments/cli.py:150
msgid ""
"R|If selected then:\n"
"L|'frames_folder' should be a parent folder containing multiple videos/"
"folders of images you need to work on.\n"
"L|'faces_folder' should be a parent folder containing multiple folders of "
"faces you wish to manage.\n"
"L|'alignments_file'. should be a parent folder containing multiple alignment "
"files."
msgstr ""
#: tools/alignments/cli.py:165
msgid "[Extract only] The output size of extracted faces."
msgstr "[Sólo extracción] El tamaño de salida de las caras extraídas."
#: tools/alignments/cli.py:157
#: tools/alignments/cli.py:175
msgid ""
"[Extract only] Only extract faces that have been resized by this percent or "
"more to meet the specified extract size (`-sz`, `--size`). Useful for "

View File

@ -8,7 +8,7 @@ msgid ""
msgstr ""
"Project-Id-Version: PACKAGE VERSION\n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2022-05-24 12:38+0100\n"
"POT-Creation-Date: 2022-09-14 18:45+0100\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n"
@ -17,47 +17,47 @@ msgstr ""
"Content-Type: text/plain; charset=CHARSET\n"
"Content-Transfer-Encoding: 8bit\n"
#: tools/alignments/cli.py:15
#: tools/alignments/cli.py:17
msgid ""
"This command lets you perform various tasks pertaining to an alignments file."
msgstr ""
#: tools/alignments/cli.py:30
#: tools/alignments/cli.py:32
msgid ""
"Alignments tool\n"
"This tool allows you to perform numerous actions on or using an alignments "
"file against its corresponding faceset/frame source."
msgstr ""
#: tools/alignments/cli.py:41
#: tools/alignments/cli.py:44
msgid " Must Pass in a frames folder/source video file (-fr)."
msgstr ""
#: tools/alignments/cli.py:42
#: tools/alignments/cli.py:45
msgid " Must Pass in a faces folder (-fc)."
msgstr ""
#: tools/alignments/cli.py:43
#: tools/alignments/cli.py:46
msgid ""
" Must Pass in either a frames folder/source video file OR afaces folder (-fr "
"or -fc)."
msgstr ""
#: tools/alignments/cli.py:45
#: tools/alignments/cli.py:48
msgid ""
" Must Pass in a frames folder/source video file AND a faces folder (-fr and -"
"fc)."
msgstr ""
#: tools/alignments/cli.py:47
#: tools/alignments/cli.py:50
msgid " Use the output option (-o) to process results."
msgstr ""
#: tools/alignments/cli.py:55 tools/alignments/cli.py:94
#: tools/alignments/cli.py:58 tools/alignments/cli.py:97
msgid "processing"
msgstr ""
#: tools/alignments/cli.py:57
#: tools/alignments/cli.py:60
#, python-brace-format
msgid ""
"R|Choose which action you want to perform. NB: All actions require an "
@ -94,7 +94,7 @@ msgid ""
"(EXPERIMENTAL!)"
msgstr ""
#: tools/alignments/cli.py:96
#: tools/alignments/cli.py:99
msgid ""
"R|How to output discovered items ('faces' and 'frames' only):\n"
"L|'console': Print the list of frames to the screen. (DEFAULT)\n"
@ -104,43 +104,45 @@ msgid ""
"directory."
msgstr ""
#: tools/alignments/cli.py:107 tools/alignments/cli.py:118
#: tools/alignments/cli.py:125
#: tools/alignments/cli.py:110 tools/alignments/cli.py:123
#: tools/alignments/cli.py:130
msgid "data"
msgstr ""
#: tools/alignments/cli.py:111
#: tools/alignments/cli.py:114
msgid ""
"Full path to the alignments file to be processed. This is required for all "
"jobs except for 'from-faces' when the alignments file will be generated in "
"the specified faces folder."
"Full path to the alignments file to be processed. If you have input a "
"'frames_dir' and don't provide this option, the process will try to find the "
"alignments file at the default location. All jobs require an alignments file "
"with the exception of 'from-faces' when the alignments file will be "
"generated in the specified faces folder."
msgstr ""
#: tools/alignments/cli.py:119
#: tools/alignments/cli.py:124
msgid "Directory containing extracted faces."
msgstr ""
#: tools/alignments/cli.py:126
#: tools/alignments/cli.py:131
msgid "Directory containing source frames that faces were extracted from."
msgstr ""
#: tools/alignments/cli.py:135 tools/alignments/cli.py:146
#: tools/alignments/cli.py:156
#: tools/alignments/cli.py:140 tools/alignments/cli.py:151
#: tools/alignments/cli.py:161
msgid "extract"
msgstr ""
#: tools/alignments/cli.py:136
#: tools/alignments/cli.py:141
msgid ""
"[Extract only] Extract every 'nth' frame. This option will skip frames when "
"extracting faces. For example a value of 1 will extract faces from every "
"frame, a value of 10 will extract faces from every 10th frame."
msgstr ""
#: tools/alignments/cli.py:147
#: tools/alignments/cli.py:152
msgid "[Extract only] The output size of extracted faces."
msgstr ""
#: tools/alignments/cli.py:157
#: tools/alignments/cli.py:162
msgid ""
"[Extract only] Only extract faces that have been resized by this percent or "
"more to meet the specified extract size (`-sz`, `--size`). Useful for "

View File

@ -12,7 +12,7 @@ plugins either in parallel or in series, giving easy access to input and output.
import logging
import sys
from typing import cast, Dict, Generator, List, Optional, Tuple, TYPE_CHECKING, Union
from typing import Any, cast, Dict, Generator, List, Optional, Tuple, TYPE_CHECKING, Union
import cv2
@ -54,11 +54,11 @@ class Extractor():
Parameters
----------
detector: str
detector: str or ``None``
The name of a detector plugin as exists in :mod:`plugins.extract.detect`
aligner: str
aligner: str or ``None
The name of an aligner plugin as exists in :mod:`plugins.extract.align`
masker: str or list
masker: str or list or ``None
The name of a masker plugin(s) as exists in :mod:`plugins.extract.mask`.
This can be a single masker or a list of multiple maskers
configfile: str, optional
@ -96,9 +96,9 @@ class Extractor():
:attr:`final_pass` to indicate to the caller which phase is being processed
"""
def __init__(self,
detector: str,
aligner: str,
masker: Union[str, List[str]],
detector: Optional[str],
aligner: Optional[str],
masker: Optional[Union[str, List[str]]],
configfile: Optional[str] = None,
multiprocess: bool = False,
exclude_gpus: Optional[List[int]] = None,
@ -114,8 +114,9 @@ class Extractor():
exclude_gpus, rotate_images, min_size, normalize_method, re_feed,
image_is_aligned)
self._instance = _get_instance()
masker = [masker] if not isinstance(masker, list) else masker
self._flow = self._set_flow(detector, aligner, masker)
maskers = [cast(Optional[str],
masker)] if not isinstance(masker, list) else cast(List[Optional[str]], masker)
self._flow = self._set_flow(detector, aligner, maskers)
self._exclude_gpus = exclude_gpus
# We only ever need 1 item in each queue. This is 2 items cached (1 in queue 1 waiting
# for queue) at each point. Adding more just stacks RAM with no speed benefit.
@ -125,7 +126,7 @@ class Extractor():
self._vram_stats = self._get_vram_stats()
self._detect = self._load_detect(detector, rotate_images, min_size, configfile)
self._align = self._load_align(aligner, configfile, normalize_method, re_feed)
self._mask = [self._load_mask(mask, image_is_aligned, configfile) for mask in masker]
self._mask = [self._load_mask(mask, image_is_aligned, configfile) for mask in maskers]
self._is_parallel = self._set_parallel_processing(multiprocess)
self._phases = self._set_phases(multiprocess)
self._phase_index = 0
@ -381,7 +382,9 @@ class Extractor():
return retval
@staticmethod
def _set_flow(detector: str, aligner: str, masker: List[str]) -> List[str]:
def _set_flow(detector: Optional[str],
aligner: Optional[str],
masker: List[Optional[str]]) -> List[str]:
""" Set the flow list based on the input plugins """
logger.debug("detector: %s, aligner: %s, masker: %s", detector, aligner, masker)
retval = []
@ -536,7 +539,7 @@ class Extractor():
# << INTERNAL PLUGIN HANDLING >> #
def _load_align(self,
aligner: str,
aligner: Optional[str],
configfile: Optional[str],
normalize_method: Optional[str],
re_feed: int) -> Optional["Aligner"]:
@ -554,7 +557,7 @@ class Extractor():
return plugin
def _load_detect(self,
detector: str,
detector: Optional[str],
rotation: Optional[List[int]],
min_size: int,
configfile: Optional[str]) -> Optional["Detector"]:
@ -572,7 +575,7 @@ class Extractor():
return plugin
def _load_mask(self,
masker: str,
masker: Optional[str],
image_is_aligned: bool,
configfile: Optional[str]) -> Optional["Masker"]:
""" Set global arguments and load masker plugin """
@ -731,6 +734,7 @@ class ExtractMedia():
self._image_shape = cast(Tuple[int, int, int], image.shape)
self._detected_faces: List["DetectedFace"] = ([] if detected_faces is None
else detected_faces)
self._frame_metadata: Dict[str, Any] = {}
@property
def filename(self) -> str:
@ -758,6 +762,20 @@ class ExtractMedia():
"""list: A list of :class:`~lib.align.DetectedFace` objects in the :attr:`image`. """
return self._detected_faces
@property
def frame_metadata(self) -> dict:
""" dict: The frame metadata that has been added from an aligned image. This property
should only be called after :func:`add_frame_metadata` has been called when processing
an aligned face. For all other instances an assertion error will be raised.
Raises
------
AssertionError
If frame metadata has not been populated from an aligned image
"""
assert self._frame_metadata is not None
return self._frame_metadata
def get_image_copy(self, color_format: Literal["BGR", "RGB", "GRAY"]) -> "np.ndarray":
""" Get a copy of the image in the requested color format.
@ -812,6 +830,18 @@ class ExtractMedia():
self._filename, image.shape)
self._image = image
def add_frame_metadata(self, metadata: Dict[str, Any]) -> None:
""" Add the source frame metadata from an aligned PNG's header data.
metadata: dict
The contents of the 'source' field in the PNG header
"""
logger.trace("Adding PNG Source data for '%s': %s", # type:ignore
self._filename, metadata)
dims: Tuple[int, int] = metadata["source_frame_dims"]
self._image_shape = (*dims, 3)
self._frame_metadata = metadata
def _image_as_bgr(self) -> "np.ndarray":
""" Get a copy of the source frame in BGR format.

View File

@ -1,13 +1,17 @@
#!/usr/bin/env python3
""" Tools for manipulating the alignments serialized file """
import logging
import os
import sys
from typing import TYPE_CHECKING
from typing import Any, TYPE_CHECKING
from lib.utils import _video_extensions
from .media import AlignmentData
from .jobs import (Check, Draw, Extract, FromFaces, Rename, # noqa pylint: disable=unused-import
RemoveFaces, Sort, Spatial)
if TYPE_CHECKING:
from argparse import Namespace
@ -27,20 +31,59 @@ class Alignments(): # pylint:disable=too-few-public-methods
"""
def __init__(self, arguments: "Namespace") -> None:
logger.debug("Initializing %s: (arguments: '%s'", self.__class__.__name__, arguments)
self.args = arguments
job = self.args.job
self.alignments = None if job == "from-faces" else AlignmentData(self.args.alignments_file)
self._args = arguments
job = self._args.job
alignment_file = self._find_alignments()
self.alignments = None if job == "from-faces" else AlignmentData(alignment_file)
logger.debug("Initialized %s", self.__class__.__name__)
def _find_alignments(self) -> str:
""" If an alignments folder is required and hasn't been provided, scan for a file based on
the video folder.
Exits if an alignments file cannot be located
Returns
-------
str
The full path to an alignments file
"""
fname = self._args.alignments_file
frames = self._args.frames_dir
if fname and os.path.isfile(fname) and os.path.splitext(fname)[-1].lower() == ".fsa":
return fname
if fname:
logger.error("Not a valid alignments file: '%s'", fname)
sys.exit(1)
if not frames or not os.path.exists(frames):
logger.error("Not a valid frames folder: '%s'. Can't scan for alignments.", frames)
sys.exit(1)
fname = "alignments.fsa"
if os.path.isdir(frames) and os.path.exists(os.path.join(frames, fname)):
return fname
if os.path.isdir(frames) or os.path.splitext(frames)[-1] not in _video_extensions:
logger.error("Can't find a valid alignments file in location: %s", frames)
sys.exit(1)
fname = f"{os.path.splitext(frames)[0]}_{fname}"
if not os.path.exists(fname):
logger.error("Can't find a valid alignments file for video: %s", frames)
sys.exit(1)
return fname
def process(self) -> None:
""" The entry point for the Alignments tool from :mod:`lib.tools.alignments.cli`.
Launches the selected alignments job.
"""
if self.args.job in ("missing-alignments", "missing-frames", "multi-faces", "no-faces"):
job = Check
if self._args.job in ("missing-alignments", "missing-frames", "multi-faces", "no-faces"):
job: Any = Check
else:
job = globals()[self.args.job.title().replace("-", "")]
job = job(self.alignments, self.args)
job = globals()[self._args.job.title().replace("-", "")]
job = job(self.alignments, self._args)
logger.debug(job)
job.process()

View File

@ -3,6 +3,8 @@
import sys
import gettext
from typing import Any, List, Dict
from lib.cli.args import FaceSwapArgs
from lib.cli.actions import DirOrFileFullPaths, DirFullPaths, FileFullPaths, Radio, Slider
@ -30,7 +32,8 @@ class AlignmentsArgs(FaceSwapArgs):
return _("Alignments tool\nThis tool allows you to perform numerous actions on or using "
"an alignments file against its corresponding faceset/frame source.")
def get_argument_list(self) -> dict:
@staticmethod
def get_argument_list() -> List[Dict[str, Any]]:
""" Collect the argparse argument options.
Returns
@ -106,11 +109,13 @@ class AlignmentsArgs(FaceSwapArgs):
type=str,
group=_("data"),
# hacky solution to not require alignments file if creating alignments from faces:
required="from-faces" not in sys.argv,
required=not any(val in sys.argv for val in ["from-faces", "-fr", "-frames_folder"]),
filetypes="alignments",
help=_("Full path to the alignments file to be processed. This is required for all "
"jobs except for 'from-faces' when the alignments file will be generated in "
"the specified faces folder.")))
help=_("Full path to the alignments file to be processed. If you have input a "
"'frames_dir' and don't provide this option, the process will try to find the "
"alignments file at the default location. All jobs require an alignments file "
"with the exception of 'from-faces' when the alignments file will be generated "
"in the specified faces folder.")))
argument_list.append(dict(
opts=("-fc", "-faces_folder"),
action=DirFullPaths,

View File

@ -3,6 +3,7 @@
import logging
import os
import sys
from typing import Any, cast, Dict, List, Optional, Tuple, TYPE_CHECKING, Union
import cv2
import numpy as np
@ -15,6 +16,11 @@ from lib.multithreading import MultiThread
from lib.utils import get_folder
from plugins.extract.pipeline import Extractor, ExtractMedia
if TYPE_CHECKING:
from argparse import Namespace
from lib.align.aligned_face import CenteringType
from lib.align.alignments import AlignmentFileDict
from lib.queue_manager import EventQueue
logger = logging.getLogger(__name__) # pylint:disable=invalid-name
@ -31,7 +37,7 @@ class Mask(): # pylint:disable=too-few-public-methods
arguments: :class:`argparse.Namespace`
The :mod:`argparse` arguments as passed in from :mod:`tools.py`
"""
def __init__(self, arguments):
def __init__(self, arguments: "Namespace") -> None:
logger.debug("Initializing %s: (arguments: %s", self.__class__.__name__, arguments)
self._update_type = arguments.processing
self._input_is_faces = arguments.input_type == "faces"
@ -47,17 +53,18 @@ class Mask(): # pylint:disable=too-few-public-methods
self._saver = self._set_saver(arguments)
loader = FacesLoader if self._input_is_faces else ImagesLoader
self._loader = loader(arguments.input)
self._faces_saver = None
self._faces_saver: Optional[ImagesSaver] = None
self._alignments = Alignments(os.path.dirname(arguments.alignments),
filename=os.path.basename(arguments.alignments))
self._extractor = self._get_extractor(arguments.exclude_gpus)
self._set_correct_mask_type()
self._extractor_input_thread = self._feed_extractor()
logger.debug("Initialized %s", self.__class__.__name__)
def _check_input(self, mask_input):
def _check_input(self, mask_input: str) -> None:
""" Check the input is valid. If it isn't exit with a logged error
Parameters
@ -74,7 +81,7 @@ class Mask(): # pylint:disable=too-few-public-methods
sys.exit(0)
logger.debug("input '%s' is valid", mask_input)
def _set_saver(self, arguments):
def _set_saver(self, arguments: "Namespace") -> Optional[ImagesSaver]:
""" set the saver in a background thread
Parameters
@ -100,7 +107,7 @@ class Mask(): # pylint:disable=too-few-public-methods
logger.debug(saver)
return saver
def _get_extractor(self, exclude_gpus):
def _get_extractor(self, exclude_gpus: List[int]) -> Optional[Extractor]:
""" Obtain a Mask extractor plugin and launch it
Parameters
@ -125,7 +132,22 @@ class Mask(): # pylint:disable=too-few-public-methods
logger.debug(extractor)
return extractor
def _feed_extractor(self):
def _set_correct_mask_type(self):
""" Some masks have multiple variants that they can be saved as depending on config options
so update the :attr:`_mask_type` accordingly
"""
if self._extractor is None or self._mask_type != "bisenet-fp":
return
# Hacky look up into masker to get the type of mask
mask_plugin = self._extractor._mask[0] # pylint:disable=protected-access
assert mask_plugin is not None
mtype = "head" if mask_plugin.config.get("include_hair", False) else "face"
new_type = f"{self._mask_type}_{mtype}"
logger.debug("Updating '%s' to '%s'", self._mask_type, new_type)
self._mask_type = new_type
def _feed_extractor(self) -> MultiThread:
""" Feed the input queue to the Extractor from a faces folder or from source frames in a
background thread
@ -134,17 +156,63 @@ class Mask(): # pylint:disable=too-few-public-methods
:class:`lib.multithreading.Multithread`:
The thread that is feeding the extractor.
"""
masker_input = getattr(self,
"_input_{}".format("faces" if self._input_is_faces else "frames"))
masker_input = getattr(self, f"_input_{'faces' if self._input_is_faces else 'frames'}")
logger.debug("masker_input: %s", masker_input)
args = tuple() if self._update_type == "output" else (self._extractor.input_queue, )
if self._update_type == "output":
args: tuple = tuple()
else:
assert self._extractor is not None
args = (self._extractor.input_queue, )
input_thread = MultiThread(masker_input, *args, thread_count=1)
input_thread.start()
logger.debug(input_thread)
return input_thread
def _input_faces(self, *args):
def _process_face(self,
filename: str,
image: np.ndarray,
metadata: Dict[str, Any]) -> Optional["ExtractMedia"]:
""" Process a single face when masking from face images
filename: str
the filename currently being processed
image: :class:`numpy.ndarray`
The current face being processed
metadata: dict
The source frame metadata from the PNG header
Returns
-------
:class:`plugins.pipeline.ExtractMedia` or ``None``
If the update type is 'output' then nothing is returned otherwise the extract media for
the face is returned
"""
frame_name = metadata["source"]["source_filename"]
face_index = metadata["source"]["face_index"]
alignment = self._alignments.get_faces_in_frame(frame_name)
if not alignment or face_index > len(alignment) - 1:
self._counts["skip"] += 1
logger.warning("Skipping Face not found in alignments file: '%s'", filename)
return None
alignment = alignment[face_index]
self._counts["face"] += 1
if self._check_for_missing(frame_name, face_index, alignment):
return None
detected_face = self._get_detected_face(alignment)
if self._update_type == "output":
detected_face.image = image
self._save(frame_name, face_index, detected_face)
return None
media = ExtractMedia(filename, image, detected_faces=[detected_face])
media.add_frame_metadata(metadata["source"])
self._counts["update"] += 1
return media
def _input_faces(self, *args: Union[tuple, Tuple["EventQueue"]]) -> None:
""" Input pre-aligned faces to the Extractor plugin inside a thread
Parameters
@ -156,7 +224,7 @@ class Mask(): # pylint:disable=too-few-public-methods
log_once = False
logger.debug("args: %s", args)
if self._update_type != "output":
queue = args[0]
queue = cast("EventQueue", args[0])
for filename, image, metadata in tqdm(self._loader.load(), total=self._loader.count):
if not metadata: # Legacy faces. Update the headers
if not log_once:
@ -174,35 +242,14 @@ class Mask(): # pylint:disable=too-few-public-methods
logger.error("You can re-extract the face-set by using the Alignments Tool's "
"Extract job.")
break
frame_name = metadata["source"]["source_filename"]
face_index = metadata["source"]["face_index"]
alignment = self._alignments.get_faces_in_frame(frame_name)
if not alignment or face_index > len(alignment) - 1:
self._counts["skip"] += 1
logger.warning("Skipping Face not found in alignments file: '%s'", filename)
continue
alignment = alignment[face_index]
self._counts["face"] += 1
if self._check_for_missing(frame_name, face_index, alignment):
continue
detected_face = self._get_detected_face(alignment)
if self._update_type == "output":
detected_face.image = image
self._save(frame_name, face_index, detected_face)
else:
media = ExtractMedia(filename, image, detected_faces=[detected_face])
# Hacky overload of ExtractMedia's shape parameter to apply the actual original
# frame dimension
media._image_shape = (*metadata["source"]["source_frame_dims"], 3)
setattr(media, "mask_tool_face_info", metadata["source"]) # TODO formalize
media = self._process_face(filename, image, metadata)
if media is not None:
queue.put(media)
self._counts["update"] += 1
if self._update_type != "output":
queue.put("EOF")
def _input_frames(self, *args):
def _input_frames(self, *args: Union[tuple, Tuple["EventQueue"]]) -> None:
""" Input frames to the Extractor plugin inside a thread
Parameters
@ -213,7 +260,7 @@ class Mask(): # pylint:disable=too-few-public-methods
"""
logger.debug("args: %s", args)
if self._update_type != "output":
queue = args[0]
queue = cast("EventQueue", args[0])
for filename, image in tqdm(self._loader.load(), total=self._loader.count):
frame = os.path.basename(filename)
if not self._alignments.frame_exists(frame):
@ -245,7 +292,7 @@ class Mask(): # pylint:disable=too-few-public-methods
if self._update_type != "output":
queue.put("EOF")
def _check_for_missing(self, frame, idx, alignment):
def _check_for_missing(self, frame: str, idx: int, alignment: "AlignmentFileDict") -> bool:
""" Check if the alignment is missing the requested mask_type
Parameters
@ -270,7 +317,7 @@ class Mask(): # pylint:disable=too-few-public-methods
logger.debug("Mask pre-exists for face: '%s' - %s", frame, idx)
return retval
def _get_output_suffix(self, arguments):
def _get_output_suffix(self, arguments: "Namespace") -> str:
""" The filename suffix, based on selected output options.
Parameters
@ -285,11 +332,11 @@ class Mask(): # pylint:disable=too-few-public-methods
"""
sfx = "mask_preview_"
sfx += "face_" if not arguments.full_frame or self._input_is_faces else "frame_"
sfx += "{}.png".format(arguments.output_type)
sfx += f"{arguments.output_type}.png"
return sfx
@staticmethod
def _get_detected_face(alignment):
def _get_detected_face(alignment: "AlignmentFileDict") -> DetectedFace:
""" Convert an alignment dict item to a detected_face object
Parameters
@ -306,11 +353,12 @@ class Mask(): # pylint:disable=too-few-public-methods
detected_face.from_alignment(alignment)
return detected_face
def process(self):
def process(self) -> None:
""" The entry point for the Mask tool from :file:`lib.tools.cli`. Runs the Mask process """
logger.debug("Starting masker process")
updater = getattr(self, "_update_{}".format("faces" if self._input_is_faces else "frames"))
updater = getattr(self, f"_update_{'faces' if self._input_is_faces else 'frames'}")
if self._update_type != "output":
assert self._extractor is not None
if self._input_is_faces:
self._faces_saver = ImagesSaver(self._loader.location, as_bytes=True)
for extractor_output in self._extractor.detected_faces():
@ -320,6 +368,7 @@ class Mask(): # pylint:disable=too-few-public-methods
self._alignments.backup()
self._alignments.save()
if self._input_is_faces:
assert self._faces_saver is not None
self._faces_saver.close()
self._extractor_input_thread.join()
@ -337,24 +386,26 @@ class Mask(): # pylint:disable=too-few-public-methods
self._counts["update"], self._counts["face"])
logger.debug("Completed masker process")
def _update_faces(self, extractor_output):
def _update_faces(self, extractor_output: ExtractMedia) -> None:
""" Update alignments for the mask if the input type is a faces folder
If an output location has been indicated, then puts the mask preview to the save queue
Parameters
----------
extractor_output: dict
extractor_output: :class:`plugins.extract.pipeline.ExtractMedia`
The output from the :class:`plugins.extract.pipeline.Extractor` object
"""
assert self._faces_saver is not None
for face in extractor_output.detected_faces:
frame_name = extractor_output.mask_tool_face_info["source_filename"]
face_index = extractor_output.mask_tool_face_info["face_index"]
logger.trace("Saving face: (frame: %s, face index: %s)", frame_name, face_index)
frame_name = extractor_output.frame_metadata["source_filename"]
face_index = extractor_output.frame_metadata["face_index"]
logger.trace("Saving face: (frame: %s, face index: %s)", # type: ignore
frame_name, face_index)
self._alignments.update_face(frame_name, face_index, face.to_alignment())
metadata = dict(alignments=face.to_png_meta(),
source=extractor_output.mask_tool_face_info)
source=extractor_output.frame_metadata)
self._faces_saver.save(extractor_output.filename,
encode_image(extractor_output.image, ".png", metadata=metadata))
@ -362,14 +413,14 @@ class Mask(): # pylint:disable=too-few-public-methods
face.image = extractor_output.image
self._save(frame_name, face_index, face)
def _update_frames(self, extractor_output):
def _update_frames(self, extractor_output: ExtractMedia) -> None:
""" Update alignments for the mask if the input type is a frames folder or video
If an output location has been indicated, then puts the mask preview to the save queue
Parameters
----------
extractor_output: dict
extractor_output: :class:`plugins.extract.pipeline.ExtractMedia`
The output from the :class:`plugins.extract.pipeline.Extractor` object
"""
frame = os.path.basename(extractor_output.filename)
@ -379,7 +430,7 @@ class Mask(): # pylint:disable=too-few-public-methods
face.image = extractor_output.image
self._save(frame, idx, face)
def _save(self, frame, idx, detected_face):
def _save(self, frame: str, idx: int, detected_face: DetectedFace) -> None:
""" Build the mask preview image and save
Parameters
@ -391,6 +442,7 @@ class Mask(): # pylint:disable=too-few-public-methods
detected_face: `lib.FacesDetect.detected_face`
A detected_face object for a face
"""
assert self._saver is not None
if self._mask_type == "bisenet-fp":
mask_types = [f"{self._mask_type}_{area}" for area in ("face", "head")]
else:
@ -406,15 +458,14 @@ class Mask(): # pylint:disable=too-few-public-methods
if mask_type not in detected_face.mask:
# If extracting bisenet mask, then skip versions which don't exist
continue
filename = os.path.join(self._saver.location, "{}_{}_{}".format(
os.path.splitext(frame)[0],
idx,
f"{mask_type}_{self._output['suffix']}"))
filename = os.path.join(
self._saver.location,
f"{os.path.splitext(frame)[0]}_{idx}_{mask_type}_{self._output['suffix']}")
image = self._create_image(detected_face, mask_type)
logger.trace("filename: '%s', image_shape: %s", filename, image.shape)
logger.trace("filename: '%s', image_shape: %s", filename, image.shape) # type: ignore
self._saver.save(filename, image)
def _create_image(self, detected_face, mask_type):
def _create_image(self, detected_face: DetectedFace, mask_type: str) -> np.ndarray:
""" Create a mask preview image for saving out to disk
Parameters
@ -433,6 +484,7 @@ class Mask(): # pylint:disable=too-few-public-methods
- The masked face
"""
mask = detected_face.mask[mask_type]
assert detected_face.image is not None
mask.set_blur_and_threshold(**self._output["opts"])
if not self._output["full_frame"] or self._input_is_faces:
if self._input_is_faces:
@ -442,26 +494,28 @@ class Mask(): # pylint:disable=too-few-public-methods
size=detected_face.image.shape[0],
is_aligned=True).face
else:
centering = "legacy" if self._alignments.version == 1.0 else mask.stored_centering
centering: "CenteringType" = ("legacy" if self._alignments.version == 1.0
else mask.stored_centering)
detected_face.load_aligned(detected_face.image, centering=centering, force=True)
face = detected_face.aligned.face
assert face is not None
mask = cv2.resize(detected_face.mask[mask_type].mask,
(face.shape[1], face.shape[0]),
interpolation=cv2.INTER_CUBIC)[..., None]
else:
face = np.array(detected_face.image) # cv2 fails if this comes as imageio.core.Array
mask = mask.get_full_frame_mask(face.shape[1], face.shape[0])
mask = np.expand_dims(mask, -1)
imask = mask.get_full_frame_mask(face.shape[1], face.shape[0])
imask = np.expand_dims(imask, -1)
height, width = face.shape[:2]
if self._output["type"] == "combined":
masked = (face.astype("float32") * mask.astype("float32") / 255.).astype("uint8")
mask = np.tile(mask, 3)
for img in (face, masked, mask):
masked = (face.astype("float32") * imask.astype("float32") / 255.).astype("uint8")
imask = np.tile(imask, 3)
for img in (face, masked, imask):
cv2.rectangle(img, (0, 0), (width - 1, height - 1), (255, 255, 255), 1)
out_image = np.concatenate((face, masked, mask), axis=1)
out_image = np.concatenate((face, masked, imask), axis=1)
elif self._output["type"] == "mask":
out_image = mask
out_image = imask
elif self._output["type"] == "masked":
out_image = np.concatenate([face, mask], axis=-1)
out_image = np.concatenate([face, imask], axis=-1)
return out_image