mirror of
https://github.com/zebrajr/faceswap.git
synced 2025-12-06 12:20:27 +01:00
GUI: Preview for convert
Display preview when outputting to a stream container (e.g. video, gif)
This commit is contained in:
parent
ce45f01e4a
commit
a757a4aa88
149
lib/gui/utils.py
149
lib/gui/utils.py
|
|
@ -233,6 +233,10 @@ class Images():
|
|||
self.pathoutput = None
|
||||
self.previewoutput = None
|
||||
self.previewtrain = dict()
|
||||
self.previewcache = dict(modified=None, # cache for extract and convert
|
||||
images=None,
|
||||
filenames=list(),
|
||||
placeholder=None)
|
||||
self.errcount = 0
|
||||
self.icons = dict()
|
||||
self.icons["folder"] = ImageTk.PhotoImage(file=os.path.join(
|
||||
|
|
@ -260,6 +264,13 @@ class Images():
|
|||
fullitem = os.path.join(self.pathpreview, item)
|
||||
logger.debug("Deleting: '%s'", fullitem)
|
||||
os.remove(fullitem)
|
||||
for fname in self.previewcache["filenames"]:
|
||||
if os.path.basename(fname) == ".gui_preview.jpg":
|
||||
logger.debug("Deleting: '%s'", fname)
|
||||
try:
|
||||
os.remove(fname)
|
||||
except FileNotFoundError:
|
||||
logger.debug("File does not exist: %s", fname)
|
||||
self.clear_image_cache()
|
||||
|
||||
def clear_image_cache(self):
|
||||
|
|
@ -268,49 +279,111 @@ class Images():
|
|||
self.pathoutput = None
|
||||
self.previewoutput = None
|
||||
self.previewtrain = dict()
|
||||
self.previewcache = dict(modified=None, # cache for extract and convert
|
||||
images=None,
|
||||
filenames=list(),
|
||||
placeholder=None)
|
||||
|
||||
@staticmethod
|
||||
def get_images(imgpath):
|
||||
""" Get the images stored within the given directory """
|
||||
logger.trace("Getting images: '%s'", imgpath)
|
||||
logger.debug("Getting images: '%s'", imgpath)
|
||||
if not os.path.isdir(imgpath):
|
||||
logger.debug("Folder does not exist")
|
||||
return None
|
||||
files = [os.path.join(imgpath, f)
|
||||
for f in os.listdir(imgpath) if f.lower().endswith((".png", ".jpg"))]
|
||||
logger.trace("Image files: %s", files)
|
||||
logger.debug("Image files: %s", files)
|
||||
return files
|
||||
|
||||
def load_latest_preview(self, thumbnail_size, frame_dims):
|
||||
""" Load the latest preview image for extract and convert """
|
||||
logger.trace("Loading preview image: (thumbnail_size: %s, frame_dims: %s)",
|
||||
logger.debug("Loading preview image: (thumbnail_size: %s, frame_dims: %s)",
|
||||
thumbnail_size, frame_dims)
|
||||
imagefiles = self.get_images(self.pathoutput)
|
||||
if not imagefiles or len(imagefiles) == 1:
|
||||
gui_preview = os.path.join(self.pathoutput, ".gui_preview.jpg")
|
||||
if not imagefiles or (len(imagefiles) == 1 and gui_preview not in imagefiles):
|
||||
logger.debug("No preview to display")
|
||||
self.previewoutput = None
|
||||
# self.previewoutput = None
|
||||
return
|
||||
logger.trace("Image Files: %s", len(imagefiles))
|
||||
num_images = (frame_dims[0] // thumbnail_size) * (frame_dims[1] // thumbnail_size)
|
||||
if num_images > len(imagefiles):
|
||||
logger.debug("Not enough images to generate display. (to display: %s, available "
|
||||
"images: %s)", num_images, len(imagefiles))
|
||||
self.previewoutput = None
|
||||
# Filter to just the gui_preview if it exists in folder output
|
||||
imagefiles = [gui_preview] if gui_preview in imagefiles else imagefiles
|
||||
logger.debug("Image Files: %s", len(imagefiles))
|
||||
|
||||
imagefiles = self.get_newest_filenames(imagefiles)
|
||||
if not imagefiles:
|
||||
return
|
||||
filenames, samples = self.get_preview_samples(imagefiles, num_images, thumbnail_size)
|
||||
show_image = self.place_previews(samples, frame_dims)
|
||||
logger.trace("Displaying preview: '%s'", filenames)
|
||||
|
||||
self.load_images_to_cache(imagefiles, frame_dims, thumbnail_size)
|
||||
if imagefiles == [gui_preview]:
|
||||
# Delete the preview image so that the main scripts know to output another
|
||||
logger.debug("Deleting preview image")
|
||||
os.remove(imagefiles[0])
|
||||
show_image = self.place_previews(frame_dims)
|
||||
logger.debug("Displaying preview: '%s'", self.previewcache["filenames"])
|
||||
self.previewoutput = (show_image, ImageTk.PhotoImage(show_image))
|
||||
|
||||
def get_newest_filenames(self, imagefiles):
|
||||
""" Return image filenames that have been modified since the last check """
|
||||
if self.previewcache["modified"] is None:
|
||||
retval = imagefiles
|
||||
else:
|
||||
retval = [fname for fname in imagefiles
|
||||
if os.path.getmtime(fname) > self.previewcache["modified"]]
|
||||
if not retval:
|
||||
logger.debug("No new images in output folder")
|
||||
else:
|
||||
self.previewcache["modified"] = max([os.path.getmtime(img) for img in retval])
|
||||
logger.debug("Number new images: %s, Last Modified: %s",
|
||||
len(retval), self.previewcache["modified"])
|
||||
return retval
|
||||
|
||||
def load_images_to_cache(self, imagefiles, frame_dims, thumbnail_size):
|
||||
""" Load new images and append to cache, filtering to the number of display images """
|
||||
logger.debug("Number imagefiles: %s, frame_dims: %s, thumbnail_size: %s",
|
||||
len(imagefiles), frame_dims, thumbnail_size)
|
||||
num_images = (frame_dims[0] // thumbnail_size) * (frame_dims[1] // thumbnail_size)
|
||||
logger.debug("num_images: %s", num_images)
|
||||
samples = list()
|
||||
start_idx = len(imagefiles) - num_images if len(imagefiles) > num_images else 0
|
||||
show_files = sorted(imagefiles, key=os.path.getctime)[start_idx:]
|
||||
for fname in show_files:
|
||||
img = Image.open(fname)
|
||||
width, height = img.size
|
||||
scaling = thumbnail_size / max(width, height)
|
||||
logger.debug("image width: %s, height: %s, scaling: %s", width, height, scaling)
|
||||
img = img.resize((int(width * scaling), int(height * scaling)))
|
||||
if img.size[0] != img.size[1]:
|
||||
# Pad to square
|
||||
new_img = Image.new("RGB", (thumbnail_size, thumbnail_size))
|
||||
new_img.paste(img, ((thumbnail_size - img.size[0])//2,
|
||||
(thumbnail_size - img.size[1])//2))
|
||||
img = new_img
|
||||
draw = ImageDraw.Draw(img)
|
||||
draw.rectangle(((0, 0), (thumbnail_size, thumbnail_size)), outline="#E5E5E5", width=1)
|
||||
samples.append(np.array(img))
|
||||
samples = np.array(samples)
|
||||
self.previewcache["filenames"] = (self.previewcache["filenames"] +
|
||||
show_files)[-num_images:]
|
||||
cache = self.previewcache["images"]
|
||||
if cache is None:
|
||||
logger.debug("Creating new cache")
|
||||
cache = samples[-num_images:]
|
||||
else:
|
||||
logger.debug("Appending to existing cache")
|
||||
cache = np.concatenate((cache, samples))[-num_images:]
|
||||
self.previewcache["images"] = cache
|
||||
logger.debug("Cache shape: %s", self.previewcache["images"].shape)
|
||||
|
||||
@staticmethod
|
||||
def get_preview_samples(imagefiles, num_images, thumbnail_size):
|
||||
""" Return a subset of the imagefiles images
|
||||
Exclude final file so we don't accidentally load a file that is being saved """
|
||||
logger.trace("num_images: %s", num_images)
|
||||
logger.debug("num_images: %s", num_images)
|
||||
samples = list()
|
||||
start_idx = len(imagefiles) - (num_images + 1)
|
||||
end_idx = len(imagefiles) - 1
|
||||
logger.trace("start_idx: %s, end_idx: %s", start_idx, end_idx)
|
||||
logger.debug("start_idx: %s, end_idx: %s", start_idx, end_idx)
|
||||
show_files = sorted(imagefiles, key=os.path.getctime)[start_idx: end_idx]
|
||||
for fname in show_files:
|
||||
img = Image.open(fname)
|
||||
|
|
@ -328,24 +401,44 @@ class Images():
|
|||
draw.rectangle(((0, 0), (thumbnail_size, thumbnail_size)), outline="#E5E5E5", width=1)
|
||||
samples.append(np.array(img))
|
||||
samples = np.array(samples)
|
||||
logger.trace("Samples shape: %s", samples.shape)
|
||||
logger.debug("Samples shape: %s", samples.shape)
|
||||
return show_files, samples
|
||||
|
||||
@staticmethod
|
||||
def place_previews(samples, frame_dims):
|
||||
def place_previews(self, frame_dims):
|
||||
""" Stack the preview images to fit display """
|
||||
samples = self.previewcache["images"].copy()
|
||||
num_images, thumbnail_size = samples.shape[:2]
|
||||
logger.trace("num_images: %s, thumbnail_size: %s", num_images, thumbnail_size)
|
||||
if self.previewcache["placeholder"] is None:
|
||||
self.create_placeholder(thumbnail_size)
|
||||
|
||||
logger.debug("num_images: %s, thumbnail_size: %s", num_images, thumbnail_size)
|
||||
cols, rows = frame_dims[0] // thumbnail_size, frame_dims[1] // thumbnail_size
|
||||
logger.trace("cols: %s, rows: %s", cols, rows)
|
||||
logger.debug("cols: %s, rows: %s", cols, rows)
|
||||
remainder = (cols * rows) % num_images
|
||||
if remainder != 0:
|
||||
logger.debug("Padding final row. Remainder: %s", remainder)
|
||||
placeholder = np.concatenate([np.expand_dims(self.previewcache["placeholder"],
|
||||
0)] * remainder)
|
||||
samples = np.concatenate((samples, placeholder))
|
||||
display = np.vstack([np.hstack(samples[row * cols: (row + 1) * cols])
|
||||
for row in range(rows)])
|
||||
logger.trace("display shape: %s", display.shape)
|
||||
logger.debug("display shape: %s", display.shape)
|
||||
return Image.fromarray(display)
|
||||
|
||||
def create_placeholder(self, thumbnail_size):
|
||||
""" Create a placeholder image for when there are fewer samples available
|
||||
then columns to display them """
|
||||
logger.debug("Creating placeholder. thumbnail_size: %s", thumbnail_size)
|
||||
placeholder = Image.new("RGB", (thumbnail_size, thumbnail_size))
|
||||
draw = ImageDraw.Draw(placeholder)
|
||||
draw.rectangle(((0, 0), (thumbnail_size, thumbnail_size)), outline="#E5E5E5", width=1)
|
||||
placeholder = np.array(placeholder)
|
||||
self.previewcache["placeholder"] = placeholder
|
||||
logger.debug("Created placeholder. shape: %s", placeholder.shape)
|
||||
|
||||
def load_training_preview(self):
|
||||
""" Load the training preview images """
|
||||
logger.trace("Loading Training preview images")
|
||||
logger.debug("Loading Training preview images")
|
||||
imagefiles = self.get_images(self.pathpreview)
|
||||
modified = None
|
||||
if not imagefiles:
|
||||
|
|
@ -358,7 +451,7 @@ class Images():
|
|||
name = os.path.splitext(name)[0]
|
||||
name = name[name.rfind("_") + 1:].title()
|
||||
try:
|
||||
logger.trace("Displaying preview: '%s'", img)
|
||||
logger.debug("Displaying preview: '%s'", img)
|
||||
size = self.get_current_size(name)
|
||||
self.previewtrain[name] = [Image.open(img), None, modified]
|
||||
self.resize_image(name, size)
|
||||
|
|
@ -378,20 +471,20 @@ class Images():
|
|||
|
||||
def get_current_size(self, name):
|
||||
""" Return the size of the currently displayed image """
|
||||
logger.trace("Getting size: '%s'", name)
|
||||
logger.debug("Getting size: '%s'", name)
|
||||
if not self.previewtrain.get(name, None):
|
||||
return None
|
||||
img = self.previewtrain[name][1]
|
||||
if not img:
|
||||
return None
|
||||
logger.trace("Got size: (name: '%s', width: '%s', height: '%s')",
|
||||
logger.debug("Got size: (name: '%s', width: '%s', height: '%s')",
|
||||
name, img.width(), img.height())
|
||||
return img.width(), img.height()
|
||||
|
||||
def resize_image(self, name, framesize):
|
||||
""" Resize the training preview image
|
||||
based on the passed in frame size """
|
||||
logger.trace("Resizing image: (name: '%s', framesize: %s", name, framesize)
|
||||
logger.debug("Resizing image: (name: '%s', framesize: %s", name, framesize)
|
||||
displayimg = self.previewtrain[name][0]
|
||||
if framesize:
|
||||
frameratio = float(framesize[0]) / float(framesize[1])
|
||||
|
|
@ -403,7 +496,7 @@ class Images():
|
|||
else:
|
||||
scale = framesize[1] / float(displayimg.size[1])
|
||||
size = (int(displayimg.size[0] * scale), framesize[1])
|
||||
logger.trace("Scaling: (scale: %s, size: %s", scale, size)
|
||||
logger.debug("Scaling: (scale: %s, size: %s", scale, size)
|
||||
|
||||
# Hacky fix to force a reload if it happens to find corrupted
|
||||
# data, probably due to reading the image whilst it is partially
|
||||
|
|
|
|||
|
|
@ -30,6 +30,14 @@ class Output():
|
|||
self.cache = dict() # Cache for when frames must be written in correct order
|
||||
logger.debug("Initialized %s", self.__class__.__name__)
|
||||
|
||||
@property
|
||||
def is_stream(self):
|
||||
""" Return whether the writer is a stream or images
|
||||
Writers that write to a stream have a frame_order paramater to dictate
|
||||
the order in which frames should be written out (eg. gif/ffmpeg) """
|
||||
retval = hasattr(self, "frame_order")
|
||||
return retval
|
||||
|
||||
def output_filename(self, filename):
|
||||
""" Return the output filename with the correct folder and extension
|
||||
NB: The plugin must have a config item 'format' that contains the
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import os
|
|||
import sys
|
||||
from threading import Event
|
||||
|
||||
from cv2 import imwrite # pylint:disable=no-name-in-module
|
||||
import numpy as np
|
||||
from tqdm import tqdm
|
||||
|
||||
|
|
@ -399,7 +400,10 @@ class DiskIO():
|
|||
def save(self, completion_event):
|
||||
""" Save the converted images """
|
||||
logger.debug("Save Images: Start")
|
||||
for _ in tqdm(range(self.total_count), desc="Converting", file=sys.stdout):
|
||||
write_preview = self.args.redirect_gui and self.writer.is_stream
|
||||
preview_image = os.path.join(self.writer.output_folder, ".gui_preview.jpg")
|
||||
logger.debug("Write preview for gui: %s", write_preview)
|
||||
for idx in tqdm(range(self.total_count), desc="Converting", file=sys.stdout):
|
||||
if self.save_queue.shutdown.is_set():
|
||||
logger.debug("Save Queue: Stop signal received. Terminating")
|
||||
break
|
||||
|
|
@ -408,6 +412,10 @@ class DiskIO():
|
|||
logger.debug("EOF Received")
|
||||
break
|
||||
filename, image = item
|
||||
# Write out preview image for the GUI every 10 frames if writing to stream
|
||||
if write_preview and idx % 10 == 0 and not os.path.exists(preview_image):
|
||||
logger.debug("Writing GUI Preview image: '%s'", preview_image)
|
||||
imwrite(preview_image, image)
|
||||
self.writer.write(filename, image)
|
||||
self.writer.close()
|
||||
completion_event.set()
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user