Skip to content

Commit 9b15435

Browse files
committed
tests: add test to validate seccomp filters
Add a test to validate that a seccomp filter works as defined in the JSON description. To do this we use a simple C program that just loads a given seccomp filter and calls a syscall also given in the arguments. Signed-off-by: Pablo Barbáchano <[email protected]>
1 parent 5104188 commit 9b15435

File tree

2 files changed

+211
-0
lines changed

2 files changed

+211
-0
lines changed

tests/host_tools/test_syscalls.c

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
// Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
// This is used by `test_seccomp_validate.py`
5+
6+
#include <linux/types.h>
7+
#include <linux/filter.h>
8+
#include <linux/seccomp.h>
9+
#include <sys/prctl.h>
10+
#include <sys/stat.h>
11+
12+
#include <unistd.h>
13+
#include <stdlib.h>
14+
#include <stdio.h>
15+
#include <string.h>
16+
#include <fcntl.h>
17+
18+
19+
void install_bpf_filter(char *bpf_file) {
20+
int fd = open(bpf_file, O_RDONLY);
21+
if (fd == -1) {
22+
perror("open");
23+
exit(EXIT_FAILURE);
24+
}
25+
struct stat sb;
26+
if (fstat(fd, &sb) == -1) {
27+
perror("stat");
28+
exit(EXIT_FAILURE);
29+
}
30+
size_t size = sb.st_size;
31+
size_t insn_len = size / sizeof(struct sock_filter);
32+
struct sock_filter *filterbuf = (struct sock_filter*)malloc(size);
33+
if (read(fd, filterbuf, size) == -1) {
34+
perror("read");
35+
exit(EXIT_FAILURE);
36+
}
37+
38+
/* Install seccomp filter */
39+
struct sock_fprog prog = {
40+
.len = (unsigned short)(insn_len),
41+
.filter = filterbuf,
42+
};
43+
if (prctl(PR_SET_NO_NEW_PRIVS, 1, 0, 0, 0)) {
44+
perror("prctl(NO_NEW_PRIVS)");
45+
exit(EXIT_FAILURE);
46+
}
47+
if (prctl(PR_SET_SECCOMP, SECCOMP_MODE_FILTER, &prog)) {
48+
perror("prctl(SECCOMP)");
49+
exit(EXIT_FAILURE);
50+
}
51+
}
52+
53+
54+
int main(int argc, char **argv) {
55+
/* parse arguments */
56+
if (argc < 3) {
57+
fprintf(stderr, "Usage: %s BPF_FILE ARG0..\n", argv[0]);
58+
exit(EXIT_FAILURE);
59+
}
60+
char *bpf_file = argv[1];
61+
long syscall_id = atoi(argv[2]);
62+
long arg0, arg1, arg2, arg3;
63+
arg0 = arg1 = arg2 = arg3 = 0;
64+
if (argc > 3) arg0 = atoi(argv[3]);
65+
if (argc > 4) arg1 = atoi(argv[4]);
66+
if (argc > 5) arg2 = atoi(argv[5]);
67+
if (argc > 6) arg3 = atoi(argv[6]);
68+
69+
/* read seccomp filter from file */
70+
if (strcmp(bpf_file, "/dev/null") != 0) {
71+
install_bpf_filter(bpf_file);
72+
}
73+
74+
long res = syscall(syscall_id, arg0, arg1, arg2, arg3);
75+
printf("%ld\n", res);
76+
return EXIT_SUCCESS;
77+
}
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
# Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
"""Test that validates that seccompiler filters work as expected"""
5+
6+
import json
7+
import platform
8+
import struct
9+
from pathlib import Path
10+
11+
import pytest
12+
import seccomp
13+
14+
from framework import utils
15+
from host_tools import cargo_build
16+
17+
ARCH = platform.machine()
18+
19+
20+
@pytest.fixture(scope="session")
21+
def bin_test_syscall(test_fc_session_root_path):
22+
"""Build a simple vsock client/server application."""
23+
test_syscall_bin = Path(test_fc_session_root_path) / "test_syscall"
24+
cargo_build.gcc_compile("host_tools/test_syscalls.c", test_syscall_bin)
25+
assert test_syscall_bin.exists()
26+
yield test_syscall_bin
27+
28+
29+
class BpfMapReader:
30+
"""
31+
Simple reader for the files that seccompiler-bin produces
32+
33+
The files are serialized with bincode[1] in format that is easy to parse.
34+
35+
sock_filter = <ushort uchar uchar uint>
36+
BpfProgram = Vec<sock_filter>
37+
BpfMap = BTreeMap(str, BpfProgram)
38+
str = Vec<uchar>
39+
40+
[1] https://github.com/bincode-org/bincode/blob/trunk/docs/spec.md
41+
"""
42+
43+
INSN_FMT = "<HBBI"
44+
INSN_SIZEOF = struct.calcsize(INSN_FMT)
45+
46+
def __init__(self, buf):
47+
self.buf = buf
48+
self.offset = 0
49+
50+
@classmethod
51+
def from_file(cls, file):
52+
"""Initialize a buffer from a file"""
53+
return cls(Path(file).read_bytes())
54+
55+
def read_format(self, fmt):
56+
"""Read a struct format string from the buffer"""
57+
val = struct.unpack_from(fmt, self.buf, offset=self.offset)
58+
self.offset += struct.calcsize(fmt)
59+
if len(val) == 1:
60+
return val[0]
61+
return val
62+
63+
def is_eof(self):
64+
"""Are we at the end of the buffer?"""
65+
return self.offset == len(self.buf)
66+
67+
def lookahead(self, size):
68+
"""Look ahead `size` bytes"""
69+
return self.buf[self.offset : self.offset + size]
70+
71+
def split(self):
72+
"""Return separate filters"""
73+
threads = {}
74+
# how many entries in the map
75+
map_len = self.read_format("<Q")
76+
for _ in range(map_len):
77+
# read key
78+
key_str_len = self.read_format("<Q")
79+
key_str = self.read_format(f"{key_str_len}s")
80+
# read value: vec of instructions
81+
insn_len = self.read_format("<Q")
82+
data = self.lookahead(insn_len * self.INSN_SIZEOF)
83+
threads[key_str.decode("ascii")] = data
84+
self.offset += len(data)
85+
86+
assert self.is_eof()
87+
return threads
88+
89+
90+
def test_validate_filter(seccompiler, bin_test_syscall, monkeypatch, tmp_path):
91+
"""Assert that the seccomp filter matches the JSON description."""
92+
93+
fc_filter_path = Path(f"../resources/seccomp/{ARCH}-unknown-linux-musl.json")
94+
fc_filter = json.loads(fc_filter_path.read_text(encoding="ascii"))
95+
96+
# cd to a tmp dir because we are going to generate lots of coredumps
97+
monkeypatch.chdir(tmp_path)
98+
bpf_path = seccompiler.compile(fc_filter)
99+
filters = BpfMapReader.from_file(bpf_path).split()
100+
arch = seccomp.Arch.X86_64 if ARCH == "x86_64" else seccomp.Arch.AARCH64
101+
for thread, filter_data in fc_filter.items():
102+
filter_path = Path(f"{thread}.bpf")
103+
filter_path.write_bytes(filters[thread])
104+
# for each rule, run the helper program and execute a syscall
105+
for rule in filter_data["filter"]:
106+
print(filter_path, rule)
107+
syscall = rule["syscall"]
108+
# this one cannot be called directly
109+
if syscall in ["rt_sigreturn"]:
110+
continue
111+
syscall_id = seccomp.resolve_syscall(arch, syscall)
112+
cmd = f"{bin_test_syscall} {filter_path} {syscall_id}"
113+
if "args" not in rule:
114+
# syscall should be allowed with any arguments and exit 0
115+
assert utils.run_cmd(cmd).returncode == 0
116+
else:
117+
allowed_args = [0] * 4
118+
# if we call it with allowed args, it should exit 0
119+
for arg in rule["args"]:
120+
allowed_args[arg["index"]] = arg["val"]
121+
allowed_str = " ".join(str(x) for x in allowed_args)
122+
assert utils.run_cmd(f"{cmd} {allowed_str}").returncode == 0
123+
# for each allowed arg try a different number
124+
for arg in rule["args"]:
125+
# We just add 1000000 to the allowed arg and assume it is
126+
# not something we allow in another rule. While not perfect
127+
# it works in practice.
128+
bad_args = allowed_args.copy()
129+
bad_args[arg["index"]] = str(arg["val"] + 1_000_000)
130+
unallowed_str = " ".join(str(x) for x in bad_args)
131+
outcome = utils.run_cmd(f"{cmd} {unallowed_str}")
132+
# if we call it with unallowed args, it should exit 159
133+
# 159 = 128 (abnormal termination) + 31 (SIGSYS)
134+
assert outcome.returncode == 159

0 commit comments

Comments
 (0)