blob: 43b6036c61502d2e7eac3a1c32ac521f53e6b82f [file] [log] [blame]
import argparse
import logging
from pathlib import Path
import sys
import textwrap
import os
import bpf
import kernel
import vm
import config
logger = logging.getLogger(__name__)
def cmd_kernel_list(args):
"""List all available kernels"""
kmanager = kernel.KernelManager()
kernels = kmanager.list_kernels()
if kernels:
for k in kernels:
print(k)
else:
logger.info("No kernels available")
def cmd_kernel_remove(args):
"""Remove a kernel"""
kmanager = kernel.KernelManager()
if not args.kernel:
logger.error("kernel version required for remove action")
sys.exit(1)
kmanager.remove_kernel(args.kernel)
logger.info(f"Kernel {args.kernel} removed")
print(f"Kernel {args.kernel} removed")
def cmd_kernel_build(args):
"""Build a kernel"""
kmanager = kernel.KernelManager()
if not args.kernel:
logger.error("kernel version required for build action")
sys.exit(1)
kmanager.build_kernel(version=args.kernel)
def cmd_bpf_compile(args):
"""Compile BPF source to bytecode only"""
kmanager = kernel.KernelManager()
try:
kernel_spec, _ = kmanager.get_kernel_from_version(version=args.kernel)
except Exception as e:
logger.error(f"Failed to get kernel: {e}")
sys.exit(1)
try:
bpf_program = bpf.BPFProgram(source_path=Path(args.bpf_src))
output_path = bpf_program.compile_bpf(kernel_spec)
if args.output:
import shutil
output_dest = Path(args.output)
shutil.copy2(output_path, output_dest)
logger.info(f"Copied to: {output_dest}")
except Exception as e:
logger.error(f"Failed to compile BPF source: {e}")
sys.exit(1)
def cmd_vmtest(args):
"""Handle vmtest subcommand"""
kmanager = kernel.KernelManager()
try:
kernel_spec, kernel_image = kmanager.get_kernel_from_version(
version=args.kernel
)
except Exception as e:
logger.error(f"Failed to get kernel: {e}")
sys.exit(1)
try:
if args.bpf_src:
command = bpf.BPFProgram.from_source(Path(args.bpf_src), kernel_spec)
elif args.bpf_obj:
command = bpf.BPFProgram.from_bpf_obj(Path(args.bpf_obj), kernel_spec)
elif args.command:
command = args.command
except Exception as e:
logger.error(f"Failed to prepare command for vmtest: {e}")
sys.exit(1)
virtual_machine = vm.VirtualMachine(kernel_image, args.rootfs, str(command))
try:
result = virtual_machine.execute()
except vm.BootFailedError as e:
logger.error(f"VM boot failure: {e}")
sys.exit(e.returncode)
if args.bpf_src or args.bpf_obj:
if result.returncode == 0:
print("BPF programs successfully loaded")
else:
if "Failed to load BPF skeleton" in result.stdout:
print("BPF program failed to load")
print("Verifier logs:")
print(textwrap.indent(vm.bpf_verifier_logs(result.stdout), "\t"))
elif args.command:
print(result.stdout)
sys.exit(result.returncode)
def main():
parser = argparse.ArgumentParser(
description="BPF vmtest tool",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=textwrap.dedent("""
Examples:
# Compile BPF source to bytecode
%(prog)s bpf compile my_prog.bpf.c -o my_prog.bpf.o
# Run BPF program in VM
%(prog)s vmtest --kernel 6.15-x86_64 --bpf-src my_prog.bpf.c
# List available kernels
%(prog)s kernel list
"""),
)
parser.add_argument(
"-v",
"--log-level",
help="Log level",
metavar="DEBUG|INFO|WARNING|ERROR",
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
default="INFO",
)
parser.add_argument(
"--vmtest-dir",
help="Directory for vmtest artifacts (or set VMTEST_DIR env variable)",
metavar="DIR",
type=str,
default=os.getenv("VMTEST_DIR"),
)
subparsers = parser.add_subparsers(dest="subcommand", help="Available commands")
# BPF subcommand
bpf_subparser = subparsers.add_parser("bpf", help="BPF program management")
bpf_subparsers = bpf_subparser.add_subparsers(dest="bpf_action", help="BPF actions")
# bpf compile subcommand
compile_parser = bpf_subparsers.add_parser(
"compile", help="Compile BPF source to bytecode (.bpf.o)"
)
compile_parser.add_argument(
"bpf_src",
help="Path to BPF C source file",
type=str,
)
compile_parser.add_argument(
"-o",
"--output",
help="Output path for compiled bytecode (optional, defaults to temp dir)",
metavar="PATH",
type=str,
required=True,
)
compile_parser.add_argument(
"-k",
"--kernel",
help="Kernel version to use for compilation",
metavar="VERSION",
type=str,
required=True,
)
compile_parser.set_defaults(func=cmd_bpf_compile)
# VMtest subcommand
vmtest_parser = subparsers.add_parser("vmtest", help="Run VM tests")
vmtest_parser.set_defaults(func=cmd_vmtest)
vmtest_parser.add_argument(
"-k",
"--kernel",
help="Kernel version to boot in the vm",
metavar="VERSION",
type=str,
required=True,
)
vmtest_parser.add_argument(
"-r", "--rootfs", help="rootfs to mount in the vm", default="/", metavar="PATH"
)
command_group = vmtest_parser.add_mutually_exclusive_group(required=True)
command_group.add_argument(
"--bpf-src",
help="Path to BPF C source file",
metavar="PATH",
type=str,
)
command_group.add_argument(
"--bpf-obj",
help="Path to bpf bytecode object",
metavar="PATH",
type=str,
)
command_group.add_argument(
"-c", "--command", help="command to run in the vm", metavar="COMMAND"
)
# Kernel subcommand with nested subcommands
kernel_subparser = subparsers.add_parser("kernel", help="Kernel management")
kernel_subparsers = kernel_subparser.add_subparsers(
dest="kernel_action", help="Kernel actions"
)
# kernel list subcommand
list_parser = kernel_subparsers.add_parser(
"list", help="List all available kernels"
)
list_parser.set_defaults(func=cmd_kernel_list)
# kernel remove subcommand
remove_parser = kernel_subparsers.add_parser("remove", help="Remove a kernel")
remove_parser.add_argument(
"kernel", help="Kernel version to remove (e.g., 6.15-x86_64)"
)
remove_parser.set_defaults(func=cmd_kernel_remove)
# kernel build subcommand
build_parser = kernel_subparsers.add_parser("build", help="Build a kernel")
build_parser.add_argument(
"kernel", help="Kernel version to build (e.g. 6.15-x86_64)"
)
build_parser.set_defaults(func=cmd_kernel_build)
args = parser.parse_args()
logging.basicConfig(level=args.log_level, format="%(levelname)s: %(message)s")
if not args.vmtest_dir:
logger.error(
"VMTEST_DIR not specified. Use --vmtest-dir=DIR or set VMTEST_DIR environment variable"
)
sys.exit(1)
vmtest_path = Path(args.vmtest_dir)
if not vmtest_path.exists():
logger.error(f"VMTEST_DIR does not exist: {vmtest_path}")
sys.exit(1)
if not vmtest_path.is_dir():
logger.error(f"VMTEST_DIR is not a directory: {vmtest_path}")
sys.exit(1)
try:
config.init_config(vmtest_dir=args.vmtest_dir)
except ValueError as e:
logger.error(str(e))
sys.exit(1)
logger.debug(f"VMTEST_DIR set to: {args.vmtest_dir}")
if hasattr(args, "func"):
args.func(args)
sys.exit(0)
else:
parser.print_help()
sys.exit(1)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
logger.error("Operation cancelled by user")
sys.exit(1)
except Exception as e:
logger.error(f"Unknown error: {e}")
sys.exit(1)