Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 38 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,16 +223,51 @@ Real-Time Threat Intelligence Feeds provide data on the different stages of the
Custom parameters aside from the common `GET` Request parameters:
- `endpoint` (choose either `download` or `feed` API endpoint - default is `feed`)
```python
api = API(USERNAME, KEY)
api = API(USERNAME, KEY, always_sign_api_key=False)
api.nod(endpoint="feed", **kwargs)
```
- `header_authentication`: by default, we're using API Header Authentication. Set this False if you want to use API Key and Secret Authentication. Apparently, you can't use API Header Authentication for `download` endpoints so you need to set this to `False` when calling `download` API endpoints.
```python
api = API(USERNAME, KEY)
api = API(USERNAME, KEY, always_sign_api_key=False)
api.nod(header_authentication=False, **kwargs)
```
- `output_format`: (choose either `csv` or `jsonl` - default is `jsonl`). Cannot be used in `domainrdap` feeds. Additionally, `csv` is not available for `download` endpoints.
```python
api = API(USERNAME, KEY)
api = API(USERNAME, KEY, always_sign_api_key=False)
api.nod(output_format="csv", **kwargs)
```

The Feed API standard access pattern is to periodically request the most recent feed data, as often as every 60 seconds. Specify the range of data you receive in one of two ways:

1. With `sessionID`: Make a call and provide a new `sessionID` parameter of your choosing. The API will return the last hour of data by default.
- Each subsequent call to the API using your `sessionID` will return all data since the last.
- Any single request returns a maximum of 10M results. Requests that exceed 10M results will return a HTTP 206 response code; repeat the same request (with the same `sessionID`) to receive the next tranche of data until receiving a HTTP 200 response code.
2. Or, specify the time range in one of two ways:
- Either an `after=-60` query parameter, where (in this example) -60 indicates the previous 60 seconds.
- Or `after` and `before` query parameters for a time range, with each parameter accepting an ISO-8601 UTC formatted timestamp (a UTC date and time of the format YYYY-MM-DDThh:mm:ssZ)

## Handling iterative response from RTUF endpoints:

Since we may dealing with large feeds datasets, the python wrapper uses `generator` for efficient memory handling. Therefore, we need to iterate through the `generator` if we're accessing the partial results of the feeds data.

### Single request because the requested data is within the maximum result:
```python
from domaintools import API

api = API(USERNAME, KEY, always_sign_api_key=False)
results = api.nod(sessionID="my-session-id", after=-60)

for result in results.response() # generator that holds NOD feeds data for the past 60 seconds and is expected to request only once
# do things to result
```

## Multiple requests because the requested data is more than the maximum result per request:
```python
from domaintools import API

api = API(USERNAME, KEY, always_sign_api_key=False)
results = api.nod(sessionID="my-session-id", after=-7200)

for partial_result in results.response() # generator that holds NOD feeds data for the past 2 hours and is expected to request multiple times
# do things to partial_result
```
4 changes: 2 additions & 2 deletions domaintools/base_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,9 @@ def setStatus(self, code, response=None):
if callable(reason):
reason = reason()

if code == 400:
if code in (400, 422):
raise BadRequestException(code, reason)
elif code == 403:
elif code in (401, 403):
raise NotAuthorizedException(code, reason)
elif code == 404:
raise NotFoundException(code, reason)
Expand Down
1 change: 1 addition & 0 deletions domaintools/cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def _phisheye_termlist():
c.FEEDS_NAD: "Returns back newly active domains feed.",
c.FEEDS_NOD: "Returns back newly observed domains feed.",
c.FEEDS_DOMAINRDAP: "Returns changes to global domain registration information, populated by the Registration Data Access Protocol (RDAP).",
c.FEEDS_DOMAINDISCOVERY: "Returns new domains as they are either discovered in domain registration information, observed by our global sensor network, or reported by trusted third parties.",
}


