mirror of
https://github.com/zebrajr/faceswap.git
synced 2025-12-06 00:20:09 +01:00
Adding plugins
This commit is contained in:
parent
0dcf192716
commit
3e2976ab03
3
.gitignore
vendored
3
.gitignore
vendored
|
|
@ -5,4 +5,5 @@
|
|||
!Dockerfile*
|
||||
!requirements*
|
||||
!lib/*.py
|
||||
!scripts
|
||||
!scripts
|
||||
!plugins
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
#!/usr/bin/env python3
|
||||
import argparse
|
||||
from scripts.extract import ExtractTrainingData
|
||||
from lib.cli import TrainingProcessor
|
||||
from scripts.train import TrainingProcessor
|
||||
from scripts.convert import ConvertImage
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
|
|||
|
|
@ -1,84 +0,0 @@
|
|||
# Face Mesh. From https://github.com/juniorxsound/Face-Align/blob/master/facemesh.py
|
||||
# Written by Or Fleisher for Data Art class taught in ITP, NYU during fall 2017 by Genevieve Hoffman.
|
||||
# Based on Leon Eckerts code from the facemesh workshop - https://github.com/leoneckert/facemash-workshop
|
||||
|
||||
import cv2
|
||||
import dlib
|
||||
import numpy as np
|
||||
|
||||
|
||||
class Aligner:
|
||||
def __init__(self, pred, detect):
|
||||
self.detector = dlib.cnn_face_detection_model_v1(detect)
|
||||
self.predictor = dlib.shape_predictor(pred)
|
||||
|
||||
def get_rects(self, img):
|
||||
rects = self.detector(img, 1)
|
||||
# print("[+] Number of faces found:", len(rects))
|
||||
return rects
|
||||
|
||||
def get_first_rect(self, img):
|
||||
rects = self.get_rects(img)
|
||||
if len(rects) > 0:
|
||||
return rects[0].rect
|
||||
else:
|
||||
return None
|
||||
|
||||
def get_landmarks(self, img, rect):
|
||||
return np.matrix([[p.x, p.y] for p in self.predictor(img, rect).parts()])
|
||||
|
||||
# https://matthewearl.github.io/2015/07/28/switching-eds-with-python/
|
||||
def transformation_from_points(self, points1, points2):
|
||||
points1 = points1.astype(np.float64)
|
||||
points2 = points2.astype(np.float64)
|
||||
|
||||
c1 = np.mean(points1, axis=0)
|
||||
c2 = np.mean(points2, axis=0)
|
||||
points1 -= c1
|
||||
points2 -= c2
|
||||
|
||||
s1 = np.std(points1)
|
||||
s2 = np.std(points2)
|
||||
points1 /= s1
|
||||
points2 /= s2
|
||||
|
||||
U, S, Vt = np.linalg.svd(points1.T * points2)
|
||||
R = (U * Vt).T
|
||||
|
||||
return np.vstack([np.hstack(((s2 / s1) * R,
|
||||
c2.T - (s2 / s1) * R * c1.T)),
|
||||
np.matrix([0., 0., 1.])])
|
||||
|
||||
def warp_im(self, im, ref, M):
|
||||
dshape = ref.shape
|
||||
output_im = ref.copy()
|
||||
translationMatrix = np.matrix([0, 0])
|
||||
cv2.warpAffine(im,
|
||||
M[:2],
|
||||
(dshape[1], dshape[0]),
|
||||
dst=output_im,
|
||||
borderMode=cv2.BORDER_TRANSPARENT,
|
||||
flags=cv2.WARP_INVERSE_MAP)
|
||||
|
||||
return output_im
|
||||
|
||||
def align(self, ref_img, img):
|
||||
# TODO optimize with a one step detection for both images
|
||||
ref_rect = self.get_first_rect(ref_img)
|
||||
if ref_rect is None:
|
||||
return None
|
||||
ref_landmarks = self.get_landmarks(ref_img, ref_rect)
|
||||
|
||||
rect = self.get_first_rect(img)
|
||||
if rect is None:
|
||||
return None
|
||||
landmarks = self.get_landmarks(img, rect)
|
||||
|
||||
transformation_matrix = self.transformation_from_points(
|
||||
ref_landmarks, landmarks)
|
||||
warped_img = self.warp_im(img, ref_img, transformation_matrix)
|
||||
|
||||
#cv2.imwrite( 'modified/_aligned.png', warped_img )
|
||||
#cv2.imwrite( 'modified/_align_ref.png', ref_img )
|
||||
#cv2.imwrite( 'modified/_align_generated.png', img )
|
||||
return warped_img
|
||||
180
lib/cli.py
180
lib/cli.py
|
|
@ -1,16 +1,8 @@
|
|||
import argparse
|
||||
import os
|
||||
import cv2
|
||||
import numpy
|
||||
import time
|
||||
|
||||
from lib.utils import get_image_paths, get_folder, load_images, stack_images
|
||||
from lib.faces_detect import crop_faces
|
||||
from lib.training_data import get_training_data
|
||||
|
||||
from lib.model import autoencoder_A, autoencoder_B
|
||||
from lib.model import encoder, decoder_A, decoder_B
|
||||
|
||||
from lib.utils import get_image_paths, get_folder, load_images
|
||||
|
||||
class FullPaths(argparse.Action):
|
||||
"""Expand user- and relative-paths"""
|
||||
|
|
@ -19,154 +11,6 @@ class FullPaths(argparse.Action):
|
|||
setattr(namespace, self.dest, os.path.abspath(
|
||||
os.path.expanduser(values)))
|
||||
|
||||
|
||||
class TrainingProcessor(object):
|
||||
arguments = None
|
||||
|
||||
def __init__(self, subparser, command, description='default'):
|
||||
self.parse_arguments(description, subparser, command)
|
||||
|
||||
def process_arguments(self, arguments):
|
||||
self.arguments = arguments
|
||||
print("Model A Directory: {}".format(self.arguments.input_A))
|
||||
print("Model B Directory: {}".format(self.arguments.input_B))
|
||||
print("Training data directory: {}".format(self.arguments.model_dir))
|
||||
print('Starting, this may take a while...')
|
||||
|
||||
try:
|
||||
encoder.load_weights(self.arguments.model_dir + '/encoder.h5')
|
||||
decoder_A.load_weights(self.arguments.model_dir + '/decoder_A.h5')
|
||||
decoder_B.load_weights(self.arguments.model_dir + '/decoder_B.h5')
|
||||
except Exception as e:
|
||||
print('Not loading existing training data.')
|
||||
print(e)
|
||||
|
||||
self.process()
|
||||
|
||||
def parse_arguments(self, description, subparser, command):
|
||||
parser = subparser.add_parser(
|
||||
command,
|
||||
help="This command trains the model for the two faces A and B.",
|
||||
description=description,
|
||||
epilog="Questions and feedback: \
|
||||
https://github.com/deepfakes/faceswap-playground"
|
||||
)
|
||||
|
||||
parser.add_argument('-A', '--input-A',
|
||||
action=FullPaths,
|
||||
dest="input_A",
|
||||
default="input_A",
|
||||
help="Input directory. A directory containing training images for face A.\
|
||||
Defaults to 'input'")
|
||||
parser.add_argument('-B', '--input-B',
|
||||
action=FullPaths,
|
||||
dest="input_B",
|
||||
default="input_B",
|
||||
help="Input directory. A directory containing training images for face B.\
|
||||
Defaults to 'input'")
|
||||
parser.add_argument('-m', '--model-dir',
|
||||
action=FullPaths,
|
||||
dest="model_dir",
|
||||
default="models",
|
||||
help="Model directory. This is where the training data will \
|
||||
be stored. Defaults to 'model'")
|
||||
parser.add_argument('-p', '--preview',
|
||||
action="store_true",
|
||||
dest="preview",
|
||||
default=False,
|
||||
help="Show preview output. If not specified, write progress \
|
||||
to file.")
|
||||
parser.add_argument('-v', '--verbose',
|
||||
action="store_true",
|
||||
dest="verbose",
|
||||
default=False,
|
||||
help="Show verbose output")
|
||||
parser.add_argument('-s', '--save-interval',
|
||||
type=int,
|
||||
dest="save_interval",
|
||||
default=100,
|
||||
help="Sets the number of iterations before saving the model.")
|
||||
parser.add_argument('-w', '--write-image',
|
||||
action="store_true",
|
||||
dest="write_image",
|
||||
default=False,
|
||||
help="Writes the training result to a file even on preview mode.")
|
||||
parser = self.add_optional_arguments(parser)
|
||||
parser.set_defaults(func=self.process_arguments)
|
||||
|
||||
def add_optional_arguments(self, parser):
|
||||
# Override this for custom arguments
|
||||
return parser
|
||||
|
||||
def save_model_weights(self):
|
||||
encoder.save_weights(self.arguments.model_dir + '/encoder.h5')
|
||||
decoder_A.save_weights(self.arguments.model_dir + '/decoder_A.h5')
|
||||
decoder_B.save_weights(self.arguments.model_dir + '/decoder_B.h5')
|
||||
print('save model weights')
|
||||
|
||||
def show_sample(self, test_A, test_B):
|
||||
figure_A = numpy.stack([
|
||||
test_A,
|
||||
autoencoder_A.predict(test_A),
|
||||
autoencoder_B.predict(test_A),
|
||||
], axis=1)
|
||||
figure_B = numpy.stack([
|
||||
test_B,
|
||||
autoencoder_B.predict(test_B),
|
||||
autoencoder_A.predict(test_B),
|
||||
], axis=1)
|
||||
|
||||
figure = numpy.concatenate([figure_A, figure_B], axis=0)
|
||||
figure = figure.reshape((4, 7) + figure.shape[1:])
|
||||
figure = stack_images(figure)
|
||||
|
||||
figure = numpy.clip(figure * 255, 0, 255).astype('uint8')
|
||||
|
||||
if self.arguments.preview is True:
|
||||
cv2.imshow('', figure)
|
||||
if not self.arguments.preview or self.arguments.write_image:
|
||||
cv2.imwrite('_sample.jpg', figure)
|
||||
|
||||
def process(self):
|
||||
images_A = get_image_paths(self.arguments.input_A)
|
||||
images_B = get_image_paths(self.arguments.input_B)
|
||||
images_A = load_images(images_A) / 255.0
|
||||
images_B = load_images(images_B) / 255.0
|
||||
|
||||
images_A += images_B.mean(axis=(0, 1, 2)) - \
|
||||
images_A.mean(axis=(0, 1, 2))
|
||||
|
||||
print('press "q" to stop training and save model')
|
||||
|
||||
BATCH_SIZE = 64
|
||||
|
||||
for epoch in range(1000000):
|
||||
if self.arguments.verbose:
|
||||
print("Iteration number {}".format(epoch + 1))
|
||||
start_time = time.time()
|
||||
warped_A, target_A = get_training_data(images_A, BATCH_SIZE)
|
||||
warped_B, target_B = get_training_data(images_B, BATCH_SIZE)
|
||||
|
||||
loss_A = autoencoder_A.train_on_batch(warped_A, target_A)
|
||||
loss_B = autoencoder_B.train_on_batch(warped_B, target_B)
|
||||
print(loss_A, loss_B)
|
||||
|
||||
if epoch % self.arguments.save_interval == 0:
|
||||
self.save_model_weights()
|
||||
self.show_sample(target_A[0:14], target_B[0:14])
|
||||
|
||||
key = cv2.waitKey(1)
|
||||
if key == ord('q'):
|
||||
self.save_model_weights()
|
||||
exit()
|
||||
if self.arguments.verbose:
|
||||
end_time = time.time()
|
||||
time_elapsed = int(round((end_time - start_time)))
|
||||
m, s = divmod(time_elapsed, 60)
|
||||
h, m = divmod(m, 60)
|
||||
print("Iteration done in {:02d}h{:02d}m{:02d}s".format(h, m, s))
|
||||
|
||||
|
||||
class DirectoryProcessor(object):
|
||||
'''
|
||||
Abstract class that processes a directory of images
|
||||
|
|
@ -214,6 +58,11 @@ class DirectoryProcessor(object):
|
|||
|
||||
self.finalize()
|
||||
|
||||
# for now, we limit this class responsability to the read of files. images and faces are processed outside this class
|
||||
def process_image(self, filename):
|
||||
# implement your image processing!
|
||||
raise NotImplementedError()
|
||||
|
||||
def parse_arguments(self, description, subparser, command):
|
||||
self.parser.add_argument('-i', '--input-dir',
|
||||
action=FullPaths,
|
||||
|
|
@ -248,23 +97,6 @@ class DirectoryProcessor(object):
|
|||
# Override this for custom arguments
|
||||
return parser
|
||||
|
||||
def process_image(self, filename):
|
||||
try:
|
||||
image = cv2.imread(filename)
|
||||
for (idx, face) in enumerate(crop_faces(image)):
|
||||
if idx > 0 and self.arguments.verbose:
|
||||
print('- Found more than one face!')
|
||||
self.verify_output = True
|
||||
|
||||
self.process_face(face, idx, filename)
|
||||
self.faces_detected = self.faces_detected + 1
|
||||
except Exception as e:
|
||||
print('Failed to extract from image: {}. Reason: {}'.format(filename, e))
|
||||
|
||||
def process_face(self, face, index, filename):
|
||||
# implement your face processing!
|
||||
raise NotImplementedError()
|
||||
|
||||
def finalize(self):
|
||||
print('-------------------------')
|
||||
print('Images found: {}'.format(self.images_found))
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
import face_recognition
|
||||
from .DetectedFace import DetectedFace
|
||||
|
||||
def crop_faces(frame):
|
||||
def detect_faces(frame):
|
||||
face_locations = face_recognition.face_locations(frame)
|
||||
#face_encodings = face_recognition.face_encodings(frame, face_locations)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,85 +0,0 @@
|
|||
import cv2
|
||||
import numpy
|
||||
import os
|
||||
|
||||
from .aligner import Aligner
|
||||
from .model import autoencoder_A
|
||||
from .model import autoencoder_B
|
||||
from .model import encoder, decoder_A, decoder_B
|
||||
|
||||
def adjust_avg_color(img_old,img_new):
|
||||
w,h,c = img_new.shape
|
||||
for i in range(img_new.shape[-1]):
|
||||
old_avg = img_old[:, :, i].mean()
|
||||
new_avg = img_new[:, :, i].mean()
|
||||
diff_int = (int)(old_avg - new_avg)
|
||||
for m in range(img_new.shape[0]):
|
||||
for n in range(img_new.shape[1]):
|
||||
temp = (img_new[m,n,i] + diff_int)
|
||||
if temp < 0:
|
||||
img_new[m,n,i] = 0
|
||||
elif temp > 255:
|
||||
img_new[m,n,i] = 255
|
||||
else:
|
||||
img_new[m,n,i] = temp
|
||||
|
||||
def smooth_mask(img_old,img_new):
|
||||
w,h,c = img_new.shape
|
||||
crop = slice(0,w)
|
||||
mask = numpy.zeros_like(img_new)
|
||||
mask[h//15:-h//15,w//15:-w//15,:] = 255
|
||||
mask = cv2.GaussianBlur(mask,(15,15),10)
|
||||
img_new[crop,crop] = mask/255*img_new + (1-mask/255)*img_old
|
||||
|
||||
def convert_one_image(image,
|
||||
model_dir="models",
|
||||
swap_model=False,
|
||||
use_aligner=False,
|
||||
use_smooth_mask=True,
|
||||
use_avg_color_adjust=True):
|
||||
face_A = '/decoder_A.h5' if not swap_model else '/decoder_B.h5'
|
||||
face_B = '/decoder_B.h5' if not swap_model else '/decoder_A.h5'
|
||||
|
||||
encoder.load_weights(model_dir + "/encoder.h5")
|
||||
decoder_A.load_weights(model_dir + face_A)
|
||||
decoder_B.load_weights(model_dir + face_B)
|
||||
|
||||
autoencoder = autoencoder_B
|
||||
|
||||
shapePredictor = "contrib/shape_predictor_68_face_landmarks.dat"
|
||||
humanFaceDetector = "contrib/mmod_human_face_detector.dat"
|
||||
if not os.path.exists(shapePredictor):
|
||||
print("{} file not found.\n"
|
||||
"Landmark file can be found in http://dlib.net/files/shape_predictor_68_face_landmarks.dat.bz2"
|
||||
"\nUnzip it in the contrib/ folder.".format(shapePredictor))
|
||||
return
|
||||
if use_aligner:
|
||||
aligner = Aligner(shapePredictor, humanFaceDetector)
|
||||
|
||||
assert image.shape == (256, 256, 3)
|
||||
crop = slice(48, 208)
|
||||
face = image[crop, crop]
|
||||
old_face = face.copy()
|
||||
|
||||
face = cv2.resize(face, (64, 64))
|
||||
face = numpy.expand_dims(face, 0)
|
||||
new_face = autoencoder.predict(face / 255.0)[0]
|
||||
new_face = numpy.clip(new_face * 255, 0, 255).astype(image.dtype)
|
||||
new_face = cv2.resize(new_face, (160, 160))
|
||||
|
||||
if use_avg_color_adjust:
|
||||
adjust_avg_color(old_face,new_face)
|
||||
if use_smooth_mask:
|
||||
smooth_mask(old_face,new_face)
|
||||
|
||||
# Aligner is not ready to use yet
|
||||
if use_aligner:
|
||||
return aligner.align(image.copy(), new_face)
|
||||
else:
|
||||
return superpose(image, new_face, crop)
|
||||
|
||||
|
||||
def superpose(image, new_face, crop):
|
||||
new_image = image.copy()
|
||||
new_image[crop, crop] = new_face
|
||||
return new_image
|
||||
|
|
@ -1,44 +0,0 @@
|
|||
import cv2
|
||||
import numpy
|
||||
|
||||
from .umeyama import umeyama
|
||||
|
||||
|
||||
def random_transform(image, rotation_range, zoom_range, shift_range, random_flip):
|
||||
h, w = image.shape[0:2]
|
||||
rotation = numpy.random.uniform(-rotation_range, rotation_range)
|
||||
scale = numpy.random.uniform(1 - zoom_range, 1 + zoom_range)
|
||||
tx = numpy.random.uniform(-shift_range, shift_range) * w
|
||||
ty = numpy.random.uniform(-shift_range, shift_range) * h
|
||||
mat = cv2.getRotationMatrix2D((w // 2, h // 2), rotation, scale)
|
||||
mat[:, 2] += (tx, ty)
|
||||
result = cv2.warpAffine(
|
||||
image, mat, (w, h), borderMode=cv2.BORDER_REPLICATE)
|
||||
if numpy.random.random() < random_flip:
|
||||
result = result[:, ::-1]
|
||||
return result
|
||||
|
||||
# get pair of random warped images from aligened face image
|
||||
|
||||
|
||||
def random_warp(image):
|
||||
assert image.shape == (256, 256, 3)
|
||||
range_ = numpy.linspace(128 - 80, 128 + 80, 5)
|
||||
mapx = numpy.broadcast_to(range_, (5, 5))
|
||||
mapy = mapx.T
|
||||
|
||||
mapx = mapx + numpy.random.normal(size=(5, 5), scale=5)
|
||||
mapy = mapy + numpy.random.normal(size=(5, 5), scale=5)
|
||||
|
||||
interp_mapx = cv2.resize(mapx, (80, 80))[8:72, 8:72].astype('float32')
|
||||
interp_mapy = cv2.resize(mapy, (80, 80))[8:72, 8:72].astype('float32')
|
||||
|
||||
warped_image = cv2.remap(image, interp_mapx, interp_mapy, cv2.INTER_LINEAR)
|
||||
|
||||
src_points = numpy.stack([mapx.ravel(), mapy.ravel()], axis=-1)
|
||||
dst_points = numpy.mgrid[0:65:16, 0:65:16].T.reshape(-1, 2)
|
||||
mat = umeyama(src_points, dst_points, True)[0:2]
|
||||
|
||||
target_image = cv2.warpAffine(image, mat, (64, 64))
|
||||
|
||||
return warped_image, target_image
|
||||
|
|
@ -4,7 +4,7 @@ from keras.layers.advanced_activations import LeakyReLU
|
|||
from keras.layers.convolutional import Conv2D
|
||||
from keras.optimizers import Adam
|
||||
|
||||
from .pixel_shuffler import PixelShuffler
|
||||
from .PixelShuffler import PixelShuffler
|
||||
|
||||
optimizer = Adam(lr=5e-5, beta_1=0.5, beta_2=0.999)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
import cv2
|
||||
import numpy
|
||||
from .image_augmentation import random_transform
|
||||
from .image_augmentation import random_warp
|
||||
|
||||
from .umeyama import umeyama
|
||||
|
||||
random_transform_args = {
|
||||
'rotation_range': 10,
|
||||
|
|
@ -9,7 +10,6 @@ random_transform_args = {
|
|||
'random_flip': 0.4,
|
||||
}
|
||||
|
||||
|
||||
def get_training_data(images, batch_size):
|
||||
indices = numpy.random.randint(len(images), size=batch_size)
|
||||
for i, index in enumerate(indices):
|
||||
|
|
@ -27,3 +27,61 @@ def get_training_data(images, batch_size):
|
|||
target_images[i] = target_img
|
||||
|
||||
return warped_images, target_images
|
||||
|
||||
def random_transform(image, rotation_range, zoom_range, shift_range, random_flip):
|
||||
h, w = image.shape[0:2]
|
||||
rotation = numpy.random.uniform(-rotation_range, rotation_range)
|
||||
scale = numpy.random.uniform(1 - zoom_range, 1 + zoom_range)
|
||||
tx = numpy.random.uniform(-shift_range, shift_range) * w
|
||||
ty = numpy.random.uniform(-shift_range, shift_range) * h
|
||||
mat = cv2.getRotationMatrix2D((w // 2, h // 2), rotation, scale)
|
||||
mat[:, 2] += (tx, ty)
|
||||
result = cv2.warpAffine(
|
||||
image, mat, (w, h), borderMode=cv2.BORDER_REPLICATE)
|
||||
if numpy.random.random() < random_flip:
|
||||
result = result[:, ::-1]
|
||||
return result
|
||||
|
||||
# get pair of random warped images from aligened face image
|
||||
|
||||
def random_warp(image):
|
||||
assert image.shape == (256, 256, 3)
|
||||
range_ = numpy.linspace(128 - 80, 128 + 80, 5)
|
||||
mapx = numpy.broadcast_to(range_, (5, 5))
|
||||
mapy = mapx.T
|
||||
|
||||
mapx = mapx + numpy.random.normal(size=(5, 5), scale=5)
|
||||
mapy = mapy + numpy.random.normal(size=(5, 5), scale=5)
|
||||
|
||||
interp_mapx = cv2.resize(mapx, (80, 80))[8:72, 8:72].astype('float32')
|
||||
interp_mapy = cv2.resize(mapy, (80, 80))[8:72, 8:72].astype('float32')
|
||||
|
||||
warped_image = cv2.remap(image, interp_mapx, interp_mapy, cv2.INTER_LINEAR)
|
||||
|
||||
src_points = numpy.stack([mapx.ravel(), mapy.ravel()], axis=-1)
|
||||
dst_points = numpy.mgrid[0:65:16, 0:65:16].T.reshape(-1, 2)
|
||||
mat = umeyama(src_points, dst_points, True)[0:2]
|
||||
|
||||
target_image = cv2.warpAffine(image, mat, (64, 64))
|
||||
|
||||
return warped_image, target_image
|
||||
|
||||
|
||||
def get_transpose_axes(n):
|
||||
if n % 2 == 0:
|
||||
y_axes = list(range(1, n - 1, 2))
|
||||
x_axes = list(range(0, n - 1, 2))
|
||||
else:
|
||||
y_axes = list(range(0, n - 1, 2))
|
||||
x_axes = list(range(1, n - 1, 2))
|
||||
return y_axes, x_axes, [n - 1]
|
||||
|
||||
|
||||
def stack_images(images):
|
||||
images_shape = numpy.array(images.shape)
|
||||
new_axes = get_transpose_axes(len(images_shape))
|
||||
new_shape = [numpy.prod(images_shape[x]) for x in new_axes]
|
||||
return numpy.transpose(
|
||||
images,
|
||||
axes=numpy.concatenate(new_axes)
|
||||
).reshape(new_shape)
|
||||
|
|
|
|||
20
lib/utils.py
20
lib/utils.py
|
|
@ -24,23 +24,3 @@ def load_images(image_paths, convert=None):
|
|||
all_images = numpy.empty((len(image_paths), ) + image.shape, dtype=image.dtype)
|
||||
all_images[i] = image
|
||||
return all_images
|
||||
|
||||
|
||||
def get_transpose_axes(n):
|
||||
if n % 2 == 0:
|
||||
y_axes = list(range(1, n - 1, 2))
|
||||
x_axes = list(range(0, n - 1, 2))
|
||||
else:
|
||||
y_axes = list(range(0, n - 1, 2))
|
||||
x_axes = list(range(1, n - 1, 2))
|
||||
return y_axes, x_axes, [n - 1]
|
||||
|
||||
|
||||
def stack_images(images):
|
||||
images_shape = numpy.array(images.shape)
|
||||
new_axes = get_transpose_axes(len(images_shape))
|
||||
new_shape = [numpy.prod(images_shape[x]) for x in new_axes]
|
||||
return numpy.transpose(
|
||||
images,
|
||||
axes=numpy.concatenate(new_axes)
|
||||
).reshape(new_shape)
|
||||
|
|
|
|||
71
plugins/Convert_Adjust.py
Normal file
71
plugins/Convert_Adjust.py
Normal file
|
|
@ -0,0 +1,71 @@
|
|||
import cv2
|
||||
import numpy
|
||||
import os
|
||||
|
||||
from lib.model import autoencoder_A
|
||||
from lib.model import autoencoder_B
|
||||
from lib.model import encoder, decoder_A, decoder_B
|
||||
|
||||
class Convert(object):
|
||||
def __init__(self,
|
||||
model_dir="models",
|
||||
swap_model=False,
|
||||
use_aligner=False):
|
||||
face_A = '/decoder_A.h5' if not swap_model else '/decoder_B.h5'
|
||||
face_B = '/decoder_B.h5' if not swap_model else '/decoder_A.h5'
|
||||
|
||||
encoder.load_weights(model_dir + "/encoder.h5")
|
||||
decoder_A.load_weights(model_dir + face_A)
|
||||
decoder_B.load_weights(model_dir + face_B)
|
||||
|
||||
self.autoencoder = autoencoder_B
|
||||
|
||||
def convert_one_image(self,image,
|
||||
use_smooth_mask=True,
|
||||
use_avg_color_adjust=True):
|
||||
assert image.shape == (256, 256, 3)
|
||||
crop = slice(48, 208)
|
||||
face = image[crop, crop]
|
||||
old_face = face.copy()
|
||||
|
||||
face = cv2.resize(face, (64, 64))
|
||||
face = numpy.expand_dims(face, 0)
|
||||
new_face = self.autoencoder.predict(face / 255.0)[0]
|
||||
new_face = numpy.clip(new_face * 255, 0, 255).astype(image.dtype)
|
||||
new_face = cv2.resize(new_face, (160, 160))
|
||||
|
||||
if use_avg_color_adjust:
|
||||
self.adjust_avg_color(old_face,new_face)
|
||||
if use_smooth_mask:
|
||||
self.smooth_mask(old_face,new_face)
|
||||
|
||||
return self.superpose(image, new_face, crop)
|
||||
|
||||
def adjust_avg_color(self,img_old,img_new):
|
||||
w,h,c = img_new.shape
|
||||
for i in range(img_new.shape[-1]):
|
||||
old_avg = img_old[:, :, i].mean()
|
||||
new_avg = img_new[:, :, i].mean()
|
||||
diff_int = (int)(old_avg - new_avg)
|
||||
for m in range(img_new.shape[0]):
|
||||
for n in range(img_new.shape[1]):
|
||||
temp = (img_new[m,n,i] + diff_int)
|
||||
if temp < 0:
|
||||
img_new[m,n,i] = 0
|
||||
elif temp > 255:
|
||||
img_new[m,n,i] = 255
|
||||
else:
|
||||
img_new[m,n,i] = temp
|
||||
|
||||
def smooth_mask(self,img_old,img_new):
|
||||
w,h,c = img_new.shape
|
||||
crop = slice(0,w)
|
||||
mask = numpy.zeros_like(img_new)
|
||||
mask[h//15:-h//15,w//15:-w//15,:] = 255
|
||||
mask = cv2.GaussianBlur(mask,(15,15),10)
|
||||
img_new[crop,crop] = mask/255*img_new + (1-mask/255)*img_old
|
||||
|
||||
def superpose(self,image, new_face, crop):
|
||||
new_image = image.copy()
|
||||
new_image[crop, crop] = new_face
|
||||
return new_image
|
||||
5
plugins/Extract_Crop.py
Normal file
5
plugins/Extract_Crop.py
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
import cv2
|
||||
|
||||
class Extract(object):
|
||||
def extract(self, image, face, size):
|
||||
return cv2.resize(face.image, size)
|
||||
|
|
@ -1,9 +1,9 @@
|
|||
import cv2
|
||||
from lib.cli import DirectoryProcessor, FullPaths
|
||||
from pathlib import Path
|
||||
from lib.faces_process import convert_one_image
|
||||
from lib.faces_detect import crop_faces
|
||||
from lib.faces_detect import detect_faces
|
||||
|
||||
from plugins.Convert_Adjust import Convert
|
||||
|
||||
class ConvertImage(DirectoryProcessor):
|
||||
filename = ''
|
||||
|
|
@ -32,13 +32,14 @@ class ConvertImage(DirectoryProcessor):
|
|||
|
||||
def process_image(self, filename):
|
||||
try:
|
||||
converter = Convert(self.arguments.model_dir, self.arguments.swap_model)
|
||||
image = cv2.imread(filename)
|
||||
for (idx, face) in enumerate(crop_faces(image)):
|
||||
for (idx, face) in enumerate(detect_faces(image)):
|
||||
if idx > 0 and self.arguments.verbose:
|
||||
print('- Found more than one face!')
|
||||
self.verify_output = True
|
||||
|
||||
new_face = convert_one_image(cv2.resize(face.image, (256, 256)), self.arguments.model_dir, self.arguments.swap_model)
|
||||
new_face = converter.convert_one_image(cv2.resize(face.image, (256, 256)))
|
||||
image[slice(face.y, face.y + face.h), slice(face.x, face.x + face.w)] = cv2.resize(new_face, (face.w, face.h))
|
||||
self.faces_detected = self.faces_detected + 1
|
||||
output_file = self.output_dir / Path(filename).name
|
||||
|
|
|
|||
|
|
@ -1,7 +1,9 @@
|
|||
import cv2
|
||||
from lib.cli import DirectoryProcessor
|
||||
from pathlib import Path
|
||||
|
||||
from pathlib import Path
|
||||
from lib.cli import DirectoryProcessor
|
||||
from lib.faces_detect import detect_faces
|
||||
from plugins.Extract_Crop import Extract
|
||||
|
||||
class ExtractTrainingData(DirectoryProcessor):
|
||||
def create_parser(self, subparser, command, description):
|
||||
|
|
@ -13,8 +15,18 @@ class ExtractTrainingData(DirectoryProcessor):
|
|||
https://github.com/deepfakes/faceswap-playground"
|
||||
)
|
||||
|
||||
def process_face(self, face, index, filename):
|
||||
resized_image = cv2.resize(face.image, (256, 256))
|
||||
output_file = self.output_dir / Path(filename).stem
|
||||
cv2.imwrite(str(output_file) + str(index) + Path(filename).suffix,
|
||||
resized_image)
|
||||
def process_image(self, filename):
|
||||
extractor = Extract()
|
||||
try:
|
||||
image = cv2.imread(filename)
|
||||
for (idx, face) in enumerate(detect_faces(image)):
|
||||
if idx > 0 and self.arguments.verbose:
|
||||
print('- Found more than one face!')
|
||||
self.verify_output = True
|
||||
|
||||
resized_image = extractor.extract(image, face, (256, 256))
|
||||
output_file = self.output_dir / Path(filename).stem
|
||||
cv2.imwrite(str(output_file) + str(idx) + Path(filename).suffix, resized_image)
|
||||
self.faces_detected = self.faces_detected + 1
|
||||
except Exception as e:
|
||||
print('Failed to extract from image: {}. Reason: {}'.format(filename, e))
|
||||
|
|
|
|||
155
scripts/train.py
Normal file
155
scripts/train.py
Normal file
|
|
@ -0,0 +1,155 @@
|
|||
import cv2
|
||||
import numpy
|
||||
import time
|
||||
|
||||
from lib.training_data import get_training_data, stack_images
|
||||
from lib.utils import get_image_paths, get_folder, load_images
|
||||
|
||||
from lib.model import autoencoder_A, autoencoder_B
|
||||
from lib.model import encoder, decoder_A, decoder_B
|
||||
from lib.cli import FullPaths
|
||||
|
||||
class TrainingProcessor(object):
|
||||
arguments = None
|
||||
|
||||
def __init__(self, subparser, command, description='default'):
|
||||
self.parse_arguments(description, subparser, command)
|
||||
|
||||
def process_arguments(self, arguments):
|
||||
self.arguments = arguments
|
||||
print("Model A Directory: {}".format(self.arguments.input_A))
|
||||
print("Model B Directory: {}".format(self.arguments.input_B))
|
||||
print("Training data directory: {}".format(self.arguments.model_dir))
|
||||
|
||||
try:
|
||||
encoder.load_weights(self.arguments.model_dir + '/encoder.h5')
|
||||
decoder_A.load_weights(self.arguments.model_dir + '/decoder_A.h5')
|
||||
decoder_B.load_weights(self.arguments.model_dir + '/decoder_B.h5')
|
||||
except Exception as e:
|
||||
print('Not loading existing training data.')
|
||||
print(e)
|
||||
|
||||
self.process()
|
||||
|
||||
def parse_arguments(self, description, subparser, command):
|
||||
parser = subparser.add_parser(
|
||||
command,
|
||||
help="This command trains the model for the two faces A and B.",
|
||||
description=description,
|
||||
epilog="Questions and feedback: \
|
||||
https://github.com/deepfakes/faceswap-playground"
|
||||
)
|
||||
|
||||
parser.add_argument('-A', '--input-A',
|
||||
action=FullPaths,
|
||||
dest="input_A",
|
||||
default="input_A",
|
||||
help="Input directory. A directory containing training images for face A.\
|
||||
Defaults to 'input'")
|
||||
parser.add_argument('-B', '--input-B',
|
||||
action=FullPaths,
|
||||
dest="input_B",
|
||||
default="input_B",
|
||||
help="Input directory. A directory containing training images for face B.\
|
||||
Defaults to 'input'")
|
||||
parser.add_argument('-m', '--model-dir',
|
||||
action=FullPaths,
|
||||
dest="model_dir",
|
||||
default="models",
|
||||
help="Model directory. This is where the training data will \
|
||||
be stored. Defaults to 'model'")
|
||||
parser.add_argument('-p', '--preview',
|
||||
action="store_true",
|
||||
dest="preview",
|
||||
default=False,
|
||||
help="Show preview output. If not specified, write progress \
|
||||
to file.")
|
||||
parser.add_argument('-v', '--verbose',
|
||||
action="store_true",
|
||||
dest="verbose",
|
||||
default=False,
|
||||
help="Show verbose output")
|
||||
parser.add_argument('-s', '--save-interval',
|
||||
type=int,
|
||||
dest="save_interval",
|
||||
default=100,
|
||||
help="Sets the number of iterations before saving the model.")
|
||||
parser.add_argument('-w', '--write-image',
|
||||
action="store_true",
|
||||
dest="write_image",
|
||||
default=False,
|
||||
help="Writes the training result to a file even on preview mode.")
|
||||
parser = self.add_optional_arguments(parser)
|
||||
parser.set_defaults(func=self.process_arguments)
|
||||
|
||||
def add_optional_arguments(self, parser):
|
||||
# Override this for custom arguments
|
||||
return parser
|
||||
|
||||
def save_model_weights(self):
|
||||
encoder.save_weights(self.arguments.model_dir + '/encoder.h5')
|
||||
decoder_A.save_weights(self.arguments.model_dir + '/decoder_A.h5')
|
||||
decoder_B.save_weights(self.arguments.model_dir + '/decoder_B.h5')
|
||||
print('save model weights')
|
||||
|
||||
def show_sample(self, test_A, test_B):
|
||||
figure_A = numpy.stack([
|
||||
test_A,
|
||||
autoencoder_A.predict(test_A),
|
||||
autoencoder_B.predict(test_A),
|
||||
], axis=1)
|
||||
figure_B = numpy.stack([
|
||||
test_B,
|
||||
autoencoder_B.predict(test_B),
|
||||
autoencoder_A.predict(test_B),
|
||||
], axis=1)
|
||||
|
||||
figure = numpy.concatenate([figure_A, figure_B], axis=0)
|
||||
figure = figure.reshape((4, 7) + figure.shape[1:])
|
||||
figure = stack_images(figure)
|
||||
|
||||
figure = numpy.clip(figure * 255, 0, 255).astype('uint8')
|
||||
|
||||
if self.arguments.preview is True:
|
||||
cv2.imshow('', figure)
|
||||
if not self.arguments.preview or self.arguments.write_image:
|
||||
cv2.imwrite('_sample.jpg', figure)
|
||||
|
||||
def process(self):
|
||||
print('Starting, this may take a while...')
|
||||
images_A = get_image_paths(self.arguments.input_A)
|
||||
images_B = get_image_paths(self.arguments.input_B)
|
||||
images_A = load_images(images_A) / 255.0
|
||||
images_B = load_images(images_B) / 255.0
|
||||
|
||||
images_A += images_B.mean(axis=(0, 1, 2)) - images_A.mean(axis=(0, 1, 2))
|
||||
|
||||
print('press "q" to stop training and save model')
|
||||
|
||||
BATCH_SIZE = 64
|
||||
|
||||
for epoch in range(1000000):
|
||||
if self.arguments.verbose:
|
||||
print("Iteration number {}".format(epoch + 1))
|
||||
start_time = time.time()
|
||||
warped_A, target_A = get_training_data(images_A, BATCH_SIZE)
|
||||
warped_B, target_B = get_training_data(images_B, BATCH_SIZE)
|
||||
|
||||
loss_A = autoencoder_A.train_on_batch(warped_A, target_A)
|
||||
loss_B = autoencoder_B.train_on_batch(warped_B, target_B)
|
||||
print(loss_A, loss_B)
|
||||
|
||||
if epoch % self.arguments.save_interval == 0:
|
||||
self.save_model_weights()
|
||||
self.show_sample(target_A[0:14], target_B[0:14])
|
||||
|
||||
key = cv2.waitKey(1)
|
||||
if key == ord('q'):
|
||||
self.save_model_weights()
|
||||
exit()
|
||||
if self.arguments.verbose:
|
||||
end_time = time.time()
|
||||
time_elapsed = int(round((end_time - start_time)))
|
||||
m, s = divmod(time_elapsed, 60)
|
||||
h, m = divmod(m, 60)
|
||||
print("Iteration done in {:02d}h{:02d}m{:02d}s".format(h, m, s))
|
||||
Loading…
Reference in New Issue
Block a user