Skip to content

Commit 26e8703

Browse files
committed
adding basic translate-plugins for opnsense routes & network-interfaces
1 parent 6c090e1 commit 26e8703

17 files changed

+5489
-52
lines changed
100 KB
Loading
128 KB
Loading

docs/source/plugins/firewall_opnsense.rst

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
.. _plugins_fw_opnsense:
22

3+
.. |export_backup| image:: ../_static/img/plugin-opnsense-backup.png
4+
:class: wiki-img
5+
6+
.. |export_network| image:: ../_static/img/plugin-opnsense-export.png
7+
:class: wiki-img
8+
39
.. include:: ../_include/head.rst
410

511
===================
@@ -11,16 +17,29 @@ Firewall - OPNsense
1117
Config Export
1218
#############
1319

14-
1. `Download a Config-Backup <https://docs.opnsense.org/manual/backups.html>`_
20+
1. `Download a Config-Backup <https://docs.opnsense.org/manual/backups.html>`_ (referenced as :code:`config.xml`)
21+
22+
|export_backup|
1523

16-
2. `Supply the runtime routes manually <https://docs.opnsense.org/manual/routes.html#status>`_ or `query them via API <https://docs.opnsense.org/development/api/core/diagnostics.html#id6>`_
24+
2. Download the current network status via the WebUI: :code:`Interfaces - Overview - Download Buttom` (referenced as :code:`network.json`)
25+
26+
|export_network|
1727

1828
----
1929

2030
Run
2131
###
2232

23-
.. include:: ../_include/warn_wip.rst
33+
Here is an example on how to run supply the exported config:
34+
35+
.. code-block:: bash
36+
37+
ftf-cli --firewall-system 'linux_netfilter' \
38+
--file-ruleset 'config.xml' \
39+
--file-interfaces 'network.json' \
40+
--file-routes 'network.json' \
41+
--src-ip 172.17.11.5 \
42+
--dst-ip 1.1.1.1
2443
2544
----
2645

src/firewall_test/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
DEFAULT_ROUTE_IP4 = ip_network('0.0.0.0/0')
1111
DEFAULT_ROUTE_IP6 = ip_network('::/0')
1212
DEFAULT_ROUTES = [DEFAULT_ROUTE_IP4, DEFAULT_ROUTE_IP6]
13+
LINK_LOCAL_IP6 = ip_network('fe80::/64')
1314

1415

1516
class Proto(ABC):

