Skip to content

Commit 71aba7d

Browse files
committed
added tests for firewall chain-filtering
1 parent cdab53a commit 71aba7d

File tree

4 files changed

+300
-33
lines changed

4 files changed

+300
-33
lines changed

src/firewall_test/simulator/04_firewall_tables_test.py

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,3 +218,168 @@ def test_firewall_inherit_table_priority_chain(prio_table, prio_chain, result):
218218

219219
fw._run_tables._inherit_table_priority_to_chain(table=table, chain=chain)
220220
assert chain.priority == result
221+
222+
223+
def test_firewall_chain_filter_pre_routing():
224+
from simulator.firewall import Firewall
225+
from plugins.system.system_linux_netfilter import SystemLinuxNetfilter
226+
from plugins.translate.netfilter.ruleset import NetfilterRuleset, NetfilterChainOutput as Chain
227+
228+
ruleset = NetfilterRuleset(TESTDATA_RULESET).get()
229+
fw = Firewall(
230+
system=SystemLinuxNetfilter,
231+
ruleset=ruleset,
232+
)
233+
234+
dnat = SystemLinuxNetfilter.FIREWALL_NAT[FlowForward]['dnat']
235+
236+
chains = [
237+
Chain(name='ingress', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook='ingress'),
238+
Chain(name='prerouting-early', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook='prerouting', priority=dnat['priority'] - 1),
239+
Chain(name='prerouting-dnat', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook='prerouting', priority=dnat['priority']),
240+
Chain(name='output-early', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook='output', priority=dnat['priority'] - 1),
241+
Chain(name='output-dnat', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook='output', priority=dnat['priority']),
242+
Chain(name='prerouting', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook='prerouting', priority=0),
243+
Chain(name='input', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook='input'),
244+
Chain(name='forward', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook='forward'),
245+
Chain(name='postrouting', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook='postrouting'),
246+
]
247+
248+
filtered_chains = [c for c in chains if fw._run_tables._chain_filter_pre_routing(chain=c, flow=FlowForward)]
249+
assert len(filtered_chains) == 3
250+
assert filtered_chains[0].name == 'ingress'
251+
assert filtered_chains[1].name == 'prerouting-early'
252+
assert filtered_chains[2].name == 'prerouting-dnat'
253+
254+
filtered_chains = [c for c in chains if fw._run_tables._chain_filter_pre_routing(chain=c, flow=FlowInput)]
255+
assert len(filtered_chains) == 3
256+
assert filtered_chains[0].name == 'ingress'
257+
assert filtered_chains[1].name == 'prerouting-early'
258+
assert filtered_chains[2].name == 'prerouting-dnat'
259+
260+
filtered_chains = [c for c in chains if fw._run_tables._chain_filter_pre_routing(chain=c, flow=FlowOutput)]
261+
assert len(filtered_chains) == 2
262+
assert filtered_chains[0].name == 'output-early'
263+
assert filtered_chains[1].name == 'output-dnat'
264+
265+
266+
def test_firewall_chain_filter_dnat():
267+
from simulator.firewall import Firewall
268+
from plugins.system.system_linux_netfilter import SystemLinuxNetfilter
269+
from plugins.translate.netfilter.ruleset import NetfilterRuleset, NetfilterChainOutput as Chain
270+
271+
ruleset = NetfilterRuleset(TESTDATA_RULESET).get()
272+
fw = Firewall(
273+
system=SystemLinuxNetfilter,
274+
ruleset=ruleset,
275+
)
276+
277+
fwd_dnat = SystemLinuxNetfilter.FIREWALL_NAT[FlowForward]['dnat']
278+
out_dnat = SystemLinuxNetfilter.FIREWALL_NAT[FlowOutput]['dnat']
279+
280+
chains = [
281+
Chain(name='ingress', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook='ingress'),
282+
Chain(name='prerouting-early', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook=fwd_dnat['hook'], priority=fwd_dnat['priority'] - 1),
283+
Chain(name='prerouting-filter', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook=fwd_dnat['hook'], priority=fwd_dnat['priority']),
284+
Chain(name='prerouting-dnat', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook=fwd_dnat['hook'], priority=fwd_dnat['priority'], type=Chain.TYPE_NAT),
285+
Chain(name='prerouting', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook=fwd_dnat['hook']),
286+
Chain(name='output', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook=out_dnat['hook']),
287+
Chain(name='output-dnat', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook=out_dnat['hook'], priority=out_dnat['priority'], type=Chain.TYPE_NAT),
288+
Chain(name='input', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook='input'),
289+
Chain(name='forward', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook='forward'),
290+
Chain(name='postrouting', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook='postrouting'),
291+
]
292+
293+
filtered_chains = [c for c in chains if fw._run_tables._chain_filter_dnat(chain=c, flow=FlowForward)]
294+
assert len(filtered_chains) == 1
295+
assert filtered_chains[0].name == 'prerouting-dnat'
296+
297+
filtered_chains = [c for c in chains if fw._run_tables._chain_filter_dnat(chain=c, flow=FlowOutput)]
298+
assert len(filtered_chains) == 1
299+
assert filtered_chains[0].name == 'output-dnat'
300+
301+
302+
def test_firewall_chain_filter_main():
303+
from simulator.firewall import Firewall
304+
from plugins.system.system_linux_netfilter import SystemLinuxNetfilter
305+
from plugins.translate.netfilter.ruleset import NetfilterRuleset, NetfilterChainOutput as Chain
306+
307+
ruleset = NetfilterRuleset(TESTDATA_RULESET).get()
308+
fw = Firewall(
309+
system=SystemLinuxNetfilter,
310+
ruleset=ruleset,
311+
)
312+
313+
dnat = SystemLinuxNetfilter.FIREWALL_NAT[FlowForward]['dnat']
314+
snat = SystemLinuxNetfilter.FIREWALL_NAT[FlowForward]['snat']
315+
316+
chains = [
317+
Chain(name='ingress', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook='ingress'),
318+
Chain(name='prerouting-dnat', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook=dnat['hook'], priority=dnat['priority'], type=Chain.TYPE_NAT),
319+
Chain(name='prerouting-late', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook=dnat['hook'], priority=dnat['priority'] + 1),
320+
Chain(name='prerouting', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook=dnat['hook']),
321+
Chain(name='input', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook='input'),
322+
Chain(name='output', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook='output'),
323+
Chain(name='forward', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook='forward'),
324+
Chain(name='postrouting', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook=snat['hook']),
325+
Chain(name='postrouting-filter', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook=snat['hook'], priority=snat['priority']),
326+
Chain(name='postrouting-snat', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook=snat['hook'], priority=snat['priority'], type=Chain.TYPE_NAT),
327+
Chain(name='postrouting-late', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook=snat['hook'], priority=snat['priority'] + 1),
328+
]
329+
330+
filtered_chains = [c for c in chains if fw._run_tables._chain_filter_main(chain=c, flow=FlowForward)]
331+
assert len(filtered_chains) == 5
332+
assert filtered_chains[0].name == 'prerouting-late'
333+
assert filtered_chains[1].name == 'prerouting'
334+
assert filtered_chains[2].name == 'forward'
335+
assert filtered_chains[3].name == 'postrouting'
336+
assert filtered_chains[4].name == 'postrouting-filter'
337+
338+
filtered_chains = [c for c in chains if fw._run_tables._chain_filter_main(chain=c, flow=FlowInput)]
339+
assert len(filtered_chains) == 3
340+
assert filtered_chains[0].name == 'prerouting-late'
341+
assert filtered_chains[1].name == 'prerouting'
342+
assert filtered_chains[2].name == 'input'
343+
344+
filtered_chains = [c for c in chains if fw._run_tables._chain_filter_main(chain=c, flow=FlowOutput)]
345+
assert len(filtered_chains) == 3
346+
assert filtered_chains[0].name == 'output'
347+
assert filtered_chains[1].name == 'postrouting'
348+
assert filtered_chains[2].name == 'postrouting-filter'
349+
350+
351+
def test_firewall_chain_filter_snat():
352+
from simulator.firewall import Firewall
353+
from plugins.system.system_linux_netfilter import SystemLinuxNetfilter
354+
from plugins.translate.netfilter.ruleset import NetfilterRuleset, NetfilterChainOutput as Chain
355+
356+
ruleset = NetfilterRuleset(TESTDATA_RULESET).get()
357+
fw = Firewall(
358+
system=SystemLinuxNetfilter,
359+
ruleset=ruleset,
360+
)
361+
362+
snat = SystemLinuxNetfilter.FIREWALL_NAT[FlowForward]['snat']
363+
364+
chains = [
365+
Chain(name='ingress', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook='ingress'),
366+
Chain(name='prerouting-dnat', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook='prerouting', priority=-100, type=Chain.TYPE_NAT),
367+
Chain(name='prerouting', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook='prerouting', priority=0),
368+
Chain(name='input', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook='input'),
369+
Chain(name='forward', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook='forward'),
370+
Chain(name='postrouting', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook=snat['hook']),
371+
Chain(name='postrouting-filter', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook=snat['hook'], priority=snat['priority']),
372+
Chain(name='postrouting-snat', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook=snat['hook'], priority=snat['priority'], type=Chain.TYPE_NAT),
373+
Chain(name='postrouting-late', rules=[], family=ProtoL3IP4, policy=Chain.POLICY_ACCEPT, hook=snat['hook'], priority=snat['priority'] + 1),
374+
]
375+
376+
filtered_chains = [c for c in chains if fw._run_tables._chain_filter_snat(chain=c, flow=FlowForward)]
377+
assert len(filtered_chains) == 1
378+
assert filtered_chains[0].name == 'postrouting-snat'
379+
380+
filtered_chains = [c for c in chains if fw._run_tables._chain_filter_snat(chain=c, flow=FlowInput)]
381+
assert len(filtered_chains) == 0
382+
383+
filtered_chains = [c for c in chains if fw._run_tables._chain_filter_snat(chain=c, flow=FlowOutput)]
384+
assert len(filtered_chains) == 1
385+
assert filtered_chains[0].name == 'postrouting-snat'

