blob: d9a01328bf26be54439e3e4d9c31134f9206bc2a [file] [log] [blame]
import sys
from unittest import mock
import pytest
from bpf import BPFProgram
import kernel
import main
import logging
import config
import os
logger = logging.getLogger(__name__)
@pytest.fixture
def tmp_config():
VMTEST_DIR = "/home/d3bug/.bpf-vmtest-tool"
assert VMTEST_DIR is not None, "Specify VMTEST_DIR environment varible"
config.init_config(vmtest_dir=VMTEST_DIR)
# reset config for every test
@pytest.fixture(autouse=True)
def reset_config():
config.config = None
@pytest.fixture
def openat_bpf_source(tmp_path):
openat_bpf = tmp_path / "openat_bpf.c"
openat_bpf.write_text(r"""
#include "vmlinux.h"
#include <bpf/bpf_helpers.h>
#include <bpf/bpf_tracing.h>
#include <bpf/bpf_core_read.h>
char LICENSE[] SEC("license") = "GPL";
int example_pid = 0;
SEC("tracepoint/syscalls/sys_enter_openat")
int handle_openat(struct trace_event_raw_sys_enter *ctx)
{
int pid = bpf_get_current_pid_tgid() >> 32;
char filename[256]; // filename buffer
bpf_probe_read_user(&filename, sizeof(filename), (void *)ctx->args[1]);
bpf_printk("sys_enter_openat() called from PID %d for file: %s\n", pid,
filename);
return 0;
}
""")
return openat_bpf
@pytest.fixture
def openat_bpf_obj(openat_bpf_source, tmp_config):
def _create_openat_bpf_obj(kernel_spec):
bpf_program = BPFProgram(source_path=openat_bpf_source)
bpf_program._compile_bpf(kernel_spec)
return bpf_program.bpf_obj
return _create_openat_bpf_obj
@pytest.fixture
def invalid_memory_access_bpf_source(tmp_path):
invalid_memory_access_bpf = tmp_path / "invalid_memory_access_bpf.c"
invalid_memory_access_bpf.write_text(r"""
#include "vmlinux.h"
#include <bpf/bpf_helpers.h>
#include <bpf/bpf_tracing.h>
char LICENSE[] SEC("license") = "GPL";
SEC("tracepoint/syscalls/sys_enter_openat")
int bpf_prog(struct trace_event_raw_sys_enter *ctx) {
int arr[4] = {1, 2, 3, 4};
// Invalid memory access: out-of-bounds
int val = arr[5]; // This causes the verifier to fail
return val;
}
""")
return invalid_memory_access_bpf
@pytest.fixture
def invalid_memory_access_bpf_obj(invalid_memory_access_bpf_source, tmp_config):
def _create_invalid_memory_access_bpf_obj(kernel_spec):
bpf_program = BPFProgram(source_path=invalid_memory_access_bpf_source)
bpf_program._compile_bpf(kernel_spec)
return bpf_program.bpf_obj
return _create_invalid_memory_access_bpf_obj
def run_main_with_args_and_capture_output(args, capsys):
with mock.patch.object(sys, "argv", args):
try:
main.main()
except SystemExit as e:
result = capsys.readouterr()
output = result.out.rstrip()
error = result.err.rstrip()
logger.debug("STDOUT:\n%s", output)
logger.debug("STDERR:\n%s", error)
return (e.code, output, error)
except Exception as e:
pytest.fail(f"Unknown error happend: {e}")
else:
pytest.fail("Expected main to raise SystemExit")
KERNEL_VERSION = "6.16"
kernel_cli_flags = [["--kernel", KERNEL_VERSION]]
@pytest.mark.parametrize("kernel_args", kernel_cli_flags)
class TestCLI:
def test_main_with_valid_bpf(self, kernel_args, openat_bpf_source, capsys):
args = [
"main.py",
"vmtest",
*kernel_args,
"--rootfs",
"/",
"--bpf-src",
str(openat_bpf_source),
]
code, output, _ = run_main_with_args_and_capture_output(args, capsys)
assert code == 0
assert "BPF programs successfully loaded" == output
def test_main_with_valid_bpf_obj(self, kernel_args, openat_bpf_obj, capsys):
args = [
"main.py",
"vmtest",
*kernel_args,
"--rootfs",
"/",
"--bpf-obj",
str(openat_bpf_obj(kernel.KernelSpec(kernel_args[1]))),
]
code, output, _ = run_main_with_args_and_capture_output(args, capsys)
assert code == 0
assert "BPF programs successfully loaded" == output
def test_main_with_invalid_bpf(
self, kernel_args, invalid_memory_access_bpf_source, capsys
):
args = [
"main.py",
"vmtest",
*kernel_args,
"--rootfs",
"/",
"--bpf-src",
str(invalid_memory_access_bpf_source),
]
code, output, _ = run_main_with_args_and_capture_output(args, capsys)
output_lines = output.splitlines()
assert code == 1
assert "BPF program failed to load" == output_lines[0]
assert "Verifier logs:" == output_lines[1]
def test_main_with_invalid_bpf_obj(
self, kernel_args, invalid_memory_access_bpf_obj, capsys
):
args = [
"main.py",
"vmtest",
*kernel_args,
"--rootfs",
"/",
"--bpf-obj",
str(invalid_memory_access_bpf_obj(kernel.KernelSpec(kernel_args[1]))),
]
code, output, _ = run_main_with_args_and_capture_output(args, capsys)
output_lines = output.splitlines()
assert code == 1
assert "BPF program failed to load" == output_lines[0]
assert "Verifier logs:" == output_lines[1]
def test_main_with_valid_command(self, kernel_args, capsys):
args = ["main.py", "vmtest", *kernel_args, "--rootfs", "/", "-c", "uname -r"]
code, output, _ = run_main_with_args_and_capture_output(args, capsys)
assert code == 0
assert f"{kernel_args[1]}.0" in output
def test_main_with_invalid_command(self, kernel_args, capsys):
args = [
"main.py",
"vmtest",
*kernel_args,
"--rootfs",
"/",
"-c",
"NotImplemented",
]
code, output, error = run_main_with_args_and_capture_output(args, capsys)
assert code != 0
assert f"Command failed with exit code: {code}" in output
def test_bpf_compile_subcommand(
self, kernel_args, openat_bpf_source, tmp_path, capsys
):
args = [
"main.py",
"bpf",
"compile",
*kernel_args,
"-o",
"",
str(openat_bpf_source),
]
code, _, _ = run_main_with_args_and_capture_output(args, capsys)
assert code == 0