src/firewall_test/plugins/translate/abstract.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,12 @@ def validate(self):
3232
class StaticRoute(TranslateOutput):
3333
# pylint: disable=W0622
3434
def __init__(
35-
self, table: str, net: str, scope: str, type: str, gw: str = None, src_pref: str = None,
35+
self, table: str, net: str, scope: str, gw: str = None, src_pref: str = None,
3636
ni: str = None, metric: int = None,
3737
):
3838
self.table = table
3939
self.net = net
4040
self.scope = scope
41-
self.type = type
4241
self.gw = gw
4342
self.src_pref = src_pref
4443
self.ni = ni
@@ -48,7 +47,7 @@ def __init__(
4847
self.src_pref = ip_address(self.src_pref)
4948

5049
def __repr__(self) -> str:
51-
return f'ROUTE: Network {self.net} in Table {self.table} via {self.ni} {self.gw} metric {self.metric} type {self.type}'
50+
return f'ROUTE: Network {self.net} in Table {self.table} via {self.ni} {self.gw} metric {self.metric}'
5251

5352
def dump(self) -> dict:
5453
gw, net = None, None
@@ -66,7 +65,6 @@ def dump(self) -> dict:
6665
'table': self.table,
6766
'net': net,
6867
'scope': self.scope,
69-
'type': self.type,
7068
'gw': gw,
7169
'src_pref': self.src_pref,
7270
'ni': self.ni,
@@ -84,8 +82,7 @@ def validate(self):
8482
assert isinstance(r['src_pref'], (IPv4Address, IPv6Address))
8583

8684
assert r['table'] in ['default', 'main', 'local', 'test']
87-
assert r['type'] in ['default', 'local', 'broadcast', 'multicast']
88-
assert r['scope'] in ['link', 'host', 'global', 'remote']
85+
assert r['scope'] in ['link', 'local', 'global']
8986

9087
def ip_count(self) -> int:
9188
cidr = int(str(self.net).split('/')[1])

src/firewall_test/plugins/translate/linux.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1-
from ipaddress import ip_address, ip_network, IPv4Address
21
from json import loads as json_loads
2+
from ipaddress import ip_address, ip_network, IPv4Address
33

44
from config import DEFAULT_ROUTE_IP4, DEFAULT_ROUTE_IP6, ProtoL3IP4, ProtoL3IP6
55
from plugins.translate.abstract import TranslatePluginStaticRoutes, TranslatePluginStaticRouteRules, \
66
StaticRoute, StaticRouteRule, TranslatePluginNetworkInterfaces, NetworkInterface
77

8+
# pylint: disable=R0801
9+
810

911
class LinuxRouteRules(TranslatePluginStaticRouteRules):
1012
def __init__(self, raw: str):
@@ -48,18 +50,22 @@ def get(self) -> list[StaticRoute]:
4850
return [
4951
StaticRoute(**self._parse_route(r))
5052
for r in self.raw
53+
if r.get('type', '') not in ['broadcast', 'multicast']
5154
]
5255

5356
@staticmethod
5457
def _parse_route(raw: dict) -> dict:
58+
scope = raw.get('scope', 'global')
59+
if scope == 'host':
60+
scope = 'local'
61+
5562
r = {
56-
'scope': raw.get('scope', 'remote'),
63+
'scope':scope,
5764
'ni': raw.get('dev', None),
5865
'metric': raw.get('metric', None),
5966
'src_pref': raw.get('prefsrc', None),
6067
'gw': raw.get('gateway', None),
6168
'table': raw.get('table', 'default'),
62-
'type': raw.get('type', 'default'),
6369
}
6470

6571
if raw.get('dst') == 'default':
@@ -92,14 +98,14 @@ def get(self) -> list[NetworkInterface]:
9298
@staticmethod
9399
def _parse_ni(raw: dict) -> dict:
94100
r = {
95-
'name': raw.get('ifname'),
96-
'mac': raw.get('address'),
101+
'name': raw['ifname'],
102+
'mac': raw['address'],
97103
ProtoL3IP4.N: [],
98104
ProtoL3IP6.N: [],
99105
'net4': [],
100106
'net6': [],
107+
'up': raw.get('operstate') == 'UP' or raw['ifname'] == 'lo',
101108
}
102-
r['up'] = raw.get('operstate') == 'UP' or r['name'] == 'lo'
103109

104110
for info in raw.get('addr_info'):
105111
ip = ip_address(info['local'])

src/firewall_test/plugins/translate/linux_ni_test.py renamed to src/firewall_test/plugins/translate/linux_interfaces_test.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,11 @@
77
def test_linux_nis():
88
from plugins.translate.linux import LinuxNetworkInterfaces
99

10-
r = LinuxNetworkInterfaces(TESTDATA_NICS)
11-
o = r.get()
10+
nis = LinuxNetworkInterfaces(TESTDATA_NICS).get()
1211

13-
for ni in o:
12+
assert len(nis) == 4
13+
14+
for ni in nis:
1415
ni.validate()
1516

1617
if ni.name == 'lo':

src/firewall_test/plugins/translate/linux_route_test.py

Lines changed: 0 additions & 34 deletions
This file was deleted.
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
from ipaddress import ip_network
2+
3+
from testdata_test import TESTDATA_FILE_ROUTES, TESTDATA_FILE_ROUTE_RULES
4+
5+
from config import DEFAULT_ROUTE_IP4, DEFAULT_ROUTE_IP6, LINK_LOCAL_IP6
6+
7+
with open(TESTDATA_FILE_ROUTES, 'r', encoding='utf-8') as f:
8+
TESTDATA_ROUTES = f.read()
9+
10+
with open(TESTDATA_FILE_ROUTE_RULES, 'r', encoding='utf-8') as f:
11+
TESTDATA_RULES = f.read()
12+
13+
14+
def test_linux_rules():
15+
from plugins.translate.linux import LinuxRouteRules
16+
17+
r = LinuxRouteRules(TESTDATA_RULES).get()
18+
19+
assert len(r) == 4
20+
21+
for rule in r:
22+
rule.validate()
23+
24+
if rule.table == 'test':
25+
assert rule.priority == 32765
26+
assert len(rule.src) == 1
27+
assert rule.src[0] == ip_network('172.18.0.0/16')
28+
29+
30+
def test_linux_routes():
31+
from plugins.translate.linux import LinuxRoutes
32+
33+
r = LinuxRoutes(TESTDATA_ROUTES).get()
34+
35+
assert len(r) == 13
36+
37+
for route in r:
38+
route.validate()
39+
40+
if route.net == DEFAULT_ROUTE_IP4:
41+
assert str(route.gw) == '10.255.255.254'
42+
assert route.scope == 'global'
43+
44+
elif route.net == DEFAULT_ROUTE_IP6:
45+
assert str(route.gw) == 'fe80::7474:ceff:feb1:5347'
46+
assert route.scope == 'global'
47+
48+
elif str(route.net) == '10.255.255.0/24':
49+
assert route.gw is None
50+
assert route.scope == 'link'
51+
assert str(route.src_pref) == '10.255.255.48'
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
from json import loads as json_loads
2+
from ipaddress import ip_address, ip_network, IPv4Address
3+
4+
from config import ProtoL3IP4, ProtoL3IP6
5+
from plugins.translate.abstract import NetworkInterface, TranslatePluginNetworkInterfaces
6+
7+
# pylint: disable=R0801
8+
9+
10+
class OPNsenseNetworkInterfaces(TranslatePluginNetworkInterfaces):
11+
def __init__(self, raw: str):
12+
super().__init__(json_loads(raw))
13+
14+
def get(self) -> list[NetworkInterface]:
15+
return [
16+
NetworkInterface(**self._parse_ni(r))
17+
for r in self.raw
18+
if r['status'] != 'down'
19+
]
20+
21+
@staticmethod
22+
def _parse_ni(raw: dict) -> dict:
23+
r = {
24+
'name': raw.get('identifier'),
25+
'mac': raw.get('macaddr'),
26+
ProtoL3IP4.N: [],
27+
ProtoL3IP6.N: [],
28+
'net4': [],
29+
'net6': [],
30+
'up': raw['status'] == 'up',
31+
}
32+
33+
ips = raw.get('ipv4', [])
34+
ips.extend(raw.get('ipv6', []))
35+
for ip_cnf in ips:
36+
ip_cidr = ip_cnf['ipaddr']
37+
ip, cidr = ip_cidr.split('/', 1)
38+
ip = ip_address(ip)
39+
if cidr in ['32', '128']:
40+
net = None
41+
42+
else:
43+
net = ip_network(ip_cidr, strict=False)
44+
45+
if isinstance(ip, IPv4Address):
46+
r[ProtoL3IP4.N].append(ip)
47+
if net is not None:
48+
r['net4'].append(net)
49+
50+
else:
51+
r[ProtoL3IP6.N].append(ip)
52+
if net is not None:
53+
r['net6'].append(net)
54+
55+
return r

0 commit comments

Comments
 (0)