bugfix: git communication fixes

This commit is contained in:
torzdf 2023-06-30 02:45:19 +01:00
parent b870c7d9bb
commit ce86d091db
4 changed files with 247 additions and 140 deletions

10
docs/full/lib/git.rst Normal file
View File

@ -0,0 +1,10 @@
**********
git module
**********
Handles interfacing with the git executable
.. automodule:: lib.git
:members:
:undoc-members:
:show-inheritance:

157
lib/git.py Normal file
View File

@ -0,0 +1,157 @@
#!/usr/bin python3
""" Handles command line calls to git """
import logging
import os
import sys
from subprocess import PIPE, Popen
logger = logging.getLogger(__name__)
class Git():
""" Handles calls to github """
def __init__(self) -> None:
logger.debug("Initializing: %s", self.__class__.__name__)
self._working_dir = os.path.dirname(os.path.realpath(sys.argv[0]))
self._available = self._check_available()
logger.debug("Initialized: %s", self.__class__.__name__)
def _from_git(self, command: str) -> tuple[bool, list[str]]:
""" Execute a git command
Parameters
----------
command : str
The command to send to git
Returns
-------
success: bool
``True`` if the command succesfully executed otherwise ``False``
list[str]
The output lines from stdout if there was no error, otherwise from stderr
"""
logger.debug("command: '%s'", command)
cmd = f"git {command}"
with Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE, cwd=self._working_dir) as proc:
stdout, stderr = proc.communicate()
retcode = proc.returncode
success = retcode == 0
lines = stdout.decode("utf-8", errors="replace").splitlines()
if not lines:
lines = stderr.decode("utf-8", errors="replace").splitlines()
logger.debug("command: '%s', returncode: %s, success: %s, lines: %s",
cmd, retcode, success, lines)
return success, lines
def _check_available(self) -> bool:
""" Check if git is available. Does a call to git status. If the process errors due to
folder ownership, attempts to add the folder to github safe folders list and tries
again
Returns
-------
bool
``True`` if git is available otherwise ``False``
"""
success, msg = self._from_git("status")
if success:
return True
config = next((line.strip() for line in msg if "add safe.directory" in line), None)
if not config:
return False
success, _ = self._from_git(config.split("git ", 1)[-1])
return True
@property
def status(self) -> list[str]:
""" Obtain the output of git status for tracked files only """
if not self._available:
return []
success, status = self._from_git("status -uno")
if not success or not status:
return []
return status
@property
def branch(self) -> str:
""" str: The git branch that is currently being used to execute Faceswap. """
status = next((line.strip() for line in self.status if "On branch" in line), "Not Found")
return status.replace("On branch ", "")
@property
def branches(self) -> list[str]:
""" list[str]: List of all available branches. """
if not self._available:
return []
success, branches = self._from_git("branch -a")
if not success or not branches:
return []
return branches
def update_remote(self) -> bool:
""" Update all branches to track remote
Returns
-------
bool
``True`` if update was succesful otherwise ``False``
"""
if not self._available:
return False
return self._from_git("remote update")[0]
def pull(self) -> bool:
""" Pull the current branch
Returns
-------
bool
``True`` if pull is successful otherwise ``False``
"""
if not self._available:
return False
return self._from_git("pull")[0]
def checkout(self, branch: str) -> bool:
""" Checkout the requested branch
Parameters
----------
branch : str
The branch to checkout
Returns
-------
bool
``True`` if the branch was succesfully checkout out otherwise ``False``
"""
if not self._available:
return False
return self._from_git(f"checkout {branch}")[0]
def get_commits(self, count: int) -> list[str]:
""" Obtain the last commits to the repo
Parameters
----------
count : int
The last number of commits to obtain
Returns
-------
list[str]
list of commits, or empty list if none found
"""
if not self._available:
return []
success, commits = self._from_git(f"log --pretty=oneline --abbrev-commit -n {count}")
if not success or not commits:
return []
return commits
git = Git()
""" :class:`Git`: Handles calls to github """

View File