src/firewall_test/simulator/06_firewall_test.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,3 +97,4 @@ def test_firewall_basic(src, dst, ni_in, ni_out, result_pre, result_dnat, result
9797
# lazy action
9898
# lazy action mixed with 'quick' action
9999
# chain default-policy
100+
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import pytest
2+
3+
from testdata_test import TESTDATA_FILE_NF_RULESET, TEST_DST_IP6_DROP, TEST_DST_IP4_ACCEPT, TEST_DST_IP6_ACCEPT, \
4+
TEST_DST_IP4_REJECT, TEST_DST_IP6_REJECT, TEST_DST_IP4_DROP
5+
from config import RuleActionAccept, RuleActionDrop, RuleActionReject, RuleActionSNAT
6+
7+
with open(TESTDATA_FILE_NF_RULESET, 'r', encoding='utf-8') as f:
8+
TESTDATA_RULESET = f.read()
9+
10+
11+
@pytest.mark.parametrize(
12+
'src,dst,ni_in,ni_out,result_pre,result_dnat,result_main,result_snat,result_eg',
13+
[
14+
# default policy drop of forward chain
15+
('192.168.0.10', '1.1.1.1', 'wan', 'wan', RuleActionAccept, None, RuleActionDrop, None, RuleActionAccept),
16+
# explicit drop in forward chain
17+
('192.168.0.10', TEST_DST_IP4_DROP, 'wan', 'wan', RuleActionAccept, None, RuleActionDrop, None, RuleActionAccept),
18+
# explicit reject in forward chain
19+
('192.168.0.10', TEST_DST_IP4_REJECT, 'wan', 'wan', RuleActionAccept, None, RuleActionReject, None, RuleActionAccept),
20+
# explicit accept in forward chain
21+
('192.168.0.10', TEST_DST_IP4_ACCEPT, 'wan', 'wan', RuleActionAccept, None, RuleActionAccept, None, RuleActionAccept),
22+
# DOCKER-FORWARD accept
23+
('172.17.11.5', '1.1.1.1', 'docker0', 'wan', RuleActionAccept, None, RuleActionAccept, RuleActionSNAT, RuleActionAccept),
24+
# explicit drop in forward chain
25+
('172.17.11.5', TEST_DST_IP4_DROP, 'docker0', 'wan', RuleActionAccept, None, RuleActionDrop, RuleActionSNAT, RuleActionAccept),
26+
# explicit accept in forward chain
27+
('2003:1::1', TEST_DST_IP6_ACCEPT, 'wan', 'wan', RuleActionAccept, None, RuleActionAccept, None, RuleActionAccept),
28+
# explicit drop in forward chain
29+
('2003:1::1', TEST_DST_IP6_DROP, 'wan', 'wan', RuleActionAccept, None, RuleActionDrop, None, RuleActionAccept),
30+
# explicit reject in forward chain
31+
('2003:1::1', TEST_DST_IP6_REJECT, 'wan', 'wan', RuleActionAccept, None, RuleActionReject, None, RuleActionAccept),
32+
]
33+
)
34+
def test_firewall_basic(src, dst, ni_in, ni_out, result_pre, result_dnat, result_main, result_snat, result_eg):
35+
from config import FlowForward
36+
from plugins.system.system_linux_netfilter import SystemLinuxNetfilter
37+
from plugins.translate.netfilter.ruleset import NetfilterRuleset
38+
from simulator.packet import PacketIP
39+
from simulator.firewall import Firewall
40+
41+
ruleset = NetfilterRuleset(TESTDATA_RULESET).get()
42+
fw = Firewall(
43+
system=SystemLinuxNetfilter,
44+
ruleset=ruleset,
45+
)
46+
packet = PacketIP(src=src, dst=dst)
47+
# NOTE: network-interface discovery is done by main simulator.. will impact rule-matching
48+
packet.ni_in = ni_in
49+
packet.ni_out = ni_out
50+
51+
passed, rule = fw.process_pre_routing(packet=packet, flow=FlowForward)
52+
assert passed == (result_pre == RuleActionAccept)
53+
if rule is not None:
54+
assert rule.action == result_pre
55+
56+
has_nat, rule = fw.process_dnat(packet=packet, flow=FlowForward)
57+
if result_dnat is None:
58+
assert not has_nat
59+
assert rule is None
60+
61+
else:
62+
assert has_nat
63+
assert rule.action == result_dnat
64+
65+
passed, rule = fw.process_main(packet=packet, flow=FlowForward)
66+
if passed != (result_main == RuleActionAccept):
67+
raise ValueError(rule)
68+
69+
assert passed == (result_main == RuleActionAccept)
70+
if rule is not None:
71+
assert rule.action == result_main
72+
73+
has_nat, rule = fw.process_snat(packet=packet, flow=FlowForward)
74+
if result_snat is None:
75+
assert not has_nat
76+
assert rule is None
77+
78+
else:
79+
assert has_nat
80+
if rule is not None:
81+
assert rule.action == result_snat
82+
83+
passed, rule = fw.process_egress(packet=packet, flow=FlowForward)
84+
assert passed == (result_eg == RuleActionAccept)
85+
if rule is not None:
86+
assert rule.action == result_eg

0 commit comments

Comments
 (0)