mirror of
https://github.com/zebrajr/faceswap.git
synced 2025-12-06 00:20:09 +01:00
setup.py - cleanup installers
This commit is contained in:
parent
76dbc4c7d0
commit
1919366d18
541
setup.py
541
setup.py
|
|
@ -13,7 +13,7 @@ import re
|
|||
import sys
|
||||
from shutil import which
|
||||
from subprocess import list2cmdline, PIPE, Popen, run, STDOUT
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
from typing import Any, Dict, List, Optional, Set, Tuple, Type
|
||||
|
||||
from pkg_resources import parse_requirements, Requirement
|
||||
|
||||
|
|
@ -54,8 +54,6 @@ class Environment():
|
|||
self.updater = updater
|
||||
# Flag that setup is being run by installer so steps can be skipped
|
||||
self.is_installer: bool = False
|
||||
self.cuda_version: str = ""
|
||||
self.cudnn_version: str = ""
|
||||
self.enable_amd: bool = False
|
||||
self.enable_apple_silicon: bool = False
|
||||
self.enable_docker: bool = False
|
||||
|
|
@ -63,6 +61,7 @@ class Environment():
|
|||
self.required_packages: List[Tuple[str, List[Tuple[str, str]]]] = []
|
||||
self.missing_packages: List[Tuple[str, List[Tuple[str, str]]]] = []
|
||||
self.conda_missing_packages: List[Tuple[str, ...]] = []
|
||||
self.cuda_cudnn = ["", ""]
|
||||
|
||||
self._process_arguments()
|
||||
self._check_permission()
|
||||
|
|
@ -106,6 +105,16 @@ class Environment():
|
|||
retval = ctypes.windll.shell32.IsUserAnAdmin() != 0 # type: ignore
|
||||
return retval
|
||||
|
||||
@property
|
||||
def cuda_version(self) -> str:
|
||||
""" str: The detected globally installed Cuda Version """
|
||||
return self.cuda_cudnn[0]
|
||||
|
||||
@property
|
||||
def cudnn_version(self) -> str:
|
||||
""" str: The detected globally installed cuDNN Version """
|
||||
return self.cuda_cudnn[1]
|
||||
|
||||
@property
|
||||
def is_virtualenv(self) -> bool:
|
||||
""" Check whether this is a virtual environment """
|
||||
|
|
@ -509,7 +518,7 @@ class Checks(): # pylint:disable=too-few-public-methods
|
|||
global _INSTALL_FAILED # pylint:disable=global-statement
|
||||
check = CudaCheck()
|
||||
if check.cuda_version:
|
||||
self._env.cuda_version = check.cuda_version
|
||||
self._env.cuda_cudnn[0] = check.cuda_version
|
||||
logger.info("CUDA version: %s", self._env.cuda_version)
|
||||
else:
|
||||
logger.error("CUDA not found. Install and try again.\n"
|
||||
|
|
@ -520,7 +529,7 @@ class Checks(): # pylint:disable=too-few-public-methods
|
|||
return
|
||||
|
||||
if check.cudnn_version:
|
||||
self._env.cudnn_version = ".".join(check.cudnn_version.split(".")[:2])
|
||||
self._env.cuda_cudnn[1] = ".".join(check.cudnn_version.split(".")[:2])
|
||||
logger.info("cuDNN version: %s", self._env.cudnn_version)
|
||||
else:
|
||||
logger.error("cuDNN not found. See "
|
||||
|
|
@ -532,7 +541,7 @@ class Checks(): # pylint:disable=too-few-public-methods
|
|||
# If we get here we're on MacOS
|
||||
self._tips.macos()
|
||||
logger.warning("Cannot find CUDA on macOS")
|
||||
self._env.cuda_version = input("Manually specify CUDA version: ")
|
||||
self._env.cuda_cudnn[0] = input("Manually specify CUDA version: ")
|
||||
|
||||
|
||||
class CudaCheck(): # pylint:disable=too-few-public-methods
|
||||
|
|
@ -692,9 +701,9 @@ class Install(): # pylint:disable=too-few-public-methods
|
|||
self._env = environment
|
||||
self._is_gui = is_gui
|
||||
if self._env.os_version[0] == "Windows":
|
||||
self._installer = self._pywinpty_installer
|
||||
self._installer: Type[Installer] = WinPTYInstaller
|
||||
else:
|
||||
self._installer = self._pexpect_installer
|
||||
self._installer = PexpectInstaller
|
||||
|
||||
if not self._env.is_installer and not self._env.updater:
|
||||
self._ask_continue()
|
||||
|
|
@ -815,7 +824,8 @@ class Install(): # pylint:disable=too-few-public-methods
|
|||
cmd.append(pkg_str)
|
||||
|
||||
clean_pkg = pkg_str.replace("\"", "")
|
||||
if self._subproc_installer(cmd, clean_pkg) != 0:
|
||||
installer = SubProcInstaller(self._env, clean_pkg, cmd, self._is_gui)
|
||||
if installer() != 0:
|
||||
logger.error("Unable to install package: %s. Process aborted", clean_pkg)
|
||||
sys.exit(1)
|
||||
|
||||
|
|
@ -910,7 +920,8 @@ class Install(): # pylint:disable=too-few-public-methods
|
|||
condaexe.append(package)
|
||||
|
||||
clean_pkg = package.replace("\"", "")
|
||||
retcode = self._installer(condaexe, clean_pkg)
|
||||
installer = self._installer(self._env, clean_pkg, condaexe, self._is_gui)
|
||||
retcode = installer()
|
||||
|
||||
if retcode != 0 and not conda_only:
|
||||
logger.info("%s not available in Conda. Installing with pip", package)
|
||||
|
|
@ -934,200 +945,331 @@ class Install(): # pylint:disable=too-few-public-methods
|
|||
pipexe.append("--user")
|
||||
pipexe.append(package)
|
||||
|
||||
if self._installer(pipexe, package) != 0:
|
||||
installer = self._installer(self._env, package, pipexe, self._is_gui)
|
||||
if installer() != 0:
|
||||
logger.warning("Couldn't install %s with pip. Please install this package manually",
|
||||
package)
|
||||
global _INSTALL_FAILED # pylint:disable=global-statement
|
||||
_INSTALL_FAILED = True
|
||||
|
||||
def _pexpect_installer(self, command: List[str], package: str) -> int:
|
||||
""" Run an install command using pexpect and log output.
|
||||
|
||||
Pexpect is used so we can get unbuffered output to display updates
|
||||
class Installer():
|
||||
""" Parent class for package installers.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
command: list
|
||||
The command to run
|
||||
package: str
|
||||
The package name that is being installed
|
||||
PyWinPty is used for Windows, Pexpect is used for Linux, as these can provide us with realtime
|
||||
output.
|
||||
|
||||
Returns
|
||||
-------
|
||||
int
|
||||
The return code from the subprocess
|
||||
"""
|
||||
try:
|
||||
import pexpect # pylint:disable=import-outside-toplevel,import-error
|
||||
logger.info("Installing %s", package)
|
||||
logger.debug("argv: %s", command)
|
||||
Subprocess is used as a fallback if any of the above fail, but this caches output, so it can
|
||||
look like the process has hung to the end user
|
||||
|
||||
proc = pexpect.spawn(" ".join(command),
|
||||
encoding=self._env.encoding,
|
||||
codec_errors="replace",
|
||||
timeout=None)
|
||||
last_line_cr = False
|
||||
while True:
|
||||
try:
|
||||
idx = proc.expect(["\r\n", "\r"])
|
||||
line = proc.before.rstrip()
|
||||
if line and idx == 0:
|
||||
if last_line_cr:
|
||||
last_line_cr = False
|
||||
# Output last line of progress bar and go to next line
|
||||
if not self._is_gui:
|
||||
print(line)
|
||||
logger.verbose(line) # type:ignore
|
||||
elif line and idx == 1:
|
||||
last_line_cr = True
|
||||
logger.debug(line)
|
||||
if not self._is_gui:
|
||||
print(line, end="\r")
|
||||
except pexpect.EOF:
|
||||
break
|
||||
proc.close()
|
||||
returncode = proc.exitstatus
|
||||
logger.debug("Package: %s, returncode: %s", package, returncode)
|
||||
return returncode
|
||||
except Exception as err: # pylint:disable=broad-except
|
||||
logger.debug("Failed to install with pexpect. Falling back to subprocess. Error: %s",
|
||||
str(err))
|
||||
return self._subproc_installer(command, package)
|
||||
|
||||
def _pywinpty_installer(self, command: List[str], package: str) -> int:
|
||||
""" Run an install command using pywinpty and log output.
|
||||
|
||||
pywinpty is used so we can get unbuffered output to display updates
|
||||
|
||||
Parameters
|
||||
----------
|
||||
command: list
|
||||
The command to run
|
||||
package: str
|
||||
The package name that is being installed
|
||||
|
||||
Returns
|
||||
-------
|
||||
int
|
||||
The return code from the subprocess
|
||||
"""
|
||||
try:
|
||||
import winpty # pylint:disable=import-outside-toplevel,import-error
|
||||
logger.info("Installing %s", package)
|
||||
cmd = which(command[0], path=os.environ.get('PATH', os.defpath))
|
||||
# For some reason with WinPTY we need to pass in the full command. Probably a bug
|
||||
cmdline = list2cmdline(command)
|
||||
logger.debug("argv: %s, cmd: '%s', cmdline: '%s'", command, cmd, cmdline)
|
||||
|
||||
proc = winpty.PTY(
|
||||
80 if self._env.is_installer else 100,
|
||||
24,
|
||||
backend=winpty.enums.Backend.WinPTY, # ConPTY hangs and has lots of Ansi Escapes
|
||||
agent_config=winpty.enums.AgentConfig.WINPTY_FLAG_PLAIN_OUTPUT) # Strip all Ansi
|
||||
|
||||
if not proc.spawn(cmd, cmdline=cmdline):
|
||||
del proc
|
||||
raise RuntimeError("Failed to spawn winpty")
|
||||
|
||||
pbar = re.compile(r"(?:eta\s[\d\W]+)|(?:\s+\|\s+\d+%)\Z")
|
||||
lines = []
|
||||
out = ""
|
||||
eof = False
|
||||
last_line_cr = False
|
||||
num_bytes = 1024
|
||||
seen_lines = set()
|
||||
while True:
|
||||
try:
|
||||
from_pty = proc.read(num_bytes)
|
||||
except winpty.WinptyError:
|
||||
# TODO Reinsert this check
|
||||
# The error message "pipe has been ended" is language specific so this check
|
||||
# fails on non english systems. For now we just swallow all errors until no
|
||||
# bytes are left to read and then check the return code
|
||||
# if any(val in str(err) for val in ["EOF", "pipe has been ended"]):
|
||||
# # Get remaining bytes. On a comms error, the buffer remains unread so keep
|
||||
# # halving buffer amount until down to 1 when we know we have everything
|
||||
# if num_bytes == 1:
|
||||
# eof = True
|
||||
# from_pty = ""
|
||||
# num_bytes //= 2
|
||||
# else:
|
||||
# raise
|
||||
|
||||
# Get remaining bytes. On a comms error, the buffer remains unread so keep
|
||||
# halving buffer amount until down to 1 when we know we have everything
|
||||
if num_bytes == 1:
|
||||
eof = True
|
||||
from_pty = ""
|
||||
num_bytes //= 2
|
||||
out += from_pty
|
||||
if "\n" in out:
|
||||
lines.extend(out.split("\n"))
|
||||
if out.endswith("\n") or eof: # Ends on newline or is EOF
|
||||
out = ""
|
||||
else: # roll over semi-consumed line to next read
|
||||
out = lines[-1]
|
||||
lines = lines[:-1]
|
||||
|
||||
for line in lines: # Dump the output to log
|
||||
line = line.rstrip()
|
||||
is_cr = bool(pbar.search(line))
|
||||
if line and not is_cr:
|
||||
if last_line_cr:
|
||||
last_line_cr = False
|
||||
if not self._is_gui and not self._env.is_installer:
|
||||
# Go to next line
|
||||
print("")
|
||||
if line not in seen_lines:
|
||||
# Supress repeat "waiting" lines from spamming the logfile
|
||||
logger.verbose(line) # type:ignore
|
||||
seen_lines.add(line)
|
||||
elif line:
|
||||
last_line_cr = True
|
||||
logger.debug(line)
|
||||
if not self._is_gui:
|
||||
# NSIS only updates on line endings, so force new line for installer
|
||||
print(line,
|
||||
end=None if self._env.is_installer else "\r")
|
||||
lines = []
|
||||
if eof:
|
||||
returncode = proc.get_exitstatus()
|
||||
break
|
||||
|
||||
del proc
|
||||
logger.debug("Package: %s, returncode: %s", package, returncode)
|
||||
return returncode
|
||||
except Exception as err: # pylint:disable=broad-except
|
||||
logger.debug("Failed to install with winpty. Falling back to subprocess. Error: %s",
|
||||
str(err))
|
||||
return self._subproc_installer(command, package)
|
||||
|
||||
def _subproc_installer(self, command: List[str], package: str) -> int:
|
||||
""" Run an install command using subprocess Popen.
|
||||
|
||||
pexpect uses pty which is not useable in Windows. The pexpect popen_spawn module does not
|
||||
give easy access to the return code, and also dumps stdout to console so we use subprocess
|
||||
for Windows. The downside of this is that we cannot do unbuffered reads, so the process can
|
||||
look like it hangs.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
command: list
|
||||
The command to run
|
||||
package: str
|
||||
The package name that is being installed
|
||||
|
||||
Returns
|
||||
-------
|
||||
int
|
||||
The return code from the subprocess
|
||||
"""
|
||||
Parameters
|
||||
----------
|
||||
environment: :class:`Environment`
|
||||
Environment class holding information about the running system
|
||||
package: str
|
||||
The package name that is being installed
|
||||
command: list
|
||||
The command to run
|
||||
is_gui: bool
|
||||
``True if the process is being called from the Faceswap GUI
|
||||
"""
|
||||
def __init__(self,
|
||||
environment: Environment,
|
||||
package: str,
|
||||
command: List[str],
|
||||
is_gui: bool) -> None:
|
||||
logger.info("Installing %s", package)
|
||||
shell = self._env.os_version[0] == "Windows" and command[0] == "conda"
|
||||
logger.debug("argv: %s", command)
|
||||
self._env = environment
|
||||
self._package = package
|
||||
self._command = command
|
||||
self._is_gui = is_gui
|
||||
self._last_line_cr = False
|
||||
self._seen_lines: Set[str] = set()
|
||||
|
||||
with Popen(command, bufsize=0, stdout=PIPE, stderr=STDOUT, shell=shell) as proc:
|
||||
last_line_cr = False
|
||||
def __call__(self) -> int:
|
||||
""" Call the subclassed call function
|
||||
|
||||
Returns
|
||||
-------
|
||||
int
|
||||
The return code of the package install process
|
||||
"""
|
||||
try:
|
||||
returncode = self.call()
|
||||
except Exception as err: # pylint:disable=broad-except
|
||||
logger.debug("Failed to install with %s. Falling back to subprocess. Error: %s",
|
||||
self.__class__.__name__, str(err))
|
||||
returncode = SubProcInstaller(self._env, self._package, self._command, self._is_gui)()
|
||||
|
||||
logger.debug("Package: %s, returncode: %s", self._package, returncode)
|
||||
return returncode
|
||||
|
||||
def call(self) -> int:
|
||||
""" Override for package installer specific logic.
|
||||
|
||||
Returns
|
||||
-------
|
||||
int
|
||||
The return code of the package install process
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def _non_gui_print(self, text: str, end: Optional[str] = None) -> None:
|
||||
""" Print output to console if not running in the GUI
|
||||
|
||||
Parameters
|
||||
----------
|
||||
text: str
|
||||
The text to print
|
||||
end: str, optional
|
||||
The line ending to use. Default: ``None`` (new line)
|
||||
"""
|
||||
if self._is_gui:
|
||||
return
|
||||
print(text, end=end)
|
||||
|
||||
def _seen_line_log(self, text: str) -> None:
|
||||
""" Output gets spammed to the log file when conda is waiting/processing. Only log each
|
||||
unique line once.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
text: str
|
||||
The text to log
|
||||
"""
|
||||
if text not in self._seen_lines:
|
||||
return
|
||||
logger.verbose(text) # type:ignore
|
||||
self._seen_lines.add(text)
|
||||
|
||||
|
||||
class PexpectInstaller(Installer): # pylint: disable=too-few-public-methods
|
||||
""" Package installer for Linux/macOS using Pexpect
|
||||
|
||||
Uses Pexpect for installing packages allowing access to realtime feedback
|
||||
|
||||
Parameters
|
||||
----------
|
||||
environment: :class:`Environment`
|
||||
Environment class holding information about the running system
|
||||
package: str
|
||||
The package name that is being installed
|
||||
command: list
|
||||
The command to run
|
||||
is_gui: bool
|
||||
``True if the process is being called from the Faceswap GUI
|
||||
"""
|
||||
def call(self) -> int:
|
||||
""" Install a package using the Pexpect module
|
||||
|
||||
Returns
|
||||
-------
|
||||
int
|
||||
The return code of the package install process
|
||||
"""
|
||||
import pexpect # pylint:disable=import-outside-toplevel,import-error
|
||||
proc = pexpect.spawn(" ".join(self._command),
|
||||
encoding=self._env.encoding, codec_errors="replace", timeout=None)
|
||||
while True:
|
||||
try:
|
||||
idx = proc.expect(["\r\n", "\r"])
|
||||
line = proc.before.rstrip()
|
||||
if line and idx == 0:
|
||||
if self._last_line_cr:
|
||||
self._last_line_cr = False
|
||||
# Output last line of progress bar and go to next line
|
||||
self._non_gui_print(line)
|
||||
self._seen_line_log(line)
|
||||
elif line and idx == 1:
|
||||
self._last_line_cr = True
|
||||
logger.debug(line)
|
||||
self._non_gui_print(line, end="\r")
|
||||
except pexpect.EOF:
|
||||
break
|
||||
proc.close()
|
||||
return proc.exitstatus
|
||||
|
||||
|
||||
class WinPTYInstaller(Installer): # pylint: disable=too-few-public-methods
|
||||
""" Package installer for Windows using WinPTY
|
||||
|
||||
Spawns a pseudo PTY for installing packages allowing access to realtime feedback
|
||||
|
||||
Parameters
|
||||
----------
|
||||
environment: :class:`Environment`
|
||||
Environment class holding information about the running system
|
||||
package: str
|
||||
The package name that is being installed
|
||||
command: list
|
||||
The command to run
|
||||
is_gui: bool
|
||||
``True if the process is being called from the Faceswap GUI
|
||||
"""
|
||||
def __init__(self,
|
||||
environment: Environment,
|
||||
package: str,
|
||||
command: List[str],
|
||||
is_gui: bool) -> None:
|
||||
super().__init__(environment, package, command, is_gui)
|
||||
self._cmd = which(command[0], path=os.environ.get('PATH', os.defpath))
|
||||
self._cmdline = list2cmdline(command)
|
||||
logger.debug("cmd: '%s', cmdline: '%s'", self._cmd, self._cmdline)
|
||||
|
||||
self._pbar = re.compile(r"(?:eta\s[\d\W]+)|(?:\s+\|\s+\d+%)\Z")
|
||||
self._eof = False
|
||||
self._read_bytes = 1024
|
||||
|
||||
self._lines: List[str] = []
|
||||
self._out = ""
|
||||
|
||||
def _read_from_pty(self, proc: Any, winpty_error: Any) -> None:
|
||||
""" Read :attr:`_num_bytes` from WinPTY. If there is an error reading, recursively halve
|
||||
the number of bytes read until we get a succesful read. If we get down to 1 byte without a
|
||||
succesful read, assume we are at EOF.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
proc: :class:`winpty.PTY`
|
||||
The WinPTY process
|
||||
winpty_error: :class:`winpty.WinptyError`
|
||||
The winpty error exception. Passed in as WinPTY is not in global scope
|
||||
"""
|
||||
try:
|
||||
from_pty = proc.read(self._read_bytes)
|
||||
except winpty_error:
|
||||
# TODO Reinsert this check
|
||||
# The error message "pipe has been ended" is language specific so this check
|
||||
# fails on non english systems. For now we just swallow all errors until no
|
||||
# bytes are left to read and then check the return code
|
||||
# if any(val in str(err) for val in ["EOF", "pipe has been ended"]):
|
||||
# # Get remaining bytes. On a comms error, the buffer remains unread so keep
|
||||
# # halving buffer amount until down to 1 when we know we have everything
|
||||
# if self._read_bytes == 1:
|
||||
# self._eof = True
|
||||
# from_pty = ""
|
||||
# self._read_bytes //= 2
|
||||
# else:
|
||||
# raise
|
||||
|
||||
# Get remaining bytes. On a comms error, the buffer remains unread so keep
|
||||
# halving buffer amount until down to 1 when we know we have everything
|
||||
if self._read_bytes == 1:
|
||||
self._eof = True
|
||||
from_pty = ""
|
||||
self._read_bytes //= 2
|
||||
|
||||
self._out += from_pty
|
||||
|
||||
def _out_to_lines(self) -> None:
|
||||
""" Process the winpty output into separate lines. Roll over any semi-consumed lines to the
|
||||
next proc call. """
|
||||
if "\n" not in self._out:
|
||||
return
|
||||
|
||||
self._lines.extend(self._out.split("\n"))
|
||||
|
||||
if self._out.endswith("\n") or self._eof: # Ends on newline or is EOF
|
||||
self._out = ""
|
||||
else: # roll over semi-consumed line to next read
|
||||
self._out = self._lines[-1]
|
||||
self._lines = self._lines[:-1]
|
||||
|
||||
def _parse_lines(self) -> None:
|
||||
""" Process the latest batch of lines that have been received from winPTY. """
|
||||
for line in self._lines: # Dump the output to log
|
||||
line = line.rstrip()
|
||||
is_cr = bool(self._pbar.search(line))
|
||||
if line and not is_cr:
|
||||
if self._last_line_cr:
|
||||
self._last_line_cr = False
|
||||
if not self._env.is_installer:
|
||||
# Go to next line
|
||||
self._non_gui_print("")
|
||||
self._seen_line_log(line)
|
||||
elif line:
|
||||
self._last_line_cr = True
|
||||
logger.debug(line)
|
||||
# NSIS only updates on line endings, so force new line for installer
|
||||
self._non_gui_print(line, end=None if self._env.is_installer else "\r")
|
||||
self._lines = []
|
||||
|
||||
def call(self) -> int:
|
||||
""" Install a package using the PyWinPTY module
|
||||
|
||||
Returns
|
||||
-------
|
||||
int
|
||||
The return code of the package install process
|
||||
"""
|
||||
import winpty # pylint:disable=import-outside-toplevel,import-error
|
||||
# For some reason with WinPTY we need to pass in the full command. Probably a bug
|
||||
proc = winpty.PTY(
|
||||
80 if self._env.is_installer else 100,
|
||||
24,
|
||||
backend=winpty.enums.Backend.WinPTY, # ConPTY hangs and has lots of Ansi Escapes
|
||||
agent_config=winpty.enums.AgentConfig.WINPTY_FLAG_PLAIN_OUTPUT) # Strip all Ansi
|
||||
|
||||
if not proc.spawn(self._cmd, cmdline=self._cmdline):
|
||||
del proc
|
||||
raise RuntimeError("Failed to spawn winpty")
|
||||
|
||||
while True:
|
||||
self._read_from_pty(proc, winpty.WinptyError)
|
||||
self._out_to_lines()
|
||||
self._parse_lines()
|
||||
|
||||
if self._eof:
|
||||
returncode = proc.get_exitstatus()
|
||||
break
|
||||
|
||||
del proc
|
||||
logger.debug("Package: %s, returncode: %s", self._package, returncode)
|
||||
return returncode
|
||||
|
||||
|
||||
class SubProcInstaller(Installer):
|
||||
""" The fallback package installer if either of the OS specific installers fail.
|
||||
|
||||
Uses the python Subprocess module to install packages. Feedback does not return in realtime
|
||||
so the process can look like it has hung to the end user
|
||||
|
||||
Parameters
|
||||
----------
|
||||
environment: :class:`Environment`
|
||||
Environment class holding information about the running system
|
||||
package: str
|
||||
The package name that is being installed
|
||||
command: list
|
||||
The command to run
|
||||
is_gui: bool
|
||||
``True if the process is being called from the Faceswap GUI
|
||||
"""
|
||||
def __init__(self,
|
||||
environment: Environment,
|
||||
package: str,
|
||||
command: List[str],
|
||||
is_gui: bool) -> None:
|
||||
super().__init__(environment, package, command, is_gui)
|
||||
self._shell = self._env.os_version[0] == "Windows" and command[0] == "conda"
|
||||
|
||||
def __call__(self) -> int:
|
||||
""" Override default call function so we don't recursively call ourselves on failure. """
|
||||
returncode = self.call()
|
||||
logger.debug("Package: %s, returncode: %s", self._package, returncode)
|
||||
return returncode
|
||||
|
||||
def call(self) -> int:
|
||||
""" Install a package using the Subprocess module
|
||||
|
||||
Returns
|
||||
-------
|
||||
int
|
||||
The return code of the package install process
|
||||
"""
|
||||
with Popen(self._command,
|
||||
bufsize=0, stdout=PIPE, stderr=STDOUT, shell=self._shell) as proc:
|
||||
while True:
|
||||
if proc.stdout is not None:
|
||||
line = proc.stdout.readline().decode(self._env.encoding, errors="replace")
|
||||
|
|
@ -1139,18 +1281,15 @@ class Install(): # pylint:disable=too-few-public-methods
|
|||
line = line.rstrip()
|
||||
|
||||
if line and not is_cr:
|
||||
if last_line_cr:
|
||||
last_line_cr = False
|
||||
if self._last_line_cr:
|
||||
self._last_line_cr = False
|
||||
# Go to next line
|
||||
if not self._is_gui:
|
||||
print("")
|
||||
logger.verbose(line) # type:ignore
|
||||
self._non_gui_print("")
|
||||
self._seen_line_log(line)
|
||||
elif line:
|
||||
last_line_cr = True
|
||||
self._last_line_cr = True
|
||||
logger.debug(line)
|
||||
if not self._is_gui:
|
||||
print(line, end="\r")
|
||||
logger.debug("Package: %s, returncode: %s", package, returncode)
|
||||
self._non_gui_print("", end="\r")
|
||||
return returncode
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user