diff --git a/README.md b/README.md index d9e335d..01a0bba 100644 --- a/README.md +++ b/README.md @@ -223,16 +223,51 @@ Real-Time Threat Intelligence Feeds provide data on the different stages of the Custom parameters aside from the common `GET` Request parameters: - `endpoint` (choose either `download` or `feed` API endpoint - default is `feed`) ```python - api = API(USERNAME, KEY) + api = API(USERNAME, KEY, always_sign_api_key=False) api.nod(endpoint="feed", **kwargs) ``` - `header_authentication`: by default, we're using API Header Authentication. Set this False if you want to use API Key and Secret Authentication. Apparently, you can't use API Header Authentication for `download` endpoints so you need to set this to `False` when calling `download` API endpoints. ```python - api = API(USERNAME, KEY) + api = API(USERNAME, KEY, always_sign_api_key=False) api.nod(header_authentication=False, **kwargs) ``` - `output_format`: (choose either `csv` or `jsonl` - default is `jsonl`). Cannot be used in `domainrdap` feeds. Additionally, `csv` is not available for `download` endpoints. ```python - api = API(USERNAME, KEY) + api = API(USERNAME, KEY, always_sign_api_key=False) api.nod(output_format="csv", **kwargs) ``` + +The Feed API standard access pattern is to periodically request the most recent feed data, as often as every 60 seconds. Specify the range of data you receive in one of two ways: + +1. With `sessionID`: Make a call and provide a new `sessionID` parameter of your choosing. The API will return the last hour of data by default. + - Each subsequent call to the API using your `sessionID` will return all data since the last. + - Any single request returns a maximum of 10M results. Requests that exceed 10M results will return a HTTP 206 response code; repeat the same request (with the same `sessionID`) to receive the next tranche of data until receiving a HTTP 200 response code. +2. Or, specify the time range in one of two ways: + - Either an `after=-60` query parameter, where (in this example) -60 indicates the previous 60 seconds. + - Or `after` and `before` query parameters for a time range, with each parameter accepting an ISO-8601 UTC formatted timestamp (a UTC date and time of the format YYYY-MM-DDThh:mm:ssZ) + +## Handling iterative response from RTUF endpoints: + +Since we may dealing with large feeds datasets, the python wrapper uses `generator` for efficient memory handling. Therefore, we need to iterate through the `generator` if we're accessing the partial results of the feeds data. + +### Single request because the requested data is within the maximum result: +```python +from domaintools import API + +api = API(USERNAME, KEY, always_sign_api_key=False) +results = api.nod(sessionID="my-session-id", after=-60) + +for result in results.response() # generator that holds NOD feeds data for the past 60 seconds and is expected to request only once + # do things to result +``` + +## Multiple requests because the requested data is more than the maximum result per request: +```python +from domaintools import API + +api = API(USERNAME, KEY, always_sign_api_key=False) +results = api.nod(sessionID="my-session-id", after=-7200) + +for partial_result in results.response() # generator that holds NOD feeds data for the past 2 hours and is expected to request multiple times + # do things to partial_result +``` \ No newline at end of file diff --git a/domaintools/base_results.py b/domaintools/base_results.py index 70da9d9..2e8e424 100644 --- a/domaintools/base_results.py +++ b/domaintools/base_results.py @@ -182,9 +182,9 @@ def setStatus(self, code, response=None): if callable(reason): reason = reason() - if code == 400: + if code in (400, 422): raise BadRequestException(code, reason) - elif code == 403: + elif code in (401, 403): raise NotAuthorizedException(code, reason) elif code == 404: raise NotFoundException(code, reason) diff --git a/domaintools/cli/utils.py b/domaintools/cli/utils.py index 79f3691..91a71ee 100644 --- a/domaintools/cli/utils.py +++ b/domaintools/cli/utils.py @@ -85,6 +85,7 @@ def _phisheye_termlist(): c.FEEDS_NAD: "Returns back newly active domains feed.", c.FEEDS_NOD: "Returns back newly observed domains feed.", c.FEEDS_DOMAINRDAP: "Returns changes to global domain registration information, populated by the Registration Data Access Protocol (RDAP).", + c.FEEDS_DOMAINDISCOVERY: "Returns new domains as they are either discovered in domain registration information, observed by our global sensor network, or reported by trusted third parties.", } diff --git a/domaintools/utils.py b/domaintools/utils.py index 242efaf..5ba61ce 100644 --- a/domaintools/utils.py +++ b/domaintools/utils.py @@ -177,8 +177,12 @@ def validate_feeds_parameters(params): after = params.get("after") before = params.get("before") if not (sessionID or after or before): - raise ValueError("sessionID or after or before must be defined") + raise ValueError("sessionID or after or before must be provided") format = params.get("output_format") - if params.get("endpoint") == Endpoint.DOWNLOAD.value and format == OutputFormat.CSV.value: + endpoint = params.get("endpoint") + if endpoint == Endpoint.DOWNLOAD.value and format == OutputFormat.CSV.value: raise ValueError(f"{format} format is not available in {Endpoint.DOWNLOAD.value} API.") + + if endpoint == Endpoint.DOWNLOAD.value and params.get("header_authentication", True): + raise ValueError(f"{Endpoint.DOWNLOAD.value} API does not support header authentication. Provide api_key in the parameter") diff --git a/tests/conftest.py b/tests/conftest.py index 5e432f4..b22ca46 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1 +1,14 @@ """Configuration for test environment""" + +import pytest + + +@pytest.fixture +def test_feeds_params(): + return { + "sessionID": "test-session-id", + "after": -60, + "before": -120, + "output_format": "csv", + "endpoint": "download", + } diff --git a/tests/test_api.py b/tests/test_api.py index 10f0502..88d4d98 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -643,3 +643,27 @@ def test_verify_response_is_a_generator(): results = feeds_api.domaindiscovery(after="-60", header_authenticationn=False) assert isgenerator(results.response()) + + +@vcr.use_cassette +def test_feeds_endpoint_should_raise_error_if_download_api_using_header_auth(): + with pytest.raises(ValueError) as excinfo: + feeds_api.domaindiscovery(after="-60", endpoint="download") + + assert str(excinfo.value) == "download API does not support header authentication. Provide api_key in the parameter" + + +@vcr.use_cassette +def test_feeds_endpoint_should_raise_error_if_no_required_params(): + with pytest.raises(ValueError) as excinfo: + feeds_api.domaindiscovery() + + assert str(excinfo.value) == "sessionID or after or before must be provided" + + +@vcr.use_cassette +def test_feeds_endpoint_should_raise_error_if_asked_csv_format_for_download_api(): + with pytest.raises(ValueError) as excinfo: + feeds_api.domaindiscovery(after="-60", output_format="csv", endpoint="download") + + assert str(excinfo.value) == "csv format is not available in download API." diff --git a/tests/test_utils.py b/tests/test_utils.py index d8c7f6a..4ab92e4 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,4 +1,6 @@ import json +import pytest + from datetime import datetime, timedelta from tests.responses import iris_investigate_data @@ -15,24 +17,18 @@ def test_get_domain_age(): def test_get_threat_component(): threat_components = [ {"name": "threat_profile_malware", "risk_score": 31}, - {"name": "threat_profile_spam", "risk_score": 73, "threats": ["spam"]} + {"name": "threat_profile_spam", "risk_score": 73, "threats": ["spam"]}, ] result = utils.get_threat_component(threat_components, "threat_profile_malware") assert result.get("risk_score") == 31 def test_investigate_average_risk_score(): - domains = [ - {"domain_risk": {"risk_score": 25}}, - {"domain_risk": {"risk_score": 27}} - ] + domains = [{"domain_risk": {"risk_score": 25}}, {"domain_risk": {"risk_score": 27}}] result = utils.get_average_risk_score(domains) assert result == 26 - domains = [ - {"domain_risk": {"risk_score": 25}}, - {} - ] + domains = [{"domain_risk": {"risk_score": 25}}, {}] result = utils.get_average_risk_score(domains) assert result == 25 @@ -42,17 +38,11 @@ def test_investigate_average_risk_score(): def test_detect_average_risk_score(): - domains = [ - {"risk_score": 25}, - {"risk_score": 27} - ] + domains = [{"risk_score": 25}, {"risk_score": 27}] result = utils.get_average_risk_score(domains) assert result == 26 - domains = [ - {"risk_score": 25}, - {"risk_score": None} - ] + domains = [{"risk_score": 25}, {"risk_score": None}] result = utils.get_average_risk_score(domains) assert result == 25 @@ -65,24 +55,15 @@ def test_investigate_average_age(): two_days_ago = (datetime.now() - timedelta(days=2)).strftime("%Y-%m-%d") five_days_ago = (datetime.now() - timedelta(days=5)).strftime("%Y-%m-%d") - domains = [ - {"create_date": {"value": two_days_ago}}, - {"create_date": {"value": five_days_ago}} - ] + domains = [{"create_date": {"value": two_days_ago}}, {"create_date": {"value": five_days_ago}}] result = utils.get_average_age(domains) assert result == 3 - domains = [ - {"create_date": {"value": two_days_ago}}, - {} - ] + domains = [{"create_date": {"value": two_days_ago}}, {}] result = utils.get_average_age(domains) assert result == 2 - domains = [ - {"create_date": {"value": two_days_ago}}, - {"create_date": {"value": ""}} - ] + domains = [{"create_date": {"value": two_days_ago}}, {"create_date": {"value": ""}}] result = utils.get_average_age(domains) assert result == 2 @@ -94,17 +75,11 @@ def test_investigate_average_age(): def test_detect_average_age(): two_days_ago = int((datetime.now() - timedelta(days=2)).strftime("%Y%m%d")) five_days_ago = int((datetime.now() - timedelta(days=5)).strftime("%Y%m%d")) - domains = [ - {"create_date": two_days_ago}, - {"create_date": five_days_ago} - ] + domains = [{"create_date": two_days_ago}, {"create_date": five_days_ago}] result = utils.get_average_age(domains) assert result == 3 - domains = [ - {"create_date": two_days_ago}, - {"create_date": None} - ] + domains = [{"create_date": two_days_ago}, {"create_date": None}] result = utils.get_average_age(domains) assert result == 2 @@ -121,26 +96,51 @@ def test_data_prune(): def test_find_emails(): emails = utils.find_emails(json.dumps(iris_investigate_data.domaintools())) - assert emails == {'abuse@enom.com', 'hostmaster@nsone.net'} + assert emails == {"abuse@enom.com", "hostmaster@nsone.net"} def test_find_ips(): ips = utils.find_ips(json.dumps(iris_investigate_data.domaintools())) - assert ips == {'142.250.115.26', - '142.250.141.27', - '198.51.44.4', - '198.51.44.68', - '198.51.45.4', - '198.51.45.68', - '199.30.228.112', - '64.233.171.26', - '74.125.142.26'} + assert ips == { + "142.250.115.26", + "142.250.141.27", + "198.51.44.4", + "198.51.44.68", + "198.51.45.4", + "198.51.45.68", + "199.30.228.112", + "64.233.171.26", + "74.125.142.26", + } + def test_get_pivots(): pivots = utils.get_pivots(iris_investigate_data.domaintools().get("results"), "") - assert pivots == [ - ['IP ADDRESS', ('199.30.228.112', 4)], - ['IP ASN', (17318, 111)], - ['IP ISP', ('DomainTools LLC', 222)] - ] + assert pivots == [["IP ADDRESS", ("199.30.228.112", 4)], ["IP ASN", (17318, 111)], ["IP ISP", ("DomainTools LLC", 222)]] + + +def test_validate_feeds_parameters_should_raise_error_if_download_api_using_header_auth(test_feeds_params): + test_feeds_params["output_format"] = "jsonl" + + with pytest.raises(ValueError) as excinfo: + utils.validate_feeds_parameters(test_feeds_params) + + assert str(excinfo.value) == "download API does not support header authentication. Provide api_key in the parameter" + + +def test_validate_feeds_parameters_should_raise_error_if_no_required_params(test_feeds_params): + test_feeds_params.pop("sessionID", None) + test_feeds_params.pop("after", None) + test_feeds_params.pop("before", None) + + with pytest.raises(ValueError) as excinfo: + utils.validate_feeds_parameters(test_feeds_params) + + assert str(excinfo.value) == "sessionID or after or before must be provided" + + +def test_validate_feeds_parameters_should_raise_error_if_asked_csv_format_for_download_api(test_feeds_params): + with pytest.raises(ValueError) as excinfo: + utils.validate_feeds_parameters(test_feeds_params) + assert str(excinfo.value) == "csv format is not available in download API."