pkgcore.operations.observer — pkgcore master documentation
__all__ = ( "null_output", "formatter_output", "file_handle_output", "phase_observer", "repo_observer", "decorate_build_method", ) import threading from snakeoil import klass from snakeoil.currying import pre_curry def _convert(msg, args=(), kwds={}): # Note for interpolation, ValueError can be thrown by '%2(s' # TypeError by "%i" % "2", and KeyError via what you would expect. if args: if kwds: raise TypeError( "both position and optional args cannot be " "supplied: given msg(%r), args(%r), kwds(%r)" % (msg, args, kwds) ) try: return msg % args except (ValueError, TypeError) as e: raise TypeError( f"observer interpolation error: {e}, msg={msg!r}, args={args!r}" ) elif kwds: try: return msg % kwds except (KeyError, TypeError, ValueError) as e: raise TypeError( f"observer interpolation error: {e}, msg={msg!r}, kwds={kwds!r}" ) return msg[docs] class null_output:
[docs] def warn(self, msg, *args, **kwds): pass
[docs] def error(self, msg, *args, **kwds): pass
[docs] def info(self, msg, *args, **kwds): pass
[docs] def debug(self, msg, *args, **kwds): pass
[docs] def write(self, msg, *args, **kwds): pass
[docs] def flush(self): pass
[docs] class formatter_output(null_output): def __init__(self, out): self._out = out self.verbosity = getattr(out, "verbosity", 0)
[docs] def debug(self, msg, *args, **kwds): self._out.write(_convert("debug: " + msg, args, kwds))
[docs] def error(self, msg, *args, **kwds): prefixes = kwds.pop( "prefixes", (self._out.fg("red"), self._out.bold, " * ", self._out.reset) ) self._out.write(_convert(msg, args, kwds), prefixes=prefixes)
[docs] def info(self, msg, *args, **kwds): prefixes = kwds.pop( "prefixes", (self._out.fg("green"), self._out.bold, " * ", self._out.reset) ) self._out.write(_convert(msg, args, kwds), prefixes=prefixes)
[docs] def warn(self, msg, *args, **kwds): prefixes = kwds.pop( "prefixes", (self._out.fg("yellow"), self._out.bold, " * ", self._out.reset) ) self._out.write(_convert(msg, args, kwds), prefixes=prefixes)
[docs] def write(self, msg, *args, autoline=False, **kwds): self._out.write(_convert(msg, args, kwds), autoline=autoline)
[docs] def flush(self): self._out.flush()
[docs] class file_handle_output(formatter_output):
[docs] def debug(self, msg, *args, **kwds): self._out.write(f"debug: {_convert(msg, args, kwds)}\n")
[docs] def error(self, msg, *args, **kwds): self._out.write(f"error: {_convert(msg, args, kwds)}\n")
[docs] def info(self, msg, *args, **kwds): self._out.write(f"info: {_convert(msg, args, kwds)}\n")
[docs] def warn(self, msg, *args, **kwds): self._out.write(f"warning: {_convert(msg, args, kwds)}\n")
[docs] def write(self, msg, *args, **kwds): self._out.write(_convert(msg, args, kwds))
[docs] class phase_observer: def __init__(self, output, debug=False): self._output = output self.verbosity = getattr(output, "verbosity", 0) self._debug = debug
[docs] def phase_start(self, phase): if self._debug: self._output.write(f"starting {phase}\n")
[docs] def debug(self, msg, *args, **kwds): if self._debug: self._output.debug(msg, *args, **kwds)
info = klass.alias_attr("_output.info") warn = klass.alias_attr("_output.warn") error = klass.alias_attr("_output.error") write = klass.alias_attr("_output.write") flush = klass.alias_attr("_output.flush")
[docs] def phase_end(self, phase, status): if self._debug: self._output.write(f"finished {phase}: {status}\n")
def _reflection_func(attr, self, *args, **kwds): return self._invoke(attr, *args, **kwds) def _mk_observer_proxy(target): class foo(target): for x in set(dir(target)).difference(dir(object)): locals()[x] = pre_curry(_reflection_func, x) return foo class threadsafe_repo_observer(_mk_observer_proxy(repo_observer)): def __init__(self, observer): self._observer = observer self._lock = threading.Lock() def _invoke(self, attr, *args, **kwds): self._lock.acquire() try: return getattr(self._observer, attr)(*args, **kwds) finally: self._lock.release() def wrap_build_method(phase, method, self, *args, **kwds): disable_observer = kwds.pop("disable_observer", False) if not hasattr(self.observer, "phase_start") or disable_observer: return method(self, *args, **kwds) self.observer.phase_start(phase) ret = False try: ret = method(self, *args, **kwds) finally: self.observer.phase_end(phase, ret) return ret[docs] class repo_observer(phase_observer):
[docs] def trigger_start(self, hook, trigger): if self._debug: self._output.write(f"hook {hook}: trigger: starting {trigger!r}\n", hook)
[docs] def trigger_end(self, hook, trigger): if self._debug: self._output.write(f"hook {hook}: trigger: finished {trigger!r}\n", hook)
[docs] def installing_fs_obj(self, obj): self._output.write(f">>> {obj}\n")
[docs] def removing_fs_obj(self, obj): self._output.write(f"<<< {obj}\n")
[docs] def decorate_build_method(phase): def f(func): return pre_curry(wrap_build_method, phase, func) return f