#!/usr/bin/env python3 """ Return a requested detector/aligner pipeline Tensorflow does not like to release GPU VRAM, so parallel plugins need to be managed to work together. This module sets up a pipeline for the extraction workflow, loading align and detect plugins either in parallal or in series, giving easy access to input and output. """ import logging from lib.gpu_stats import GPUStats from lib.queue_manager import queue_manager, QueueEmpty from lib.utils import get_backend from plugins.plugin_loader import PluginLoader logger = logging.getLogger(__name__) # pylint:disable=invalid-name class Extractor(): """ Creates a :mod:`~plugins.extract.detect`/:mod:`~plugins.extract.align` pipeline and yields results frame by frame from the :attr:`detected_faces` generator :attr:`input_queue` is dynamically set depending on the current :attr:`phase` of extraction Parameters ---------- detector: str The name of a detector plugin as exists in :mod:`plugins.extract.detect` aligner: str The name of an aligner plugin as exists in :mod:`plugins.extract.align` configfile: str, optional The path to a custom ``extract.ini`` configfile. If ``None`` then the system :file:`config/extract.ini` file will be used. multiprocess: bool, optional Whether to attempt processing the plugins in parallel. This may get overridden internally depending on the plugin combination. Default: ``False`` rotate_images: str, optional Used to set the :attr:`~plugins.extract.detect.rotation` attribute. Pass in a single number to use increments of that size up to 360, or pass in a ``list`` of ``ints`` to enumerate exactly what angles to check. Can also pass in ``'on'`` to increment at 90 degree intervals. Default: ``None`` min_size: int, optional Used to set the :attr:`~plugins.extract.detect.min_size` attribute Filters out faces detected below this size. Length, in pixels across the diagonal of the bounding box. Set to ``0`` for off. Default: ``0`` normalize_method: {`None`, 'clahe', 'hist', 'mean'}, optional Used to set the :attr:`~plugins.extract.align.normalize_method` attribute. Normalize the images fed to the aligner.Default: ``None`` Attributes ---------- phase: str The current phase that the pipeline is running. Used in conjunction with :attr:`passes` and :attr:`final_pass` to indicate to the caller which phase is being processed """ def __init__(self, detector, aligner, masker, configfile=None, multiprocess=False, rotate_images=None, min_size=20, normalize_method=None, input_size=256, output_size=256, coverage_ratio=1.): logger.debug("Initializing %s: (detector: %s, aligner: %s, masker: %s, " "configfile: %s, multiprocess: %s, rotate_images: %s, min_size: %s, " "normalize_method: %s, input_size: %s, output_size: %s, coverage_ratio: %s)", self.__class__.__name__, detector, aligner, masker, configfile, multiprocess, rotate_images, min_size, normalize_method, input_size, output_size, coverage_ratio) self.phase = "detect" self._queue_size = 32 self._vram_buffer = 320 # Leave a buffer for VRAM allocation self._detector = self._load_detector(detector, rotate_images, min_size, configfile) self._aligner = self._load_aligner(aligner, configfile, normalize_method) self._masker = self._load_masker(masker, configfile, input_size, output_size, coverage_ratio) self._is_parallel = self._set_parallel_processing(multiprocess) self._set_extractor_batchsize() self._queues = self._add_queues() logger.debug("Initialized %s", self.__class__.__name__) @property def input_queue(self): """ queue: Return the correct input queue depending on the current phase The input queue is the entry point into the extraction pipeline. A ``dict`` should be put to the queue in the following format(s): For detect/single phase operations: >>> {'filename': , >>> 'image': } For align (2nd pass operations): >>> {'filename': , >>> 'image': , >>> 'detected_faces: []} """ qname_dict = dict(detect="extract_detect_in", align="extract_align_in", mask="extract_mask_in") qname = "extract_detect_in" if self._is_parallel else qname_dict[self.phase] retval = self._queues[qname] logger.trace("%s: %s", qname, retval) return retval @property def passes(self): """ int: Returns the total number of passes the extractor needs to make. This is calculated on several factors (vram available, plugin choice, :attr:`multiprocess` etc.). It is useful for iterating over the pipeline and handling accordingly. Example ------- >>> for phase in extractor.passes: >>> if phase == 1: >>> extractor.input_queue.put({"filename": "path/to/image/file", >>> "image": np.array(image)}) >>> else: >>> extractor.input_queue.put({"filename": "path/to/image/file", >>> "image": np.array(image), >>> "detected_faces": [>> for face in extractor.detected_faces(): >>> if extractor.final_pass: >>> >>> else: >>> >>> extractor.input_queue.put({"filename": "path/to/image/file", >>> "image": np.array(image), >>> "detected_faces": [>> for phase in extractor.passes: >>> extractor.launch(): >>> """ if self._is_parallel: self._launch_detector() self._launch_aligner() self._launch_masker() elif self.phase == "detect": self._launch_detector() elif self.phase == "align": self._launch_aligner() elif self.phase == "mask": self._launch_masker() def detected_faces(self): """ Generator that returns results, frame by frame from the extraction pipeline This is the exit point for the extraction pipeline and is used to obtain the output of any pipeline :attr:`phase` Yields ------ faces: dict regardless of phase, the returned dictinary will contain, exclusively, ``filename``: the filename of the source image, ``image``: the ``numpy.array`` of the source image in BGR color format, ``detected_faces``: a list of :class:`~lib.faces_detect.Detected_Face` objects. Example ------- >>> for face in extractor.detected_faces(): >>> filename = face["filename"] >>> image = face["image"] >>> detected_faces = face["detected_faces"] """ logger.debug("Running Detection. Phase: '%s'", self.phase) # If not multiprocessing, intercept the align in queue for # detection phase out_queue = self._output_queue while True: try: if self._check_and_raise_error(): break faces = out_queue.get(True, 1) if faces == "EOF": break except QueueEmpty: continue yield faces self._join_threads() if self.final_pass: # Cleanup queues for q_name in self._queues.keys(): queue_manager.del_queue(q_name) logger.debug("Detection Complete") else: self.phase = "align" if self.phase == "detect" else "mask" logger.debug("Switching to %s phase", self.phase) # <<< INTERNAL METHODS >>> # @property def _output_queue(self): """ Return the correct output queue depending on the current phase """ qname_dict = dict(detect="extract_align_in", align="extract_mask_in", mask="extract_mask_out") qname = "extract_mask_out" if self.final_pass else qname_dict[self.phase] retval = self._queues[qname] logger.trace("%s: %s", qname, retval) return retval @property def _active_plugins(self): """ Return the plugins that are currently active based on pass """ if self.passes == 1: retval = [self._detector, self._aligner, self._masker] elif self.passes == 3 and self.phase == 'detect': retval = [self._detector] elif self.passes == 3 and self.phase == 'align': retval = [self._aligner] elif self.passes == 3 and self.phase == 'mask': retval = [self._masker] else: retval = [None] logger.trace("Active plugins: %s", retval) return retval def _add_queues(self): """ Add the required processing queues to Queue Manager """ queues = dict() tasks = ["extract_detect_in", "extract_align_in", "extract_mask_in", "extract_mask_out"] for task in tasks: # Limit queue size to avoid stacking ram self._queue_size = 32 if task == "extract_detect_in" or (not self._is_parallel and task == "extract_align_in"): self._queue_size = 64 queue_manager.add_queue(task, maxsize=self._queue_size) queues[task] = queue_manager.get_queue(task) logger.debug("Queues: %s", queues) return queues def _set_parallel_processing(self, multiprocess): """ Set whether to run detect, align, and mask together or separately """ if not multiprocess: logger.debug("Parallel processing disabled by cli.") return False gpu_stats = GPUStats() if gpu_stats.device_count == 0: logger.debug("No GPU detected. Enabling parallel processing.") return True if get_backend() == "amd": logger.debug("Parallel processing discabled by amd") return False vram_required = (self._detector.vram + self._aligner.vram + self._masker.vram + self._vram_buffer) stats = gpu_stats.get_card_most_free() vram_free = int(stats["free"]) logger.verbose("%s - %sMB free of %sMB", stats["device"], vram_free, int(stats["total"])) if vram_free <= vram_required: logger.warning("Not enough free VRAM for parallel processing. " "Switching to serial") return False return True # << INTERNAL PLUGIN HANDLING >> # @staticmethod def _load_detector(detector, rotation, min_size, configfile): """ Set global arguments and load detector plugin """ detector_name = detector.replace("-", "_").lower() logger.debug("Loading Detector: '%s'", detector_name) detector = PluginLoader.get_detector(detector_name)(rotation=rotation, min_size=min_size, configfile=configfile) return detector @staticmethod def _load_aligner(aligner, configfile, normalize_method): """ Set global arguments and load aligner plugin """ aligner_name = aligner.replace("-", "_").lower() logger.debug("Loading Aligner: '%s'", aligner_name) aligner = PluginLoader.get_aligner(aligner_name)(configfile=configfile, normalize_method=normalize_method) return aligner @staticmethod def _load_masker(masker, configfile, input_size, output_size, coverage_ratio): """ Set global arguments and load masker plugin """ masker_name = masker.replace("-", "_").lower() logger.debug("Loading Masker: '%s'", masker_name) masker = PluginLoader.get_masker(masker_name)(configfile=configfile, input_size=input_size, output_size=output_size, coverage_ratio=coverage_ratio) return masker def _launch_detector(self): """ Launch the face detector """ logger.debug("Launching Detector") kwargs = dict(in_queue=self._queues["extract_detect_in"], out_queue=self._queues["extract_align_in"]) self._detector.initialize(**kwargs) self._detector.start() logger.debug("Launched Detector") def _launch_aligner(self): """ Launch the face aligner """ logger.debug("Launching Aligner") kwargs = dict(in_queue=self._queues["extract_align_in"], out_queue=self._queues["extract_mask_in"]) self._aligner.initialize(**kwargs) self._aligner.start() logger.debug("Launched Aligner") def _launch_masker(self): """ Launch the face masker """ logger.debug("Launching Masker") kwargs = dict(in_queue=self._queues["extract_mask_in"], out_queue=self._queues["extract_mask_out"]) self._masker.initialize(**kwargs) self._masker.start() logger.debug("Launched Masker") def _set_extractor_batchsize(self): """ Sets the batchsize of the requested plugins based on their vram and vram_per_batch_requirements if the the configured batchsize requires more vram than is available. Nvidia only. """ if (self._detector.vram == 0 and self._aligner.vram == 0 and self._masker.vram == 0 or get_backend() != "nvidia"): logger.debug("Either detector and aligner have no VRAM requirements or not running " "on Nvidia. Not updating batchsize requirements.") return stats = GPUStats().get_card_most_free() vram_free = int(stats["free"]) if self._is_parallel: vram_required = (self._detector.vram + self._aligner.vram + self._masker.vram + self._vram_buffer) batch_required = ((self._detector.vram_per_batch * self._detector.batchsize) + (self._aligner.vram_per_batch * self._aligner.batchsize) + (self._masker.vram_per_batch * self._masker.batchsize)) plugin_required = vram_required + batch_required if plugin_required <= vram_free: logger.debug("Plugin requirements within threshold: (plugin_required: %sMB, " "vram_free: %sMB)", plugin_required, vram_free) return # Hacky split across 3 plugins available_vram = (vram_free - vram_required) // 3 for plugin in (self._detector, self._aligner, self._masker): self._set_plugin_batchsize(plugin, available_vram) else: for plugin in (self._detector, self._aligner, self._masker): vram_required = plugin.vram + self._vram_buffer batch_required = plugin.vram_per_batch * plugin.batchsize plugin_required = vram_required + batch_required if plugin_required <= vram_free: logger.debug("%s requirements within threshold: (plugin_required: %sMB, " "vram_free: %sMB)", plugin.name, plugin_required, vram_free) continue available_vram = vram_free - vram_required self._set_plugin_batchsize(plugin, available_vram) @staticmethod def _set_plugin_batchsize(plugin, available_vram): """ Set the batchsize for the given plugin based on given available vram """ plugin.batchsize = max(1, available_vram // plugin.vram_per_batch) logger.verbose("Reset batchsize for %s to %s", plugin.name, plugin.batchsize) def _join_threads(self): """ Join threads for current pass """ for plugin in self._active_plugins: plugin.join() def _check_and_raise_error(self): """ Check all threads for errors and raise if one occurs """ for plugin in self._active_plugins: if plugin.check_and_raise_error(): return True return False