1
0
Fork 0
mirror of https://github.com/deepfakes/faceswap synced 2025-06-07 10:43:27 -04:00
faceswap/lib/logger.py
2022-07-28 23:53:31 +01:00

533 lines
19 KiB
Python

#!/usr/bin/python
""" Logging Functions for Faceswap. """
import collections
import logging
from logging.handlers import RotatingFileHandler
import os
import platform
import re
import sys
import time
import traceback
from datetime import datetime
from typing import Union
class FaceswapLogger(logging.Logger):
""" A standard :class:`logging.logger` with additional "verbose" and "trace" levels added. """
def __init__(self, name: str) -> None:
for new_level in (("VERBOSE", 15), ("TRACE", 5)):
level_name, level_num = new_level
if hasattr(logging, level_name):
continue
logging.addLevelName(level_num, level_name)
setattr(logging, level_name, level_num)
super().__init__(name)
def verbose(self, msg: str, *args, **kwargs) -> None:
# pylint:disable=wrong-spelling-in-docstring
""" Create a log message at severity level 15.
Parameters
----------
msg: str
The log message to be recorded at Verbose level
args: tuple
Standard logging arguments
kwargs: dict
Standard logging key word arguments
"""
if self.isEnabledFor(15):
self._log(15, msg, args, **kwargs)
def trace(self, msg: str, *args, **kwargs) -> None:
# pylint:disable=wrong-spelling-in-docstring
""" Create a log message at severity level 5.
Parameters
----------
msg: str
The log message to be recorded at Trace level
args: tuple
Standard logging arguments
kwargs: dict
Standard logging key word arguments
"""
if self.isEnabledFor(5):
self._log(5, msg, args, **kwargs)
class ColoredFormatter(logging.Formatter):
""" Overrides the stand :class:`logging.Formatter` to enable colored labels for message level
labels on supported platforms
Parameters
----------
fmt: str
The format string for the message as a whole
pad_newlines: bool, Optional
If ``True`` new lines will be padded to appear in line with the log message, if ``False``
they will be left aligned
kwargs: dict
Standard :class:`logging.Formatter` keyword arguments
"""
def __init__(self, fmt: str, pad_newlines: bool = False, **kwargs) -> None:
super().__init__(fmt, **kwargs)
self._use_color = platform.system().lower() in ("linux", "darwin")
self._level_colors = dict(CRITICAL="\033[31m", # red
ERROR="\033[31m", # red
WARNING="\033[33m", # yellow
INFO="\033[32m", # green
VERBOSE="\033[34m") # blue
self._default_color = "\033[0m"
self._newline_padding = self._get_newline_padding(pad_newlines, fmt)
def _get_newline_padding(self, pad_newlines: bool, fmt: str) -> int:
""" Parses the format string to obtain padding for newlines if requested
Parameters
----------
fmt: str
The format string for the message as a whole
pad_newlines: bool, Optional
If ``True`` new lines will be padded to appear in line with the log message, if
``False`` they will be left aligned
Returns
-------
int
The amount of padding to apply to the front of newlines
"""
if not pad_newlines:
return 0
msg_idx = fmt.find("%(message)") + 1
filtered = fmt[:msg_idx - 1]
spaces = filtered.count(" ")
pads = [int(pad.replace("s", "")) for pad in re.findall(r"\ds", filtered)]
if "asctime" in filtered:
pads.append(self._get_sample_time_string())
return sum(pads) + spaces
def _get_sample_time_string(self) -> int:
""" Obtain a sample time string and calculate correct padding.
This may be inaccurate wheb ticking over an integer from single to double digits, but that
shouldn't be a huge issue.
Returns
-------
int
The length of the formatted date-time string
"""
sample_time = time.time()
date_format = self.datefmt if self.datefmt else self.default_time_format
datestring = time.strftime(date_format, logging.Formatter.converter(sample_time))
if not self.datefmt and self.default_msec_format:
msecs = (sample_time - int(sample_time)) * 1000
datestring = self.default_msec_format % (datestring, msecs)
return len(datestring)
def format(self, record: logging.LogRecord) -> str:
""" Color the log message level if supported otherwise return the standard log message.
Parameters
----------
record: :class:`logging.LogRecord`
The incoming log record to be formatted for entry into the logger.
Returns
-------
str
The formatted log message
"""
formatted = super().format(record)
levelname = record.levelname
if self._use_color and levelname in self._level_colors:
formatted = re.sub(levelname,
f"{self._level_colors[levelname]}{levelname}{self._default_color}",
formatted,
1)
if self._newline_padding:
formatted = formatted.replace("\n", f"\n{' ' * self._newline_padding}")
return formatted
class FaceswapFormatter(logging.Formatter):
""" Overrides the standard :class:`logging.Formatter`.
Strip newlines from incoming log messages.
Rewrites some upstream warning messages to debug level to avoid spamming the console.
"""
def format(self, record: logging.LogRecord) -> str:
""" Strip new lines from log records and rewrite certain warning messages to debug level.
Parameters
----------
record : :class:`logging.LogRecord`
The incoming log record to be formatted for entry into the logger.
Returns
-------
str
The formatted log message
"""
record.message = record.getMessage()
record = self._rewrite_warnings(record)
record = self._lower_external(record)
# strip newlines
if record.levelno < 30 and ("\n" in record.message or "\r" in record.message):
record.message = record.message.replace("\n", "\\n").replace("\r", "\\r")
if self.usesTime():
record.asctime = self.formatTime(record, self.datefmt)
msg = self.formatMessage(record)
if record.exc_info:
# Cache the traceback text to avoid converting it multiple times
# (it's constant anyway)
if not record.exc_text:
record.exc_text = self.formatException(record.exc_info)
if record.exc_text:
if msg[-1:] != "\n":
msg = msg + "\n"
msg = msg + record.exc_text
if record.stack_info:
if msg[-1:] != "\n":
msg = msg + "\n"
msg = msg + self.formatStack(record.stack_info)
return msg
@classmethod
def _rewrite_warnings(cls, record: logging.LogRecord) -> logging.LogRecord:
""" Change certain warning messages from WARNING to DEBUG to avoid passing non-important
information to output.
Parameters
----------
record: :class:`logging.LogRecord`
The log record to check for rewriting
Returns
-------
:class:`logging.LogRecord`
The log rewritten or untouched record
"""
if record.levelno == 30 and record.funcName == "warn" and record.module == "ag_logging":
# TF 2.3 in Conda is imported with the wrong gast(0.4 when 0.3.3 should be used). This
# causes warnings in autograph. They don't appear to impact performance so de-elevate
# warning to debug
record.levelno = 10
record.levelname = "DEBUG"
if record.levelno == 30 and (record.funcName == "_tfmw_add_deprecation_warning" or
record.module in ("deprecation", "deprecation_wrapper")):
# Keras Deprecations.
record.levelno = 10
record.levelname = "DEBUG"
return record
@classmethod
def _lower_external(cls, record: logging.LogRecord) -> logging.LogRecord:
""" Some external libs log at a higher level than we would really like, so lower their
log level.
Specifically: Matplotlib font properties
Parameters
----------
record: :class:`logging.LogRecord`
The log record to check for rewriting
Returns
----------
:class:`logging.LogRecord`
The log rewritten or untouched record
"""
if (record.levelno == 20 and record.funcName == "__init__"
and record.module == "font_manager"):
# Matplotlib font manager
record.levelno = 10
record.levelname = "DEBUG"
return record
class RollingBuffer(collections.deque):
"""File-like that keeps a certain number of lines of text in memory for writing out to the
crash log. """
def write(self, buffer: str) -> None:
""" Splits lines from the incoming buffer and writes them out to the rolling buffer.
Parameters
----------
buffer: str
The log messages to write to the rolling buffer
"""
for line in buffer.rstrip().splitlines():
self.append(f"{line}\n")
class TqdmHandler(logging.StreamHandler):
""" Overrides :class:`logging.StreamHandler` to use :func:`tqdm.tqdm.write` rather than writing
to :func:`sys.stderr` so that log messages do not mess up tqdm progress bars. """
def emit(self, record: logging.LogRecord) -> None:
""" Format the incoming message and pass to :func:`tqdm.tqdm.write`.
Parameters
----------
record : :class:`logging.LogRecord`
The incoming log record to be formatted for entry into the logger.
"""
# tqdm is imported here as it won't be installed when setup.py is running
from tqdm import tqdm # pylint:disable=import-outside-toplevel
msg = self.format(record)
tqdm.write(msg)
def _set_root_logger(loglevel: int = logging.INFO) -> logging.Logger:
""" Setup the root logger.
Parameters
----------
loglevel: int, optional
The log level to set the root logger to. Default :attr:`logging.INFO`
Returns
-------
:class:`logging.Logger`
The root logger for Faceswap
"""
rootlogger = logging.getLogger()
rootlogger.setLevel(loglevel)
return rootlogger
def log_setup(loglevel, log_file: str, command: str, is_gui: bool = False) -> None:
""" Set up logging for Faceswap.
Sets up the root logger, the formatting for the crash logger and the file logger, and sets up
the crash, file and stream log handlers.
Parameters
----------
loglevel: str
The requested log level that Faceswap should be run at.
log_file: str
The location of the log file to write Faceswap's log to
command: str
The Faceswap command that is being run. Used to dictate whether the log file should
have "_gui" appended to the filename or not.
is_gui: bool, optional
Whether Faceswap is running in the GUI or not. Dictates where the stream handler should
output messages to. Default: ``False``
"""
numeric_loglevel = get_loglevel(loglevel)
root_loglevel = min(logging.DEBUG, numeric_loglevel)
rootlogger = _set_root_logger(loglevel=root_loglevel)
if command == "setup":
log_format = FaceswapFormatter("%(asctime)s %(module)-16s %(funcName)-30s %(levelname)-8s "
"%(message)s", datefmt="%m/%d/%Y %H:%M:%S")
s_handler = _stream_setup_handler(numeric_loglevel)
f_handler = _file_handler(root_loglevel, log_file, log_format, command)
else:
log_format = FaceswapFormatter("%(asctime)s %(processName)-15s %(threadName)-30s "
"%(module)-15s %(funcName)-30s %(levelname)-8s %(message)s",
datefmt="%m/%d/%Y %H:%M:%S")
s_handler = _stream_handler(numeric_loglevel, is_gui)
f_handler = _file_handler(numeric_loglevel, log_file, log_format, command)
rootlogger.addHandler(f_handler)
rootlogger.addHandler(s_handler)
if command != "setup":
c_handler = _crash_handler(log_format)
rootlogger.addHandler(c_handler)
logging.info("Log level set to: %s", loglevel.upper())
def _file_handler(loglevel,
log_file: str,
log_format: FaceswapFormatter,
command: str) -> RotatingFileHandler:
""" Add a rotating file handler for the current Faceswap session. 1 backup is always kept.
Parameters
----------
loglevel: str
The requested log level that messages should be logged at.
log_file: str
The location of the log file to write Faceswap's log to
log_format: :class:`FaceswapFormatter:
The formatting to store log messages as
command: str
The Faceswap command that is being run. Used to dictate whether the log file should
have "_gui" appended to the filename or not.
Returns
-------
:class:`logging.RotatingFileHandler`
The logging file handler
"""
if log_file is not None:
filename = log_file
else:
filename = os.path.join(os.path.dirname(os.path.realpath(sys.argv[0])), "faceswap")
# Windows has issues sharing the log file with sub-processes, so log GUI separately
filename += "_gui.log" if command == "gui" else ".log"
should_rotate = os.path.isfile(filename)
handler = RotatingFileHandler(filename, backupCount=1, encoding="utf-8")
if should_rotate:
handler.doRollover()
handler.setFormatter(log_format)
handler.setLevel(loglevel)
return handler
def _stream_handler(loglevel: int, is_gui: bool) -> Union[logging.StreamHandler, TqdmHandler]:
""" Add a stream handler for the current Faceswap session. The stream handler will only ever
output at a maximum of VERBOSE level to avoid spamming the console.
Parameters
----------
loglevel: int
The requested log level that messages should be logged at.
is_gui: bool, optional
Whether Faceswap is running in the GUI or not. Dictates where the stream handler should
output messages to.
Returns
-------
:class:`TqdmHandler` or :class:`logging.StreamHandler`
The stream handler to use
"""
# Don't set stdout to lower than verbose
loglevel = max(loglevel, 15)
log_format = FaceswapFormatter("%(asctime)s %(levelname)-8s %(message)s",
datefmt="%m/%d/%Y %H:%M:%S")
if is_gui:
# tqdm.write inserts extra lines in the GUI, so use standard output as
# it is not needed there.
log_console = logging.StreamHandler(sys.stdout)
else:
log_console = TqdmHandler(sys.stdout)
log_console.setFormatter(log_format)
log_console.setLevel(loglevel)
return log_console
def _stream_setup_handler(loglevel: int) -> logging.StreamHandler:
""" Add a stream handler for faceswap's setup.py script
This stream handler outputs a limited set of easy to use information using colored labels
if available. It will only ever output at a minimum of INFO level
Parameters
----------
loglevel: int
The requested log level that messages should be logged at.
Returns
-------
:class:`logging.StreamHandler`
The stream handler to use
"""
loglevel = max(loglevel, 15)
log_format = ColoredFormatter("%(levelname)-8s %(message)s", pad_newlines=True)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(log_format)
handler.setLevel(loglevel)
return handler
def _crash_handler(log_format: FaceswapFormatter) -> logging.StreamHandler:
""" Add a handler that stores the last 100 debug lines to :attr:'_DEBUG_BUFFER' for use in
crash reports.
Parameters
----------
log_format: :class:`FaceswapFormatter:
The formatting to store log messages as
Returns
-------
:class:`logging.StreamHandler`
The crash log handler
"""
log_crash = logging.StreamHandler(_DEBUG_BUFFER)
log_crash.setFormatter(log_format)
log_crash.setLevel(logging.DEBUG)
return log_crash
def get_loglevel(loglevel: str) -> int:
""" Check whether a valid log level has been supplied, and return the numeric log level that
corresponds to the given string level.
Parameters
----------
loglevel: str
The loglevel that has been requested
Returns
-------
int
The numeric representation of the given loglevel
"""
numeric_level = getattr(logging, loglevel.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError(f"Invalid log level: {loglevel}")
return numeric_level
def crash_log() -> str:
""" On a crash, write out the contents of :func:`_DEBUG_BUFFER` containing the last 100 lines
of debug messages to a crash report in the root Faceswap folder.
Returns
-------
str
The filename of the file that contains the crash report
"""
original_traceback = traceback.format_exc().encode("utf-8")
path = os.path.dirname(os.path.realpath(sys.argv[0]))
filename = os.path.join(path, datetime.now().strftime("crash_report.%Y.%m.%d.%H%M%S%f.log"))
freeze_log = [line.encode("utf-8") for line in _DEBUG_BUFFER]
try:
from lib.sysinfo import sysinfo # pylint:disable=import-outside-toplevel
except Exception: # pylint:disable=broad-except
sysinfo = ("\n\nThere was an error importing System Information from lib.sysinfo. This is "
f"probably a bug which should be fixed:\n{traceback.format_exc()}")
with open(filename, "wb") as outfile:
outfile.writelines(freeze_log)
outfile.write(original_traceback)
outfile.write(sysinfo.encode("utf-8"))
return filename
_OLD_FACTORY = logging.getLogRecordFactory()
def _faceswap_logrecord(*args, **kwargs) -> logging.LogRecord:
""" Add a flag to :class:`logging.LogRecord` to not strip formatting from particular
records. """
record = _OLD_FACTORY(*args, **kwargs)
record.strip_spaces = True # type:ignore
return record
logging.setLogRecordFactory(_faceswap_logrecord)
# Set logger class to custom logger
logging.setLoggerClass(FaceswapLogger)
# Stores the last 100 debug messages
_DEBUG_BUFFER = RollingBuffer(maxlen=100)