Expand Down
8 changes: 6 additions & 2 deletions domaintools/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,12 @@ def validate_feeds_parameters(params):
after = params.get("after")
before = params.get("before")
if not (sessionID or after or before):
raise ValueError("sessionID or after or before must be defined")
raise ValueError("sessionID or after or before must be provided")

format = params.get("output_format")
if params.get("endpoint") == Endpoint.DOWNLOAD.value and format == OutputFormat.CSV.value:
endpoint = params.get("endpoint")
if endpoint == Endpoint.DOWNLOAD.value and format == OutputFormat.CSV.value:
raise ValueError(f"{format} format is not available in {Endpoint.DOWNLOAD.value} API.")

if endpoint == Endpoint.DOWNLOAD.value and params.get("header_authentication", True):
raise ValueError(f"{Endpoint.DOWNLOAD.value} API does not support header authentication. Provide api_key in the parameter")
13 changes: 13 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1 +1,14 @@
"""Configuration for test environment"""

import pytest


@pytest.fixture
def test_feeds_params():
return {
"sessionID": "test-session-id",
"after": -60,
"before": -120,
"output_format": "csv",
"endpoint": "download",
}
24 changes: 24 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -643,3 +643,27 @@ def test_verify_response_is_a_generator():
results = feeds_api.domaindiscovery(after="-60", header_authenticationn=False)

assert isgenerator(results.response())


@vcr.use_cassette
def test_feeds_endpoint_should_raise_error_if_download_api_using_header_auth():
with pytest.raises(ValueError) as excinfo:
feeds_api.domaindiscovery(after="-60", endpoint="download")

assert str(excinfo.value) == "download API does not support header authentication. Provide api_key in the parameter"


@vcr.use_cassette
def test_feeds_endpoint_should_raise_error_if_no_required_params():
with pytest.raises(ValueError) as excinfo:
feeds_api.domaindiscovery()

assert str(excinfo.value) == "sessionID or after or before must be provided"


@vcr.use_cassette
def test_feeds_endpoint_should_raise_error_if_asked_csv_format_for_download_api():
with pytest.raises(ValueError) as excinfo:
feeds_api.domaindiscovery(after="-60", output_format="csv", endpoint="download")

assert str(excinfo.value) == "csv format is not available in download API."
104 changes: 52 additions & 52 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import json
import pytest

from datetime import datetime, timedelta

from tests.responses import iris_investigate_data
Expand All @@ -15,24 +17,18 @@ def test_get_domain_age():
def test_get_threat_component():
threat_components = [
{"name": "threat_profile_malware", "risk_score": 31},
{"name": "threat_profile_spam", "risk_score": 73, "threats": ["spam"]}
{"name": "threat_profile_spam", "risk_score": 73, "threats": ["spam"]},
]
result = utils.get_threat_component(threat_components, "threat_profile_malware")
assert result.get("risk_score") == 31


def test_investigate_average_risk_score():
domains = [
{"domain_risk": {"risk_score": 25}},
{"domain_risk": {"risk_score": 27}}
]
domains = [{"domain_risk": {"risk_score": 25}}, {"domain_risk": {"risk_score": 27}}]
result = utils.get_average_risk_score(domains)
assert result == 26

domains = [
{"domain_risk": {"risk_score": 25}},
{}
]
domains = [{"domain_risk": {"risk_score": 25}}, {}]
result = utils.get_average_risk_score(domains)
assert result == 25

Expand All @@ -42,17 +38,11 @@ def test_investigate_average_risk_score():


def test_detect_average_risk_score():
domains = [
{"risk_score": 25},
{"risk_score": 27}
]
domains = [{"risk_score": 25}, {"risk_score": 27}]
result = utils.get_average_risk_score(domains)
assert result == 26

domains = [
{"risk_score": 25},
{"risk_score": None}
]
domains = [{"risk_score": 25}, {"risk_score": None}]
result = utils.get_average_risk_score(domains)
assert result == 25

Expand All @@ -65,24 +55,15 @@ def test_investigate_average_age():
two_days_ago = (datetime.now() - timedelta(days=2)).strftime("%Y-%m-%d")
five_days_ago = (datetime.now() - timedelta(days=5)).strftime("%Y-%m-%d")

