|
| 1 | +#!/usr/bin/env python3 |
| 2 | +"""WAN-DMZ-LAN Firewall with Port Forwarding |
| 3 | +
|
| 4 | +Multi-zone firewall setup with port forwarding (DNAT) to a DMZ server, |
| 5 | +and masquerading (SNAT) of WAN-bound traffic. |
| 6 | +
|
| 7 | +image::wan-dmz-lan.svg[align=center, scaledwidth=50%] |
| 8 | +
|
| 9 | +- DUT/Gateway with WAN/DMZ/LAN zones and NAT |
| 10 | +- Test host's WAN interface acts as external Internet client |
| 11 | +- Test host's DMZ interface acts as internal server (HTTP on port 80) |
| 12 | +- Test host's LAN interface acts as internal LAN client |
| 13 | +""" |
| 14 | + |
| 15 | +import time |
| 16 | +import infamy |
| 17 | +from infamy.util import until |
| 18 | + |
| 19 | + |
| 20 | +with infamy.Test() as test: |
| 21 | + with test.step("Set up topology and attach to gateway"): |
| 22 | + env = infamy.Env() |
| 23 | + gateway = env.attach("gateway", "mgmt") |
| 24 | + _, wan_if = env.ltop.xlate("gateway", "wan") |
| 25 | + _, dmz_if = env.ltop.xlate("gateway", "dmz") |
| 26 | + _, lan_if = env.ltop.xlate("gateway", "lan") |
| 27 | + _, mgmt_if = env.ltop.xlate("gateway", "mgmt") |
| 28 | + _, host_wan = env.ltop.xlate("host", "wan") |
| 29 | + _, host_dmz = env.ltop.xlate("host", "dmz") |
| 30 | + _, host_lan = env.ltop.xlate("host", "lan") |
| 31 | + |
| 32 | + WAN_NET = "203.0.113.0/24" # RFC 5737 test network |
| 33 | + WAN_ROUTER_IP = "203.0.113.1" # Gateway WAN interface |
| 34 | + WAN_CLIENT_IP = "203.0.113.100" # Host WAN interface |
| 35 | + |
| 36 | + DMZ_NET = "10.0.1.0/24" |
| 37 | + DMZ_ROUTER_IP = "10.0.1.1" # Gateway DMZ interface |
| 38 | + DMZ_SERVER_IP = "10.0.1.100" # Host DMZ interface |
| 39 | + |
| 40 | + LAN_NET = "192.168.1.0/24" |
| 41 | + LAN_ROUTER_IP = "192.168.1.1" # Gateway LAN interface |
| 42 | + LAN_CLIENT_IP = "192.168.1.100" # Host LAN interface |
| 43 | + |
| 44 | + with test.step("Configure gateway with multi-zone firewall and NAT"): |
| 45 | + gateway.put_config_dict("ietf-interfaces", { |
| 46 | + "interfaces": { |
| 47 | + "interface": [ |
| 48 | + { |
| 49 | + "name": wan_if, |
| 50 | + "enabled": True, |
| 51 | + "ipv4": { |
| 52 | + "forwarding": True, |
| 53 | + "address": [{ |
| 54 | + "ip": WAN_ROUTER_IP, |
| 55 | + "prefix-length": 24 |
| 56 | + }] |
| 57 | + } |
| 58 | + }, |
| 59 | + { |
| 60 | + "name": dmz_if, |
| 61 | + "enabled": True, |
| 62 | + "ipv4": { |
| 63 | + "forwarding": True, |
| 64 | + "address": [{ |
| 65 | + "ip": DMZ_ROUTER_IP, |
| 66 | + "prefix-length": 24 |
| 67 | + }] |
| 68 | + } |
| 69 | + }, |
| 70 | + { |
| 71 | + "name": lan_if, |
| 72 | + "enabled": True, |
| 73 | + "ipv4": { |
| 74 | + "forwarding": True, |
| 75 | + "address": [{ |
| 76 | + "ip": LAN_ROUTER_IP, |
| 77 | + "prefix-length": 24 |
| 78 | + }] |
| 79 | + } |
| 80 | + } |
| 81 | + ] |
| 82 | + } |
| 83 | + }) |
| 84 | + |
| 85 | + gateway.put_config_dict("infix-firewall", { |
| 86 | + "firewall": { |
| 87 | + "default": "wan", |
| 88 | + "logging": "all", |
| 89 | + "zone": [ |
| 90 | + { |
| 91 | + "name": "wan", |
| 92 | + "description": "External WAN interface - untrusted", |
| 93 | + "action": "drop", |
| 94 | + "interface": [wan_if], |
| 95 | + "port-forward": [{ |
| 96 | + "lower": 8080, |
| 97 | + "proto": "tcp", |
| 98 | + "to": { |
| 99 | + "addr": DMZ_SERVER_IP, |
| 100 | + "port": 80 |
| 101 | + } |
| 102 | + }] |
| 103 | + }, |
| 104 | + { |
| 105 | + "name": "dmz", |
| 106 | + "description": "DMZ network - limited trust", |
| 107 | + "action": "reject", |
| 108 | + "network": [DMZ_NET], |
| 109 | + "service": ["http"] |
| 110 | + }, |
| 111 | + { |
| 112 | + "name": "lan", |
| 113 | + "description": "Internal LAN network - trusted", |
| 114 | + "action": "accept", |
| 115 | + "interface": [lan_if, mgmt_if] |
| 116 | + } |
| 117 | + ], |
| 118 | + "policy": [ |
| 119 | + { |
| 120 | + "name": "loc-to-wan", |
| 121 | + "description": "Allow local networks to WAN with SNAT", |
| 122 | + "ingress": ["lan", "dmz"], |
| 123 | + "egress": ["wan"], |
| 124 | + "action": "accept", |
| 125 | + "masquerade": True |
| 126 | + }, { |
| 127 | + "name": "lan-to-dmz", |
| 128 | + "description": "Allow LAN access to DMZ services", |
| 129 | + "ingress": ["lan"], |
| 130 | + "egress": ["dmz"], |
| 131 | + "action": "accept", |
| 132 | + "service": ["ssh", "http"] |
| 133 | + } |
| 134 | + ] |
| 135 | + } |
| 136 | + }) |
| 137 | + |
| 138 | + # Allow for some time to propagete the configuration change |
| 139 | + time.sleep(2) |
| 140 | + |
| 141 | + # Verify firewall configuration |
| 142 | + config = gateway.get_config_dict("/infix-firewall:firewall") |
| 143 | + fw = config["firewall"] |
| 144 | + zones = {z["name"]: z for z in fw["zone"]} |
| 145 | + |
| 146 | + # Verify WAN zone with port forwarding |
| 147 | + wan_zone = zones["wan"] |
| 148 | + assert wan_zone["action"] == "drop" |
| 149 | + assert wan_if in wan_zone["interface"] |
| 150 | + assert len(wan_zone["port-forward"]) == 1 |
| 151 | + pf = next(iter(wan_zone["port-forward"])) |
| 152 | + assert pf["lower"] == 8080 |
| 153 | + assert pf["to"]["addr"] == DMZ_SERVER_IP |
| 154 | + assert pf["to"]["port"] == 80 |
| 155 | + |
| 156 | + # Verify DMZ zone |
| 157 | + dmz_zone = zones["dmz"] |
| 158 | + assert dmz_zone["action"] == "reject" |
| 159 | + assert DMZ_NET in dmz_zone["network"] |
| 160 | + assert "http" in dmz_zone["service"] |
| 161 | + |
| 162 | + # Verify LAN zone |
| 163 | + lan_zone = zones["lan"] |
| 164 | + assert lan_zone["action"] == "accept" |
| 165 | + assert lan_if in lan_zone["interface"] |
| 166 | + |
| 167 | + # Check policies |
| 168 | + policies = {p["name"]: p for p in fw["policy"]} |
| 169 | + |
| 170 | + # Verify loc-to-wan policy |
| 171 | + loc_wan_policy = policies["loc-to-wan"] |
| 172 | + assert set(loc_wan_policy["ingress"]) == {"lan", "dmz"} |
| 173 | + assert loc_wan_policy["egress"] == ["wan"] |
| 174 | + assert loc_wan_policy["masquerade"] is True |
| 175 | + |
| 176 | + # Verify lan-to-dmz policy |
| 177 | + lan_dmz_policy = policies["lan-to-dmz"] |
| 178 | + assert lan_dmz_policy["ingress"] == ["lan"] |
| 179 | + assert lan_dmz_policy["egress"] == ["dmz"] |
| 180 | + assert "ssh" in lan_dmz_policy["service"] |
| 181 | + assert "http" in lan_dmz_policy["service"] |
| 182 | + |
| 183 | + with infamy.IsolatedMacVlan(host_wan) as wan_client: |
| 184 | + wan_client.addip(WAN_CLIENT_IP) |
| 185 | + |
| 186 | + with infamy.IsolatedMacVlan(host_dmz) as dmz_server: |
| 187 | + dmz_server.addip(DMZ_SERVER_IP) |
| 188 | + dmz_server.addroute("0.0.0.0", DMZ_ROUTER_IP, prefix_length="0") |
| 189 | + |
| 190 | + with infamy.IsolatedMacVlan(host_lan) as lan_client: |
| 191 | + lan_client.addip(LAN_CLIENT_IP) |
| 192 | + lan_client.addroute("0.0.0.0", LAN_ROUTER_IP, prefix_length="0") |
| 193 | + |
| 194 | + with test.step("Verify basic connectivity within zones"): |
| 195 | + lan_client.must_reach(LAN_ROUTER_IP, timeout=3) |
| 196 | + dmz_server.must_not_reach(DMZ_ROUTER_IP, timeout=3) |
| 197 | + |
| 198 | + with test.step("Verify WAN to DMZ port forwarding (DNAT)"): |
| 199 | + firewall = infamy.Firewall(wan_client, dmz_server) |
| 200 | + |
| 201 | + # Test port forwarding: WAN:8080 → DMZ:80 |
| 202 | + ok, info = firewall.verify_dnat( |
| 203 | + WAN_ROUTER_IP, forward_port=8080, target_port=80) |
| 204 | + |
| 205 | + if not ok: |
| 206 | + print(f" ⚠ {info}") |
| 207 | + test.fail() |
| 208 | + |
| 209 | + with test.step("Verify LAN to DMZ connectivity"): |
| 210 | + lan_client.must_reach(DMZ_SERVER_IP, timeout=3) |
| 211 | + firewall = infamy.Firewall(lan_client, None) |
| 212 | + svc = [ |
| 213 | + (22, "tcp", "ssh"), |
| 214 | + (80, "tcp", "http"), |
| 215 | + ] |
| 216 | + |
| 217 | + ok, ports = firewall.verify_allowed(DMZ_SERVER_IP, svc) |
| 218 | + if not ok: |
| 219 | + print(f" ⚠ Some DMZ services filtered from LAN: {', '.join(ports)}") |
| 220 | + test.fail() |
| 221 | + |
| 222 | + with test.step("Verify DMZ to LAN blocking"): |
| 223 | + dmz_server.must_not_reach(LAN_CLIENT_IP, timeout=3) |
| 224 | + |
| 225 | + with test.step("Verify WAN isolation"): |
| 226 | + firewall = infamy.Firewall(wan_client, None) |
| 227 | + |
| 228 | + ok, ports, _ = firewall.verify_blocked(LAN_ROUTER_IP) |
| 229 | + if not ok: |
| 230 | + print(f" ⚠ WAN can access LAN ports: {', '.join(ports)}") |
| 231 | + test.fail() |
| 232 | + |
| 233 | + ok, ports, _ = firewall.verify_blocked(DMZ_ROUTER_IP) |
| 234 | + if not ok: |
| 235 | + print(f" ⚠ WAN can access DMZ ports: {', '.join(ports)}") |
| 236 | + |
| 237 | + with test.step("Verify LAN to WAN connectivity with SNAT"): |
| 238 | + firewall = infamy.Firewall(lan_client, wan_client) |
| 239 | + |
| 240 | + lan_client.must_reach(WAN_CLIENT_IP, timeout=3) |
| 241 | + |
| 242 | + ok, info = firewall.verify_snat(WAN_CLIENT_IP, WAN_ROUTER_IP) |
| 243 | + if not ok: |
| 244 | + print(f" ⚠ LAN to WAN SNAT: {info}") |
| 245 | + test.fail() |
| 246 | + |
| 247 | + with test.step("Verify DMZ to WAN connectivity with SNAT"): |
| 248 | + firewall = infamy.Firewall(dmz_server, wan_client) |
| 249 | + |
| 250 | + dmz_server.must_reach(WAN_CLIENT_IP, timeout=3) |
| 251 | + |
| 252 | + ok, info = firewall.verify_snat(WAN_CLIENT_IP, WAN_ROUTER_IP) |
| 253 | + if not ok: |
| 254 | + print(f" ⚠ DMZ to WAN SNAT: {info}") |
| 255 | + test.fail() |
| 256 | + |
| 257 | + with test.step("Verify zone default actions/services"): |
| 258 | + firewall_lan = infamy.Firewall(lan_client, None) |
| 259 | + firewall_dmz = infamy.Firewall(dmz_server, None) |
| 260 | + firewall_wan = infamy.Firewall(wan_client, None) |
| 261 | + |
| 262 | + svc = [ |
| 263 | + (22, "tcp", "ssh"), |
| 264 | + (53, "udp", "dns"), |
| 265 | + (67, "udp", "dhcp") |
| 266 | + ] |
| 267 | + ok, ports = firewall_lan.verify_allowed(LAN_ROUTER_IP, svc) |
| 268 | + if not ok: |
| 269 | + print(f" ⚠ LAN services not properly accessible: {', '.join(ports)}") |
| 270 | + |
| 271 | + svc = [(80, "tcp", "http")] |
| 272 | + ok, ports = firewall_dmz.verify_allowed(DMZ_ROUTER_IP, svc) |
| 273 | + if not ok: |
| 274 | + print(f" ⚠ DMZ HTTP service not accessible: {', '.join(ports)}") |
| 275 | + |
| 276 | + ok, ports, _ = firewall_wan.verify_blocked(WAN_ROUTER_IP) |
| 277 | + if not ok: |
| 278 | + print(f" ⚠ WAN has unexpected open ports: {', '.join(ports)}") |
| 279 | + |
| 280 | + test.succeed() |
0 commit comments