ekernel/test/__init__.py
Florian Schroegendorfer 28b33eaf1c
renamed test directory
2024-06-13 00:11:16 +02:00

122 lines
3.7 KiB
Python

import functools
import io
import subprocess
import sys
import portage.output
from pkgutil import resolve_name as resolve
import ekernel
# disable output
ekernel.out.quiet = True
def git (argv: list[str]):
return subprocess.run(["git"] + argv, capture_output=True, check=True)
def capture_stdout (f):
"""A decorator for capturing stdout in a io.StringIO object."""
@functools.wraps(f)
def capture (*args, **kwargs):
quiet = ekernel.out.quiet
ekernel.out.quiet = False
stdout = sys.stdout
sys.stdout = io.StringIO()
r = f(*args, **kwargs)
sys.stdout = stdout
ekernel.out.quiet = quiet
return r
return capture
def capture_stderr (f):
"""A decorator for capturing stderr in a io.StringIO object."""
@functools.wraps(f)
def capture (*args, **kwargs):
quiet = ekernel.out.quiet
ekernel.out.quiet = False
stderr = sys.stderr
sys.stderr = io.StringIO()
r = f(*args, **kwargs)
sys.stderr = stderr
ekernel.out.quiet = quiet
return r
return capture
def colorless (f):
"""A decorator for disabling portage's colorful output."""
@functools.wraps(f)
def nocolor (*args, **kwargs):
havecolor = portage.output.havecolor
portage.output.havecolor = 0
r = f(*args, **kwargs)
portage.output.havecolor = havecolor
return r
return nocolor
class Interceptor:
"""Dynamically intercept, trace and/or replace arbitrary function calls."""
class Tracer:
def __init__ (self, interceptor, target, log, call):
self.name = f"{target.__module__}.{target.__qualname__}"
self.parent = resolve(self.name.rsplit(".", 1)[0])
self.interceptor = interceptor
self.target = target
self.log = log
self.call = call
def start (self):
def call (*args, **kwargs):
if self.log:
self.interceptor.trace.append((self, (args, kwargs)))
if callable(self.call):
return self.call(self, *args, **kwargs)
setattr(self.parent, self.target.__name__, call)
def stop (self):
setattr(self.parent, self.target.__name__, self.target)
def __init__ (self):
self.targets = {}
self.trace = []
def __str__ (self):
s = io.StringIO()
for tracer, (args, kwargs) in self.trace:
s.write(f"{tracer.name}\n")
for a in args:
s.write(f" {a}\n")
for k, v in kwargs.items():
s.write(f" {k} = {v}\n")
return s.getvalue()
def add (self, target, log=True, call=None):
"""
Intercept calls to the given function.
Args:
target: the intercepted function object
log (bool): trace calls if True
call: function to be called instead
"""
if target in self.targets:
raise RuntimeError(f"{self.targets[target].name} already caught")
if not callable(target):
raise RuntimeError(f"{target.__name__} is not callable")
self.targets[target] = self.Tracer(self, target, log, call)
def remove (self, target):
"""Stop intercepting calls to the given function."""
if target not in self.targets:
raise RuntimeError(f"{target.__name__} not being caught")
del self.targets[target]
def start (self):
"""Start intercepting calls to the registered functions."""
for tracer in self.targets.values(): tracer.start()
def stop (self):
"""Stop intercepting calls to the registered functions."""
for tracer in self.targets.values(): tracer.stop()