blob: 56d2052aba8ab85270c19d47d4a4279f96b03744 [file] [log] [blame]
import logging
from pathlib import Path
import subprocess
from typing import List
from kernel import KernelImage
logger = logging.getLogger(__name__)
class VMConfig:
"""Configuration container for VM settings"""
def __init__(
self, kernel_image: KernelImage, rootfs_path: str, command: str, **kwargs
):
self.kernel = kernel_image
self.kernel_path = str(kernel_image.path)
self.rootfs_path = rootfs_path
self.command = command
self.memory_mb = kwargs.get("memory_mb", 512)
self.cpu_count = kwargs.get("cpu_count", 1)
self.extra_args = kwargs.get("extra_args", {})
def bpf_verifier_logs(output: str) -> str:
start_tag = "--- Verifier log start ---"
end_tag = "--- Verifier log end ---"
start_idx = output.find(start_tag)
end_idx = output.find(end_tag)
if start_idx != -1 and end_idx != -1:
# Extract between the tags (excluding the markers themselves)
log_body = output[start_idx + len(start_tag) : end_idx].strip()
return log_body
else:
return "No verifier log found in the output."
class Vmtest:
"""vmtest backend implementation"""
def __init__(self):
pass
def _boot_command(self, vm_config: VMConfig):
vmtest_command = ["vmtest"]
vmtest_command.extend(["-k", vm_config.kernel_path])
vmtest_command.extend(["-r", vm_config.rootfs_path])
# If it is a compiled BPF program, use the mounted path inside the VM
if vm_config.command.endswith(".bin"):
vmtest_command.append("/mnt/vmtest/" + Path(vm_config.command).name)
else:
vmtest_command.append(vm_config.command)
return vmtest_command
def _remove_boot_log(self, full_output: str) -> str:
"""
Filters QEMU and kernel boot logs, returning only the output after the
`===> Running command` marker.
"""
marker = "===> Running command"
lines = full_output.splitlines()
try:
start_index = next(i for i, line in enumerate(lines) if marker in line)
# Return everything after that marker (excluding the marker itself)
return "\n".join(lines[start_index + 1 :]).strip()
except StopIteration:
return full_output.strip()
def run_command(self, vm_config):
vm = None
try:
logger.info(f"Booting VM with kernel: {vm_config.kernel_path}")
logger.info(f"Using rootfs: {vm_config.rootfs_path}")
vm = subprocess.run(
self._boot_command(vm_config),
check=True,
text=True,
capture_output=True,
shell=False,
cwd=Path(vm_config.command).parent,
)
vm_stdout = vm.stdout
logger.debug(vm_stdout)
return VMCommandResult(
vm.returncode, self._remove_boot_log(vm_stdout), None
)
except FileNotFoundError:
raise BootFailedError(
"vmtest command not found in PATH. Please ensure vmtest is installed and available in your system PATH."
)
except subprocess.CalledProcessError as e:
out = e.stdout
err = e.stderr
# when the command in the vm fails we consider it as a successful boot
if "===> Running command" not in out:
raise BootFailedError("Boot failed", out, err, e.returncode)
logger.debug("STDOUT: \n%s", out)
logger.debug("STDERR: \n%s", err)
return VMCommandResult(e.returncode, self._remove_boot_log(out), err)
class VMCommandResult:
def __init__(self, returncode, stdout, stderr) -> None:
self.returncode = returncode
self.stdout = stdout
self.stderr = stderr
class VirtualMachine:
"""Main VM class - simple interface for end users"""
# Registry of available hypervisors
_hypervisors = {
"vmtest": Vmtest,
}
def __init__(
self,
kernel_image: KernelImage,
rootfs_path: str,
command: str,
hypervisor_type: str = "vmtest",
**kwargs,
):
self.config = VMConfig(kernel_image, rootfs_path, command, **kwargs)
if hypervisor_type not in self._hypervisors:
raise ValueError(f"Unsupported hypervisor: {hypervisor_type}")
self.hypervisor = self._hypervisors[hypervisor_type]()
@classmethod
def list_hypervisors(cls) -> List[str]:
"""List available hypervisors"""
return list(cls._hypervisors.keys())
def execute(self):
"""Execute command in VM"""
return self.hypervisor.run_command(self.config)
class BootFailedError(Exception):
"""Raised when VM fails to boot properly (before command execution)."""
def __init__(
self, message: str, stdout: str = "", stderr: str = "", returncode: int = -1
):
super().__init__(message)
self.stdout = stdout
self.stderr = stderr
self.returncode = returncode
def __str__(self):
base = super().__str__()
output_parts = [
base,
f"Return code: {self.returncode}",
]
optional_sections = [
("STDOUT", self.stdout),
("STDERR", self.stderr),
]
for header, content in optional_sections:
if content:
output_parts.append(f"--- {header} ---")
output_parts.append(content)
return "\n".join(output_parts)