@ -2,17 +2,14 @@
""" The Menu Bars for faceswap GUI """ """ The Menu Bars for faceswap GUI """
from __future__ import annotations from __future__ import annotations
import gettext import gettext
import locale
import logging import logging
import os import os
import sys
import tkinter as tk import tkinter as tk
import typing as T import typing as T
from tkinter import ttk from tkinter import ttk
import webbrowser import webbrowser
from subprocess import Popen, PIPE, STDOUT from lib.git import git
from lib.multithreading import MultiThread from lib.multithreading import MultiThread
from lib.serializer import get_serializer, Serializer from lib.serializer import get_serializer, Serializer
from lib.utils import FaceswapError from lib.utils import FaceswapError
@ -31,8 +28,6 @@ logger = logging.getLogger(__name__) # pylint: disable=invalid-name
_LANG = gettext.translation("gui.menu", localedir="locales", fallback=True) _LANG = gettext.translation("gui.menu", localedir="locales", fallback=True)
_ = _LANG.gettext _ = _LANG.gettext
_WORKING_DIR = os.path.dirname(os.path.realpath(sys.argv[0]))
_RESOURCES: list[tuple[str, str]] = [ _RESOURCES: list[tuple[str, str]] = [
(_("faceswap.dev - Guides and Forum"), "https://www.faceswap.dev"), (_("faceswap.dev - Guides and Forum"), "https://www.faceswap.dev"),
(_("Patreon - Support this project"), "https://www.patreon.com/faceswap"), (_("Patreon - Support this project"), "https://www.patreon.com/faceswap"),
@ -274,13 +269,40 @@ class HelpMenu(tk.Menu): # pylint:disable=too-many-ancestors
self.root.config(cursor="") self.root.config(cursor="")
@classmethod @classmethod
def _check_for_updates(cls, encoding: str, check: bool = False) -> bool: def _process_status_output(cls, status: list[str]) -> bool:
""" Process the output of a git status call and output information
Parameters
----------
status : list[str]
The lines returned from a git status call
Returns
-------
bool
``True`` if the repo can be updated otherwise ``False``
"""
for line in status:
if line.lower().startswith("your branch is ahead"):
logger.warning("Your branch is ahead of the remote repo. Not updating")
return False
if line.lower().startswith("your branch is up to date"):
logger.info("Faceswap is up to date.")
return False
if "have diverged" in line.lower():
logger.warning("Your branch has diverged from the remote repo. Not updating")
return False
if line.lower().startswith("your branch is behind"):
return True
logger.warning("Unable to retrieve status of branch")
return False
def _check_for_updates(self, check: bool = False) -> bool:
""" Check whether an update is required """ Check whether an update is required
Parameters Parameters
---------- ----------
encoding: str
The encoding to use for decoding process returns
check: bool check: bool
``True`` if we are just checking for updates ``False`` if a check and update is to be ``True`` if we are just checking for updates ``False`` if a check and update is to be
performed. Default: ``False`` performed. Default: ``False``
@ -292,93 +314,52 @@ class HelpMenu(tk.Menu): # pylint:disable=too-many-ancestors
""" """
# Do the check # Do the check
logger.info("Checking for updates...") logger.info("Checking for updates...")
update = False msg = ("Git is not installed or you are not running a cloned repo. "
msg = "" "Unable to check for updates")
gitcmd = "git remote update && git status -uno"
with Popen(gitcmd, shell=True, stdout=PIPE, stderr=STDOUT, cwd=_WORKING_DIR) as cmd: sync = git.update_remote()
stdout, _ = cmd.communicate() if not sync:
retcode = cmd.poll() logger.warning(msg)
if retcode != 0: return False
msg = ("Git is not installed or you are not running a cloned repo. "
"Unable to check for updates") status = git.status
else: if not status:
chk = stdout.decode(encoding, errors="replace").splitlines() logger.warning(msg)
for line in chk: return False
if line.lower().startswith("your branch is ahead"):
msg = "Your branch is ahead of the remote repo. Not updating" retval = self._process_status_output(status)
break if retval and check:
if line.lower().startswith("your branch is up to date"): logger.info("There are updates available")
msg = "Faceswap is up to date." return retval
break
if line.lower().startswith("your branch is behind"):
msg = "There are updates available"
update = True
break
if "have diverged" in line.lower():
msg = "Your branch has diverged from the remote repo. Not updating"
break
if not update or check:
logger.info(msg)
logger.debug("Checked for update. Update required: %s", update)
return update
def _check(self) -> None: def _check(self) -> None:
""" Check for updates and clone repository """ """ Check for updates and clone repository """
logger.debug("Checking for updates...") logger.debug("Checking for updates...")
self.root.config(cursor="watch") self.root.config(cursor="watch")
encoding = locale.getpreferredencoding() self._check_for_updates(check=True)
logger.debug("Encoding: %s", encoding)
self._check_for_updates(encoding, check=True)
self.root.config(cursor="") self.root.config(cursor="")
@classmethod def _do_update(self) -> bool:
def _do_update(cls, encoding: str) -> bool:
""" Update Faceswap """ Update Faceswap
Parameters
----------
encoding: str
The encoding to use for decoding process returns
Returns Returns
------- -------
bool bool
``True`` if update was successful ``True`` if update was successful
""" """
logger.info("A new version is available. Updating...") logger.info("A new version is available. Updating...")
gitcmd = "git pull" success = git.pull()
with Popen(gitcmd, if not success:
shell=True, logger.info("An error occurred during update")
stdout=PIPE, return success
stderr=STDOUT,
bufsize=1,
cwd=_WORKING_DIR) as cmd:
while True:
out = cmd.stdout
output = "" if out is None else out.readline().decode(encoding, errors="replace")
if output == "" and cmd.poll() is not None:
break
if output:
logger.debug("'%s' output: '%s'", gitcmd, output.strip())
print(output.strip())
retcode = cmd.poll()
logger.debug("'%s' returncode: %s", gitcmd, retcode)
if retcode != 0:
logger.info("An error occurred during update. return code: %s", retcode)
retval = False
else:
retval = True
return retval
def _update(self) -> None: def _update(self) -> None:
""" Check for updates and clone repository """ """ Check for updates and clone repository """
logger.debug("Updating Faceswap...") logger.debug("Updating Faceswap...")
self.root.config(cursor="watch") self.root.config(cursor="watch")
encoding = locale.getpreferredencoding()
logger.debug("Encoding: %s", encoding)
success = False success = False
if self._check_for_updates(encoding): if self._check_for_updates():
success = self._do_update(encoding) success = self._do_update()
update_deps.main(is_gui=True) update_deps.main(is_gui=True)
if success: if success:
logger.info("Please restart Faceswap to complete the update.") logger.info("Please restart Faceswap to complete the update.")
@ -416,11 +397,11 @@ class HelpMenu(tk.Menu): # pylint:disable=too-many-ancestors
bool bool
``True`` if menu was successfully built otherwise ``False`` ``True`` if menu was successfully built otherwise ``False``
""" """
stdout = self._get_branches() branches = git.branches
if stdout is None: if not branches:
return False return False
branches = self._filter_branches(stdout) branches = self._filter_branches(branches)
if not branches: if not branches:
return False return False
@ -431,36 +412,13 @@ class HelpMenu(tk.Menu): # pylint:disable=too-many-ancestors
return True return True
@classmethod @classmethod
def _get_branches(cls) -> str | None: def _filter_branches(cls, branches: list[str]) -> list[str]:
""" Get the available github branches """ Filter the branches, remove any non-local branches
Returns
-------
str or ``None``
The list of branches available. If no branches were found or there was an
error then `None` is returned
"""
gitcmd = "git branch -a"
with Popen(gitcmd, shell=True, stdout=PIPE, stderr=STDOUT, cwd=_WORKING_DIR) as cmd:
stdout, _ = cmd.communicate()
retcode = cmd.poll()
if retcode != 0:
logger.debug("Unable to list git branches. return code: %s, message: %s",
retcode,
stdout.decode(locale.getpreferredencoding(),
errors="replace").strip().replace("\n", " - "))
return None
return stdout.decode(locale.getpreferredencoding(), errors="replace")
@classmethod
def _filter_branches(cls, stdout: str) -> list[str]:
""" Filter the branches, remove duplicates and the current branch and return a sorted
list.
Parameters Parameters
---------- ----------
stdout: str branches: list[str]
The output from the git branch query converted to a string list of available git branches
Returns Returns
------- -------
@ -468,20 +426,22 @@ class HelpMenu(tk.Menu): # pylint:disable=too-many-ancestors
Unique list of available branches sorted in alphabetical order Unique list of available branches sorted in alphabetical order
""" """
current = None current = None
branches = set() unique = set()
for line in stdout.splitlines(): for line in branches:
branch = line[line.rfind("/") + 1:] if "/" in line else line.strip() branch = line.strip()
if branch.startswith("remotes"):
continue
if branch.startswith("*"): if branch.startswith("*"):
branch = branch.replace("*", "").strip() branch = branch.replace("*", "").strip()
current = branch current = branch
continue continue
branches.add(branch) unique.add(branch)
logger.debug("Found branches: %s", branches) logger.debug("Found branches: %s", unique)
if current in branches: if current in unique:
logger.debug("Removing current branch from output: %s", current) logger.debug("Removing current branch from output: %s", current)
branches.remove(current) unique.remove(current)
retval = sorted(list(branches), key=str.casefold) retval = sorted(list(unique), key=str.casefold)
logger.debug("Final branches: %s", retval) logger.debug("Final branches: %s", retval)
return retval return retval
@ -495,15 +455,8 @@ class HelpMenu(tk.Menu): # pylint:disable=too-many-ancestors
The branch to switch to The branch to switch to
""" """
logger.info("Switching branch to '%s'...", branch) logger.info("Switching branch to '%s'...", branch)
gitcmd = f"git checkout {branch}" if not git.checkout(branch):
with Popen(gitcmd, shell=True, stdout=PIPE, stderr=STDOUT, cwd=_WORKING_DIR) as cmd: logger.error("Unable to switch branch to '%s'", branch)
stdout, _ = cmd.communicate()
retcode = cmd.poll()
if retcode != 0:
logger.error("Unable to switch branch. return code: %s, message: %s",
retcode,
stdout.decode(T.cast(str, locale.getdefaultlocale()),
errors="replace").strip().replace("\n", " - "))
return return
logger.info("Succesfully switched to '%s'. You may want to check for updates to make sure " logger.info("Succesfully switched to '%s'. You may want to check for updates to make sure "
"that you have the latest code.", branch) "that you have the latest code.", branch)

View File

@ -11,6 +11,7 @@ from subprocess import PIPE, Popen
import psutil import psutil
from lib.git import git
from lib.gpu_stats import GPUStats, GPUInfo from lib.gpu_stats import GPUStats, GPUInfo
from lib.utils import get_backend from lib.utils import get_backend
from setup import CudaCheck from setup import CudaCheck
@ -125,27 +126,13 @@ class _SysInfo(): # pylint:disable=too-few-public-methods
version = stdout.decode(self._encoding, errors="replace").splitlines() version = stdout.decode(self._encoding, errors="replace").splitlines()
return "\n".join(version) return "\n".join(version)
@property
def _git_branch(self) -> str:
""" str: The git branch that is currently being used to execute Faceswap. """
with Popen("git status", shell=True, stdout=PIPE, stderr=PIPE) as git:
stdout, stderr = git.communicate()
if stderr:
return "Not Found"
branch = stdout.decode(self._encoding,
errors="replace").splitlines()[0].replace("On branch ", "")
return branch
@property @property
def _git_commits(self) -> str: def _git_commits(self) -> str:
""" str: The last 5 git commits for the currently running Faceswap. """ """ str: The last 5 git commits for the currently running Faceswap. """
with Popen("git log --pretty=oneline --abbrev-commit -n 5", commits = git.get_commits(3)
shell=True, stdout=PIPE, stderr=PIPE) as git: if not commits:
stdout, stderr = git.communicate()
if stderr:
return "Not Found" return "Not Found"
commits = stdout.decode(self._encoding, errors="replace").splitlines() return " | ".join(commits)
return ". ".join(commits)
@property @property
def _cuda_version(self) -> str: def _cuda_version(self) -> str:
@ -210,7 +197,7 @@ class _SysInfo(): # pylint:disable=too-few-public-methods
"sys_processor": self._system["processor"], "sys_processor": self._system["processor"],
"sys_ram": self._format_ram(), "sys_ram": self._format_ram(),
"encoding": self._encoding, "encoding": self._encoding,
"git_branch": self._git_branch, "git_branch": git.branch,
"git_commits": self._git_commits, "git_commits": self._git_commits,
"gpu_cuda": self._cuda_version, "gpu_cuda": self._cuda_version,
"gpu_cudnn": self._cudnn_version, "gpu_cudnn": self._cudnn_version,