|
4 | 4 |
|
5 | 5 | from sshuttle.methods import get_method
|
6 | 6 | from sshuttle.helpers import Fatal
|
7 |
| -from sshuttle.methods.pf import FreeBsd, Darwin |
| 7 | +from sshuttle.methods.pf import FreeBsd, Darwin, OpenBsd |
8 | 8 |
|
9 | 9 |
|
10 | 10 | def test_get_supported_features():
|
@@ -131,6 +131,29 @@ def test_firewall_command_freebsd(mock_pf_get_dev, mock_ioctl, mock_stdout):
|
131 | 131 | ]
|
132 | 132 |
|
133 | 133 |
|
| 134 | +@patch('sshuttle.methods.pf.pf', OpenBsd()) |
| 135 | +@patch('sshuttle.methods.pf.sys.stdout') |
| 136 | +@patch('sshuttle.methods.pf.ioctl') |
| 137 | +@patch('sshuttle.methods.pf.pf_get_dev') |
| 138 | +def test_firewall_command_openbsd(mock_pf_get_dev, mock_ioctl, mock_stdout): |
| 139 | + method = get_method('pf') |
| 140 | + assert not method.firewall_command("somthing") |
| 141 | + |
| 142 | + command = "QUERY_PF_NAT %d,%d,%s,%d,%s,%d\n" % ( |
| 143 | + socket.AF_INET, socket.IPPROTO_TCP, |
| 144 | + "127.0.0.1", 1025, "127.0.0.2", 1024) |
| 145 | + assert method.firewall_command(command) |
| 146 | + |
| 147 | + assert mock_pf_get_dev.mock_calls == [call()] |
| 148 | + assert mock_ioctl.mock_calls == [ |
| 149 | + call(mock_pf_get_dev(), 0xc0504417, ANY), |
| 150 | + ] |
| 151 | + assert mock_stdout.mock_calls == [ |
| 152 | + call.write('QUERY_PF_NAT_SUCCESS 0.0.0.0,0\n'), |
| 153 | + call.flush(), |
| 154 | + ] |
| 155 | + |
| 156 | + |
134 | 157 | def pfctl(args, stdin=None):
|
135 | 158 | if args == '-s all':
|
136 | 159 | return (b'INFO:\nStatus: Disabled\nanother mary had a little lamb\n',
|
@@ -301,3 +324,80 @@ def test_setup_firewall_freebsd(mock_pf_get_dev, mock_ioctl, mock_pfctl):
|
301 | 324 | mock_pf_get_dev.reset_mock()
|
302 | 325 | mock_pfctl.reset_mock()
|
303 | 326 | mock_ioctl.reset_mock()
|
| 327 | + |
| 328 | + |
| 329 | +@patch('sshuttle.helpers.verbose', new=3) |
| 330 | +@patch('sshuttle.methods.pf.pf', OpenBsd()) |
| 331 | +@patch('sshuttle.methods.pf.pfctl') |
| 332 | +@patch('sshuttle.methods.pf.ioctl') |
| 333 | +@patch('sshuttle.methods.pf.pf_get_dev') |
| 334 | +def test_setup_firewall_openbsd(mock_pf_get_dev, mock_ioctl, mock_pfctl): |
| 335 | + mock_pfctl.side_effect = pfctl |
| 336 | + |
| 337 | + method = get_method('pf') |
| 338 | + assert method.name == 'pf' |
| 339 | + |
| 340 | + with pytest.raises(Exception) as excinfo: |
| 341 | + method.setup_firewall( |
| 342 | + 1024, 1026, |
| 343 | + [(10, u'2404:6800:4004:80c::33')], |
| 344 | + 10, |
| 345 | + [(10, 64, False, u'2404:6800:4004:80c::'), |
| 346 | + (10, 128, True, u'2404:6800:4004:80c::101f')], |
| 347 | + True) |
| 348 | + assert str(excinfo.value) \ |
| 349 | + == 'Address family "AF_INET6" unsupported by pf method_name' |
| 350 | + assert mock_pf_get_dev.mock_calls == [] |
| 351 | + assert mock_ioctl.mock_calls == [] |
| 352 | + assert mock_pfctl.mock_calls == [] |
| 353 | + |
| 354 | + with pytest.raises(Exception) as excinfo: |
| 355 | + method.setup_firewall( |
| 356 | + 1025, 1027, |
| 357 | + [(2, u'1.2.3.33')], |
| 358 | + 2, |
| 359 | + [(2, 24, False, u'1.2.3.0'), (2, 32, True, u'1.2.3.66')], |
| 360 | + True) |
| 361 | + assert str(excinfo.value) == 'UDP not supported by pf method_name' |
| 362 | + assert mock_pf_get_dev.mock_calls == [] |
| 363 | + assert mock_ioctl.mock_calls == [] |
| 364 | + assert mock_pfctl.mock_calls == [] |
| 365 | + |
| 366 | + method.setup_firewall( |
| 367 | + 1025, 1027, |
| 368 | + [(2, u'1.2.3.33')], |
| 369 | + 2, |
| 370 | + [(2, 24, False, u'1.2.3.0'), (2, 32, True, u'1.2.3.66')], |
| 371 | + False) |
| 372 | + assert mock_ioctl.mock_calls == [ |
| 373 | + call(mock_pf_get_dev(), 0xcd48441a, ANY), |
| 374 | + call(mock_pf_get_dev(), 0xcd48441a, ANY), |
| 375 | + ] |
| 376 | + assert mock_pfctl.mock_calls == [ |
| 377 | + call('-f /dev/stdin', b'match on lo\n'), |
| 378 | + call('-s all'), |
| 379 | + call('-a sshuttle -f /dev/stdin', |
| 380 | + b'table <forward_subnets> {!1.2.3.66/32,1.2.3.0/24}\n' |
| 381 | + b'table <dns_servers> {1.2.3.33}\n' |
| 382 | + b'pass in on lo0 inet proto tcp divert-to 127.0.0.1 port 1025\n' |
| 383 | + b'pass in on lo0 inet proto udp to ' |
| 384 | + b'<dns_servers>port 53 rdr-to 127.0.0.1 port 1027\n' |
| 385 | + b'pass out inet proto tcp to ' |
| 386 | + b'<forward_subnets> route-to lo0 keep state\n' |
| 387 | + b'pass out inet proto udp to ' |
| 388 | + b'<dns_servers> port 53 route-to lo0 keep state\n'), |
| 389 | + call('-e'), |
| 390 | + ] |
| 391 | + mock_pf_get_dev.reset_mock() |
| 392 | + mock_ioctl.reset_mock() |
| 393 | + mock_pfctl.reset_mock() |
| 394 | + |
| 395 | + method.restore_firewall(1025, 2, False) |
| 396 | + assert mock_ioctl.mock_calls == [] |
| 397 | + assert mock_pfctl.mock_calls == [ |
| 398 | + call('-a sshuttle -F all'), |
| 399 | + call("-d"), |
| 400 | + ] |
| 401 | + mock_pf_get_dev.reset_mock() |
| 402 | + mock_pfctl.reset_mock() |
| 403 | + mock_ioctl.reset_mock() |
0 commit comments