122 lines
3.7 KiB
Python
122 lines
3.7 KiB
Python
|
import functools
|
||
|
import io
|
||
|
import subprocess
|
||
|
import sys
|
||
|
|
||
|
import portage.output
|
||
|
from pkgutil import resolve_name as resolve
|
||
|
|
||
|
import ekernel
|
||
|
|
||
|
# disable output
|
||
|
ekernel.out.quiet = True
|
||
|
|
||
|
def git (argv: list[str]):
|
||
|
return subprocess.run(["git"] + argv, capture_output=True, check=True)
|
||
|
|
||
|
def capture_stdout (f):
|
||
|
"""A decorator for capturing stdout in a io.StringIO object."""
|
||
|
@functools.wraps(f)
|
||
|
def capture (*args, **kwargs):
|
||
|
quiet = ekernel.out.quiet
|
||
|
ekernel.out.quiet = False
|
||
|
stdout = sys.stdout
|
||
|
sys.stdout = io.StringIO()
|
||
|
r = f(*args, **kwargs)
|
||
|
sys.stdout = stdout
|
||
|
ekernel.out.quiet = quiet
|
||
|
return r
|
||
|
return capture
|
||
|
|
||
|
def capture_stderr (f):
|
||
|
"""A decorator for capturing stderr in a io.StringIO object."""
|
||
|
@functools.wraps(f)
|
||
|
def capture (*args, **kwargs):
|
||
|
quiet = ekernel.out.quiet
|
||
|
ekernel.out.quiet = False
|
||
|
stderr = sys.stderr
|
||
|
sys.stderr = io.StringIO()
|
||
|
r = f(*args, **kwargs)
|
||
|
sys.stderr = stderr
|
||
|
ekernel.out.quiet = quiet
|
||
|
return r
|
||
|
return capture
|
||
|
|
||
|
def colorless (f):
|
||
|
"""A decorator for disabling portage's colorful output."""
|
||
|
@functools.wraps(f)
|
||
|
def nocolor (*args, **kwargs):
|
||
|
havecolor = portage.output.havecolor
|
||
|
portage.output.havecolor = 0
|
||
|
r = f(*args, **kwargs)
|
||
|
portage.output.havecolor = havecolor
|
||
|
return r
|
||
|
return nocolor
|
||
|
|
||
|
class Interceptor:
|
||
|
"""Dynamically intercept, trace and/or replace arbitrary function calls."""
|
||
|
|
||
|
class Tracer:
|
||
|
|
||
|
def __init__ (self, interceptor, target, log, call):
|
||
|
self.name = f"{target.__module__}.{target.__qualname__}"
|
||
|
self.parent = resolve(self.name.rsplit(".", 1)[0])
|
||
|
self.interceptor = interceptor
|
||
|
self.target = target
|
||
|
self.log = log
|
||
|
self.call = call
|
||
|
|
||
|
def start (self):
|
||
|
def call (*args, **kwargs):
|
||
|
if self.log:
|
||
|
self.interceptor.trace.append((self, (args, kwargs)))
|
||
|
if callable(self.call):
|
||
|
return self.call(self, *args, **kwargs)
|
||
|
setattr(self.parent, self.target.__name__, call)
|
||
|
|
||
|
def stop (self):
|
||
|
setattr(self.parent, self.target.__name__, self.target)
|
||
|
|
||
|
def __init__ (self):
|
||
|
self.targets = {}
|
||
|
self.trace = []
|
||
|
|
||
|
def __str__ (self):
|
||
|
s = io.StringIO()
|
||
|
for tracer, (args, kwargs) in self.trace:
|
||
|
s.write(f"{tracer.name}\n")
|
||
|
for a in args:
|
||
|
s.write(f" {a}\n")
|
||
|
for k, v in kwargs.items():
|
||
|
s.write(f" {k} = {v}\n")
|
||
|
return s.getvalue()
|
||
|
|
||
|
def add (self, target, log=True, call=None):
|
||
|
"""
|
||
|
Intercept calls to the given function.
|
||
|
|
||
|
Args:
|
||
|
target: the intercepted function object
|
||
|
log (bool): trace calls if True
|
||
|
call: function to be called instead
|
||
|
"""
|
||
|
if target in self.targets:
|
||
|
raise RuntimeError(f"{self.targets[target].name} already caught")
|
||
|
if not callable(target):
|
||
|
raise RuntimeError(f"{target.__name__} is not callable")
|
||
|
self.targets[target] = self.Tracer(self, target, log, call)
|
||
|
|
||
|
def remove (self, target):
|
||
|
"""Stop intercepting calls to the given function."""
|
||
|
if target not in self.targets:
|
||
|
raise RuntimeError(f"{target.__name__} not being caught")
|
||
|
del self.targets[target]
|
||
|
|
||
|
def start (self):
|
||
|
"""Start intercepting calls to the registered functions."""
|
||
|
for tracer in self.targets.values(): tracer.start()
|
||
|
|
||
|
def stop (self):
|
||
|
"""Stop intercepting calls to the registered functions."""
|
||
|
for tracer in self.targets.values(): tracer.stop()
|