diff --git a/modules/custom_types.py b/modules/custom_types.py new file mode 100644 index 0000000..3f41b7f --- /dev/null +++ b/modules/custom_types.py @@ -0,0 +1,7 @@ +from typing import Any + +from insightface.app.common import Face +import numpy + +Face = Face +Frame = numpy.ndarray[Any, Any] \ No newline at end of file diff --git a/modules/globals.py b/modules/globals.py index 564fe7d..aad50f0 100644 --- a/modules/globals.py +++ b/modules/globals.py @@ -1,3 +1,5 @@ +# --- START OF FILE globals.py --- + import os from typing import List, Dict, Any @@ -9,35 +11,61 @@ file_types = [ ("Video", ("*.mp4", "*.mkv")), ] -source_target_map = [] -simple_map = {} +# Face Mapping Data +souce_target_map: List[Dict[str, Any]] = [] # Stores detailed map for image/video processing +simple_map: Dict[str, Any] = {} # Stores simplified map (embeddings/faces) for live/simple mode -source_path = None -target_path = None -output_path = None +# Paths +source_path: str | None = None +target_path: str | None = None +output_path: str | None = None + +# Processing Options frame_processors: List[str] = [] -keep_fps = True -keep_audio = True -keep_frames = False -many_faces = False -map_faces = False -color_correction = False # New global variable for color correction toggle -nsfw_filter = False -video_encoder = None -video_quality = None -live_mirror = False -live_resizable = True -max_memory = None -execution_providers: List[str] = [] -execution_threads = None -headless = None -log_level = "error" +keep_fps: bool = True +keep_audio: bool = True +keep_frames: bool = False +many_faces: bool = False # Process all detected faces with default source +map_faces: bool = False # Use souce_target_map or simple_map for specific swaps +color_correction: bool = False # Enable color correction (implementation specific) +nsfw_filter: bool = False + +# Video Output Options +video_encoder: str | None = None +video_quality: int | None = None # Typically a CRF value or bitrate + +# Live Mode Options +live_mirror: bool = False +live_resizable: bool = True +camera_input_combobox: Any | None = None # Placeholder for UI element if needed +webcam_preview_running: bool = False +show_fps: bool = False + +# System Configuration +max_memory: int | None = None # Memory limit in GB? (Needs clarification) +execution_providers: List[str] = [] # e.g., ['CUDAExecutionProvider', 'CPUExecutionProvider'] +execution_threads: int | None = None # Number of threads for CPU execution +headless: bool | None = None # Run without UI? +log_level: str = "error" # Logging level (e.g., 'debug', 'info', 'warning', 'error') + +# Face Processor UI Toggles (Example) fp_ui: Dict[str, bool] = {"face_enhancer": False} -camera_input_combobox = None -webcam_preview_running = False -show_fps = False -mouth_mask = False -show_mouth_mask_box = False -mask_feather_ratio = 8 -mask_down_size = 0.50 -mask_size = 1 + +# Face Swapper Specific Options +face_swapper_enabled: bool = True # General toggle for the swapper processor +opacity: float = 1.0 # Blend factor for the swapped face (0.0-1.0) +sharpness: float = 0.0 # Sharpness enhancement for swapped face (0.0-1.0+) + +# Mouth Mask Options +mouth_mask: bool = False # Enable mouth area masking/pasting +show_mouth_mask_box: bool = False # Visualize the mouth mask area (for debugging) +mask_feather_ratio: int = 12 # Denominator for feathering calculation (higher = smaller feather) +mask_down_size: float = 0.1 # Expansion factor for lower lip mask (relative) +mask_size: float = 1.0 # Expansion factor for upper lip mask (relative) + +# --- START: Added for Frame Interpolation --- +enable_interpolation: bool = True # Toggle temporal smoothing +interpolation_weight: float = 0 # Blend weight for current frame (0.0-1.0). Lower=smoother. +# --- END: Added for Frame Interpolation --- + +# --- END OF FILE globals.py --- \ No newline at end of file diff --git a/modules/metadata.py b/modules/metadata.py index b6e7b2b..f216a64 100644 --- a/modules/metadata.py +++ b/modules/metadata.py @@ -1,3 +1,3 @@ name = 'Deep-Live-Cam' -version = '1.8.1' +version = '2.0c' edition = 'GitHub Edition' diff --git a/modules/processors/frame/face_enhancer.py b/modules/processors/frame/face_enhancer.py index de192e6..22a4d52 100644 --- a/modules/processors/frame/face_enhancer.py +++ b/modules/processors/frame/face_enhancer.py @@ -1,16 +1,18 @@ +# --- START OF FILE face_enhancer.py --- + from typing import Any, List import cv2 import threading import gfpgan import os +import platform +import torch # Make sure torch is imported import modules.globals import modules.processors.frame.core from modules.core import update_status from modules.face_analyser import get_one_face from modules.typing import Frame, Face -import platform -import torch from modules.utilities import ( conditional_download, is_image, @@ -48,83 +50,157 @@ def pre_start() -> bool: return True -TENSORRT_AVAILABLE = False -try: - import torch_tensorrt - TENSORRT_AVAILABLE = True -except ImportError as im: - print(f"TensorRT is not available: {im}") - pass -except Exception as e: - print(f"TensorRT is not available: {e}") - pass - def get_face_enhancer() -> Any: + """ + Initializes and returns the GFPGAN face enhancer instance, + prioritizing CUDA, then MPS (Mac), then CPU. + """ global FACE_ENHANCER with THREAD_LOCK: if FACE_ENHANCER is None: model_path = os.path.join(models_dir, "GFPGANv1.4.pth") - - selected_device = None - device_priority = [] + device = None + try: + # Priority 1: CUDA + if torch.cuda.is_available(): + device = torch.device("cuda") + print(f"{NAME}: Using CUDA device.") + # Priority 2: MPS (Mac Silicon) + elif platform.system() == "Darwin" and torch.backends.mps.is_available(): + device = torch.device("mps") + print(f"{NAME}: Using MPS device.") + # Priority 3: CPU + else: + device = torch.device("cpu") + print(f"{NAME}: Using CPU device.") - if TENSORRT_AVAILABLE and torch.cuda.is_available(): - selected_device = torch.device("cuda") - device_priority.append("TensorRT+CUDA") - elif torch.cuda.is_available(): - selected_device = torch.device("cuda") - device_priority.append("CUDA") - elif torch.backends.mps.is_available() and platform.system() == "Darwin": - selected_device = torch.device("mps") - device_priority.append("MPS") - elif not torch.cuda.is_available(): - selected_device = torch.device("cpu") - device_priority.append("CPU") - - FACE_ENHANCER = gfpgan.GFPGANer(model_path=model_path, upscale=1, device=selected_device) + FACE_ENHANCER = gfpgan.GFPGANer( + model_path=model_path, + upscale=1, # upscale=1 means enhancement only, no resizing + arch='clean', + channel_multiplier=2, + bg_upsampler=None, + device=device + ) + print(f"{NAME}: GFPGANer initialized successfully on {device}.") + + except Exception as e: + print(f"{NAME}: Error initializing GFPGANer: {e}") + # Fallback to CPU if initialization with GPU fails for some reason + if device is not None and device.type != 'cpu': + print(f"{NAME}: Falling back to CPU due to error.") + try: + device = torch.device("cpu") + FACE_ENHANCER = gfpgan.GFPGANer( + model_path=model_path, + upscale=1, + arch='clean', + channel_multiplier=2, + bg_upsampler=None, + device=device + ) + print(f"{NAME}: GFPGANer initialized successfully on CPU after fallback.") + except Exception as fallback_e: + print(f"{NAME}: FATAL: Could not initialize GFPGANer even on CPU: {fallback_e}") + FACE_ENHANCER = None # Ensure it's None if totally failed + else: + # If it failed even on the first CPU attempt or device was already CPU + print(f"{NAME}: FATAL: Could not initialize GFPGANer on CPU: {e}") + FACE_ENHANCER = None # Ensure it's None if totally failed + + + # Check if enhancer is still None after attempting initialization + if FACE_ENHANCER is None: + raise RuntimeError(f"{NAME}: Failed to initialize GFPGANer. Check logs for errors.") - # for debug: - print(f"Selected device: {selected_device} and device priority: {device_priority}") return FACE_ENHANCER def enhance_face(temp_frame: Frame) -> Frame: - with THREAD_SEMAPHORE: - _, _, temp_frame = get_face_enhancer().enhance(temp_frame, paste_back=True) - return temp_frame + """Enhances faces in a single frame using the global GFPGANer instance.""" + # Ensure enhancer is ready + enhancer = get_face_enhancer() + try: + with THREAD_SEMAPHORE: + # The enhance method returns: _, restored_faces, restored_img + _, _, restored_img = enhancer.enhance( + temp_frame, + has_aligned=False, # Assume faces are not pre-aligned + only_center_face=False, # Enhance all detected faces + paste_back=True # Paste enhanced faces back onto the original image + ) + # GFPGAN might return None if no face is detected or an error occurs + if restored_img is None: + # print(f"{NAME}: Warning: GFPGAN enhancement returned None. Returning original frame.") + return temp_frame + return restored_img + except Exception as e: + print(f"{NAME}: Error during face enhancement: {e}") + # Return the original frame in case of error during enhancement + return temp_frame -def process_frame(source_face: Face, temp_frame: Frame) -> Frame: - target_face = get_one_face(temp_frame) - if target_face: - temp_frame = enhance_face(temp_frame) +def process_frame(source_face: Face | None, temp_frame: Frame) -> Frame: + """Processes a frame: enhances face if detected.""" + # We don't strictly need source_face for enhancement only + # Check if any face exists to potentially save processing time, though GFPGAN also does detection. + # For simplicity and ensuring enhancement is attempted if possible, we can rely on enhance_face. + # target_face = get_one_face(temp_frame) # This gets only ONE face + # If you want to enhance ONLY if a face is detected by your *own* analyser first: + # has_face = get_one_face(temp_frame) is not None # Or use get_many_faces + # if has_face: + # temp_frame = enhance_face(temp_frame) + # else: # Enhance regardless, let GFPGAN handle detection + temp_frame = enhance_face(temp_frame) return temp_frame def process_frames( - source_path: str, temp_frame_paths: List[str], progress: Any = None + source_path: str | None, temp_frame_paths: List[str], progress: Any = None ) -> None: + """Processes multiple frames from file paths.""" for temp_frame_path in temp_frame_paths: + if not os.path.exists(temp_frame_path): + print(f"{NAME}: Warning: Frame path not found {temp_frame_path}, skipping.") + if progress: + progress.update(1) + continue + temp_frame = cv2.imread(temp_frame_path) - result = process_frame(None, temp_frame) - cv2.imwrite(temp_frame_path, result) + if temp_frame is None: + print(f"{NAME}: Warning: Failed to read frame {temp_frame_path}, skipping.") + if progress: + progress.update(1) + continue + + result_frame = process_frame(None, temp_frame) + cv2.imwrite(temp_frame_path, result_frame) if progress: progress.update(1) -def process_image(source_path: str, target_path: str, output_path: str) -> None: +def process_image(source_path: str | None, target_path: str, output_path: str) -> None: + """Processes a single image file.""" target_frame = cv2.imread(target_path) - result = process_frame(None, target_frame) - cv2.imwrite(output_path, result) + if target_frame is None: + print(f"{NAME}: Error: Failed to read target image {target_path}") + return + result_frame = process_frame(None, target_frame) + cv2.imwrite(output_path, result_frame) + print(f"{NAME}: Enhanced image saved to {output_path}") -def process_video(source_path: str, temp_frame_paths: List[str]) -> None: - modules.processors.frame.core.process_video(None, temp_frame_paths, process_frames) +def process_video(source_path: str | None, temp_frame_paths: List[str]) -> None: + """Processes video frames using the frame processor core.""" + # source_path might be optional depending on how process_video is called + modules.processors.frame.core.process_video(source_path, temp_frame_paths, process_frames) +# Optional: Keep process_frame_v2 if it's used elsewhere, otherwise it's redundant +# def process_frame_v2(temp_frame: Frame) -> Frame: +# target_face = get_one_face(temp_frame) +# if target_face: +# temp_frame = enhance_face(temp_frame) +# return temp_frame -def process_frame_v2(temp_frame: Frame) -> Frame: - target_face = get_one_face(temp_frame) - if target_face: - temp_frame = enhance_face(temp_frame) - return temp_frame +# --- END OF FILE face_enhancer.py --- \ No newline at end of file diff --git a/modules/processors/frame/face_masking.py b/modules/processors/frame/face_masking.py new file mode 100644 index 0000000..152ea6f --- /dev/null +++ b/modules/processors/frame/face_masking.py @@ -0,0 +1,609 @@ +import cv2 +import numpy as np +from modules.typing import Face, Frame +import modules.globals + +def apply_color_transfer(source, target): + """ + Apply color transfer from target to source image + """ + source = cv2.cvtColor(source, cv2.COLOR_BGR2LAB).astype("float32") + target = cv2.cvtColor(target, cv2.COLOR_BGR2LAB).astype("float32") + + source_mean, source_std = cv2.meanStdDev(source) + target_mean, target_std = cv2.meanStdDev(target) + + # Reshape mean and std to be broadcastable + source_mean = source_mean.reshape(1, 1, 3) + source_std = source_std.reshape(1, 1, 3) + target_mean = target_mean.reshape(1, 1, 3) + target_std = target_std.reshape(1, 1, 3) + + # Perform the color transfer + source = (source - source_mean) * (target_std / source_std) + target_mean + + return cv2.cvtColor(np.clip(source, 0, 255).astype("uint8"), cv2.COLOR_LAB2BGR) + +def create_face_mask(face: Face, frame: Frame) -> np.ndarray: + mask = np.zeros(frame.shape[:2], dtype=np.uint8) + landmarks = face.landmark_2d_106 + if landmarks is not None: + # Convert landmarks to int32 + landmarks = landmarks.astype(np.int32) + + # Extract facial features + right_side_face = landmarks[0:16] + left_side_face = landmarks[17:32] + right_eye = landmarks[33:42] + right_eye_brow = landmarks[43:51] + left_eye = landmarks[87:96] + left_eye_brow = landmarks[97:105] + + # Calculate padding + padding = int( + np.linalg.norm(right_side_face[0] - left_side_face[-1]) * 0.05 + ) # 5% of face width + + # Create a slightly larger convex hull for padding + hull = cv2.convexHull(face_outline) + hull_padded = [] + for point in hull: + x, y = point[0] + center = np.mean(face_outline, axis=0) + direction = np.array([x, y]) - center + direction = direction / np.linalg.norm(direction) + padded_point = np.array([x, y]) + direction * padding + hull_padded.append(padded_point) + + hull_padded = np.array(hull_padded, dtype=np.int32) + + # Fill the padded convex hull + cv2.fillConvexPoly(mask, hull_padded, 255) + + # Smooth the mask edges + mask = cv2.GaussianBlur(mask, (5, 5), 3) + + return mask + +def create_lower_mouth_mask( + face: Face, frame: Frame +) -> (np.ndarray, np.ndarray, tuple, np.ndarray): + mask = np.zeros(frame.shape[:2], dtype=np.uint8) + mouth_cutout = None + landmarks = face.landmark_2d_106 + if landmarks is not None: + # 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 + lower_lip_order = [ + 65, + 66, + 62, + 70, + 69, + 18, + 19, + 20, + 21, + 22, + 23, + 24, + 0, + 8, + 7, + 6, + 5, + 4, + 3, + 2, + 65, + ] + lower_lip_landmarks = landmarks[lower_lip_order].astype( + np.float32 + ) # Use float for precise calculations + + # Calculate the center of the landmarks + center = np.mean(lower_lip_landmarks, axis=0) + + # Expand the landmarks outward using the mouth_mask_size + expansion_factor = ( + 1 + modules.globals.mask_down_size * modules.globals.mouth_mask_size + ) # Adjust expansion based on slider + expanded_landmarks = (lower_lip_landmarks - center) * expansion_factor + center + + # Extend the top lip part + toplip_indices = [ + 20, + 0, + 1, + 2, + 3, + 4, + 5, + ] # Indices for landmarks 2, 65, 66, 62, 70, 69, 18 + toplip_extension = ( + modules.globals.mask_size * modules.globals.mouth_mask_size * 0.5 + ) # Adjust extension based on slider + for idx in toplip_indices: + direction = expanded_landmarks[idx] - center + direction = direction / np.linalg.norm(direction) + expanded_landmarks[idx] += direction * toplip_extension + + # Extend the bottom part (chin area) + chin_indices = [ + 11, + 12, + 13, + 14, + 15, + 16, + ] # Indices for landmarks 21, 22, 23, 24, 0, 8 + chin_extension = 2 * 0.2 # Adjust this factor to control the extension + for idx in chin_indices: + expanded_landmarks[idx][1] += ( + expanded_landmarks[idx][1] - center[1] + ) * chin_extension + + # Convert back to integer coordinates + expanded_landmarks = expanded_landmarks.astype(np.int32) + + # Calculate bounding box for the expanded lower mouth + min_x, min_y = np.min(expanded_landmarks, axis=0) + max_x, max_y = np.max(expanded_landmarks, axis=0) + + # Add some padding to the bounding box + padding = int((max_x - min_x) * 0.1) # 10% padding + min_x = max(0, min_x - padding) + min_y = max(0, min_y - padding) + max_x = min(frame.shape[1], max_x + padding) + max_y = min(frame.shape[0], max_y + padding) + + # Ensure the bounding box dimensions are valid + if max_x <= min_x or max_y <= min_y: + if (max_x - min_x) <= 1: + max_x = min_x + 1 + if (max_y - min_y) <= 1: + max_y = min_y + 1 + + # Create the mask + mask_roi = np.zeros((max_y - min_y, max_x - min_x), dtype=np.uint8) + cv2.fillPoly(mask_roi, [expanded_landmarks - [min_x, min_y]], 255) + + # Apply Gaussian blur to soften the mask edges + mask_roi = cv2.GaussianBlur(mask_roi, (15, 15), 5) + + # Place the mask ROI in the full-sized mask + mask[min_y:max_y, min_x:max_x] = mask_roi + + # Extract the masked area from the frame + mouth_cutout = frame[min_y:max_y, min_x:max_x].copy() + + # Return the expanded lower lip polygon in original frame coordinates + lower_lip_polygon = expanded_landmarks + + return mask, mouth_cutout, (min_x, min_y, max_x, max_y), lower_lip_polygon + +def create_eyes_mask(face: Face, frame: Frame) -> (np.ndarray, np.ndarray, tuple, np.ndarray): + mask = np.zeros(frame.shape[:2], dtype=np.uint8) + eyes_cutout = None + landmarks = face.landmark_2d_106 + if landmarks is not None: + # Left eye landmarks (87-96) and right eye landmarks (33-42) + left_eye = landmarks[87:96] + right_eye = landmarks[33:42] + + # Calculate centers and dimensions for each eye + left_eye_center = np.mean(left_eye, axis=0).astype(np.int32) + right_eye_center = np.mean(right_eye, axis=0).astype(np.int32) + + # Calculate eye dimensions with size adjustment + def get_eye_dimensions(eye_points): + x_coords = eye_points[:, 0] + y_coords = eye_points[:, 1] + width = int((np.max(x_coords) - np.min(x_coords)) * (1 + modules.globals.mask_down_size * modules.globals.eyes_mask_size)) + height = int((np.max(y_coords) - np.min(y_coords)) * (1 + modules.globals.mask_down_size * modules.globals.eyes_mask_size)) + return width, height + + left_width, left_height = get_eye_dimensions(left_eye) + right_width, right_height = get_eye_dimensions(right_eye) + + # Add extra padding + padding = int(max(left_width, right_width) * 0.2) + + # Calculate bounding box for both eyes + min_x = min(left_eye_center[0] - left_width//2, right_eye_center[0] - right_width//2) - padding + max_x = max(left_eye_center[0] + left_width//2, right_eye_center[0] + right_width//2) + padding + min_y = min(left_eye_center[1] - left_height//2, right_eye_center[1] - right_height//2) - padding + max_y = max(left_eye_center[1] + left_height//2, right_eye_center[1] + right_height//2) + padding + + # Ensure coordinates are within frame bounds + min_x = max(0, min_x) + min_y = max(0, min_y) + max_x = min(frame.shape[1], max_x) + max_y = min(frame.shape[0], max_y) + + # Create mask for the eyes region + mask_roi = np.zeros((max_y - min_y, max_x - min_x), dtype=np.uint8) + + # Draw ellipses for both eyes + left_center = (left_eye_center[0] - min_x, left_eye_center[1] - min_y) + right_center = (right_eye_center[0] - min_x, right_eye_center[1] - min_y) + + # Calculate axes lengths (half of width and height) + left_axes = (left_width//2, left_height//2) + right_axes = (right_width//2, right_height//2) + + # Draw filled ellipses + cv2.ellipse(mask_roi, left_center, left_axes, 0, 0, 360, 255, -1) + cv2.ellipse(mask_roi, right_center, right_axes, 0, 0, 360, 255, -1) + + # Apply Gaussian blur to soften mask edges + mask_roi = cv2.GaussianBlur(mask_roi, (15, 15), 5) + + # Place the mask ROI in the full-sized mask + mask[min_y:max_y, min_x:max_x] = mask_roi + + # Extract the masked area from the frame + eyes_cutout = frame[min_y:max_y, min_x:max_x].copy() + + # Create polygon points for visualization + def create_ellipse_points(center, axes): + t = np.linspace(0, 2*np.pi, 32) + x = center[0] + axes[0] * np.cos(t) + y = center[1] + axes[1] * np.sin(t) + return np.column_stack((x, y)).astype(np.int32) + + # Generate points for both ellipses + left_points = create_ellipse_points((left_eye_center[0], left_eye_center[1]), (left_width//2, left_height//2)) + right_points = create_ellipse_points((right_eye_center[0], right_eye_center[1]), (right_width//2, right_height//2)) + + # Combine points for both eyes + eyes_polygon = np.vstack([left_points, right_points]) + + return mask, eyes_cutout, (min_x, min_y, max_x, max_y), eyes_polygon + +def create_curved_eyebrow(points): + if len(points) >= 5: + # Sort points by x-coordinate + sorted_idx = np.argsort(points[:, 0]) + sorted_points = points[sorted_idx] + + # Calculate dimensions + x_min, y_min = np.min(sorted_points, axis=0) + x_max, y_max = np.max(sorted_points, axis=0) + width = x_max - x_min + height = y_max - y_min + + # Create more points for smoother curve + num_points = 50 + x = np.linspace(x_min, x_max, num_points) + + # Fit quadratic curve through points for more natural arch + coeffs = np.polyfit(sorted_points[:, 0], sorted_points[:, 1], 2) + y = np.polyval(coeffs, x) + + # Increased offsets to create more separation + top_offset = height * 0.5 # Increased from 0.3 to shift up more + bottom_offset = height * 0.2 # Increased from 0.1 to shift down more + + # Create smooth curves + top_curve = y - top_offset + bottom_curve = y + bottom_offset + + # Create curved endpoints with more pronounced taper + end_points = 5 + start_x = np.linspace(x[0] - width * 0.15, x[0], end_points) # Increased taper + end_x = np.linspace(x[-1], x[-1] + width * 0.15, end_points) # Increased taper + + # Create tapered ends + start_curve = np.column_stack(( + start_x, + np.linspace(bottom_curve[0], top_curve[0], end_points) + )) + end_curve = np.column_stack(( + end_x, + np.linspace(bottom_curve[-1], top_curve[-1], end_points) + )) + + # Combine all points to form a smooth contour + contour_points = np.vstack([ + start_curve, + np.column_stack((x, top_curve)), + end_curve, + np.column_stack((x[::-1], bottom_curve[::-1])) + ]) + + # Add slight padding for better coverage + center = np.mean(contour_points, axis=0) + vectors = contour_points - center + padded_points = center + vectors * 1.2 # Increased padding slightly + + return padded_points + return points + +def create_eyebrows_mask(face: Face, frame: Frame) -> (np.ndarray, np.ndarray, tuple, np.ndarray): + mask = np.zeros(frame.shape[:2], dtype=np.uint8) + eyebrows_cutout = None + landmarks = face.landmark_2d_106 + if landmarks is not None: + # Left eyebrow landmarks (97-105) and right eyebrow landmarks (43-51) + left_eyebrow = landmarks[97:105].astype(np.float32) + right_eyebrow = landmarks[43:51].astype(np.float32) + + # Calculate centers and dimensions for each eyebrow + left_center = np.mean(left_eyebrow, axis=0) + right_center = np.mean(right_eyebrow, axis=0) + + # Calculate bounding box with padding adjusted by size + all_points = np.vstack([left_eyebrow, right_eyebrow]) + padding_factor = modules.globals.eyebrows_mask_size + min_x = np.min(all_points[:, 0]) - 25 * padding_factor + max_x = np.max(all_points[:, 0]) + 25 * padding_factor + min_y = np.min(all_points[:, 1]) - 20 * padding_factor + max_y = np.max(all_points[:, 1]) + 15 * padding_factor + + # Ensure coordinates are within frame bounds + min_x = max(0, int(min_x)) + min_y = max(0, int(min_y)) + max_x = min(frame.shape[1], int(max_x)) + max_y = min(frame.shape[0], int(max_y)) + + # Create mask for the eyebrows region + mask_roi = np.zeros((max_y - min_y, max_x - min_x), dtype=np.uint8) + + try: + # Convert points to local coordinates + left_local = left_eyebrow - [min_x, min_y] + right_local = right_eyebrow - [min_x, min_y] + + def create_curved_eyebrow(points): + if len(points) >= 5: + # Sort points by x-coordinate + sorted_idx = np.argsort(points[:, 0]) + sorted_points = points[sorted_idx] + + # Calculate dimensions + x_min, y_min = np.min(sorted_points, axis=0) + x_max, y_max = np.max(sorted_points, axis=0) + width = x_max - x_min + height = y_max - y_min + + # Create more points for smoother curve + num_points = 50 + x = np.linspace(x_min, x_max, num_points) + + # Fit quadratic curve through points for more natural arch + coeffs = np.polyfit(sorted_points[:, 0], sorted_points[:, 1], 2) + y = np.polyval(coeffs, x) + + # Increased offsets to create more separation + top_offset = height * 0.5 # Increased from 0.3 to shift up more + bottom_offset = height * 0.2 # Increased from 0.1 to shift down more + + # Create smooth curves + top_curve = y - top_offset + bottom_curve = y + bottom_offset + + # Create curved endpoints with more pronounced taper + end_points = 5 + start_x = np.linspace(x[0] - width * 0.15, x[0], end_points) # Increased taper + end_x = np.linspace(x[-1], x[-1] + width * 0.15, end_points) # Increased taper + + # Create tapered ends + start_curve = np.column_stack(( + start_x, + np.linspace(bottom_curve[0], top_curve[0], end_points) + )) + end_curve = np.column_stack(( + end_x, + np.linspace(bottom_curve[-1], top_curve[-1], end_points) + )) + + # Combine all points to form a smooth contour + contour_points = np.vstack([ + start_curve, + np.column_stack((x, top_curve)), + end_curve, + np.column_stack((x[::-1], bottom_curve[::-1])) + ]) + + # Add slight padding for better coverage + center = np.mean(contour_points, axis=0) + vectors = contour_points - center + padded_points = center + vectors * 1.2 # Increased padding slightly + + return padded_points + return points + + # Generate and draw eyebrow shapes + left_shape = create_curved_eyebrow(left_local) + right_shape = create_curved_eyebrow(right_local) + + # Apply multi-stage blurring for natural feathering + # First, strong Gaussian blur for initial softening + mask_roi = cv2.GaussianBlur(mask_roi, (21, 21), 7) + + # Second, medium blur for transition areas + mask_roi = cv2.GaussianBlur(mask_roi, (11, 11), 3) + + # Finally, light blur for fine details + mask_roi = cv2.GaussianBlur(mask_roi, (5, 5), 1) + + # Normalize mask values + mask_roi = cv2.normalize(mask_roi, None, 0, 255, cv2.NORM_MINMAX) + + # Place the mask ROI in the full-sized mask + mask[min_y:max_y, min_x:max_x] = mask_roi + + # Extract the masked area from the frame + eyebrows_cutout = frame[min_y:max_y, min_x:max_x].copy() + + # Combine points for visualization + eyebrows_polygon = np.vstack([ + left_shape + [min_x, min_y], + right_shape + [min_x, min_y] + ]).astype(np.int32) + + except Exception as e: + # Fallback to simple polygons if curve fitting fails + left_local = left_eyebrow - [min_x, min_y] + right_local = right_eyebrow - [min_x, min_y] + cv2.fillPoly(mask_roi, [left_local.astype(np.int32)], 255) + cv2.fillPoly(mask_roi, [right_local.astype(np.int32)], 255) + mask_roi = cv2.GaussianBlur(mask_roi, (21, 21), 7) + mask[min_y:max_y, min_x:max_x] = mask_roi + eyebrows_cutout = frame[min_y:max_y, min_x:max_x].copy() + eyebrows_polygon = np.vstack([left_eyebrow, right_eyebrow]).astype(np.int32) + + return mask, eyebrows_cutout, (min_x, min_y, max_x, max_y), eyebrows_polygon + +def apply_mask_area( + frame: np.ndarray, + cutout: np.ndarray, + box: tuple, + face_mask: np.ndarray, + polygon: np.ndarray, +) -> np.ndarray: + min_x, min_y, max_x, max_y = box + box_width = max_x - min_x + box_height = max_y - min_y + + if ( + cutout is None + or box_width is None + or box_height is None + or face_mask is None + or polygon is None + ): + return frame + + try: + resized_cutout = cv2.resize(cutout, (box_width, box_height)) + roi = frame[min_y:max_y, min_x:max_x] + + if roi.shape != resized_cutout.shape: + resized_cutout = cv2.resize( + resized_cutout, (roi.shape[1], roi.shape[0]) + ) + + color_corrected_area = apply_color_transfer(resized_cutout, roi) + + # Create mask for the area + polygon_mask = np.zeros(roi.shape[:2], dtype=np.uint8) + + # Split points for left and right parts if needed + if len(polygon) > 50: # Arbitrary threshold to detect if we have multiple parts + mid_point = len(polygon) // 2 + left_points = polygon[:mid_point] - [min_x, min_y] + right_points = polygon[mid_point:] - [min_x, min_y] + cv2.fillPoly(polygon_mask, [left_points], 255) + cv2.fillPoly(polygon_mask, [right_points], 255) + else: + adjusted_polygon = polygon - [min_x, min_y] + cv2.fillPoly(polygon_mask, [adjusted_polygon], 255) + + # Apply strong initial feathering + polygon_mask = cv2.GaussianBlur(polygon_mask, (21, 21), 7) + + # Apply additional feathering + feather_amount = min( + 30, + box_width // modules.globals.mask_feather_ratio, + box_height // modules.globals.mask_feather_ratio, + ) + feathered_mask = cv2.GaussianBlur( + polygon_mask.astype(float), (0, 0), feather_amount + ) + feathered_mask = feathered_mask / feathered_mask.max() + + # Apply additional smoothing to the mask edges + feathered_mask = cv2.GaussianBlur(feathered_mask, (5, 5), 1) + + face_mask_roi = face_mask[min_y:max_y, min_x:max_x] + combined_mask = feathered_mask * (face_mask_roi / 255.0) + + combined_mask = combined_mask[:, :, np.newaxis] + blended = ( + color_corrected_area * combined_mask + roi * (1 - combined_mask) + ).astype(np.uint8) + + # Apply face mask to blended result + face_mask_3channel = ( + np.repeat(face_mask_roi[:, :, np.newaxis], 3, axis=2) / 255.0 + ) + final_blend = blended * face_mask_3channel + roi * (1 - face_mask_3channel) + + frame[min_y:max_y, min_x:max_x] = final_blend.astype(np.uint8) + except Exception as e: + pass + + return frame + +def draw_mask_visualization( + frame: Frame, + mask_data: tuple, + label: str, + draw_method: str = "polygon" +) -> Frame: + mask, cutout, (min_x, min_y, max_x, max_y), polygon = mask_data + + vis_frame = frame.copy() + + # Ensure coordinates are within frame bounds + height, width = vis_frame.shape[:2] + min_x, min_y = max(0, min_x), max(0, min_y) + max_x, max_y = min(width, max_x), min(height, max_y) + + if draw_method == "ellipse" and len(polygon) > 50: # For eyes + # Split points for left and right parts + mid_point = len(polygon) // 2 + left_points = polygon[:mid_point] + right_points = polygon[mid_point:] + + try: + # Fit ellipses to points - need at least 5 points + if len(left_points) >= 5 and len(right_points) >= 5: + # Convert points to the correct format for ellipse fitting + left_points = left_points.astype(np.float32) + right_points = right_points.astype(np.float32) + + # Fit ellipses + left_ellipse = cv2.fitEllipse(left_points) + right_ellipse = cv2.fitEllipse(right_points) + + # Draw the ellipses + cv2.ellipse(vis_frame, left_ellipse, (0, 255, 0), 2) + cv2.ellipse(vis_frame, right_ellipse, (0, 255, 0), 2) + except Exception as e: + # If ellipse fitting fails, draw simple rectangles as fallback + left_rect = cv2.boundingRect(left_points) + right_rect = cv2.boundingRect(right_points) + cv2.rectangle(vis_frame, + (left_rect[0], left_rect[1]), + (left_rect[0] + left_rect[2], left_rect[1] + left_rect[3]), + (0, 255, 0), 2) + cv2.rectangle(vis_frame, + (right_rect[0], right_rect[1]), + (right_rect[0] + right_rect[2], right_rect[1] + right_rect[3]), + (0, 255, 0), 2) + else: # For mouth and eyebrows + # Draw the polygon + if len(polygon) > 50: # If we have multiple parts + mid_point = len(polygon) // 2 + left_points = polygon[:mid_point] + right_points = polygon[mid_point:] + cv2.polylines(vis_frame, [left_points], True, (0, 255, 0), 2, cv2.LINE_AA) + cv2.polylines(vis_frame, [right_points], True, (0, 255, 0), 2, cv2.LINE_AA) + else: + cv2.polylines(vis_frame, [polygon], True, (0, 255, 0), 2, cv2.LINE_AA) + + # Add label + cv2.putText( + vis_frame, + label, + (min_x, min_y - 10), + cv2.FONT_HERSHEY_SIMPLEX, + 0.5, + (255, 255, 255), + 1, + ) + + return vis_frame \ No newline at end of file diff --git a/modules/processors/frame/face_swapper.py b/modules/processors/frame/face_swapper.py index 88640b2..7c4a98f 100644 --- a/modules/processors/frame/face_swapper.py +++ b/modules/processors/frame/face_swapper.py @@ -4,7 +4,6 @@ import insightface import threading import numpy as np import modules.globals -import logging import modules.processors.frame.core from modules.core import update_status from modules.face_analyser import get_one_face, get_many_faces, default_source_face @@ -15,45 +14,47 @@ from modules.utilities import ( is_video, ) from modules.cluster_analysis import find_closest_centroid +# Removed modules.globals.face_swapper_enabled - assuming controlled elsewhere or implicitly true if used +# Removed modules.globals.opacity - accessed via getattr import os FACE_SWAPPER = None THREAD_LOCK = threading.Lock() NAME = "DLC.FACE-SWAPPER" +# --- START: Added for Interpolation --- +PREVIOUS_FRAME_RESULT = None # Stores the final processed frame from the previous step +# --- END: Added for Interpolation --- + abs_dir = os.path.dirname(os.path.abspath(__file__)) models_dir = os.path.join( os.path.dirname(os.path.dirname(os.path.dirname(abs_dir))), "models" ) - def pre_check() -> bool: - download_directory_path = models_dir - model_url = "https://huggingface.co/hacksider/deep-live-cam/resolve/main/inswapper_128.onnx" - if "CUDAExecutionProvider" in modules.globals.execution_providers: - model_url = "https://huggingface.co/hacksider/deep-live-cam/resolve/main/inswapper_128_fp16.onnx" - + download_directory_path = abs_dir conditional_download( download_directory_path, - [model_url], + [ + "https://huggingface.co/hacksider/deep-live-cam/blob/main/inswapper_128_fp16.onnx" + ], ) return True def pre_start() -> bool: - if not modules.globals.map_faces and not is_image(modules.globals.source_path): - update_status("Select an image for source path.", NAME) + # Simplified pre_start, assuming checks happen before calling process functions + model_path = os.path.join(models_dir, "inswapper_128_fp16.onnx") + if not os.path.exists(model_path): + update_status(f"Model not found: {model_path}. Please download it.", NAME) return False - elif not modules.globals.map_faces and not get_one_face( - cv2.imread(modules.globals.source_path) - ): - update_status("No face in source path detected.", NAME) - return False - if not is_image(modules.globals.target_path) and not is_video( - modules.globals.target_path - ): - update_status("Select an image or video for target path.", NAME) + + # Try to get the face swapper to ensure it loads correctly + if get_face_swapper() is None: + # Error message already printed within get_face_swapper return False + + # Add other essential checks if needed, e.g., target/source path validity return True @@ -62,566 +63,1053 @@ def get_face_swapper() -> Any: with THREAD_LOCK: if FACE_SWAPPER is None: - model_name = "inswapper_128.onnx" - if "CUDAExecutionProvider" in modules.globals.execution_providers: - model_name = "inswapper_128_fp16.onnx" - model_path = os.path.join(models_dir, model_name) - FACE_SWAPPER = insightface.model_zoo.get_model( - model_path, providers=modules.globals.execution_providers - ) + model_path = os.path.join(models_dir, "inswapper_128_fp16.onnx") + update_status(f"Loading face swapper model from: {model_path}", NAME) + try: + # Ensure the providers list is correctly passed + providers = modules.globals.execution_providers + # print(f"Attempting to load model with providers: {providers}") # Debug print + FACE_SWAPPER = insightface.model_zoo.get_model( + model_path, providers=providers + ) + update_status("Face swapper model loaded successfully.", NAME) + except Exception as e: + update_status(f"Error loading face swapper model: {e}", NAME) + # print traceback maybe? + # import traceback + # traceback.print_exc() + FACE_SWAPPER = None # Ensure it remains None on failure + return None return FACE_SWAPPER def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame: face_swapper = get_face_swapper() + if face_swapper is None: + update_status("Face swapper model not loaded or failed to load. Skipping swap.", NAME) + return temp_frame # Return original frame if model failed or not loaded + + # Store a copy of the original frame before swapping for opacity blending + original_frame = temp_frame.copy() + + # --- Pre-swap Input Check (Optional but good practice) --- + if temp_frame.dtype != np.uint8: + # print(f"Warning: Input frame is {temp_frame.dtype}, converting to uint8 before swap.") + temp_frame = np.clip(temp_frame, 0, 255).astype(np.uint8) + # --- End Input Check --- # Apply the face swap - swapped_frame = face_swapper.get( - temp_frame, target_face, source_face, paste_back=True - ) + try: + swapped_frame_raw = face_swapper.get( + temp_frame, target_face, source_face, paste_back=True + ) - if modules.globals.mouth_mask: + # --- START: CRITICAL FIX FOR ORT 1.17 --- + # Check the output type and range from the model + if swapped_frame_raw is None: + # print("Warning: face_swapper.get returned None.") # Debug + return original_frame # Return original if swap somehow failed internally + + # Ensure the output is a numpy array + if not isinstance(swapped_frame_raw, np.ndarray): + # print(f"Warning: face_swapper.get returned type {type(swapped_frame_raw)}, expected numpy array.") # Debug + return original_frame + + # Ensure the output has the correct shape (like the input frame) + if swapped_frame_raw.shape != temp_frame.shape: + # print(f"Warning: Swapped frame shape {swapped_frame_raw.shape} differs from input {temp_frame.shape}.") # Debug + # Attempt resize (might distort if aspect ratio changed, but better than crashing) + try: + swapped_frame_raw = cv2.resize(swapped_frame_raw, (temp_frame.shape[1], temp_frame.shape[0])) + except Exception as resize_e: + # print(f"Error resizing swapped frame: {resize_e}") # Debug + return original_frame + + # Explicitly clip values to 0-255 and convert to uint8 + # This handles cases where the model might output floats or values outside the valid range + swapped_frame = np.clip(swapped_frame_raw, 0, 255).astype(np.uint8) + # --- END: CRITICAL FIX FOR ORT 1.17 --- + + except Exception as e: + print(f"Error during face swap using face_swapper.get: {e}") # More specific error + # import traceback + # traceback.print_exc() # Print full traceback for debugging + return original_frame # Return original if swap fails + + # --- Post-swap Processing (Masking, Opacity, etc.) --- + # Now, work with the guaranteed uint8 'swapped_frame' + + if getattr(modules.globals, "mouth_mask", False): # Check if mouth_mask is enabled # Create a mask for the target face - face_mask = create_face_mask(target_face, temp_frame) + face_mask = create_face_mask(target_face, temp_frame) # Use temp_frame (original shape) for mask creation geometry - # Create the mouth mask + # Create the mouth mask using original geometry mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon = ( - create_lower_mouth_mask(target_face, temp_frame) + create_lower_mouth_mask(target_face, temp_frame) # Use temp_frame (original) for cutout ) - # Apply the mouth area - swapped_frame = apply_mouth_area( - swapped_frame, mouth_cutout, mouth_box, face_mask, lower_lip_polygon - ) - - if modules.globals.show_mouth_mask_box: - mouth_mask_data = (mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon) - swapped_frame = draw_mouth_mask_visualization( - swapped_frame, target_face, mouth_mask_data + # Apply the mouth area only if mouth_cutout exists + if mouth_cutout is not None and mouth_box != (0,0,0,0): # Add check for valid box + # Apply mouth area (from original) onto the 'swapped_frame' + swapped_frame = apply_mouth_area( + swapped_frame, mouth_cutout, mouth_box, face_mask, lower_lip_polygon ) - return swapped_frame + if getattr(modules.globals, "show_mouth_mask_box", False): + mouth_mask_data = (mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon) + # Draw visualization on the swapped_frame *before* opacity blending + swapped_frame = draw_mouth_mask_visualization( + swapped_frame, target_face, mouth_mask_data + ) + + # Apply opacity blend between the original frame and the swapped frame + opacity = getattr(modules.globals, "opacity", 1.0) + # Ensure opacity is within valid range [0.0, 1.0] + opacity = max(0.0, min(1.0, opacity)) + + # Blend the original_frame with the (potentially mouth-masked) swapped_frame + # Ensure both frames are uint8 before blending + final_swapped_frame = cv2.addWeighted(original_frame.astype(np.uint8), 1 - opacity, swapped_frame.astype(np.uint8), opacity, 0) + + # Ensure final frame is uint8 after blending (addWeighted should preserve it, but belt-and-suspenders) + final_swapped_frame = final_swapped_frame.astype(np.uint8) + + return final_swapped_frame + + +# --- START: Helper function for interpolation and sharpening --- +def apply_post_processing(current_frame: Frame, swapped_face_bboxes: List[np.ndarray]) -> Frame: + """Applies sharpening and interpolation.""" + global PREVIOUS_FRAME_RESULT + + processed_frame = current_frame.copy() + + # 1. Apply Sharpening (if enabled) + sharpness_value = getattr(modules.globals, "sharpness", 0.0) + if sharpness_value > 0.0 and swapped_face_bboxes: + height, width = processed_frame.shape[:2] + for bbox in swapped_face_bboxes: + # Ensure bbox is iterable and has 4 elements + if not hasattr(bbox, '__iter__') or len(bbox) != 4: + # print(f"Warning: Invalid bbox format for sharpening: {bbox}") # Debug + continue + x1, y1, x2, y2 = bbox + # Ensure coordinates are integers and within bounds + try: + x1, y1 = max(0, int(x1)), max(0, int(y1)) + x2, y2 = min(width, int(x2)), min(height, int(y2)) + except ValueError: + # print(f"Warning: Could not convert bbox coordinates to int: {bbox}") # Debug + continue + + + if x2 <= x1 or y2 <= y1: + continue + + face_region = processed_frame[y1:y2, x1:x2] + if face_region.size == 0: continue # Skip empty regions + + # Apply sharpening using addWeighted for smoother control + # Use try-except for GaussianBlur and addWeighted as they can fail on invalid inputs + try: + blurred = cv2.GaussianBlur(face_region, (0, 0), 3) # sigma=3, kernel size auto + sharpened_region = cv2.addWeighted( + face_region, 1.0 + sharpness_value, + blurred, -sharpness_value, + 0 + ) + # Ensure the sharpened region doesn't have invalid values + sharpened_region = np.clip(sharpened_region, 0, 255).astype(np.uint8) + processed_frame[y1:y2, x1:x2] = sharpened_region + except cv2.error as sharpen_e: + # print(f"Warning: OpenCV error during sharpening: {sharpen_e} for bbox {bbox}") # Debug + # Skip sharpening for this region if it fails + pass + + + # 2. Apply Interpolation (if enabled) + enable_interpolation = getattr(modules.globals, "enable_interpolation", False) + interpolation_weight = getattr(modules.globals, "interpolation_weight", 0.2) + + final_frame = processed_frame # Start with the current (potentially sharpened) frame + + if enable_interpolation and 0 < interpolation_weight < 1: + if PREVIOUS_FRAME_RESULT is not None and PREVIOUS_FRAME_RESULT.shape == processed_frame.shape and PREVIOUS_FRAME_RESULT.dtype == processed_frame.dtype: + # Perform interpolation + try: + final_frame = cv2.addWeighted( + PREVIOUS_FRAME_RESULT, 1.0 - interpolation_weight, + processed_frame, interpolation_weight, + 0 + ) + # Ensure final frame is uint8 + final_frame = np.clip(final_frame, 0, 255).astype(np.uint8) + except cv2.error as interp_e: + # print(f"Warning: OpenCV error during interpolation: {interp_e}") # Debug + final_frame = processed_frame # Use current frame if interpolation fails + PREVIOUS_FRAME_RESULT = None # Reset state if error occurs + + # Update the state for the next frame *with the interpolated result* + PREVIOUS_FRAME_RESULT = final_frame.copy() + else: + # If previous frame invalid or doesn't match, use current frame and update state + if PREVIOUS_FRAME_RESULT is not None and PREVIOUS_FRAME_RESULT.shape != processed_frame.shape: + # print("Info: Frame shape changed, resetting interpolation state.") # Debug + pass + PREVIOUS_FRAME_RESULT = processed_frame.copy() + else: + # If interpolation is off or weight is invalid, just use the current frame + # Update state with the current (potentially sharpened) frame + # Reset previous frame state if interpolation was just turned off or weight is invalid + PREVIOUS_FRAME_RESULT = processed_frame.copy() + + + return final_frame +# --- END: Helper function for interpolation and sharpening --- def process_frame(source_face: Face, temp_frame: Frame) -> Frame: - if modules.globals.color_correction: - temp_frame = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB) + """ + DEPRECATED / SIMPLER VERSION - Processes a single frame using one source face. + Consider using process_frame_v2 for more complex scenarios. + """ + if getattr(modules.globals, "opacity", 1.0) == 0: + # If opacity is 0, no swap happens, so no post-processing needed. + # Also reset interpolation state if it was active. + global PREVIOUS_FRAME_RESULT + PREVIOUS_FRAME_RESULT = None + return temp_frame + + # Color correction removed from here (better applied before swap if needed) + + processed_frame = temp_frame # Start with the input frame + swapped_face_bboxes = [] # Keep track of where swaps happened if modules.globals.many_faces: - many_faces = get_many_faces(temp_frame) + many_faces = get_many_faces(processed_frame) if many_faces: + current_swap_target = processed_frame.copy() # Apply swaps sequentially on a copy for target_face in many_faces: - if source_face and target_face: - temp_frame = swap_face(source_face, target_face, temp_frame) - else: - print("Face detection failed for target/source.") + current_swap_target = swap_face(source_face, target_face, current_swap_target) + if target_face is not None and hasattr(target_face, "bbox") and target_face.bbox is not None: + swapped_face_bboxes.append(target_face.bbox.astype(int)) + processed_frame = current_swap_target # Assign the final result after all swaps else: - target_face = get_one_face(temp_frame) - if target_face and source_face: - temp_frame = swap_face(source_face, target_face, temp_frame) - else: - logging.error("Face detection failed for target or source.") - return temp_frame + target_face = get_one_face(processed_frame) + if target_face: + processed_frame = swap_face(source_face, target_face, processed_frame) + if target_face is not None and hasattr(target_face, "bbox") and target_face.bbox is not None: + swapped_face_bboxes.append(target_face.bbox.astype(int)) + # Apply sharpening and interpolation + final_frame = apply_post_processing(processed_frame, swapped_face_bboxes) + + return final_frame def process_frame_v2(temp_frame: Frame, temp_frame_path: str = "") -> Frame: - if is_image(modules.globals.target_path): - if modules.globals.many_faces: - source_face = default_source_face() - for map in modules.globals.source_target_map: - target_face = map["target"]["face"] - temp_frame = swap_face(source_face, target_face, temp_frame) + """Handles complex mapping scenarios (map_faces=True) and live streams.""" + if getattr(modules.globals, "opacity", 1.0) == 0: + # If opacity is 0, no swap happens, so no post-processing needed. + # Also reset interpolation state if it was active. + global PREVIOUS_FRAME_RESULT + PREVIOUS_FRAME_RESULT = None + return temp_frame - elif not modules.globals.many_faces: - for map in modules.globals.source_target_map: - if "source" in map: - source_face = map["source"]["face"] - target_face = map["target"]["face"] - temp_frame = swap_face(source_face, target_face, temp_frame) + processed_frame = temp_frame # Start with the input frame + swapped_face_bboxes = [] # Keep track of where swaps happened - elif is_video(modules.globals.target_path): - if modules.globals.many_faces: - source_face = default_source_face() - for map in modules.globals.source_target_map: - target_frame = [ - f - for f in map["target_faces_in_frame"] - if f["location"] == temp_frame_path - ] + # Determine source/target pairs based on mode + source_target_pairs = [] - for frame in target_frame: - for target_face in frame["faces"]: - temp_frame = swap_face(source_face, target_face, temp_frame) + # Ensure maps exist before accessing them + souce_target_map = getattr(modules.globals, "souce_target_map", None) + simple_map = getattr(modules.globals, "simple_map", None) - elif not modules.globals.many_faces: - for map in modules.globals.source_target_map: - if "source" in map: - target_frame = [ - f - for f in map["target_faces_in_frame"] - if f["location"] == temp_frame_path - ] - source_face = map["source"]["face"] + # Check if target is a file path (image or video) or live stream + is_file_target = modules.globals.target_path and (is_image(modules.globals.target_path) or is_video(modules.globals.target_path)) - for frame in target_frame: - for target_face in frame["faces"]: - temp_frame = swap_face(source_face, target_face, temp_frame) + if is_file_target: + # Processing specific image or video file with pre-analyzed maps + if souce_target_map: + if modules.globals.many_faces: + source_face = default_source_face() # Use default source for all targets + if source_face: + for map_data in souce_target_map: + if is_image(modules.globals.target_path): + target_info = map_data.get("target", {}) + if target_info: # Check if target info exists + target_face = target_info.get("face") + if target_face: + source_target_pairs.append((source_face, target_face)) + elif is_video(modules.globals.target_path): + # Find faces for the current frame_path in video map + target_frames_data = map_data.get("target_faces_in_frame", []) + if target_frames_data: # Check if frame data exists + target_frames = [f for f in target_frames_data if f and f.get("location") == temp_frame_path] + for frame_data in target_frames: + faces_in_frame = frame_data.get("faces", []) + if faces_in_frame: # Check if faces exist + for target_face in faces_in_frame: + source_target_pairs.append((source_face, target_face)) + else: # Single face or specific mapping + for map_data in souce_target_map: + source_info = map_data.get("source", {}) + if not source_info: continue # Skip if no source info + source_face = source_info.get("face") + if not source_face: continue # Skip if no source defined for this map entry + + if is_image(modules.globals.target_path): + target_info = map_data.get("target", {}) + if target_info: + target_face = target_info.get("face") + if target_face: + source_target_pairs.append((source_face, target_face)) + elif is_video(modules.globals.target_path): + target_frames_data = map_data.get("target_faces_in_frame", []) + if target_frames_data: + target_frames = [f for f in target_frames_data if f and f.get("location") == temp_frame_path] + for frame_data in target_frames: + faces_in_frame = frame_data.get("faces", []) + if faces_in_frame: + for target_face in faces_in_frame: + source_target_pairs.append((source_face, target_face)) else: - detected_faces = get_many_faces(temp_frame) - if modules.globals.many_faces: - if detected_faces: + # Live stream or webcam processing (analyze faces on the fly) + detected_faces = get_many_faces(processed_frame) + if detected_faces: + if modules.globals.many_faces: + source_face = default_source_face() # Use default source for all detected targets + if source_face: + for target_face in detected_faces: + source_target_pairs.append((source_face, target_face)) + elif simple_map: + # Use simple_map (source_faces <-> target_embeddings) + source_faces = simple_map.get("source_faces", []) + target_embeddings = simple_map.get("target_embeddings", []) + + if source_faces and target_embeddings and len(source_faces) == len(target_embeddings): + # Match detected faces to the closest target embedding + if len(detected_faces) <= len(target_embeddings): + # More targets defined than detected - match each detected face + for detected_face in detected_faces: + if detected_face.normed_embedding is None: continue + closest_idx, _ = find_closest_centroid(target_embeddings, detected_face.normed_embedding) + if 0 <= closest_idx < len(source_faces): + source_target_pairs.append((source_faces[closest_idx], detected_face)) + else: + # More faces detected than targets defined - match each target embedding to closest detected face + detected_embeddings = [f.normed_embedding for f in detected_faces if f.normed_embedding is not None] + detected_faces_with_embedding = [f for f in detected_faces if f.normed_embedding is not None] + if not detected_embeddings: return processed_frame # No embeddings to match + + for i, target_embedding in enumerate(target_embeddings): + if 0 <= i < len(source_faces): # Ensure source face exists for this embedding + closest_idx, _ = find_closest_centroid(detected_embeddings, target_embedding) + if 0 <= closest_idx < len(detected_faces_with_embedding): + source_target_pairs.append((source_faces[i], detected_faces_with_embedding[closest_idx])) + else: # Fallback: if no map, use default source for the single detected face (if any) source_face = default_source_face() - for target_face in detected_faces: - temp_frame = swap_face(source_face, target_face, temp_frame) + target_face = get_one_face(processed_frame, detected_faces) # Use faces already detected + if source_face and target_face: + source_target_pairs.append((source_face, target_face)) - elif not modules.globals.many_faces: - if detected_faces: - if len(detected_faces) <= len( - modules.globals.simple_map["target_embeddings"] - ): - for detected_face in detected_faces: - closest_centroid_index, _ = find_closest_centroid( - modules.globals.simple_map["target_embeddings"], - detected_face.normed_embedding, - ) - temp_frame = swap_face( - modules.globals.simple_map["source_faces"][ - closest_centroid_index - ], - detected_face, - temp_frame, - ) - else: - detected_faces_centroids = [] - for face in detected_faces: - detected_faces_centroids.append(face.normed_embedding) - i = 0 - for target_embedding in modules.globals.simple_map[ - "target_embeddings" - ]: - closest_centroid_index, _ = find_closest_centroid( - detected_faces_centroids, target_embedding - ) + # Perform swaps based on the collected pairs + current_swap_target = processed_frame.copy() # Apply swaps sequentially + for source_face, target_face in source_target_pairs: + if source_face and target_face: + current_swap_target = swap_face(source_face, target_face, current_swap_target) + if target_face is not None and hasattr(target_face, "bbox") and target_face.bbox is not None: + swapped_face_bboxes.append(target_face.bbox.astype(int)) + processed_frame = current_swap_target # Assign final result - temp_frame = swap_face( - modules.globals.simple_map["source_faces"][i], - detected_faces[closest_centroid_index], - temp_frame, - ) - i += 1 - return temp_frame + + # Apply sharpening and interpolation + final_frame = apply_post_processing(processed_frame, swapped_face_bboxes) + + return final_frame def process_frames( source_path: str, temp_frame_paths: List[str], progress: Any = None ) -> None: - if not modules.globals.map_faces: - source_face = get_one_face(cv2.imread(source_path)) - for temp_frame_path in temp_frame_paths: - temp_frame = cv2.imread(temp_frame_path) + """ + Processes a list of frame paths (typically for video). + Iterates through frames, applies the appropriate swapping logic based on globals, + and saves the result back to the frame path. Handles multi-threading via caller. + """ + # Determine which processing function to use based on map_faces global setting + use_v2 = getattr(modules.globals, "map_faces", False) + source_face = None # Initialize source_face + + # --- Pre-load source face only if needed (Simple Mode: map_faces=False) --- + if not use_v2: + if not source_path or not os.path.exists(source_path): + update_status(f"Error: Source path invalid or not provided for simple mode: {source_path}", NAME) + # Log the error but allow proceeding; subsequent check will stop processing. + else: try: - result = process_frame(source_face, temp_frame) - cv2.imwrite(temp_frame_path, result) - except Exception as exception: - print(exception) - pass - if progress: - progress.update(1) - else: - for temp_frame_path in temp_frame_paths: + source_img = cv2.imread(source_path) + if source_img is None: + # Specific error for file reading failure + update_status(f"Error reading source image file {source_path}. Please check the path and file integrity.", NAME) + else: + source_face = get_one_face(source_img) + if source_face is None: + # Specific message for no face detected after successful read + update_status(f"Warning: Successfully read source image {source_path}, but no face was detected. Swaps will be skipped.", NAME) + except Exception as e: + # Print the specific exception caught + import traceback + print(f"{NAME}: Caught exception during source image processing for {source_path}:") + traceback.print_exc() # Print the full traceback + update_status(f"Error during source image reading or analysis {source_path}: {e}", NAME) + # Log general exception during the process + + total_frames = len(temp_frame_paths) + # update_status(f"Processing {total_frames} frames. Use V2 (map_faces): {use_v2}", NAME) # Optional Debug + + # --- Stop processing entirely if in Simple Mode and source face is invalid --- + if not use_v2 and source_face is None: + update_status(f"Halting video processing: Invalid or no face detected in source image for simple mode.", NAME) + if progress: + # Ensure the progress bar completes if it was started + remaining_updates = total_frames - progress.n if hasattr(progress, 'n') else total_frames + if remaining_updates > 0: + progress.update(remaining_updates) + return # Exit the function entirely + + # --- Process each frame path provided in the list --- + # Note: In the current core.py multi_process_frame, temp_frame_paths will usually contain only ONE path per call. + for i, temp_frame_path in enumerate(temp_frame_paths): + # update_status(f"Processing frame {i+1}/{total_frames}: {os.path.basename(temp_frame_path)}", NAME) # Optional Debug + + # Read the target frame + try: temp_frame = cv2.imread(temp_frame_path) - try: - result = process_frame_v2(temp_frame, temp_frame_path) - cv2.imwrite(temp_frame_path, result) - except Exception as exception: - print(exception) - pass - if progress: - progress.update(1) + if temp_frame is None: + print(f"{NAME}: Error: Could not read frame: {temp_frame_path}, skipping.") + if progress: progress.update(1) + continue # Skip this frame if read fails + except Exception as read_e: + print(f"{NAME}: Error reading frame {temp_frame_path}: {read_e}, skipping.") + if progress: progress.update(1) + continue + + # Select processing function and execute + result_frame = None + try: + if use_v2: + # V2 uses global maps and needs the frame path for lookup in video mode + # update_status(f"Using process_frame_v2 for: {os.path.basename(temp_frame_path)}", NAME) # Optional Debug + result_frame = process_frame_v2(temp_frame, temp_frame_path) + else: + # Simple mode uses the pre-loaded source_face (already checked for validity above) + # update_status(f"Using process_frame (simple) for: {os.path.basename(temp_frame_path)}", NAME) # Optional Debug + result_frame = process_frame(source_face, temp_frame) # source_face is guaranteed to be valid here + + # Check if processing actually returned a frame + if result_frame is None: + print(f"{NAME}: Warning: Processing returned None for frame {temp_frame_path}. Using original.") + result_frame = temp_frame + + except Exception as proc_e: + print(f"{NAME}: Error processing frame {temp_frame_path}: {proc_e}") + # import traceback # Optional for detailed debugging + # traceback.print_exc() + result_frame = temp_frame # Use original frame on processing error + + # Write the result back to the same frame path + try: + write_success = cv2.imwrite(temp_frame_path, result_frame) + if not write_success: + print(f"{NAME}: Error: Failed to write processed frame to {temp_frame_path}") + except Exception as write_e: + print(f"{NAME}: Error writing frame {temp_frame_path}: {write_e}") + + # Update progress bar + if progress: + progress.update(1) + # else: # Basic console progress (optional) + # if (i + 1) % 10 == 0 or (i + 1) == total_frames: # Update every 10 frames or on last frame + # update_status(f"Processed frame {i+1}/{total_frames}", NAME) def process_image(source_path: str, target_path: str, output_path: str) -> None: - if not modules.globals.map_faces: - source_face = get_one_face(cv2.imread(source_path)) + """Processes a single target image.""" + # --- Reset interpolation state for single image processing --- + global PREVIOUS_FRAME_RESULT + PREVIOUS_FRAME_RESULT = None + # --- + + use_v2 = getattr(modules.globals, "map_faces", False) + + # Read target first + try: target_frame = cv2.imread(target_path) - result = process_frame(source_face, target_frame) - cv2.imwrite(output_path, result) - else: - if modules.globals.many_faces: - update_status( - "Many faces enabled. Using first source image. Progressing...", NAME - ) - target_frame = cv2.imread(output_path) - result = process_frame_v2(target_frame) - cv2.imwrite(output_path, result) + if target_frame is None: + update_status(f"Error: Could not read target image: {target_path}", NAME) + return + except Exception as read_e: + update_status(f"Error reading target image {target_path}: {read_e}", NAME) + return + + result = None + try: + if use_v2: + if getattr(modules.globals, "many_faces", False): + update_status("Processing image with 'map_faces' and 'many_faces'. Using pre-analysis map.", NAME) + # V2 processes based on global maps, doesn't need source_path here directly + # Assumes maps are pre-populated. Pass target_path for map lookup. + result = process_frame_v2(target_frame, target_path) + + else: # Simple mode + try: + source_img = cv2.imread(source_path) + if source_img is None: + update_status(f"Error: Could not read source image: {source_path}", NAME) + return + source_face = get_one_face(source_img) + if not source_face: + update_status(f"Error: No face found in source image: {source_path}", NAME) + return + except Exception as src_e: + update_status(f"Error reading or analyzing source image {source_path}: {src_e}", NAME) + return + + result = process_frame(source_face, target_frame) + + # Write the result if processing was successful + if result is not None: + write_success = cv2.imwrite(output_path, result) + if write_success: + update_status(f"Output image saved to: {output_path}", NAME) + else: + update_status(f"Error: Failed to write output image to {output_path}", NAME) + else: + # This case might occur if process_frame/v2 returns None unexpectedly + update_status("Image processing failed (result was None).", NAME) + + except Exception as proc_e: + update_status(f"Error during image processing: {proc_e}", NAME) + # import traceback + # traceback.print_exc() def process_video(source_path: str, temp_frame_paths: List[str]) -> None: - if modules.globals.map_faces and modules.globals.many_faces: - update_status( - "Many faces enabled. Using first source image. Progressing...", NAME - ) + """Sets up and calls the frame processing for video.""" + # --- Reset interpolation state before starting video processing --- + global PREVIOUS_FRAME_RESULT + PREVIOUS_FRAME_RESULT = None + # --- + + mode_desc = "'map_faces'" if getattr(modules.globals, "map_faces", False) else "'simple'" + if getattr(modules.globals, "map_faces", False) and getattr(modules.globals, "many_faces", False): + mode_desc += " and 'many_faces'. Using pre-analysis map." + update_status(f"Processing video with {mode_desc} mode.", NAME) + + # Pass the correct source_path (needed for simple mode in process_frames) + # The core processing logic handles calling the right frame function (process_frames) modules.processors.frame.core.process_video( - source_path, temp_frame_paths, process_frames + source_path, temp_frame_paths, process_frames # Pass the newly modified process_frames ) +# ========================== +# MASKING FUNCTIONS (Mostly unchanged, added safety checks and minor improvements) +# ========================== def create_lower_mouth_mask( face: Face, frame: Frame ) -> (np.ndarray, np.ndarray, tuple, np.ndarray): mask = np.zeros(frame.shape[:2], dtype=np.uint8) mouth_cutout = None + lower_lip_polygon = None # Initialize + mouth_box = (0,0,0,0) # Initialize + + # Validate face and landmarks + if face is None or not hasattr(face, 'landmark_2d_106'): + # print("Warning: Invalid face object passed to create_lower_mouth_mask.") + return mask, mouth_cutout, mouth_box, lower_lip_polygon + landmarks = face.landmark_2d_106 - if landmarks is not None: + + # Check landmark validity + if landmarks is None or not isinstance(landmarks, np.ndarray) or landmarks.shape[0] < 106: + # print("Warning: Invalid or insufficient landmarks for mouth mask.") + return mask, mouth_cutout, mouth_box, lower_lip_polygon + + try: # Wrap main logic in try-except # 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 - lower_lip_order = [ - 65, - 66, - 62, - 70, - 69, - 18, - 19, - 20, - 21, - 22, - 23, - 24, - 0, - 8, - 7, - 6, - 5, - 4, - 3, - 2, - 65, - ] - lower_lip_landmarks = landmarks[lower_lip_order].astype( - np.float32 - ) # Use float for precise calculations + lower_lip_order = [65, 66, 62, 70, 69, 18, 19, 20, 21, 22, 23, 24, 0, 8, 7, 6, 5, 4, 3, 2, 65] # 21 points + + # Check if all indices are valid for the loaded landmarks (already partially done by < 106 check) + if max(lower_lip_order) >= landmarks.shape[0]: + # print(f"Warning: Landmark index {max(lower_lip_order)} out of bounds for shape {landmarks.shape[0]}.") + return mask, mouth_cutout, mouth_box, lower_lip_polygon + + lower_lip_landmarks = landmarks[lower_lip_order].astype(np.float32) + + # Filter out potential NaN or Inf values in landmarks + if not np.all(np.isfinite(lower_lip_landmarks)): + # print("Warning: Non-finite values detected in lower lip landmarks.") + return mask, mouth_cutout, mouth_box, lower_lip_polygon - # Calculate the center of the landmarks center = np.mean(lower_lip_landmarks, axis=0) + if not np.all(np.isfinite(center)): # Check center calculation + # print("Warning: Could not calculate valid center for mouth mask.") + return mask, mouth_cutout, mouth_box, lower_lip_polygon - # Expand the landmarks outward - expansion_factor = ( - 1 + modules.globals.mask_down_size - ) # Adjust this for more or less expansion + + mask_down_size = getattr(modules.globals, "mask_down_size", 0.1) # Default 0.1 + expansion_factor = 1 + mask_down_size expanded_landmarks = (lower_lip_landmarks - center) * expansion_factor + center - # Extend the top lip part - toplip_indices = [ - 20, - 0, - 1, - 2, - 3, - 4, - 5, - ] # Indices for landmarks 2, 65, 66, 62, 70, 69, 18 - toplip_extension = ( - modules.globals.mask_size * 0.5 - ) # Adjust this factor to control the extension - for idx in toplip_indices: - direction = expanded_landmarks[idx] - center - direction = direction / np.linalg.norm(direction) - expanded_landmarks[idx] += direction * toplip_extension + mask_size = getattr(modules.globals, "mask_size", 1.0) # Default 1.0 + toplip_extension = mask_size * 0.5 - # Extend the bottom part (chin area) - chin_indices = [ - 11, - 12, - 13, - 14, - 15, - 16, - ] # Indices for landmarks 21, 22, 23, 24, 0, 8 - chin_extension = 2 * 0.2 # Adjust this factor to control the extension - for idx in chin_indices: - expanded_landmarks[idx][1] += ( - expanded_landmarks[idx][1] - center[1] - ) * chin_extension + # Define toplip indices relative to lower_lip_order (safer) + toplip_local_indices = [0, 1, 2, 3, 4, 5, 19] # Indices in lower_lip_order for [65, 66, 62, 70, 69, 18, 2] + + for idx in toplip_local_indices: + if idx < len(expanded_landmarks): # Boundary check + direction = expanded_landmarks[idx] - center + norm = np.linalg.norm(direction) + if norm > 1e-6: # Avoid division by zero + direction_normalized = direction / norm + expanded_landmarks[idx] += direction_normalized * toplip_extension + + # Define chin indices relative to lower_lip_order + chin_local_indices = [9, 10, 11, 12, 13, 14] # Indices for [22, 23, 24, 0, 8, 7] + chin_extension = 2 * 0.2 + + for idx in chin_local_indices: + if idx < len(expanded_landmarks): # Boundary check + # Extend vertically based on distance from center y + y_diff = expanded_landmarks[idx][1] - center[1] + expanded_landmarks[idx][1] += y_diff * chin_extension + + + # Ensure landmarks are finite after adjustments + if not np.all(np.isfinite(expanded_landmarks)): + # print("Warning: Non-finite values detected after expanding landmarks.") + return mask, mouth_cutout, mouth_box, lower_lip_polygon - # Convert back to integer coordinates expanded_landmarks = expanded_landmarks.astype(np.int32) - # Calculate bounding box for the expanded lower mouth min_x, min_y = np.min(expanded_landmarks, axis=0) max_x, max_y = np.max(expanded_landmarks, axis=0) - # Add some padding to the bounding box - padding = int((max_x - min_x) * 0.1) # 10% padding - min_x = max(0, min_x - padding) - min_y = max(0, min_y - padding) - max_x = min(frame.shape[1], max_x + padding) - max_y = min(frame.shape[0], max_y + padding) + # Add padding *after* initial min/max calculation + padding_ratio = 0.1 # Percentage padding + padding_x = int((max_x - min_x) * padding_ratio) + padding_y = int((max_y - min_y) * padding_ratio) # Use y-range for y-padding - # Ensure the bounding box dimensions are valid - if max_x <= min_x or max_y <= min_y: - if (max_x - min_x) <= 1: - max_x = min_x + 1 - if (max_y - min_y) <= 1: - max_y = min_y + 1 + # Apply padding and clamp to frame boundaries + frame_h, frame_w = frame.shape[:2] + min_x = max(0, min_x - padding_x) + min_y = max(0, min_y - padding_y) + max_x = min(frame_w, max_x + padding_x) + max_y = min(frame_h, max_y + padding_y) - # Create the mask - mask_roi = np.zeros((max_y - min_y, max_x - min_x), dtype=np.uint8) - cv2.fillPoly(mask_roi, [expanded_landmarks - [min_x, min_y]], 255) - # Apply Gaussian blur to soften the mask edges - mask_roi = cv2.GaussianBlur(mask_roi, (15, 15), 5) + if max_x > min_x and max_y > min_y: + # Create the mask ROI + mask_roi_h = max_y - min_y + mask_roi_w = max_x - min_x + mask_roi = np.zeros((mask_roi_h, mask_roi_w), dtype=np.uint8) - # Place the mask ROI in the full-sized mask - mask[min_y:max_y, min_x:max_x] = mask_roi + # Shift polygon coordinates relative to the ROI's top-left corner + polygon_relative_to_roi = expanded_landmarks - [min_x, min_y] - # Extract the masked area from the frame - mouth_cutout = frame[min_y:max_y, min_x:max_x].copy() + # Draw polygon on the ROI mask + cv2.fillPoly(mask_roi, [polygon_relative_to_roi], 255) - # Return the expanded lower lip polygon in original frame coordinates - lower_lip_polygon = expanded_landmarks + # Apply Gaussian blur (ensure kernel size is odd and positive) + blur_k_size = getattr(modules.globals, "mask_blur_kernel", 15) # Default 15 + blur_k_size = max(1, blur_k_size // 2 * 2 + 1) # Ensure odd + mask_roi = cv2.GaussianBlur(mask_roi, (blur_k_size, blur_k_size), 0) # Sigma=0 calculates from kernel - return mask, mouth_cutout, (min_x, min_y, max_x, max_y), lower_lip_polygon + # Place the mask ROI in the full-sized mask + mask[min_y:max_y, min_x:max_x] = mask_roi + + # Extract the masked area from the *original* frame + mouth_cutout = frame[min_y:max_y, min_x:max_x].copy() + + lower_lip_polygon = expanded_landmarks # Return polygon in original frame coords + mouth_box = (min_x, min_y, max_x, max_y) # Return the calculated box + else: + # print("Warning: Invalid mouth mask bounding box after padding/clamping.") # Optional debug + pass + + except IndexError as idx_e: + # print(f"Warning: Landmark index out of bounds during mouth mask creation: {idx_e}") # Optional debug + pass + except Exception as e: + print(f"Error in create_lower_mouth_mask: {e}") # Print unexpected errors + # import traceback + # traceback.print_exc() + pass + + # Return values, ensuring defaults if errors occurred + return mask, mouth_cutout, mouth_box, lower_lip_polygon def draw_mouth_mask_visualization( frame: Frame, face: Face, mouth_mask_data: tuple ) -> Frame: - landmarks = face.landmark_2d_106 - if landmarks is not None and mouth_mask_data is not None: - mask, mouth_cutout, (min_x, min_y, max_x, max_y), lower_lip_polygon = ( - mouth_mask_data - ) - vis_frame = frame.copy() + # Validate inputs + if frame is None or face is None or mouth_mask_data is None or len(mouth_mask_data) != 4: + return frame # Return original frame if inputs are invalid - # Ensure coordinates are within frame bounds - height, width = vis_frame.shape[:2] - min_x, min_y = max(0, min_x), max(0, min_y) - max_x, max_y = min(width, max_x), min(height, max_y) + mask, mouth_cutout, box, lower_lip_polygon = mouth_mask_data + (min_x, min_y, max_x, max_y) = box - # Adjust mask to match the region size - mask_region = mask[0 : max_y - min_y, 0 : max_x - min_x] + # Check if polygon is valid for drawing + if lower_lip_polygon is None or not isinstance(lower_lip_polygon, np.ndarray) or len(lower_lip_polygon) < 3: + return frame # Cannot draw without a valid polygon - # Remove the color mask overlay - # color_mask = cv2.applyColorMap((mask_region * 255).astype(np.uint8), cv2.COLORMAP_JET) + vis_frame = frame.copy() + height, width = vis_frame.shape[:2] - # Ensure shapes match before blending - vis_region = vis_frame[min_y:max_y, min_x:max_x] - # Remove blending with color_mask - # if vis_region.shape[:2] == color_mask.shape[:2]: - # blended = cv2.addWeighted(vis_region, 0.7, color_mask, 0.3, 0) - # vis_frame[min_y:max_y, min_x:max_x] = blended + # Ensure box coordinates are valid integers within frame bounds + try: + min_x, min_y = max(0, int(min_x)), max(0, int(min_y)) + max_x, max_y = min(width, int(max_x)), min(height, int(max_y)) + except ValueError: + # print("Warning: Invalid coordinates for mask visualization box.") + return frame - # Draw the lower lip polygon - cv2.polylines(vis_frame, [lower_lip_polygon], True, (0, 255, 0), 2) + if max_x <= min_x or max_y <= min_y: + return frame # Invalid box - # Remove the red box - # cv2.rectangle(vis_frame, (min_x, min_y), (max_x, max_y), (0, 0, 255), 2) + # Draw the lower lip polygon (green outline) + try: + # Ensure polygon points are within frame boundaries before drawing + safe_polygon = lower_lip_polygon.copy() + safe_polygon[:, 0] = np.clip(safe_polygon[:, 0], 0, width - 1) + safe_polygon[:, 1] = np.clip(safe_polygon[:, 1], 0, height - 1) + cv2.polylines(vis_frame, [safe_polygon.astype(np.int32)], isClosed=True, color=(0, 255, 0), thickness=2) + except Exception as e: + print(f"Error drawing polygon for visualization: {e}") # Optional debug + pass - # Visualize the feathered mask - feather_amount = max( - 1, - min( - 30, - (max_x - min_x) // modules.globals.mask_feather_ratio, - (max_y - min_y) // modules.globals.mask_feather_ratio, - ), - ) - # Ensure kernel size is odd - kernel_size = 2 * feather_amount + 1 - feathered_mask = cv2.GaussianBlur( - mask_region.astype(float), (kernel_size, kernel_size), 0 - ) - feathered_mask = (feathered_mask / feathered_mask.max() * 255).astype(np.uint8) - # Remove the feathered mask color overlay - # color_feathered_mask = cv2.applyColorMap(feathered_mask, cv2.COLORMAP_VIRIDIS) + # Optional: Draw bounding box (red rectangle) + # cv2.rectangle(vis_frame, (min_x, min_y), (max_x, max_y), (0, 0, 255), 1) - # Ensure shapes match before blending feathered mask - # if vis_region.shape == color_feathered_mask.shape: - # blended_feathered = cv2.addWeighted(vis_region, 0.7, color_feathered_mask, 0.3, 0) - # vis_frame[min_y:max_y, min_x:max_x] = blended_feathered + # Optional: Add labels + label_pos_y = min_y - 10 if min_y > 20 else max_y + 15 # Adjust position based on box location + label_pos_x = min_x + try: + cv2.putText(vis_frame, "Mouth Mask", (label_pos_x, label_pos_y), + cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1, cv2.LINE_AA) + except Exception as e: + # print(f"Error drawing text for visualization: {e}") # Optional debug + pass - # Add labels - cv2.putText( - vis_frame, - "Lower Mouth Mask", - (min_x, min_y - 10), - cv2.FONT_HERSHEY_SIMPLEX, - 0.5, - (255, 255, 255), - 1, - ) - cv2.putText( - vis_frame, - "Feathered Mask", - (min_x, max_y + 20), - cv2.FONT_HERSHEY_SIMPLEX, - 0.5, - (255, 255, 255), - 1, - ) - return vis_frame - return frame + return vis_frame def apply_mouth_area( frame: np.ndarray, mouth_cutout: np.ndarray, mouth_box: tuple, - face_mask: np.ndarray, - mouth_polygon: np.ndarray, + face_mask: np.ndarray, # Full face mask (for blending edges) + mouth_polygon: np.ndarray, # Specific polygon for the mouth area itself ) -> np.ndarray: - min_x, min_y, max_x, max_y = mouth_box - box_width = max_x - min_x - box_height = max_y - min_y - if ( - mouth_cutout is None - or box_width is None - or box_height is None - or face_mask is None - or mouth_polygon is None - ): + # Basic validation + if (frame is None or mouth_cutout is None or mouth_box is None or + face_mask is None or mouth_polygon is None): + # print("Warning: Invalid input (None value) to apply_mouth_area") # Optional debug + return frame + if (mouth_cutout.size == 0 or face_mask.size == 0 or len(mouth_polygon) < 3): + # print("Warning: Invalid input (empty array/polygon) to apply_mouth_area") # Optional debug return frame - try: - resized_mouth_cutout = cv2.resize(mouth_cutout, (box_width, box_height)) + try: # Wrap main logic in try-except + min_x, min_y, max_x, max_y = map(int, mouth_box) # Ensure integer coords + box_width = max_x - min_x + box_height = max_y - min_y + + # Check box validity + if box_width <= 0 or box_height <= 0: + # print("Warning: Invalid mouth box dimensions in apply_mouth_area.") + return frame + + # Define the Region of Interest (ROI) on the target frame (swapped frame) + frame_h, frame_w = frame.shape[:2] + # Clamp coordinates strictly within frame boundaries + min_y, max_y = max(0, min_y), min(frame_h, max_y) + min_x, max_x = max(0, min_x), min(frame_w, max_x) + + # Recalculate box dimensions based on clamped coords + box_width = max_x - min_x + box_height = max_y - min_y + if box_width <= 0 or box_height <= 0: + # print("Warning: ROI became invalid after clamping in apply_mouth_area.") + return frame # ROI is invalid + roi = frame[min_y:max_y, min_x:max_x] - if roi.shape != resized_mouth_cutout.shape: - resized_mouth_cutout = cv2.resize( - resized_mouth_cutout, (roi.shape[1], roi.shape[0]) - ) + # Ensure ROI extraction was successful + if roi.size == 0: + # print("Warning: Extracted ROI is empty in apply_mouth_area.") + return frame - color_corrected_mouth = apply_color_transfer(resized_mouth_cutout, roi) + # Resize mouth cutout from original frame to fit the ROI size + resized_mouth_cutout = None + if roi.shape[:2] != mouth_cutout.shape[:2]: + # Check if mouth_cutout has valid dimensions before resizing + if mouth_cutout.shape[0] > 0 and mouth_cutout.shape[1] > 0: + resized_mouth_cutout = cv2.resize(mouth_cutout, (box_width, box_height), interpolation=cv2.INTER_LINEAR) + else: + # print("Warning: mouth_cutout has invalid dimensions, cannot resize.") + return frame # Cannot proceed without valid cutout + else: + resized_mouth_cutout = mouth_cutout - # Use the provided mouth polygon to create the mask - polygon_mask = np.zeros(roi.shape[:2], dtype=np.uint8) + # If resize failed or original was invalid + if resized_mouth_cutout is None or resized_mouth_cutout.size == 0: + # print("Warning: Mouth cutout is invalid after resize attempt.") + return frame + + # --- Color Correction Step --- + # Apply color transfer from ROI (swapped face region) to the original mouth cutout + # This helps match lighting/color before blending + color_corrected_mouth = resized_mouth_cutout # Default to resized if correction fails + try: + # Ensure both images are 3 channels for color transfer + if len(resized_mouth_cutout.shape) == 3 and resized_mouth_cutout.shape[2] == 3 and \ + len(roi.shape) == 3 and roi.shape[2] == 3: + color_corrected_mouth = apply_color_transfer(resized_mouth_cutout, roi) + else: + # print("Warning: Cannot apply color transfer, images not BGR.") + pass + except cv2.error as ct_e: # Handle potential errors in color transfer + # print(f"Warning: Color transfer failed: {ct_e}. Using uncorrected mouth cutout.") # Optional debug + pass + except Exception as ct_gen_e: + # print(f"Warning: Unexpected error during color transfer: {ct_gen_e}") + pass + # --- End Color Correction --- + + + # --- Mask Creation --- + # Create a mask based *specifically* on the mouth_polygon, relative to the ROI + polygon_mask_roi = np.zeros(roi.shape[:2], dtype=np.uint8) + # Adjust polygon coordinates relative to the ROI's top-left corner adjusted_polygon = mouth_polygon - [min_x, min_y] - cv2.fillPoly(polygon_mask, [adjusted_polygon], 255) + # Draw the filled polygon on the ROI mask + cv2.fillPoly(polygon_mask_roi, [adjusted_polygon.astype(np.int32)], 255) - # Apply feathering to the polygon mask - feather_amount = min( - 30, - box_width // modules.globals.mask_feather_ratio, - box_height // modules.globals.mask_feather_ratio, - ) - feathered_mask = cv2.GaussianBlur( - polygon_mask.astype(float), (0, 0), feather_amount - ) - feathered_mask = feathered_mask / feathered_mask.max() + # Feather the polygon mask (Gaussian blur) + mask_feather_ratio = getattr(modules.globals, "mask_feather_ratio", 12) # Default 12 + # Calculate feather amount based on the smaller dimension of the box + feather_base_dim = min(box_width, box_height) + feather_amount = max(1, min(30, feather_base_dim // max(1, mask_feather_ratio))) # Avoid div by zero + # Ensure kernel size is odd and positive + kernel_size = 2 * feather_amount + 1 + feathered_polygon_mask = cv2.GaussianBlur(polygon_mask_roi.astype(float), (kernel_size, kernel_size), 0) - face_mask_roi = face_mask[min_y:max_y, min_x:max_x] - combined_mask = feathered_mask * (face_mask_roi / 255.0) + # Normalize feathered mask to [0.0, 1.0] range + max_val = feathered_polygon_mask.max() + if max_val > 1e-6: # Avoid division by zero + feathered_polygon_mask = feathered_polygon_mask / max_val + else: + feathered_polygon_mask.fill(0.0) # Mask is all black if max is near zero + # --- End Mask Creation --- - combined_mask = combined_mask[:, :, np.newaxis] - blended = ( - color_corrected_mouth * combined_mask + roi * (1 - combined_mask) - ).astype(np.uint8) - # Apply face mask to blended result - face_mask_3channel = ( - np.repeat(face_mask_roi[:, :, np.newaxis], 3, axis=2) / 255.0 - ) - final_blend = blended * face_mask_3channel + roi * (1 - face_mask_3channel) + # --- Refined Blending --- + # Get the corresponding ROI from the *full face mask* (already blurred) + # Ensure face_mask is float and normalized [0.0, 1.0] + if face_mask.dtype != np.float64 and face_mask.dtype != np.float32: + face_mask_float = face_mask.astype(float) / 255.0 + else: # Assume already float [0,1] if type is float + face_mask_float = face_mask + face_mask_roi = face_mask_float[min_y:max_y, min_x:max_x] + + # Combine the feathered mouth polygon mask with the face mask ROI + # Use minimum to ensure we only affect area inside both masks (mouth area within face) + # This helps blend the edges smoothly with the surrounding swapped face region + combined_mask = np.minimum(feathered_polygon_mask, face_mask_roi) + + # Expand mask to 3 channels for blending (ensure it matches image channels) + if len(frame.shape) == 3 and frame.shape[2] == 3: + combined_mask_3channel = combined_mask[:, :, np.newaxis] + + # Ensure data types are compatible for blending (float or double for mask, uint8 for images) + color_corrected_mouth_uint8 = color_corrected_mouth.astype(np.uint8) + roi_uint8 = roi.astype(np.uint8) + combined_mask_float = combined_mask_3channel.astype(np.float64) # Use float64 for precision in mask + + # Blend: (original_mouth * combined_mask) + (swapped_face_roi * (1 - combined_mask)) + blended_roi = (color_corrected_mouth_uint8 * combined_mask_float + + roi_uint8 * (1.0 - combined_mask_float)) + + # Place the blended ROI back into the frame + frame[min_y:max_y, min_x:max_x] = blended_roi.astype(np.uint8) + else: + # print("Warning: Cannot apply mouth mask blending, frame is not 3-channel BGR.") + pass # Don't modify frame if it's not BGR - frame[min_y:max_y, min_x:max_x] = final_blend.astype(np.uint8) except Exception as e: - pass + print(f"Error applying mouth area: {e}") # Optional debug + # import traceback + # traceback.print_exc() + pass # Don't crash, just return the frame as is return frame def create_face_mask(face: Face, frame: Frame) -> np.ndarray: - mask = np.zeros(frame.shape[:2], dtype=np.uint8) + """Creates a feathered mask covering the whole face area based on landmarks.""" + mask = np.zeros(frame.shape[:2], dtype=np.uint8) # Start with uint8 + + # Validate inputs + if face is None or not hasattr(face, 'landmark_2d_106') or frame is None: + # print("Warning: Invalid face or frame for create_face_mask.") + return mask # Return empty mask + landmarks = face.landmark_2d_106 - if landmarks is not None: - # Convert landmarks to int32 - landmarks = landmarks.astype(np.int32) + if landmarks is None or not isinstance(landmarks, np.ndarray) or landmarks.shape[0] < 106: + # print("Warning: Invalid or insufficient landmarks for face mask.") + return mask # Return empty mask - # Extract facial features - right_side_face = landmarks[0:16] - left_side_face = landmarks[17:32] - right_eye = landmarks[33:42] - right_eye_brow = landmarks[43:51] - left_eye = landmarks[87:96] - left_eye_brow = landmarks[97:105] + try: # Wrap main logic in try-except + # Filter out non-finite landmark values + if not np.all(np.isfinite(landmarks)): + # print("Warning: Non-finite values detected in landmarks for face mask.") + return mask - # Calculate forehead extension - right_eyebrow_top = np.min(right_eye_brow[:, 1]) - left_eyebrow_top = np.min(left_eye_brow[:, 1]) - eyebrow_top = min(right_eyebrow_top, left_eyebrow_top) + landmarks_int = landmarks.astype(np.int32) - face_top = np.min([right_side_face[0, 1], left_side_face[-1, 1]]) - forehead_height = face_top - eyebrow_top - extended_forehead_height = int(forehead_height * 5.0) # Extend by 50% + # Use standard face outline landmarks (0-32) + face_outline_points = landmarks_int[0:33] # Points 0 to 32 cover chin and sides - # Create forehead points - forehead_left = right_side_face[0].copy() - forehead_right = left_side_face[-1].copy() - forehead_left[1] -= extended_forehead_height - forehead_right[1] -= extended_forehead_height - # Combine all points to create the face outline - face_outline = np.vstack( - [ - [forehead_left], - right_side_face, - left_side_face[ - ::-1 - ], # Reverse left side to create a continuous outline - [forehead_right], - ] - ) + # Calculate convex hull of these points + # Use try-except as convexHull can fail on degenerate input + try: + hull = cv2.convexHull(full_face_poly.astype(np.float32)) # Use float for accuracy + if hull is None or len(hull) < 3: + # print("Warning: Convex hull calculation failed or returned too few points.") + # Fallback: use bounding box of landmarks? Or just return empty mask? + return mask - # Calculate padding - padding = int( - np.linalg.norm(right_side_face[0] - left_side_face[-1]) * 0.05 - ) # 5% of face width + # Draw the filled convex hull on the mask + cv2.fillConvexPoly(mask, hull.astype(np.int32), 255) + except Exception as hull_e: + print(f"Error creating convex hull for face mask: {hull_e}") + return mask # Return empty mask on error - # Create a slightly larger convex hull for padding - hull = cv2.convexHull(face_outline) - hull_padded = [] - for point in hull: - x, y = point[0] - center = np.mean(face_outline, axis=0) - direction = np.array([x, y]) - center - direction = direction / np.linalg.norm(direction) - padded_point = np.array([x, y]) + direction * padding - hull_padded.append(padded_point) - hull_padded = np.array(hull_padded, dtype=np.int32) + # Apply Gaussian blur to feather the mask edges + # Kernel size should be reasonably large, odd, and positive + blur_k_size = getattr(modules.globals, "face_mask_blur", 31) # Default 31 + blur_k_size = max(1, blur_k_size // 2 * 2 + 1) # Ensure odd and positive - # Fill the padded convex hull - cv2.fillConvexPoly(mask, hull_padded, 255) + # Use sigma=0 to let OpenCV calculate from kernel size + # Apply blur to the uint8 mask directly + mask = cv2.GaussianBlur(mask, (blur_k_size, blur_k_size), 0) - # Smooth the mask edges - mask = cv2.GaussianBlur(mask, (5, 5), 3) + # --- Optional: Return float mask for apply_mouth_area --- + # mask = mask.astype(float) / 255.0 + # --- - return mask + except IndexError: + # print("Warning: Landmark index out of bounds for face mask.") # Optional debug + pass + except Exception as e: + print(f"Error creating face mask: {e}") # Print unexpected errors + # import traceback + # traceback.print_exc() + pass + + return mask # Return uint8 mask def apply_color_transfer(source, target): """ - Apply color transfer from target to source image + Apply color transfer using LAB color space. Handles potential division by zero and ensures output is uint8. """ - source = cv2.cvtColor(source, cv2.COLOR_BGR2LAB).astype("float32") - target = cv2.cvtColor(target, cv2.COLOR_BGR2LAB).astype("float32") + # Input validation + if source is None or target is None or source.size == 0 or target.size == 0: + # print("Warning: Invalid input to apply_color_transfer.") + return source # Return original source if invalid input - source_mean, source_std = cv2.meanStdDev(source) - target_mean, target_std = cv2.meanStdDev(target) + # Ensure images are 3-channel BGR uint8 + if len(source.shape) != 3 or source.shape[2] != 3 or source.dtype != np.uint8: + # print("Warning: Source image for color transfer is not uint8 BGR.") + # Attempt conversion if possible, otherwise return original + try: + if len(source.shape) == 2: # Grayscale + source = cv2.cvtColor(source, cv2.COLOR_GRAY2BGR) + source = np.clip(source, 0, 255).astype(np.uint8) + if len(source.shape)!= 3 or source.shape[2]!= 3: raise ValueError("Conversion failed") + except Exception: + return source + if len(target.shape) != 3 or target.shape[2] != 3 or target.dtype != np.uint8: + # print("Warning: Target image for color transfer is not uint8 BGR.") + try: + if len(target.shape) == 2: # Grayscale + target = cv2.cvtColor(target, cv2.COLOR_GRAY2BGR) + target = np.clip(target, 0, 255).astype(np.uint8) + if len(target.shape)!= 3 or target.shape[2]!= 3: raise ValueError("Conversion failed") + except Exception: + return source # Return original source if target invalid - # Reshape mean and std to be broadcastable - source_mean = source_mean.reshape(1, 1, 3) - source_std = source_std.reshape(1, 1, 3) - target_mean = target_mean.reshape(1, 1, 3) - target_std = target_std.reshape(1, 1, 3) + result_bgr = source # Default to original source in case of errors - # Perform the color transfer - source = (source - source_mean) * (target_std / source_std) + target_mean + try: + # Convert to float32 [0, 1] range for LAB conversion + source_float = source.astype(np.float32) / 255.0 + target_float = target.astype(np.float32) / 255.0 - return cv2.cvtColor(np.clip(source, 0, 255).astype("uint8"), cv2.COLOR_LAB2BGR) + source_lab = cv2.cvtColor(source_float, cv2.COLOR_BGR2LAB) + target_lab = cv2.cvtColor(target_float, cv2.COLOR_BGR2LAB) + + # Compute statistics + source_mean, source_std = cv2.meanStdDev(source_lab) + target_mean, target_std = cv2.meanStdDev(target_lab) + + # Reshape for broadcasting + source_mean = source_mean.reshape((1, 1, 3)) + source_std = source_std.reshape((1, 1, 3)) + target_mean = target_mean.reshape((1, 1, 3)) + target_std = target_std.reshape((1, 1, 3)) + + # Avoid division by zero or very small std deviations (add epsilon) + epsilon = 1e-6 + source_std = np.maximum(source_std, epsilon) + # target_std = np.maximum(target_std, epsilon) # Target std can be small + + # Perform color transfer in LAB space + result_lab = (source_lab - source_mean) * (target_std / source_std) + target_mean + + # --- No explicit clipping needed in LAB space typically --- + # Clipping is handled implicitly by the conversion back to BGR and then to uint8 + + # Convert back to BGR float [0, 1] + result_bgr_float = cv2.cvtColor(result_lab, cv2.COLOR_LAB2BGR) + + # Clip final BGR values to [0, 1] range before scaling to [0, 255] + result_bgr_float = np.clip(result_bgr_float, 0.0, 1.0) + + # Convert back to uint8 [0, 255] + result_bgr = (result_bgr_float * 255.0).astype("uint8") + + except cv2.error as e: + # print(f"OpenCV error during color transfer: {e}. Returning original source.") # Optional debug + return source # Return original source if conversion fails + except Exception as e: + # print(f"Unexpected color transfer error: {e}. Returning original source.") # Optional debug + # import traceback + # traceback.print_exc() + return source + + return result_bgr \ No newline at end of file diff --git a/modules/run.py b/modules/run.py new file mode 100644 index 0000000..1abdd11 --- /dev/null +++ b/modules/run.py @@ -0,0 +1,9 @@ +#!/usr/bin/env python3 + +# Import the tkinter fix to patch the ScreenChanged error +import tkinter_fix + +import core + +if __name__ == '__main__': + core.run() diff --git a/modules/tkinter_fix.py b/modules/tkinter_fix.py new file mode 100644 index 0000000..38d99d8 --- /dev/null +++ b/modules/tkinter_fix.py @@ -0,0 +1,26 @@ +import tkinter + +# Only needs to be imported once at the beginning of the application +def apply_patch(): + # Create a monkey patch for the internal _tkinter module + original_init = tkinter.Tk.__init__ + + def patched_init(self, *args, **kwargs): + # Call the original init + original_init(self, *args, **kwargs) + + # Define the missing ::tk::ScreenChanged procedure + self.tk.eval(""" + if {[info commands ::tk::ScreenChanged] == ""} { + proc ::tk::ScreenChanged {args} { + # Do nothing + return + } + } + """) + + # Apply the monkey patch + tkinter.Tk.__init__ = patched_init + +# Apply the patch automatically when this module is imported +apply_patch() \ No newline at end of file diff --git a/modules/ui.py b/modules/ui.py index ce599d6..c310cca 100644 --- a/modules/ui.py +++ b/modules/ui.py @@ -27,6 +27,7 @@ from modules.utilities import ( ) from modules.video_capture import VideoCapturer from modules.gettext import LanguageManager +from modules import globals import platform if platform.system() == "Windows": @@ -35,7 +36,7 @@ if platform.system() == "Windows": ROOT = None POPUP = None POPUP_LIVE = None -ROOT_HEIGHT = 700 +ROOT_HEIGHT = 750 ROOT_WIDTH = 600 PREVIEW = None @@ -152,20 +153,20 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C root.protocol("WM_DELETE_WINDOW", lambda: destroy()) source_label = ctk.CTkLabel(root, text=None) - source_label.place(relx=0.1, rely=0.1, relwidth=0.3, relheight=0.25) + source_label.place(relx=0.1, rely=0.05, relwidth=0.275, relheight=0.225) target_label = ctk.CTkLabel(root, text=None) - target_label.place(relx=0.6, rely=0.1, relwidth=0.3, relheight=0.25) + target_label.place(relx=0.6, rely=0.05, relwidth=0.275, relheight=0.225) select_face_button = ctk.CTkButton( root, text=_("Select a face"), cursor="hand2", command=lambda: select_source_path() ) - select_face_button.place(relx=0.1, rely=0.4, relwidth=0.3, relheight=0.1) + select_face_button.place(relx=0.1, rely=0.30, relwidth=0.3, relheight=0.1) swap_faces_button = ctk.CTkButton( root, text="↔", cursor="hand2", command=lambda: swap_faces_paths() ) - swap_faces_button.place(relx=0.45, rely=0.4, relwidth=0.1, relheight=0.1) + swap_faces_button.place(relx=0.45, rely=0.30, relwidth=0.1, relheight=0.1) select_target_button = ctk.CTkButton( root, @@ -173,7 +174,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C cursor="hand2", command=lambda: select_target_path(), ) - select_target_button.place(relx=0.6, rely=0.4, relwidth=0.3, relheight=0.1) + select_target_button.place(relx=0.6, rely=0.30, relwidth=0.3, relheight=0.1) keep_fps_value = ctk.BooleanVar(value=modules.globals.keep_fps) keep_fps_checkbox = ctk.CTkSwitch( @@ -186,7 +187,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C save_switch_states(), ), ) - keep_fps_checkbox.place(relx=0.1, rely=0.6) + keep_fps_checkbox.place(relx=0.1, rely=0.5) keep_frames_value = ctk.BooleanVar(value=modules.globals.keep_frames) keep_frames_switch = ctk.CTkSwitch( @@ -199,7 +200,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C save_switch_states(), ), ) - keep_frames_switch.place(relx=0.1, rely=0.65) + keep_frames_switch.place(relx=0.1, rely=0.55) enhancer_value = ctk.BooleanVar(value=modules.globals.fp_ui["face_enhancer"]) enhancer_switch = ctk.CTkSwitch( @@ -212,7 +213,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C save_switch_states(), ), ) - enhancer_switch.place(relx=0.1, rely=0.7) + enhancer_switch.place(relx=0.1, rely=0.6) keep_audio_value = ctk.BooleanVar(value=modules.globals.keep_audio) keep_audio_switch = ctk.CTkSwitch( @@ -225,7 +226,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C save_switch_states(), ), ) - keep_audio_switch.place(relx=0.6, rely=0.6) + keep_audio_switch.place(relx=0.6, rely=0.5) many_faces_value = ctk.BooleanVar(value=modules.globals.many_faces) many_faces_switch = ctk.CTkSwitch( @@ -238,7 +239,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C save_switch_states(), ), ) - many_faces_switch.place(relx=0.6, rely=0.65) + many_faces_switch.place(relx=0.6, rely=0.55) color_correction_value = ctk.BooleanVar(value=modules.globals.color_correction) color_correction_switch = ctk.CTkSwitch( @@ -251,7 +252,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C save_switch_states(), ), ) - color_correction_switch.place(relx=0.6, rely=0.70) + color_correction_switch.place(relx=0.6, rely=0.6) # nsfw_value = ctk.BooleanVar(value=modules.globals.nsfw_filter) # nsfw_switch = ctk.CTkSwitch(root, text='NSFW filter', variable=nsfw_value, cursor='hand2', command=lambda: setattr(modules.globals, 'nsfw_filter', nsfw_value.get())) @@ -269,7 +270,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C close_mapper_window() if not map_faces.get() else None ), ) - map_faces_switch.place(relx=0.1, rely=0.75) + map_faces_switch.place(relx=0.1, rely=0.65) show_fps_value = ctk.BooleanVar(value=modules.globals.show_fps) show_fps_switch = ctk.CTkSwitch( @@ -282,7 +283,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C save_switch_states(), ), ) - show_fps_switch.place(relx=0.6, rely=0.75) + show_fps_switch.place(relx=0.6, rely=0.65) mouth_mask_var = ctk.BooleanVar(value=modules.globals.mouth_mask) mouth_mask_switch = ctk.CTkSwitch( @@ -292,7 +293,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C cursor="hand2", command=lambda: setattr(modules.globals, "mouth_mask", mouth_mask_var.get()), ) - mouth_mask_switch.place(relx=0.1, rely=0.55) + mouth_mask_switch.place(relx=0.1, rely=0.45) show_mouth_mask_box_var = ctk.BooleanVar(value=modules.globals.show_mouth_mask_box) show_mouth_mask_box_switch = ctk.CTkSwitch( @@ -304,7 +305,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C modules.globals, "show_mouth_mask_box", show_mouth_mask_box_var.get() ), ) - show_mouth_mask_box_switch.place(relx=0.6, rely=0.55) + show_mouth_mask_box_switch.place(relx=0.6, rely=0.45) start_button = ctk.CTkButton( root, text=_("Start"), cursor="hand2", command=lambda: analyze_target(start, root) @@ -365,6 +366,72 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C live_button.place(relx=0.65, rely=0.86, relwidth=0.2, relheight=0.05) # --- End Camera Selection --- + # 1) Define a DoubleVar for transparency (0 = fully transparent, 1 = fully opaque) + transparency_var = ctk.DoubleVar(value=1.0) + + def on_transparency_change(value: float): + # Convert slider value to float + val = float(value) + modules.globals.opacity = val # Set global opacity + percentage = int(val * 100) + + if percentage == 0: + modules.globals.fp_ui["face_enhancer"] = False + update_status("Transparency set to 0% - Face swapping disabled.") + elif percentage == 100: + modules.globals.face_swapper_enabled = True + update_status("Transparency set to 100%.") + else: + modules.globals.face_swapper_enabled = True + update_status(f"Transparency set to {percentage}%") + + # 2) Transparency label and slider (placed ABOVE sharpness) + transparency_label = ctk.CTkLabel(root, text="Transparency:") + transparency_label.place(relx=0.15, rely=0.69, relwidth=0.2, relheight=0.05) + + transparency_slider = ctk.CTkSlider( + root, + from_=0.0, + to=1.0, + variable=transparency_var, + command=on_transparency_change, + fg_color="#E0E0E0", + progress_color="#007BFF", + button_color="#FFFFFF", + button_hover_color="#CCCCCC", + height=5, + border_width=1, + corner_radius=3, + ) + transparency_slider.place(relx=0.35, rely=0.71, relwidth=0.5, relheight=0.02) + + # 3) Sharpness label & slider + sharpness_var = ctk.DoubleVar(value=0.0) # start at 0.0 + def on_sharpness_change(value: float): + modules.globals.sharpness = float(value) + update_status(f"Sharpness set to {value:.1f}") + + sharpness_label = ctk.CTkLabel(root, text="Sharpness:") + sharpness_label.place(relx=0.15, rely=0.74, relwidth=0.2, relheight=0.05) + + sharpness_slider = ctk.CTkSlider( + root, + from_=0.0, + to=5.0, + variable=sharpness_var, + command=on_sharpness_change, + fg_color="#E0E0E0", + progress_color="#007BFF", + button_color="#FFFFFF", + button_hover_color="#CCCCCC", + height=5, + border_width=1, + corner_radius=3, + ) + sharpness_slider.place(relx=0.35, rely=0.76, relwidth=0.5, relheight=0.02) + + # Status and link at the bottom + global status_label status_label = ctk.CTkLabel(root, text=None, justify="center") status_label.place(relx=0.1, rely=0.9, relwidth=0.8) @@ -381,6 +448,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C return root + def close_mapper_window(): global POPUP, POPUP_LIVE if POPUP and POPUP.winfo_exists(): @@ -397,7 +465,7 @@ def analyze_target(start: Callable[[], None], root: ctk.CTk): return if modules.globals.map_faces: - modules.globals.source_target_map = [] + modules.globals.souce_target_map = [] if is_image(modules.globals.target_path): update_status("Getting unique faces") @@ -406,8 +474,8 @@ def analyze_target(start: Callable[[], None], root: ctk.CTk): update_status("Getting unique faces") get_unique_faces_from_target_video() - if len(modules.globals.source_target_map) > 0: - create_source_target_popup(start, root, modules.globals.source_target_map) + if len(modules.globals.souce_target_map) > 0: + create_source_target_popup(start, root, modules.globals.souce_target_map) else: update_status("No faces found in target") else: @@ -429,7 +497,7 @@ def create_source_target_popup( POPUP.destroy() select_output_path(start) else: - update_pop_status("At least 1 source with target is required!") + update_pop_status("Atleast 1 source with target is required!") scrollable_frame = ctk.CTkScrollableFrame( POPUP, width=POPUP_SCROLL_WIDTH, height=POPUP_SCROLL_HEIGHT @@ -489,7 +557,7 @@ def update_popup_source( global source_label_dict source_path = ctk.filedialog.askopenfilename( - title=_("select a source image"), + title=_("select an source image"), initialdir=RECENT_DIRECTORY_SOURCE, filetypes=[img_ft], ) @@ -584,7 +652,7 @@ def select_source_path() -> None: PREVIEW.withdraw() source_path = ctk.filedialog.askopenfilename( - title=_("select a source image"), + title=_("select an source image"), initialdir=RECENT_DIRECTORY_SOURCE, filetypes=[img_ft], ) @@ -627,7 +695,7 @@ def select_target_path() -> None: PREVIEW.withdraw() target_path = ctk.filedialog.askopenfilename( - title=_("select a target image or video"), + title=_("select an target image or video"), initialdir=RECENT_DIRECTORY_TARGET, filetypes=[img_ft, vid_ft], ) @@ -696,21 +764,17 @@ def check_and_ignore_nsfw(target, destroy: Callable = None) -> bool: def fit_image_to_size(image, width: int, height: int): - if width is None or height is None or width <= 0 or height <= 0: + if width is None and height is None: return image h, w, _ = image.shape ratio_h = 0.0 ratio_w = 0.0 - ratio_w = width / w - ratio_h = height / h - # Use the smaller ratio to ensure the image fits within the given dimensions - ratio = min(ratio_w, ratio_h) - - # Compute new dimensions, ensuring they're at least 1 pixel - new_width = max(1, int(ratio * w)) - new_height = max(1, int(ratio * h)) - new_size = (new_width, new_height) - + if width > height: + ratio_h = height / h + else: + ratio_w = width / w + ratio = max(ratio_w, ratio_h) + new_size = (int(ratio * w), int(ratio * h)) return cv2.resize(image, dsize=new_size) @@ -791,9 +855,9 @@ def webcam_preview(root: ctk.CTk, camera_index: int): return create_webcam_preview(camera_index) else: - modules.globals.source_target_map = [] + modules.globals.souce_target_map = [] create_source_target_popup_for_webcam( - root, modules.globals.source_target_map, camera_index + root, modules.globals.souce_target_map, camera_index ) @@ -1108,7 +1172,7 @@ def update_webcam_source( global source_label_dict_live source_path = ctk.filedialog.askopenfilename( - title=_("select a source image"), + title=_("select an source image"), initialdir=RECENT_DIRECTORY_SOURCE, filetypes=[img_ft], ) @@ -1160,7 +1224,7 @@ def update_webcam_target( global target_label_dict_live target_path = ctk.filedialog.askopenfilename( - title=_("select a target image"), + title=_("select an target image"), initialdir=RECENT_DIRECTORY_SOURCE, filetypes=[img_ft], ) @@ -1203,4 +1267,4 @@ def update_webcam_target( target_label_dict_live[button_num] = target_image else: update_pop_live_status("Face could not be detected in last upload!") - return map + return map \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 9f3c8c0..9c0e947 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,3 +19,5 @@ onnxruntime-gpu==1.22.0; sys_platform != 'darwin' tensorflow; sys_platform != 'darwin' opennsfw2==0.10.2 protobuf==4.25.1 +git+https://github.com/xinntao/BasicSR.git@master +git+https://github.com/TencentARC/GFPGAN.git@master \ No newline at end of file diff --git a/run.py b/run.py index 31bc6da..c834ef5 100644 --- a/run.py +++ b/run.py @@ -1,5 +1,8 @@ #!/usr/bin/env python3 +# Import the tkinter fix to patch the ScreenChanged error +import tkinter_fix + from modules import core if __name__ == '__main__': diff --git a/tkinter_fix.py b/tkinter_fix.py new file mode 100644 index 0000000..38d99d8 --- /dev/null +++ b/tkinter_fix.py @@ -0,0 +1,26 @@ +import tkinter + +# Only needs to be imported once at the beginning of the application +def apply_patch(): + # Create a monkey patch for the internal _tkinter module + original_init = tkinter.Tk.__init__ + + def patched_init(self, *args, **kwargs): + # Call the original init + original_init(self, *args, **kwargs) + + # Define the missing ::tk::ScreenChanged procedure + self.tk.eval(""" + if {[info commands ::tk::ScreenChanged] == ""} { + proc ::tk::ScreenChanged {args} { + # Do nothing + return + } + } + """) + + # Apply the monkey patch + tkinter.Tk.__init__ = patched_init + +# Apply the patch automatically when this module is imported +apply_patch() \ No newline at end of file