1
0
Fork 0
mirror of https://github.com/deepfakes/faceswap synced 2025-06-08 20:13:52 -04:00
faceswap/lib/gui/wrapper.py

316 lines
11 KiB
Python

#!/usr/bin python3
""" Process wrapper for underlying faceswap commands for the GUI """
import os
import re
import signal
from subprocess import PIPE, Popen, TimeoutExpired
import sys
import tkinter as tk
from threading import Thread
from time import time
import psutil
from .utils import Images
class ProcessWrapper():
""" Builds command, launches and terminates the underlying
faceswap process. Updates GUI display depending on state """
def __init__(self, statusbar, session=None, pathscript=None, cliopts=None):
self.tk_vars = self.set_tk_vars()
self.session = session
self.pathscript = pathscript
self.cliopts = cliopts
self.command = None
self.statusbar = statusbar
self.task = FaceswapControl(self)
def set_tk_vars(self):
""" TK Variables to be triggered by ProcessWrapper to indicate
what state various parts of the GUI should be in """
display = tk.StringVar()
display.set(None)
runningtask = tk.BooleanVar()
runningtask.set(False)
actioncommand = tk.StringVar()
actioncommand.set(None)
actioncommand.trace("w", self.action_command)
generatecommand = tk.StringVar()
generatecommand.set(None)
generatecommand.trace("w", self.generate_command)
consoleclear = tk.BooleanVar()
consoleclear.set(False)
return {"display": display,
"runningtask": runningtask,
"action": actioncommand,
"generate": generatecommand,
"consoleclear": consoleclear}
def action_command(self, *args):
""" The action to perform when the action button is pressed """
if not self.tk_vars["action"].get():
return
category, command = self.tk_vars["action"].get().split(",")
if self.tk_vars["runningtask"].get():
self.task.terminate()
else:
self.command = command
args = self.prepare(category)
self.task.execute_script(command, args)
self.tk_vars["action"].set(None)
def generate_command(self, *args):
""" Generate the command line arguments and output """
if not self.tk_vars["generate"].get():
return
category, command = self.tk_vars["generate"].get().split(",")
args = self.build_args(category, command=command, generate=True)
self.tk_vars["consoleclear"].set(True)
print(" ".join(args))
self.tk_vars["generate"].set(None)
def prepare(self, category):
""" Prepare the environment for execution """
self.tk_vars["runningtask"].set(True)
self.tk_vars["consoleclear"].set(True)
print("Loading...")
self.statusbar.status_message.set("Executing - "
+ self.command + ".py")
mode = "indeterminate" if self.command in ("effmpeg",
"train") else "determinate"
self.statusbar.progress_start(mode)
args = self.build_args(category)
self.tk_vars["display"].set(self.command)
return args
def build_args(self, category, command=None, generate=False):
""" Build the faceswap command and arguments list """
command = self.command if not command else command
script = "{}.{}".format(category, "py")
pathexecscript = os.path.join(self.pathscript, script)
args = [sys.executable] if generate else [sys.executable, "-u"]
args.extend([pathexecscript, command])
for cliopt in self.cliopts.gen_cli_arguments(command):
args.extend(cliopt)
if command == "train" and not generate:
self.set_session_stats(cliopt)
if command == "train" and not generate:
args.append("-gui") # Embed the preview pane
return args
def set_session_stats(self, cliopt):
""" Set the session stats for batchsize and modeldir """
if cliopt[0] == "-bs":
self.session.stats["batchsize"] = int(cliopt[1])
if cliopt[0] == "-m":
self.session.modeldir = cliopt[1]
def terminate(self, message):
""" Finalise wrapper when process has exited """
self.tk_vars["runningtask"].set(False)
self.statusbar.progress_stop()
self.statusbar.status_message.set(message)
self.tk_vars["display"].set(None)
Images().delete_preview()
if self.command == "train":
self.session.save_session()
self.session.__init__()
self.command = None
print("Process exited.")
class FaceswapControl():
""" Control the underlying Faceswap tasks """
def __init__(self, wrapper):
self.wrapper = wrapper
self.statusbar = wrapper.statusbar
self.command = None
self.args = None
self.process = None
self.consoleregex = {
"loss": re.compile(r"([a-zA-Z_]+):.*?(\d+\.\d+)"),
"tqdm": re.compile(r"(\d+%|\d+/\d+|\d+:\d+|\d+\.\d+[a-zA-Z/]+)")}
def execute_script(self, command, args):
""" Execute the requested Faceswap Script """
self.command = command
kwargs = {"stdout": PIPE,
"stderr": PIPE,
"bufsize": 1,
"universal_newlines": True}
self.process = Popen(args, **kwargs, stdin=PIPE)
self.thread_stdout()
self.thread_stderr()
def read_stdout(self):
""" Read stdout from the subprocess. If training, pass the loss
values to Queue """
while True:
try:
output = self.process.stdout.readline()
except ValueError as err:
if str(err).lower().startswith("i/o operation on closed file"):
break
raise
if output == "" and self.process.poll() is not None:
break
if output:
if (self.command == "train" and self.capture_loss(output)) or (
self.command != "train" and self.capture_tqdm(output)):
continue
print(output.strip())
returncode = self.process.poll()
message = self.set_final_status(returncode)
self.wrapper.terminate(message)
def read_stderr(self):
""" Read stdout from the subprocess. If training, pass the loss
values to Queue """
while True:
try:
output = self.process.stderr.readline()
except ValueError as err:
if str(err).lower().startswith("i/o operation on closed file"):
break
raise
if output == "" and self.process.poll() is not None:
break
if output:
if self.command != "train" and self.capture_tqdm(output):
continue
print(output.strip(), file=sys.stderr)
def thread_stdout(self):
""" Put the subprocess stdout so that it can be read without
blocking """
thread = Thread(target=self.read_stdout)
thread.daemon = True
thread.start()
def thread_stderr(self):
""" Put the subprocess stderr so that it can be read without
blocking """
thread = Thread(target=self.read_stderr)
thread.daemon = True
thread.start()
def capture_loss(self, string):
""" Capture loss values from stdout """
if not str.startswith(string, "["):
return False
loss = self.consoleregex["loss"].findall(string)
if len(loss) < 2:
return False
self.wrapper.session.add_loss(loss)
message = ""
for item in loss:
message += "{}: {} ".format(item[0], item[1])
if not message:
return False
elapsed = self.wrapper.session.timestats["elapsed"]
iterations = self.wrapper.session.stats["iterations"]
message = "Elapsed: {} Iteration: {} {}".format(elapsed,
iterations,
message)
self.statusbar.progress_update(message, 0, False)
return True
def capture_tqdm(self, string):
""" Capture tqdm output for progress bar """
tqdm = self.consoleregex["tqdm"].findall(string)
if len(tqdm) != 5:
return False
percent = tqdm[0]
processed = tqdm[1]
processtime = "Elapsed: {} Remaining: {}".format(tqdm[2], tqdm[3])
rate = tqdm[4]
message = "{} | {} | {} | {}".format(processtime,
rate,
processed,
percent)
current, total = processed.split("/")
position = int((float(current) / float(total)) * 1000)
self.statusbar.progress_update(message, position, True)
return True
def terminate(self):
""" Terminate the subprocess """
if self.command != "train":
print("Sending Exit Signal", flush=True)
try:
now = time()
if os.name == "nt":
try:
self.process.communicate(input="\n", timeout=60)
except TimeoutExpired:
raise ValueError("Timeout reached sending Exit Signal")
else:
self.process.send_signal(signal.SIGINT)
while True:
timeelapsed = time() - now
if self.process.poll() is not None:
break
if timeelapsed > 60:
raise ValueError("Timeout reached sending Exit Signal")
return
except ValueError as err:
print(err)
else:
print("Terminating Process...")
children = psutil.Process().children(recursive=True)
for child in children:
child.terminate()
_, alive = psutil.wait_procs(children, timeout=10)
if not alive:
print("Terminated")
return
print("Termination timed out. Killing Process...")
for child in alive:
child.kill()
_, alive = psutil.wait_procs(alive, timeout=10)
if not alive:
print("Killed")
else:
for child in alive:
print("Process {} survived SIGKILL. "
"Giving up".format(child))
def set_final_status(self, returncode):
""" Set the status bar output based on subprocess return code """
if returncode in (0, 3221225786):
status = "Ready"
elif returncode == -15:
status = "Terminated - {}.py".format(self.command)
elif returncode == -9:
status = "Killed - {}.py".format(self.command)
elif returncode == -6:
status = "Aborted - {}.py".format(self.command)
else:
status = "Failed - {}.py. Return Code: {}".format(self.command,
returncode)
return status