-
Notifications
You must be signed in to change notification settings - Fork 13
requests_test.py: cleanup #498
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
1d31481
Create helper ssrf_check issue
bitterpanda63 45d2c06
Make it easier to add new test cases to requests_test.py
bitterpanda63 ca4bb83
run same urls for urllib3 as for requests
bitterpanda63 ecfcf65
Update aikido_zen/sinks/tests/requests_and_urrlib3_test.py
bitterpanda63 c0e6661
fix typo in filename: urrlib -> urllib
bitterpanda63 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,187 @@ | ||
| import os | ||
| import pytest | ||
|
|
||
| from aikido_zen.context import Context, current_context | ||
| from aikido_zen.thread.thread_cache import ThreadCache, get_cache | ||
| from aikido_zen.errors import AikidoSSRF | ||
| from aikido_zen.background_process.comms import reset_comms | ||
| import aikido_zen.sinks.socket | ||
| import aikido_zen.sinks.http_client | ||
| import requests | ||
| import urllib3 | ||
|
|
||
|
|
||
| @pytest.fixture(autouse=True) | ||
| def run_around_tests(): | ||
| get_cache().reset() | ||
| yield | ||
| # Make sure to reset context and cache after every test so it does not | ||
| # interfere with other tests | ||
| current_context.set(None) | ||
| get_cache().reset() | ||
|
|
||
|
|
||
| def set_context_and_lifecycle(url, host=None): | ||
| wsgi_request = { | ||
| "REQUEST_METHOD": "GET", | ||
| "HTTP_HEADER_1": "header 1 value", | ||
| "HTTP_HEADER_2": "Header 2 value", | ||
| "RANDOM_VALUE": "Random value", | ||
| "HTTP_COOKIE": "sessionId=abc123xyz456;", | ||
| "wsgi.url_scheme": "http", | ||
| "HTTP_HOST": "localhost:8080", | ||
| "PATH_INFO": "/hello", | ||
| "QUERY_STRING": "user=JohnDoe&age=30&age=35", | ||
| "CONTENT_TYPE": "application/json", | ||
| "REMOTE_ADDR": "198.51.100.23", | ||
| } | ||
| if host is not None: | ||
| wsgi_request["HTTP_HOST"] = host | ||
| context = Context( | ||
| req=wsgi_request, | ||
| body={ | ||
| "url": url, | ||
| }, | ||
| source="flask", | ||
| ) | ||
| context.set_as_current_context() | ||
|
|
||
|
|
||
| def ssrf_check(monkeypatch, url): | ||
| reset_comms() | ||
| set_context_and_lifecycle(url) | ||
| monkeypatch.setenv("AIKIDO_BLOCK", "1") | ||
| with pytest.raises(AikidoSSRF): | ||
| requests.get(url) | ||
| with pytest.raises(AikidoSSRF): | ||
| http = urllib3.PoolManager() | ||
| http.request("GET", url) | ||
|
|
||
|
|
||
| @pytest.mark.parametrize( | ||
| "url", | ||
| [ | ||
| # ssrf test | ||
| "http://ssrf-redirects.testssandbox.com/ssrf-test", | ||
| # twice ssrf test | ||
| "http://ssrf-redirects.testssandbox.com/ssrf-test-twice", | ||
| # domain test | ||
| "http://ssrf-redirects.testssandbox.com/ssrf-test-domain", | ||
| # domain twice test | ||
| "http://ssrf-redirects.testssandbox.com/ssrf-test-domain-twice", | ||
| # punycode test | ||
| "http://ssrf-rédirects.testssandbox.com/ssrf-test", | ||
| # Punycode encoded test | ||
| "http://xn--ssrf-rdirects-ghb.testssandbox.com/ssrf-test", | ||
| # Punycode domain twice test | ||
| "http://ssrf-rédirects.testssandbox.com/ssrf-test-domain-twice", | ||
| # cross domain test | ||
| "http://firewallssrfredirects-env-2.eba-7ifve22q.eu-north-1.elasticbeanstalk.com/ssrf-test", | ||
| # cross domain test twice | ||
| "http://firewallssrfredirects-env-2.eba-7ifve22q.eu-north-1.elasticbeanstalk.com/ssrf-test-domain-twice", | ||
| # loopback ipv6 | ||
| "http://[::1]:8081", | ||
| "http://[::1]:8081/", | ||
| "http://[::1]:8081/test", | ||
| # loopback ipv6 with zeroes | ||
| "http://[0000:0000:0000:0000:0000:0000:0000:0001]:8081", | ||
| "http://[0000:0000:0000:0000:0000:0000:0000:0001]:8081/", | ||
| "http://[0000:0000:0000:0000:0000:0000:0000:0001]:8081/test", | ||
| # private ips written differently | ||
| "http://2130706433:8081", | ||
| "http://0x7f000001:8081/", | ||
| "http://0177.0.0.01:8081/", | ||
| "http://0x7f.0x0.0x0.0x1:8081/", | ||
| # 127.0.0.1 ipv6 mapped | ||
| "http://[::ffff:127.0.0.1]:8081", | ||
| ], | ||
| ) | ||
| def test_ssrf_1(monkeypatch, url): | ||
| ssrf_check(monkeypatch, url) | ||
|
|
||
|
|
||
| def test_no_raises_if_diff_url(monkeypatch): | ||
| set_context_and_lifecycle( | ||
| "http://firewallssrfredirects-env-2.eba-7ifve22q.eu-north-1.elasticbeanstalk.com/ssrf-test-domain-twice" | ||
| ) | ||
| monkeypatch.setenv("AIKIDO_BLOCK", "1") | ||
| with pytest.raises(requests.exceptions.ConnectionError): | ||
| requests.get("http://ssrf-redirects.testssandbox.com/ssrf-test-domain-twice") | ||
|
|
||
|
|
||
| def test_no_raises_if_diff_url_urllib3(monkeypatch): | ||
| http = urllib3.PoolManager() | ||
| set_context_and_lifecycle( | ||
| "http://firewallssrfredirects-env-2.eba-7ifve22q.eu-north-1.elasticbeanstalk.com/ssrf-test-domain-twice" | ||
| ) | ||
| monkeypatch.setenv("AIKIDO_BLOCK", "1") | ||
| with pytest.raises(urllib3.exceptions.MaxRetryError): | ||
| http.request( | ||
| "GET", "http://ssrf-redirects.testssandbox.com/ssrf-test-domain-twice" | ||
| ) | ||
|
|
||
|
|
||
| def test_localhost_is_same_as_context(monkeypatch): | ||
| set_context_and_lifecycle("http://localhost:8080") | ||
| monkeypatch.setenv("AIKIDO_BLOCK", "1") | ||
| with pytest.raises(requests.exceptions.ConnectionError): | ||
| requests.get("http://localhost:8080") | ||
|
|
||
|
|
||
| def test_localhost_raises_ssrf(monkeypatch): | ||
| set_context_and_lifecycle("http://localhost:8081/") | ||
| monkeypatch.setenv("AIKIDO_BLOCK", "1") | ||
| with pytest.raises(AikidoSSRF): | ||
| requests.get("http://localhost:8081") | ||
| with pytest.raises(AikidoSSRF): | ||
| requests.get("http://localhost:8081/") | ||
| with pytest.raises(AikidoSSRF): | ||
| requests.get("http://localhost:8081/test") | ||
| with pytest.raises(requests.exceptions.ConnectionError): | ||
| requests.get("http://localhost:5002/test") | ||
|
|
||
| set_context_and_lifecycle("http://localhost:8081/test") | ||
| with pytest.raises(AikidoSSRF): | ||
| requests.get("http://localhost:8081/test") | ||
| set_context_and_lifecycle("http://localhost:8081/test/2") | ||
| with pytest.raises(AikidoSSRF): | ||
| requests.get("http://localhost:8081/chicken/3") | ||
|
|
||
|
|
||
| def test_different_capitalization_raises_ssrf(monkeypatch): | ||
| set_context_and_lifecycle("http://localHost:8081") | ||
| monkeypatch.setenv("AIKIDO_BLOCK", "1") | ||
| with pytest.raises(AikidoSSRF): | ||
| requests.get("http://LOCALHOST:8081") | ||
| with pytest.raises(AikidoSSRF): | ||
| requests.get("http://Localhost:8081/") | ||
| with pytest.raises(AikidoSSRF): | ||
| requests.get("http://localHost:8081/test") | ||
|
|
||
|
|
||
| def test_srrf_with_request_to_itself_urllib3(monkeypatch): | ||
| http = urllib3.PoolManager() | ||
| reset_comms() | ||
|
|
||
| set_context_and_lifecycle("http://localhost:5000/test/1", host="localhost:5000") | ||
| monkeypatch.setenv("AIKIDO_BLOCK", "1") | ||
| with pytest.raises(urllib3.exceptions.MaxRetryError): | ||
| http.request("GET", "http://localhost:5000/test/1") | ||
|
|
||
| # Now test with no port match | ||
| set_context_and_lifecycle("http://localhost:5000/test/2", host="localhost:4999") | ||
| monkeypatch.setenv("AIKIDO_BLOCK", "1") | ||
| with pytest.raises(AikidoSSRF): | ||
| http.request("GET", "http://localhost:5000/test/2") | ||
|
|
||
| # Test with http app requesting to https | ||
| set_context_and_lifecycle("https://localhost/test/3", host="localhost:80") | ||
| monkeypatch.setenv("AIKIDO_BLOCK", "1") | ||
| with pytest.raises(urllib3.exceptions.MaxRetryError): | ||
| http.request("GET", "https://localhost/test/3") | ||
|
|
||
| # Test with https app requesting to http | ||
| set_context_and_lifecycle("http://localhost/test/4", host="localhost:443") | ||
| monkeypatch.setenv("AIKIDO_BLOCK", "1") | ||
| with pytest.raises(urllib3.exceptions.MaxRetryError): | ||
| http.request("GET", "https://localhost/test/4") | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.