domains = [
{"create_date": {"value": two_days_ago}},
{"create_date": {"value": five_days_ago}}
]
domains = [{"create_date": {"value": two_days_ago}}, {"create_date": {"value": five_days_ago}}]
result = utils.get_average_age(domains)
assert result == 3

domains = [
{"create_date": {"value": two_days_ago}},
{}
]
domains = [{"create_date": {"value": two_days_ago}}, {}]
result = utils.get_average_age(domains)
assert result == 2

domains = [
{"create_date": {"value": two_days_ago}},
{"create_date": {"value": ""}}
]
domains = [{"create_date": {"value": two_days_ago}}, {"create_date": {"value": ""}}]
result = utils.get_average_age(domains)
assert result == 2

Expand All @@ -94,17 +75,11 @@ def test_investigate_average_age():
def test_detect_average_age():
two_days_ago = int((datetime.now() - timedelta(days=2)).strftime("%Y%m%d"))
five_days_ago = int((datetime.now() - timedelta(days=5)).strftime("%Y%m%d"))
domains = [
{"create_date": two_days_ago},
{"create_date": five_days_ago}
]
domains = [{"create_date": two_days_ago}, {"create_date": five_days_ago}]
result = utils.get_average_age(domains)
assert result == 3

domains = [
{"create_date": two_days_ago},
{"create_date": None}
]
domains = [{"create_date": two_days_ago}, {"create_date": None}]
result = utils.get_average_age(domains)
assert result == 2

Expand All @@ -121,26 +96,51 @@ def test_data_prune():

def test_find_emails():
emails = utils.find_emails(json.dumps(iris_investigate_data.domaintools()))
assert emails == {'[email protected]', '[email protected]'}
assert emails == {"[email protected]", "[email protected]"}


def test_find_ips():
ips = utils.find_ips(json.dumps(iris_investigate_data.domaintools()))
assert ips == {'142.250.115.26',
'142.250.141.27',
'198.51.44.4',
'198.51.44.68',
'198.51.45.4',
'198.51.45.68',
'199.30.228.112',
'64.233.171.26',
'74.125.142.26'}
assert ips == {
"142.250.115.26",
"142.250.141.27",
"198.51.44.4",
"198.51.44.68",
"198.51.45.4",
"198.51.45.68",
"199.30.228.112",
"64.233.171.26",
"74.125.142.26",
}


def test_get_pivots():
pivots = utils.get_pivots(iris_investigate_data.domaintools().get("results"), "")
assert pivots == [
['IP ADDRESS', ('199.30.228.112', 4)],
['IP ASN', (17318, 111)],
['IP ISP', ('DomainTools LLC', 222)]
]
assert pivots == [["IP ADDRESS", ("199.30.228.112", 4)], ["IP ASN", (17318, 111)], ["IP ISP", ("DomainTools LLC", 222)]]


def test_validate_feeds_parameters_should_raise_error_if_download_api_using_header_auth(test_feeds_params):
test_feeds_params["output_format"] = "jsonl"

with pytest.raises(ValueError) as excinfo:
utils.validate_feeds_parameters(test_feeds_params)

assert str(excinfo.value) == "download API does not support header authentication. Provide api_key in the parameter"


def test_validate_feeds_parameters_should_raise_error_if_no_required_params(test_feeds_params):
test_feeds_params.pop("sessionID", None)
test_feeds_params.pop("after", None)
test_feeds_params.pop("before", None)

with pytest.raises(ValueError) as excinfo:
utils.validate_feeds_parameters(test_feeds_params)

assert str(excinfo.value) == "sessionID or after or before must be provided"


def test_validate_feeds_parameters_should_raise_error_if_asked_csv_format_for_download_api(test_feeds_params):
with pytest.raises(ValueError) as excinfo:
utils.validate_feeds_parameters(test_feeds_params)

assert str(excinfo.value) == "csv format is not available in download API."
Loading