blob: de7d2eb0e06630f745118fa0209b02015807b701 [file] [log] [blame]
import logging
import os
import shutil
import subprocess
from pathlib import Path
import re
import sys
from urllib.parse import urljoin
from urllib.request import urlretrieve
from typing import Optional, List
from dataclasses import dataclass
import config
import utils
logger = logging.getLogger(__name__)
@dataclass
class KernelSpec:
"""Immutable kernel specification"""
version: str
arch: str | None = None
def __post_init__(self):
if self.arch is None or self.arch == "":
self.arch = config.config.arch
self.major = self.version.split(".")[0]
def __str__(self):
return f"{self.version}-{self.arch}"
@property
def kernel_build_dir(self) -> Path:
return config.config.kernels_dir / f"linux-{self}-build"
@property
def kernel_dir(self) -> Path:
return config.config.kernels_dir / f"linux-{self}"
@property
def bzimage_path(self) -> Path:
return self.kernel_dir / f"bzImage-{self}"
@property
def bpftool_path(self) -> Path:
return self.kernel_dir / "bpftool"
@property
def libbpf_path(self) -> Path:
return self.kernel_dir / "libbpf.a"
@property
def vmlinux_path(self) -> Path:
return self.kernel_dir / "vmlinux.h"
@property
def tarball_path(self) -> Path:
return config.config.kernels_dir / f"linux-{self}.tar.xz"
class KernelImage:
"""Represents a compiled kernel image"""
def __init__(self, path: Path):
if not isinstance(path, Path):
path = Path(path)
if not path.exists():
raise FileNotFoundError(f"Kernel image not found: {path}")
self.path = path
def __str__(self):
return str(self.path)
class KernelCompiler:
"""Handles complete kernel compilation process including download and build"""
@staticmethod
def _progress_hook(block_num: int, block_size: int, total_size: int) -> None:
"""Progress hook for urlretrieve to display download progress"""
if total_size <= 0:
return
downloaded = block_num * block_size
percent = min(downloaded * 100 // total_size, 100)
bar_length = 10
filled = int(bar_length * downloaded // total_size)
bar = "#" * filled + "-" * (bar_length - filled)
if logger.getEffectiveLevel() <= logging.INFO:
downloaded_mb = downloaded / (1024 * 1024)
total_mb = total_size / (1024 * 1024)
if sys.stdout.isatty():
sys.stdout.write(
f"\rDownloading: |{bar}| {percent}% ({downloaded_mb:.2f}MB / {total_mb:.2f}MB)"
)
sys.stdout.flush()
if downloaded >= total_size:
sys.stdout.write("\n")
sys.stdout.flush()
else:
if downloaded >= total_size or downloaded % (block_size * 100) == 0:
logger.info(
f"Downloading: {percent}% ({downloaded_mb:.2f}MB / {total_mb:.2f}MB)"
)
def compile_from_version(self, spec: KernelSpec) -> KernelImage:
"""Complete compilation process from kernel version"""
if spec.bzimage_path.exists():
logger.info(f"Kernel {spec} already exists, skipping compilation")
return KernelImage(spec.bzimage_path)
try:
self._download_source(spec)
self._extract_source(spec)
self._configure_kernel(spec)
self._compile_kernel(spec)
self._copy_bzimage(spec)
logger.info(f"Successfully compiled kernel {spec}")
return KernelImage(spec.bzimage_path)
except Exception as e:
logger.error(f"Failed to compile kernel {spec}: {e}")
sys.exit(1)
finally:
# Always cleanup temporary files
self._cleanup(spec)
def _download_source(self, spec: KernelSpec) -> None:
"""Download kernel source tarball"""
if spec.tarball_path.exists():
logger.info(f"Tarball already exists: {spec.tarball_path}")
return
url_suffix = f"v{spec.major}.x/linux-{spec.version}.tar.xz"
url = urljoin(config.config.kernel_tarball_url, url_suffix)
logger.info(f"Downloading kernel from {url}")
spec.tarball_path.parent.mkdir(parents=True, exist_ok=True)
urlretrieve(url, spec.tarball_path, reporthook=self._progress_hook)
logger.info("Kernel source downloaded")
def _extract_source(self, spec: KernelSpec) -> None:
"""Extract kernel source tarball"""
logger.info(f"Extracting kernel source to {spec.kernel_build_dir}")
spec.kernel_build_dir.mkdir(parents=True, exist_ok=True)
utils.run_command(
[
"tar",
"-xf",
str(spec.tarball_path),
"-C",
str(spec.kernel_build_dir),
"--strip-components=1",
]
)
def _configure_kernel(self, spec: KernelSpec) -> None:
"""Configure kernel with provided config files"""
config_path = spec.kernel_build_dir / ".config"
with open(config_path, "wb") as kconfig:
for config_rel_path in config.config.kconfig_rel_paths:
config_abs_path = spec.kernel_build_dir / config_rel_path
if config_abs_path.exists():
with open(config_abs_path, "rb") as conf:
kconfig.write(conf.read())
logger.info("Updated kernel configuration")
def _compile_kernel(self, spec: KernelSpec) -> None:
"""Compile the kernel"""
logger.info(f"Compiling kernel in {spec.kernel_build_dir}")
old_cwd = os.getcwd()
try:
os.chdir(spec.kernel_build_dir)
# pahole is required for the DEBUG_INFO_BTF kernel configuration option.
pahole_path = shutil.which("pahole")
if pahole_path is None:
logger.error(
"pahole not found in PATH. BTF generation requires pahole v1.16 or later."
)
sys.exit(1)
friendly_cores = os.cpu_count() - 2
utils.run_command(["make", "olddefconfig"], stream_output=True)
logger.info("Starting kernel compilation")
utils.run_command(
["make", f"-j{friendly_cores}", "bzImage"], stream_output=True
)
logger.info("Compiling bpftool")
utils.run_command(
["make", "-C", "tools/bpf", f"-j{friendly_cores}", "bpftool"],
stream_output=True,
)
logger.info("Compiling libbpf")
utils.run_command(
["make", "-C", "tools/lib/bpf", f"-j{friendly_cores}"],
stream_output=True,
)
except subprocess.CalledProcessError as e:
logger.error(f"Kernel compilation failed: {e}")
sys.exit(1)
finally:
os.chdir(old_cwd)
def _copy_bzimage(self, spec: KernelSpec) -> None:
"""Copy compiled bzImage to final location"""
# compile the bpftool as well
src = spec.kernel_build_dir / "arch/x86/boot/bzImage"
dest = spec.bzimage_path
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src, dest)
logger.info(f"Stored bzImage at {dest}")
shutil.copy2(
spec.kernel_build_dir / "tools/bpf/bpftool/vmlinux.h", spec.vmlinux_path
)
logger.info(f"Stored vmlinux at {spec.vmlinux_path}")
shutil.copy2(
spec.kernel_build_dir / "tools/bpf/bpftool/bpftool", spec.bpftool_path
)
logger.info(f"Stored bpftool at {spec.bpftool_path}")
shutil.copy2(spec.kernel_build_dir / "tools/lib/bpf/libbpf.a", spec.libbpf_path)
logger.info(f"Stored libbpf at {spec.libbpf_path}")
def _cleanup(self, spec: KernelSpec) -> None:
"""Clean up temporary files"""
if spec.tarball_path.exists():
spec.tarball_path.unlink()
logger.info("Removed tarball")
if spec.kernel_build_dir.exists():
shutil.rmtree(spec.kernel_build_dir)
logger.info("Removed kernel source directory")
class KernelManager:
"""Main interface for kernel management"""
def __init__(self):
self.compiler = KernelCompiler()
def remove_kernel(self, name: str) -> None:
"""Remove compiled kernel by version"""
version, _, arch = name.partition("-")
spec = KernelSpec(version=version, arch=arch)
if spec.kernel_dir.exists():
shutil.rmtree(spec.kernel_dir)
logger.info(f"Removed kernel {spec}")
else:
logger.error(
f"Kernel {spec} does not exist, path {spec.kernel_dir} not found"
)
raise SystemExit(1)
def build_kernel(self, version: str, arch=None) -> None:
"""Build kernel from version"""
spec = KernelSpec(version=version, arch=arch)
self.compiler.compile_from_version(spec)
@staticmethod
def get_kernel_from_version(
version: str,
):
"""Get kernel image from version"""
version, _, arch = version.partition("-")
spec = KernelSpec(version=version, arch=arch)
if spec.bzimage_path.exists():
return spec, KernelImage(spec.bzimage_path)
else:
raise FileNotFoundError(
f"Kernel {spec} not found. Use 'main.py kernel build' to create it."
)
def list_kernels(self) -> List[str]:
"""List all available compiled kernels"""
if not config.config.kernels_dir.exists():
raise FileNotFoundError(
f"Kernels directory not found: {config.config.kernels_dir}"
)
kernels = []
for file in config.config.kernels_dir.glob("linux-*"):
if file.is_dir():
match = re.match(r"linux-(.*)", file.name)
if match:
kernels.append(match.group(1))
return sorted(kernels)