From ce86d091db8ae24bb0bfade1eb82e894c1250386 Mon Sep 17 00:00:00 2001 From: torzdf <36920800+torzdf@users.noreply.github.com> Date: Fri, 30 Jun 2023 02:45:19 +0100 Subject: [PATCH] bugfix: git communication fixes --- docs/full/lib/git.rst | 10 +++ lib/git.py | 157 +++++++++++++++++++++++++++++++++ lib/gui/menu.py | 197 ++++++++++++++++-------------------------- lib/sysinfo.py | 23 ++--- 4 files changed, 247 insertions(+), 140 deletions(-) create mode 100644 docs/full/lib/git.rst create mode 100644 lib/git.py diff --git a/docs/full/lib/git.rst b/docs/full/lib/git.rst new file mode 100644 index 0000000..3f8d8de --- /dev/null +++ b/docs/full/lib/git.rst @@ -0,0 +1,10 @@ +********** +git module +********** + +Handles interfacing with the git executable + +.. automodule:: lib.git + :members: + :undoc-members: + :show-inheritance: diff --git a/lib/git.py b/lib/git.py new file mode 100644 index 0000000..90cba0a --- /dev/null +++ b/lib/git.py @@ -0,0 +1,157 @@ +#!/usr/bin python3 +""" Handles command line calls to git """ +import logging +import os +import sys + +from subprocess import PIPE, Popen + +logger = logging.getLogger(__name__) + + +class Git(): + """ Handles calls to github """ + def __init__(self) -> None: + logger.debug("Initializing: %s", self.__class__.__name__) + self._working_dir = os.path.dirname(os.path.realpath(sys.argv[0])) + self._available = self._check_available() + logger.debug("Initialized: %s", self.__class__.__name__) + + def _from_git(self, command: str) -> tuple[bool, list[str]]: + """ Execute a git command + + Parameters + ---------- + command : str + The command to send to git + + Returns + ------- + success: bool + ``True`` if the command succesfully executed otherwise ``False`` + list[str] + The output lines from stdout if there was no error, otherwise from stderr + """ + logger.debug("command: '%s'", command) + cmd = f"git {command}" + with Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE, cwd=self._working_dir) as proc: + stdout, stderr = proc.communicate() + retcode = proc.returncode + success = retcode == 0 + lines = stdout.decode("utf-8", errors="replace").splitlines() + if not lines: + lines = stderr.decode("utf-8", errors="replace").splitlines() + logger.debug("command: '%s', returncode: %s, success: %s, lines: %s", + cmd, retcode, success, lines) + return success, lines + + def _check_available(self) -> bool: + """ Check if git is available. Does a call to git status. If the process errors due to + folder ownership, attempts to add the folder to github safe folders list and tries + again + + Returns + ------- + bool + ``True`` if git is available otherwise ``False`` + + """ + success, msg = self._from_git("status") + if success: + return True + config = next((line.strip() for line in msg if "add safe.directory" in line), None) + if not config: + return False + success, _ = self._from_git(config.split("git ", 1)[-1]) + return True + + @property + def status(self) -> list[str]: + """ Obtain the output of git status for tracked files only """ + if not self._available: + return [] + success, status = self._from_git("status -uno") + if not success or not status: + return [] + return status + + @property + def branch(self) -> str: + """ str: The git branch that is currently being used to execute Faceswap. """ + status = next((line.strip() for line in self.status if "On branch" in line), "Not Found") + return status.replace("On branch ", "") + + @property + def branches(self) -> list[str]: + """ list[str]: List of all available branches. """ + if not self._available: + return [] + success, branches = self._from_git("branch -a") + if not success or not branches: + return [] + return branches + + def update_remote(self) -> bool: + """ Update all branches to track remote + + Returns + ------- + bool + ``True`` if update was succesful otherwise ``False`` + """ + if not self._available: + return False + return self._from_git("remote update")[0] + + def pull(self) -> bool: + """ Pull the current branch + + Returns + ------- + bool + ``True`` if pull is successful otherwise ``False`` + """ + if not self._available: + return False + return self._from_git("pull")[0] + + def checkout(self, branch: str) -> bool: + """ Checkout the requested branch + + Parameters + ---------- + branch : str + The branch to checkout + + Returns + ------- + bool + ``True`` if the branch was succesfully checkout out otherwise ``False`` + """ + if not self._available: + return False + return self._from_git(f"checkout {branch}")[0] + + def get_commits(self, count: int) -> list[str]: + """ Obtain the last commits to the repo + + Parameters + ---------- + count : int + The last number of commits to obtain + + Returns + ------- + list[str] + list of commits, or empty list if none found + """ + if not self._available: + return [] + success, commits = self._from_git(f"log --pretty=oneline --abbrev-commit -n {count}") + if not success or not commits: + return [] + return commits + + +git = Git() +""" :class:`Git`: Handles calls to github """ diff --git a/lib/gui/menu.py b/lib/gui/menu.py index 694655b..2a3026f 100644 --- a/lib/gui/menu.py +++ b/lib/gui/menu.py @@ -2,17 +2,14 @@ """ The Menu Bars for faceswap GUI """ from __future__ import annotations import gettext -import locale import logging import os -import sys import tkinter as tk import typing as T from tkinter import ttk import webbrowser -from subprocess import Popen, PIPE, STDOUT - +from lib.git import git from lib.multithreading import MultiThread from lib.serializer import get_serializer, Serializer from lib.utils import FaceswapError @@ -31,8 +28,6 @@ logger = logging.getLogger(__name__) # pylint: disable=invalid-name _LANG = gettext.translation("gui.menu", localedir="locales", fallback=True) _ = _LANG.gettext -_WORKING_DIR = os.path.dirname(os.path.realpath(sys.argv[0])) - _RESOURCES: list[tuple[str, str]] = [ (_("faceswap.dev - Guides and Forum"), "https://www.faceswap.dev"), (_("Patreon - Support this project"), "https://www.patreon.com/faceswap"), @@ -274,13 +269,40 @@ class HelpMenu(tk.Menu): # pylint:disable=too-many-ancestors self.root.config(cursor="") @classmethod - def _check_for_updates(cls, encoding: str, check: bool = False) -> bool: + def _process_status_output(cls, status: list[str]) -> bool: + """ Process the output of a git status call and output information + + Parameters + ---------- + status : list[str] + The lines returned from a git status call + + Returns + ------- + bool + ``True`` if the repo can be updated otherwise ``False`` + """ + for line in status: + if line.lower().startswith("your branch is ahead"): + logger.warning("Your branch is ahead of the remote repo. Not updating") + return False + if line.lower().startswith("your branch is up to date"): + logger.info("Faceswap is up to date.") + return False + if "have diverged" in line.lower(): + logger.warning("Your branch has diverged from the remote repo. Not updating") + return False + if line.lower().startswith("your branch is behind"): + return True + + logger.warning("Unable to retrieve status of branch") + return False + + def _check_for_updates(self, check: bool = False) -> bool: """ Check whether an update is required Parameters ---------- - encoding: str - The encoding to use for decoding process returns check: bool ``True`` if we are just checking for updates ``False`` if a check and update is to be performed. Default: ``False`` @@ -292,93 +314,52 @@ class HelpMenu(tk.Menu): # pylint:disable=too-many-ancestors """ # Do the check logger.info("Checking for updates...") - update = False - msg = "" - gitcmd = "git remote update && git status -uno" - with Popen(gitcmd, shell=True, stdout=PIPE, stderr=STDOUT, cwd=_WORKING_DIR) as cmd: - stdout, _ = cmd.communicate() - retcode = cmd.poll() - if retcode != 0: - msg = ("Git is not installed or you are not running a cloned repo. " - "Unable to check for updates") - else: - chk = stdout.decode(encoding, errors="replace").splitlines() - for line in chk: - if line.lower().startswith("your branch is ahead"): - msg = "Your branch is ahead of the remote repo. Not updating" - break - if line.lower().startswith("your branch is up to date"): - msg = "Faceswap is up to date." - break - if line.lower().startswith("your branch is behind"): - msg = "There are updates available" - update = True - break - if "have diverged" in line.lower(): - msg = "Your branch has diverged from the remote repo. Not updating" - break - if not update or check: - logger.info(msg) - logger.debug("Checked for update. Update required: %s", update) - return update + msg = ("Git is not installed or you are not running a cloned repo. " + "Unable to check for updates") + + sync = git.update_remote() + if not sync: + logger.warning(msg) + return False + + status = git.status + if not status: + logger.warning(msg) + return False + + retval = self._process_status_output(status) + if retval and check: + logger.info("There are updates available") + return retval def _check(self) -> None: """ Check for updates and clone repository """ logger.debug("Checking for updates...") self.root.config(cursor="watch") - encoding = locale.getpreferredencoding() - logger.debug("Encoding: %s", encoding) - self._check_for_updates(encoding, check=True) + self._check_for_updates(check=True) self.root.config(cursor="") - @classmethod - def _do_update(cls, encoding: str) -> bool: + def _do_update(self) -> bool: """ Update Faceswap - Parameters - ---------- - encoding: str - The encoding to use for decoding process returns - Returns ------- bool ``True`` if update was successful """ logger.info("A new version is available. Updating...") - gitcmd = "git pull" - with Popen(gitcmd, - shell=True, - stdout=PIPE, - stderr=STDOUT, - bufsize=1, - cwd=_WORKING_DIR) as cmd: - while True: - out = cmd.stdout - output = "" if out is None else out.readline().decode(encoding, errors="replace") - if output == "" and cmd.poll() is not None: - break - if output: - logger.debug("'%s' output: '%s'", gitcmd, output.strip()) - print(output.strip()) - retcode = cmd.poll() - logger.debug("'%s' returncode: %s", gitcmd, retcode) - if retcode != 0: - logger.info("An error occurred during update. return code: %s", retcode) - retval = False - else: - retval = True - return retval + success = git.pull() + if not success: + logger.info("An error occurred during update") + return success def _update(self) -> None: """ Check for updates and clone repository """ logger.debug("Updating Faceswap...") self.root.config(cursor="watch") - encoding = locale.getpreferredencoding() - logger.debug("Encoding: %s", encoding) success = False - if self._check_for_updates(encoding): - success = self._do_update(encoding) + if self._check_for_updates(): + success = self._do_update() update_deps.main(is_gui=True) if success: logger.info("Please restart Faceswap to complete the update.") @@ -416,11 +397,11 @@ class HelpMenu(tk.Menu): # pylint:disable=too-many-ancestors bool ``True`` if menu was successfully built otherwise ``False`` """ - stdout = self._get_branches() - if stdout is None: + branches = git.branches + if not branches: return False - branches = self._filter_branches(stdout) + branches = self._filter_branches(branches) if not branches: return False @@ -431,36 +412,13 @@ class HelpMenu(tk.Menu): # pylint:disable=too-many-ancestors return True @classmethod - def _get_branches(cls) -> str | None: - """ Get the available github branches - - Returns - ------- - str or ``None`` - The list of branches available. If no branches were found or there was an - error then `None` is returned - """ - gitcmd = "git branch -a" - with Popen(gitcmd, shell=True, stdout=PIPE, stderr=STDOUT, cwd=_WORKING_DIR) as cmd: - stdout, _ = cmd.communicate() - retcode = cmd.poll() - if retcode != 0: - logger.debug("Unable to list git branches. return code: %s, message: %s", - retcode, - stdout.decode(locale.getpreferredencoding(), - errors="replace").strip().replace("\n", " - ")) - return None - return stdout.decode(locale.getpreferredencoding(), errors="replace") - - @classmethod - def _filter_branches(cls, stdout: str) -> list[str]: - """ Filter the branches, remove duplicates and the current branch and return a sorted - list. + def _filter_branches(cls, branches: list[str]) -> list[str]: + """ Filter the branches, remove any non-local branches Parameters ---------- - stdout: str - The output from the git branch query converted to a string + branches: list[str] + list of available git branches Returns ------- @@ -468,20 +426,22 @@ class HelpMenu(tk.Menu): # pylint:disable=too-many-ancestors Unique list of available branches sorted in alphabetical order """ current = None - branches = set() - for line in stdout.splitlines(): - branch = line[line.rfind("/") + 1:] if "/" in line else line.strip() + unique = set() + for line in branches: + branch = line.strip() + if branch.startswith("remotes"): + continue if branch.startswith("*"): branch = branch.replace("*", "").strip() current = branch continue - branches.add(branch) - logger.debug("Found branches: %s", branches) - if current in branches: + unique.add(branch) + logger.debug("Found branches: %s", unique) + if current in unique: logger.debug("Removing current branch from output: %s", current) - branches.remove(current) + unique.remove(current) - retval = sorted(list(branches), key=str.casefold) + retval = sorted(list(unique), key=str.casefold) logger.debug("Final branches: %s", retval) return retval @@ -495,15 +455,8 @@ class HelpMenu(tk.Menu): # pylint:disable=too-many-ancestors The branch to switch to """ logger.info("Switching branch to '%s'...", branch) - gitcmd = f"git checkout {branch}" - with Popen(gitcmd, shell=True, stdout=PIPE, stderr=STDOUT, cwd=_WORKING_DIR) as cmd: - stdout, _ = cmd.communicate() - retcode = cmd.poll() - if retcode != 0: - logger.error("Unable to switch branch. return code: %s, message: %s", - retcode, - stdout.decode(T.cast(str, locale.getdefaultlocale()), - errors="replace").strip().replace("\n", " - ")) + if not git.checkout(branch): + logger.error("Unable to switch branch to '%s'", branch) return logger.info("Succesfully switched to '%s'. You may want to check for updates to make sure " "that you have the latest code.", branch) diff --git a/lib/sysinfo.py b/lib/sysinfo.py index 6d40d17..fa0e2f9 100644 --- a/lib/sysinfo.py +++ b/lib/sysinfo.py @@ -11,6 +11,7 @@ from subprocess import PIPE, Popen import psutil +from lib.git import git from lib.gpu_stats import GPUStats, GPUInfo from lib.utils import get_backend from setup import CudaCheck @@ -125,27 +126,13 @@ class _SysInfo(): # pylint:disable=too-few-public-methods version = stdout.decode(self._encoding, errors="replace").splitlines() return "\n".join(version) - @property - def _git_branch(self) -> str: - """ str: The git branch that is currently being used to execute Faceswap. """ - with Popen("git status", shell=True, stdout=PIPE, stderr=PIPE) as git: - stdout, stderr = git.communicate() - if stderr: - return "Not Found" - branch = stdout.decode(self._encoding, - errors="replace").splitlines()[0].replace("On branch ", "") - return branch - @property def _git_commits(self) -> str: """ str: The last 5 git commits for the currently running Faceswap. """ - with Popen("git log --pretty=oneline --abbrev-commit -n 5", - shell=True, stdout=PIPE, stderr=PIPE) as git: - stdout, stderr = git.communicate() - if stderr: + commits = git.get_commits(3) + if not commits: return "Not Found" - commits = stdout.decode(self._encoding, errors="replace").splitlines() - return ". ".join(commits) + return " | ".join(commits) @property def _cuda_version(self) -> str: @@ -210,7 +197,7 @@ class _SysInfo(): # pylint:disable=too-few-public-methods "sys_processor": self._system["processor"], "sys_ram": self._format_ram(), "encoding": self._encoding, - "git_branch": self._git_branch, + "git_branch": git.branch, "git_commits": self._git_commits, "gpu_cuda": self._cuda_version, "gpu_cudnn": self._cudnn_version,