Skip to content

Commit 979cf1b

Browse files
committed
tests: add test to validate seccomp filters
Add a test to validate that a seccomp filter works as defined in the JSON description. To do this we use a simple C program that just loads a given seccomp filter and calls a syscall also given in the arguments. Signed-off-by: Pablo Barbáchano <[email protected]>
1 parent 2027594 commit 979cf1b

File tree

2 files changed

+215
-0
lines changed

2 files changed

+215
-0
lines changed

tests/host_tools/test_syscalls.c

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
// Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
// This is used by `test_seccomp_validate.py`
5+
6+
#include <linux/types.h>
7+
#include <linux/filter.h>
8+
#include <linux/seccomp.h>
9+
#include <sys/prctl.h>
10+
#include <sys/stat.h>
11+
12+
#include <unistd.h>
13+
#include <stdlib.h>
14+
#include <stdio.h>
15+
#include <string.h>
16+
#include <fcntl.h>
17+
18+
19+
void install_bpf_filter(char *bpf_file) {
20+
int fd = open(bpf_file, O_RDONLY);
21+
if (fd == -1) {
22+
perror("open");
23+
exit(EXIT_FAILURE);
24+
}
25+
struct stat sb;
26+
if (fstat(fd, &sb) == -1) {
27+
perror("stat");
28+
exit(EXIT_FAILURE);
29+
}
30+
size_t size = sb.st_size;
31+
size_t insn_len = size / sizeof(struct sock_filter);
32+
struct sock_filter *filterbuf = (struct sock_filter*)malloc(size);
33+
if (read(fd, filterbuf, size) == -1) {
34+
perror("read");
35+
exit(EXIT_FAILURE);
36+
}
37+
38+
/* Install seccomp filter */
39+
struct sock_fprog prog = {
40+
.len = (unsigned short)(insn_len),
41+
.filter = filterbuf,
42+
};
43+
if (prctl(PR_SET_NO_NEW_PRIVS, 1, 0, 0, 0)) {
44+
perror("prctl(NO_NEW_PRIVS)");
45+
exit(EXIT_FAILURE);
46+
}
47+
if (prctl(PR_SET_SECCOMP, SECCOMP_MODE_FILTER, &prog)) {
48+
perror("prctl(SECCOMP)");
49+
exit(EXIT_FAILURE);
50+
}
51+
}
52+
53+
54+
int main(int argc, char **argv) {
55+
/* parse arguments */
56+
if (argc < 3) {
57+
fprintf(stderr, "Usage: %s BPF_FILE ARG0..\n", argv[0]);
58+
exit(EXIT_FAILURE);
59+
}
60+
char *bpf_file = argv[1];
61+
long syscall_id = atoi(argv[2]);
62+
long arg0, arg1, arg2, arg3;
63+
arg0 = arg1 = arg2 = arg3 = 0;
64+
if (argc > 3) arg0 = atoi(argv[3]);
65+
if (argc > 4) arg1 = atoi(argv[4]);
66+
if (argc > 5) arg2 = atoi(argv[5]);
67+
if (argc > 6) arg3 = atoi(argv[6]);
68+
69+
/* read seccomp filter from file */
70+
if (strcmp(bpf_file, "/dev/null") != 0) {
71+
install_bpf_filter(bpf_file);
72+
}
73+
74+
long res = syscall(syscall_id, arg0, arg1, arg2, arg3);
75+
printf("%ld\n", res);
76+
return EXIT_SUCCESS;
77+
}
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
# Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
"""Test that validates that seccompiler filters work as expected"""
5+
6+
import json
7+
import platform
8+
import resource
9+
import struct
10+
from pathlib import Path
11+
12+
import pytest
13+
import seccomp
14+
15+
from framework import utils
16+
from host_tools import cargo_build
17+
18+
ARCH = platform.machine()
19+
20+
21+
@pytest.fixture(scope="session")
22+
def bin_test_syscall(test_fc_session_root_path):
23+
"""Build the test_syscall binary."""
24+
test_syscall_bin = Path(test_fc_session_root_path) / "test_syscall"
25+
cargo_build.gcc_compile("host_tools/test_syscalls.c", test_syscall_bin)
26+
assert test_syscall_bin.exists()
27+
yield test_syscall_bin
28+
29+
30+
class BpfMapReader:
31+
"""
32+
Simple reader for the files that seccompiler-bin produces
33+
34+
The files are serialized with bincode[1] in format that is easy to parse.
35+
36+
sock_filter = <ushort uchar uchar uint>
37+
BpfProgram = Vec<sock_filter>
38+
BpfMap = BTreeMap(str, BpfProgram)
39+
str = Vec<uchar>
40+
41+
[1] https://github.com/bincode-org/bincode/blob/trunk/docs/spec.md
42+
"""
43+
44+
INSN_FMT = "<HBBI"
45+
INSN_SIZEOF = struct.calcsize(INSN_FMT)
46+
47+
def __init__(self, buf):
48+
self.buf = buf
49+
self.offset = 0
50+
51+
@classmethod
52+
def from_file(cls, file):
53+
"""Initialize a buffer from a file"""
54+
return cls(Path(file).read_bytes())
55+
56+
def read_format(self, fmt):
57+
"""Read a struct format string from the buffer"""
58+
val = struct.unpack_from(fmt, self.buf, offset=self.offset)
59+
self.offset += struct.calcsize(fmt)
60+
if len(val) == 1:
61+
return val[0]
62+
return val
63+
64+
def is_eof(self):
65+
"""Are we at the end of the buffer?"""
66+
return self.offset == len(self.buf)
67+
68+
def lookahead(self, size):
69+
"""Look ahead `size` bytes"""
70+
return self.buf[self.offset : self.offset + size]
71+
72+
def split(self):
73+
"""Return separate filters"""
74+
threads = {}
75+
# how many entries in the map
76+
map_len = self.read_format("<Q")
77+
for _ in range(map_len):
78+
# read key
79+
key_str_len = self.read_format("<Q")
80+
key_str = self.read_format(f"{key_str_len}s")
81+
# read value: vec of instructions
82+
insn_len = self.read_format("<Q")
83+
data = self.lookahead(insn_len * self.INSN_SIZEOF)
84+
threads[key_str.decode("ascii")] = data
85+
self.offset += len(data)
86+
87+
assert self.is_eof()
88+
return threads
89+
90+
91+
def test_validate_filter(seccompiler, bin_test_syscall, monkeypatch, tmp_path):
92+
"""Assert that the seccomp filter matches the JSON description."""
93+
94+
fc_filter_path = Path(f"../resources/seccomp/{ARCH}-unknown-linux-musl.json")
95+
fc_filter = json.loads(fc_filter_path.read_text(encoding="ascii"))
96+
97+
# cd to a tmp dir because we may generate a bunch of intermediate files
98+
monkeypatch.chdir(tmp_path)
99+
# prevent coredumps
100+
resource.setrlimit(resource.RLIMIT_CORE, (0, 0))
101+
102+
bpf_path = seccompiler.compile(fc_filter)
103+
filters = BpfMapReader.from_file(bpf_path).split()
104+
arch = seccomp.Arch.X86_64 if ARCH == "x86_64" else seccomp.Arch.AARCH64
105+
for thread, filter_data in fc_filter.items():
106+
filter_path = Path(f"{thread}.bpf")
107+
filter_path.write_bytes(filters[thread])
108+
# for each rule, run the helper program and execute a syscall
109+
for rule in filter_data["filter"]:
110+
print(filter_path, rule)
111+
syscall = rule["syscall"]
112+
# this one cannot be called directly
113+
if syscall in ["rt_sigreturn"]:
114+
continue
115+
syscall_id = seccomp.resolve_syscall(arch, syscall)
116+
cmd = f"{bin_test_syscall} {filter_path} {syscall_id}"
117+
if "args" not in rule:
118+
# syscall should be allowed with any arguments and exit 0
119+
assert utils.run_cmd(cmd).returncode == 0
120+
else:
121+
allowed_args = [0] * 4
122+
# if we call it with allowed args, it should exit 0
123+
for arg in rule["args"]:
124+
allowed_args[arg["index"]] = arg["val"]
125+
allowed_str = " ".join(str(x) for x in allowed_args)
126+
assert utils.run_cmd(f"{cmd} {allowed_str}").returncode == 0
127+
# for each allowed arg try a different number
128+
for arg in rule["args"]:
129+
# We just add 1000000 to the allowed arg and assume it is
130+
# not something we allow in another rule. While not perfect
131+
# it works in practice.
132+
bad_args = allowed_args.copy()
133+
bad_args[arg["index"]] = str(arg["val"] + 1_000_000)
134+
unallowed_str = " ".join(str(x) for x in bad_args)
135+
outcome = utils.run_cmd(f"{cmd} {unallowed_str}")
136+
# if we call it with unallowed args, it should exit 159
137+
# 159 = 128 (abnormal termination) + 31 (SIGSYS)
138+
assert outcome.returncode == 159

0 commit comments

Comments
 (0)