From 41398abdcfcf7457f6611ee5b41af0bf39d673cb Mon Sep 17 00:00:00 2001 From: Florian Schroegendorfer Date: Fri, 7 Jun 2024 00:01:57 +0200 Subject: [PATCH] added bootloader/ESP detection using efibootmgr --- ekernel.py | 143 +++++++++++++++++++++--------------------- tests/data/kernel.py | 42 ++++++++++--- tests/test_build.py | 2 +- tests/test_clean.py | 38 +++++------ tests/test_commit.py | 6 +- tests/test_install.py | 33 +++++----- tests/test_kernel.py | 7 ++- tests/test_update.py | 50 ++++++--------- 8 files changed, 161 insertions(+), 160 deletions(-) diff --git a/ekernel.py b/ekernel.py index c58eafe..50cd736 100644 --- a/ekernel.py +++ b/ekernel.py @@ -45,7 +45,10 @@ class Kernel: modules = pathlib.Path("/lib/modules") # EFI system partition - esp = pathlib.Path("/boot/EFI/Gentoo") + esp = pathlib.Path("/boot") + + # EFI bootloader (stub kernel) + boot = esp / "EFI/Gentoo/bootx64.efi" def __init__ (self, src): """Construct a Kernel based on a given source path.""" @@ -58,8 +61,7 @@ class Kernel: raise ValueError(f"illegal source: {src}") from e self.config = self.src / ".config" self.bzImage = self.src / "arch/x86_64/boot/bzImage" - self.bootx64 = self.esp / "bootx64.efi" - self.efi = self.esp / f"gentoo-{self.version.base_version}.efi" + self.bkp = self.boot.parent / f"gentoo-{self.version.base_version}.efi" self.modules = self.modules / f"{self.version.base_version}-gentoo" def __eq__ (self, other): @@ -75,14 +77,14 @@ class Kernel: f"* config = {self.config}\n" f"* bzImage = {self.bzImage}\n" f"* modules = {self.modules}\n" - f"* efi = {self.efi}\n" + f"* bkp = {self.bkp}\n" ) @classmethod def list (cls, descending=True): """Get an descending list of available kernels.""" return list(sorted( - ( Kernel(src) for src in cls.src.glob("linux-*") ), + (Kernel(src) for src in cls.src.glob("linux-*")), key=lambda k: k.version, reverse=descending )) @@ -112,11 +114,6 @@ def cli (f): @functools.wraps(f) def handler (argv=sys.argv[1:]): try: - # dirty hack: set custom esp before mounting - try: - i = argv.index("-e") - Kernel.esp = pathlib.Path(argv[i + 1]) - except ValueError: pass r = f(argv) return 0 if r is None else r except Exception as e: @@ -124,30 +121,64 @@ def cli (f): sys.exit(1) return handler -def mount (f): - """Decorator ensuring a given path is mounted.""" +def efi (f): + """Decorator locating and mounting the ESP through efivars.""" @functools.wraps(f) - def mounter (*args, **kwargs): + def locator (*args, **kwargs): mounted = False - esp = Kernel.esp.parents[-2] - if not esp.exists() or mount.force: - try: - subprocess.run( - ["mount", str(esp)], - capture_output=True, - check=True - ) - mounted = True - except subprocess.CalledProcessError as e: - msg = e.stderr.decode().strip() - if f"already mounted on {esp}" not in msg: - raise RuntimeError(e.stderr.decode().splitlines()[0]) + # ensure access to the currently running EFI bootloader / stub kernel + if not Kernel.boot.exists(): + # find current bootloader + mgr = subprocess.run( + ["efibootmgr"], + capture_output=True, + check=True + ) + lines = iter(mgr.stdout.decode().splitlines()) + bootnum = "NaN" + for l in lines: + if l.startswith("BootCurrent"): + bootnum = l.split()[1] + break + for l in lines: + if l.startswith(f"Boot{bootnum}"): + i = l.find("File") + if i < 0: + raise RuntimeError(f"error locating bootloader:\n{l}") + i += 6 + j = l.find(")", i) + loader = pathlib.Path(l[i:j].replace("\\", "/")) + break + # find mountpoint + for l in pathlib.Path("/etc/fstab").read_text().splitlines(): + if not l.startswith("#"): + for p in ["/boot", "/efi"]: + if p in l: + Kernel.esp = pathlib.Path(p) + Kernel.boot = Kernel.esp / loader + break + else: continue + break + # mount esp + if not Kernel.boot.exists(): + try: + subprocess.run( + ["mount", str(Kernel.esp)], + capture_output=True, + check=True + ) + mounted = True + except subprocess.CalledProcessError as e: + msg = e.stderr.decode().strip() + if f"already mounted on {Kernel.esp}" not in msg: + raise RuntimeError(e.stderr.decode().splitlines()[0]) + assert Kernel.boot.exists() r = f(*args, **kwargs) + # umount esp if mounted: - subprocess.run(["umount", str(esp)], check=True) + subprocess.run(["umount", str(Kernel.esp)], check=True) return r - mount.force = False - return mounter + return locator @cli def configure (argv): @@ -347,7 +378,7 @@ def build (argv): subprocess.run(["make", "modules_install"], check=True) @cli -@mount +@efi def install (argv): """ Install a kernel. @@ -359,9 +390,6 @@ def install (argv): Command Line Arguments ---------------------- - ``-e `` - EFI bootloader directory (default: ``/boot/EFI/Gentoo``) - ``-s `` kernel source directory (default: latest) @@ -388,13 +416,6 @@ def install (argv): description="Install a kernel.", formatter_class=argparse.RawDescriptionHelpFormatter ) - parser.add_argument( - "-e", - metavar="", - dest="esp", - type=pathlib.Path, - help="EFI bootloader directory (default: /boot/EFI/Gentoo)" - ) parser.add_argument( "-s", metavar="", @@ -410,7 +431,6 @@ def install (argv): help="be quiet" ) args = parser.parse_args(argv) - if args.esp: Kernel.esp = args.esp # redundant kernel = Kernel(args.src) out.quiet = args.quiet @@ -429,19 +449,19 @@ def install (argv): ) # copy boot image - out.einfo(f"creating boot image {out.hilite(kernel.bootx64)}") - shutil.copy(kernel.bzImage, kernel.bootx64) + out.einfo(f"creating boot image {out.hilite(kernel.boot)}") + shutil.copy(kernel.bzImage, kernel.boot) # create backup - out.einfo(f"creating backup image {out.hilite(kernel.efi)}") - shutil.copy(kernel.bzImage, kernel.efi) + out.einfo(f"creating backup image {out.hilite(kernel.bkp)}") + shutil.copy(kernel.bzImage, kernel.bkp) # rebuild external modules out.einfo(f"rebuilding external kernel modules") subprocess.run(["emerge", "@module-rebuild"], check=True) @cli -@mount +@efi def clean (argv): """ Remove unused kernel leftovers. @@ -455,9 +475,6 @@ def clean (argv): Command Line Arguments ---------------------- - ``-e `` - EFI bootloader directory (default: ``/boot/EFI/Gentoo``) - ``-k `` keep the previous ```` kernels (default: 1) @@ -473,14 +490,6 @@ def clean (argv): description="Remove unused kernel leftovers.", formatter_class=argparse.RawDescriptionHelpFormatter ) - parser.add_argument( - "-e", - metavar="", - dest="esp", - type=pathlib.Path, - default=Kernel.esp, - help="EFI bootloader directory (default: /boot/EFI/Gentoo)" - ) parser.add_argument( "-k", metavar="", @@ -502,15 +511,14 @@ def clean (argv): help="be quiet" ) args = parser.parse_args(argv) - if args.esp: Kernel.esp = args.esp # redundant out.quiet = args.quiet if args.keep < 0: - raise ValueError("invalid int value: must be greater equal zero") + raise ValueError("invalid keep value: must be greater equal zero") # kernels to remove kernels = Kernel.list() def obsolete (k): - if args.keep and k.efi.exists() and k.modules.exists(): + if args.keep and k.bkp.exists() and k.modules.exists(): args.keep -= 1 return False return True @@ -534,7 +542,7 @@ def clean (argv): out.einfo(f"removing {out.hilite(k.src.name)}") shutil.rmtree(k.src) shutil.rmtree(k.modules, ignore_errors=True) - k.efi.unlink(missing_ok=True) + k.bkp.unlink(missing_ok=True) @cli def commit (argv): @@ -762,6 +770,7 @@ def commit (argv): raise RuntimeError(e.stderr.decode()) @cli +@efi def update (argv): """Custom Gentoo EFI stub kernel updater.""" parser = argparse.ArgumentParser( @@ -776,13 +785,6 @@ def update (argv): type=int, help=f"number of parallel make jobs (default: {jobs})" ) - parser.add_argument( - "-e", - metavar="", - dest="esp", - type=pathlib.Path, - help="EFI bootloader directory (default: /boot/EFI/Gentoo)" - ) parser.add_argument( "-s", metavar="", @@ -812,7 +814,6 @@ def update (argv): ) args = parser.parse_args(argv) args.jobs = ["-j", str(args.jobs)] if args.jobs else [] - args.esp = ["-e", str(args.esp)] if args.esp else [] # redundant args.src = ["-s", str(args.src)] if args.src else [] args.keep = ["-k", str(args.keep)] if args.keep is not None else [] args.msg = ["-m", args.msg] if args.msg else [] @@ -820,6 +821,6 @@ def update (argv): configure(args.quiet + args.src) build(args.quiet + args.jobs + args.src) - install(args.quiet + args.esp + args.src) - clean(args.quiet + args.esp + args.keep) + install(args.quiet + args.src) + clean(args.quiet + args.keep) commit(args.quiet + args.msg) diff --git a/tests/data/kernel.py b/tests/data/kernel.py index a27c5d6..ec0ffaf 100644 --- a/tests/data/kernel.py +++ b/tests/data/kernel.py @@ -1,28 +1,30 @@ """Setup the kernel test environment.""" +import functools import pathlib import shutil +import subprocess import tempfile from ekernel import Kernel # create temporary directory tmpdir = tempfile.TemporaryDirectory() -root = pathlib.Path(tmpdir.name) +tmp = pathlib.Path(tmpdir.name) # kernel source directory -src = root / "usr/src" +src = tmp / "usr/src" # kernel source symlink linux = src / "linux" # kernel module directory -modules = root / "lib/modules" +modules = tmp / "lib/modules" # EFI system partition -esp = root / "boot/EFI/Gentoo" +esp = tmp.parents[-2] # boot image -bootx64 = esp / "bootx64.efi" +boot = tmp / "boot/EFI/Gentoo/bootx64.efi" # list of installed kernels kernels = [] @@ -67,21 +69,41 @@ CONFIG_D=y CONFIG_F=y """ +def efi (f): + """Decorator adding common EFI related test actions.""" + @functools.wraps(f) + def runner (t, *args, **kwargs): + if args[0][0] == "efibootmgr": + boot.touch() + return subprocess.CompletedProcess("", 0, + "BootCurrent: 0001\n" + "Timeout: 1 seconds\n" + "BootOrder: 0001,0000\n" + "Boot0000* Windows HD()/File()\n" + "Boot0001* Gentoo HD()/File(\\EFI\\gentoo\\bootx64.efi)\n" + .encode() + ) + elif args[0][0] == "mount": + Kernel.esp = esp + Kernel.boot = boot + return f(t, *args, **kwargs) + return runner + def setup (): """Setup the kernel test environment.""" # remove any existing files - for p in root.glob("*"): + for p in tmp.glob("*"): shutil.rmtree(p) - # change Kernel class' root directory + # change Kernel paths Kernel.src = src Kernel.linux = linux Kernel.modules = modules Kernel.esp = esp - Kernel.bootx64 = bootx64 + Kernel.boot = boot # create EFI system partition - esp.mkdir(parents=True) + boot.parent.mkdir(parents=True) # create Kernels for s in sources: s.mkdir(parents=True) @@ -97,7 +119,7 @@ def setup (): else: k.config.touch() k.bzImage.touch() - k.efi.touch() + k.bkp.touch() k.modules.mkdir(parents=True) # symlink to old source directory diff --git a/tests/test_build.py b/tests/test_build.py index 84fbe10..4d3d1ac 100644 --- a/tests/test_build.py +++ b/tests/test_build.py @@ -79,7 +79,7 @@ class Tests (unittest.TestCase): @capture_stderr def test_build_source_illegal (self): with self.assertRaises(SystemExit): - run("-s", str(data.root)) + run("-s", str(data.tmp)) def test_build_jobs_source (self): self.jobs = "128" diff --git a/tests/test_clean.py b/tests/test_clean.py index 8224c4b..6448949 100644 --- a/tests/test_clean.py +++ b/tests/test_clean.py @@ -23,19 +23,19 @@ class Tests (unittest.TestCase): data.linux.unlink() data.linux.symlink_to(data.latest) # initialize git repository - os.chdir(data.root) + os.chdir(data.tmp) for k in data.kernels[:-2]: if not k.config.exists(): k.config.touch() if not k.modules.exists(): k.modules.mkdir(parents=True) - if not k.efi.exists(): - k.efi.touch() - # force mounting - ekernel.mount.force = True + if not k.bkp.exists(): + k.bkp.touch() # start interceptor + @data.efi + def run (tracer, *args, **kwargs): pass self.interceptor = Interceptor() - self.interceptor.add(subprocess.run, call=True) + self.interceptor.add(subprocess.run, call=run) self.interceptor.start() def tearDown (self): @@ -45,10 +45,15 @@ class Tests (unittest.TestCase): def check_clean (self, keep=1): split = data.kernels.index(Kernel.current()) + keep + 1 trace_it = iter(self.interceptor.trace) + # efibootmgr + tracer, (args, kwargs) = next(trace_it) + self.assertEqual(tracer.name, "subprocess.run") + self.assertEqual(args, (["efibootmgr"],)) + self.assertEqual(kwargs, {"capture_output": True, "check": True}) # mount /boot tracer, (args, kwargs) = next(trace_it) self.assertEqual(tracer.name, "subprocess.run") - self.assertEqual(args, (["mount", "/tmp"],)) + self.assertEqual(args, (["mount", "/boot"],)) self.assertEqual(kwargs, {"capture_output": True, "check": True}) # emerge -cq gentoo-sources tracer, (args, kwargs) = next(trace_it) @@ -58,11 +63,11 @@ class Tests (unittest.TestCase): for k in data.kernels[:split]: self.assertTrue(k.src.exists()) self.assertTrue(k.modules.exists()) - self.assertTrue(k.efi.exists()) + self.assertTrue(k.bkp.exists()) for k in data.kernels[split:]: self.assertFalse(k.src.exists()) self.assertFalse(k.modules.exists()) - self.assertFalse(k.efi.exists()) + self.assertFalse(k.bkp.exists()) # umount /boot tracer, (args, kwargs) = next(trace_it) self.assertEqual(tracer.name, "subprocess.run") @@ -74,7 +79,7 @@ class Tests (unittest.TestCase): self.check_clean() def test_clean_missing_efi (self): - data.kernels[-1].efi.unlink() + data.kernels[-1].bkp.unlink() self.assertEqual(run("-q"), 0) self.check_clean() @@ -95,19 +100,6 @@ class Tests (unittest.TestCase): self.assertEqual(run("-q", "-k", "10"), 0) self.check_clean(10) - def test_clean_esp (self): - esp = data.root / "boot/EFI/linux" - esp.mkdir(parents=True) - for k in data.kernels: - efi = esp / k.efi.name - if k.efi.exists(): - efi.touch() - k.efi.unlink() - k.efi = efi - data.esp.rmdir() - self.assertEqual(run("-q", "-e", str(esp)), 0) - self.check_clean() - @colorless @capture_stdout def test_clean_dry_run (self): diff --git a/tests/test_commit.py b/tests/test_commit.py index 4a709e0..3131aa6 100644 --- a/tests/test_commit.py +++ b/tests/test_commit.py @@ -24,7 +24,7 @@ class Tests (unittest.TestCase): data.linux.unlink() data.linux.symlink_to(data.latest) # initialize git repository - os.chdir(data.root) + os.chdir(data.tmp) git(["init"]) git(["config", "user.email", "some@e.mail"]) git(["config", "user.name", "some body"]) @@ -51,7 +51,7 @@ class Tests (unittest.TestCase): git([ "cat-file", "-e", - f"HEAD:{self.latest.config.relative_to(data.root)}"] + f"HEAD:{self.latest.config.relative_to(data.tmp)}"] ).returncode, 0 ) @@ -141,7 +141,7 @@ class Tests (unittest.TestCase): @colorless @capture_stderr def test_commit_missing_repository (self): - shutil.rmtree(data.root / ".git") + shutil.rmtree(data.tmp / ".git") with self.assertRaises(SystemExit): self.assertEqual(run(), 1) self.assertRegex(sys.stderr.getvalue(), r"not a git repository") diff --git a/tests/test_install.py b/tests/test_install.py index 6199d66..9a62ac1 100644 --- a/tests/test_install.py +++ b/tests/test_install.py @@ -18,14 +18,13 @@ class Tests (unittest.TestCase): data.setup() self.kernel = Kernel.latest() self.kernel.bzImage.touch() - # force mounting - ekernel.mount.force = True # start interceptor - self.interceptor = Interceptor() + @data.efi def run (tracer, *args, **kwargs): if args[0][0] == "eselect": data.linux.unlink() data.linux.symlink_to(self.kernel.src.name) + self.interceptor = Interceptor() self.interceptor.add(subprocess.run, call=run) self.interceptor.start() @@ -35,10 +34,15 @@ class Tests (unittest.TestCase): def check_install (self): trace_it = iter(self.interceptor.trace) + # efibootmgr + tracer, (args, kwargs) = next(trace_it) + self.assertEqual(tracer.name, "subprocess.run") + self.assertEqual(args, (["efibootmgr"],)) + self.assertEqual(kwargs, {"capture_output": True, "check": True}) # mount /boot tracer, (args, kwargs) = next(trace_it) self.assertEqual(tracer.name, "subprocess.run") - self.assertEqual(args, (["mount", "/tmp"],)) + self.assertEqual(args, (["mount", "/boot"],)) self.assertEqual(kwargs, {"capture_output": True, "check": True}) # eselect kernel set tracer, (args, kwargs) = next(trace_it) @@ -54,26 +58,19 @@ class Tests (unittest.TestCase): self.assertEqual(tracer.name, "subprocess.run") self.assertEqual(args, (["emerge", "@module-rebuild"],)) self.assertEqual(kwargs, {"check": True}) + # umount /boot + tracer, (args, kwargs) = next(trace_it) + self.assertEqual(tracer.name, "subprocess.run") + self.assertEqual(args, (["umount", "/tmp"],)) + self.assertEqual(kwargs, {"check": True}) # check generated files - self.assertTrue(self.kernel.bootx64.exists()) - self.assertTrue(self.kernel.efi.exists()) + self.assertTrue(self.kernel.boot.exists()) + self.assertTrue(self.kernel.bkp.exists()) def test_install (self): self.assertEqual(run("-q"), 0) self.check_install() - def test_install_esp (self): - esp = data.root / "boot/EFI/linux" - esp.mkdir(parents=True) - self.kernel.efi = esp / self.kernel.efi.name - self.kernel.bootx64 = esp / self.kernel.bootx64.name - self.assertEqual(run("-q", "-e", str(esp)), 0) - self.check_install() - - def test_install_esp_missing (self): - with self.assertRaises(SystemExit): - run("-q", "-e", str(data.root / "boot/EFI/linux")) - def test_install_source (self): self.kernel = Kernel.current() self.assertEqual(run("-q", "-s", str(data.current)), 0) diff --git a/tests/test_kernel.py b/tests/test_kernel.py index 861107c..d50fe16 100644 --- a/tests/test_kernel.py +++ b/tests/test_kernel.py @@ -19,14 +19,17 @@ class Tests (unittest.TestCase): self.assertEqual(k.src, s) self.assertEqual(k.config, s / ".config") self.assertEqual(k.bzImage, s / "arch/x86_64/boot/bzImage") - self.assertEqual(k.efi, data.esp / f"gentoo-{v.base_version}.efi") self.assertEqual(k.modules, data.modules / f"{v.base_version}-gentoo") + self.assertEqual( + k.bkp, + data.boot.parent / f"gentoo-{v.base_version}.efi" + ) def test_paths (self): self.assertEqual(ekernel.Kernel.src, data.src) self.assertEqual(ekernel.Kernel.linux, data.linux) self.assertEqual(ekernel.Kernel.esp, data.esp) - self.assertEqual(ekernel.Kernel.bootx64, data.bootx64) + self.assertEqual(ekernel.Kernel.boot, data.boot) self.assertEqual(ekernel.Kernel.modules, data.modules) def test_version (self): diff --git a/tests/test_update.py b/tests/test_update.py index f438549..1d21ae5 100644 --- a/tests/test_update.py +++ b/tests/test_update.py @@ -15,8 +15,19 @@ def run (*argv): class Tests (unittest.TestCase): def setUp (self): + # setup test environment + data.setup() + self.latest = Kernel.latest() + self.oldconfig = data.latest / ".config.old" + # initialize git repository + os.chdir(data.tmp) + git(["init"]) + git(["config", "user.email", "some@e.mail"]) + git(["config", "user.name", "some body"]) + git(["add", "-f", Kernel.current().config]) + git(["commit", "-m", "initial"]) # start interceptor - self.interceptor = Interceptor() + @data.efi def run (tracer, *args, **kwargs): if args[0][0] == "make": if args[0][1] == "listnewconfig": @@ -33,38 +44,28 @@ class Tests (unittest.TestCase): data.linux.symlink_to(data.latest) elif args[0][0] == "git": return tracer.target(*args, **kwargs) + self.interceptor = Interceptor() self.interceptor.add(subprocess.run, call=run) self.interceptor.start() - # setup test environmenT - data.setup() - self.latest = Kernel.latest() - self.oldconfig = data.latest / ".config.old" - # initialize git repository - os.chdir(data.root) - git(["init"]) - git(["config", "user.email", "some@e.mail"]) - git(["config", "user.name", "some body"]) - git(["add", "-f", Kernel.current().config]) - git(["commit", "-m", "initial"]) def check_update (self): # configure self.assertTrue(self.oldconfig.exists()) self.assertTrue(self.latest.config.exists()) # install - self.assertTrue(self.latest.bootx64.exists()) - self.assertTrue(self.latest.efi.exists()) + self.assertTrue(self.latest.boot.exists()) + self.assertTrue(self.latest.bkp.exists()) # clean for k in data.kernels[2:]: self.assertFalse(k.src.exists()) self.assertFalse(k.modules.exists()) - self.assertFalse(k.efi.exists()) + self.assertFalse(k.bkp.exists()) # check if config has been commited self.assertEqual( git([ "cat-file", "-e", - f"HEAD:{self.latest.config.relative_to(data.root)}"] + f"HEAD:{self.latest.config.relative_to(data.tmp)}"] ).returncode, 0 ) @@ -81,21 +82,6 @@ class Tests (unittest.TestCase): self.assertEqual(run("-q", "-j", "8"), 0) self.check_update() - def test_update_esp (self): - esp = data.root / "boot/EFI/linux" - esp.mkdir(parents=True) - for k in data.kernels: - efi = esp / k.efi.name - if k.efi.exists(): - efi.touch() - k.efi.unlink() - k.efi = efi - data.esp.rmdir() - self.latest.efi = esp / self.latest.efi.name - self.latest.bootx64 = esp / self.latest.bootx64.name - self.assertEqual(run("-q", "-e", str(esp)), 0) - self.check_update() - def test_update_source (self): self.assertEqual(run("-q", "-s", str(data.latest)), 0) self.check_update() @@ -106,7 +92,7 @@ class Tests (unittest.TestCase): self.check_update() self.assertFalse(current.src.exists()) self.assertFalse(current.modules.exists()) - self.assertFalse(current.efi.exists()) + self.assertFalse(current.bkp.exists()) @capture_stdout def test_update_message (self):