Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,5 +211,28 @@ API_KEY
Python Version Support Policy
===================

Please see the [supported versions](https://github.com/DomainTools/python_api/raw/main/PYTHON_SUPPORT.md) document
Please see the [supported versions](https://github.com/DomainTools/python_api/raw/main/PYTHON_SUPPORT.md) document
for the DomainTools Python support policy.


Real-Time Threat Intelligence Feeds
===================

Real-Time Threat Intelligence Feeds provide data on the different stages of the domain lifecycle: from first-observed in the wild, to newly re-activated after a period of quiet. Access current feed data in real-time or retrieve historical feed data through separate APIs.

Custom parameters aside from the common `GET` Request parameters:
- `endpoint` (choose either `download` or `feed` API endpoint - default is `feed`)
```python
api = API(USERNAME, KEY)
api.nod(endpoint="download")
```
- `header_authentication`: by default, we're using API Header Authentication. Set this False if you want to use API Key and Secret Authentication
```python
api = API(USERNAME, KEY)
api.nod(header_authentication=False, **kwargs)
```
- `output_format`: (choose either `csv` or `jsonl` - default is `jsonl`). Cannot be used in `domainrdap` feeds. Additionally, `csv` is not available for `download` endpoints.
```python
api = API(USERNAME, KEY)
api.nod(output_format="csv", **kwargs)
```
48 changes: 28 additions & 20 deletions domaintools/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
filter_by_field,
DTResultFilter,
)
from domaintools.utils import validate_feeds_parameters
from domaintools.utils import get_feeds_products_list, validate_feeds_parameters


AVAILABLE_KEY_SIGN_HASHES = ["sha1", "sha256", "md5"]
Expand Down Expand Up @@ -125,14 +125,18 @@ def _results(self, product, path, cls=Results, **kwargs):
uri = "/".join((self._rest_api_url, path.lstrip("/")))
parameters = self.default_parameters.copy()
parameters["api_username"] = self.username
self.handle_api_key(path, parameters)
header_authentication = kwargs.pop("header_authentication", True) # Used only by Real-Time Threat Intelligence Feeds endpoints for now
self.handle_api_key(product, path, parameters, header_authentication)
parameters.update({key: str(value).lower() if value in (True, False) else value for key, value in kwargs.items() if value is not None})

return cls(self, product, uri, **parameters)

def handle_api_key(self, path, parameters):
def handle_api_key(self, product, path, parameters, header_authentication):
if self.https and not self.always_sign_api_key:
parameters["api_key"] = self.key
if product in get_feeds_products_list() and header_authentication:
parameters["X-Api-Key"] = self.key
else:
parameters["api_key"] = self.key
else:
if self.key_sign_hash and self.key_sign_hash in AVAILABLE_KEY_SIGN_HASHES:
signing_hash = eval(self.key_sign_hash)
Expand Down Expand Up @@ -1063,28 +1067,32 @@ def iris_detect_ignored_domains(

def nod(self, **kwargs):
"""Returns back list of the newly observed domains feed"""
sessionID = kwargs.get("sessionID")
after = kwargs.get("after")
if not (sessionID or after):
raise ValueError("sessionID or after (can be both) must be defined")
validate_feeds_parameters(kwargs)
endpoint = kwargs.pop("endpoint", Endpoint.FEED.value)
source = ENDPOINT_TO_SOURCE_MAP.get(endpoint)
if endpoint == Endpoint.DOWNLOAD.value or kwargs.get("output_format", OutputFormat.JSONL.value) != OutputFormat.CSV.value:
# headers param is allowed only in Feed API and CSV format
kwargs.pop("headers", None)

return self._results(
"newly-observed-domains-feed-(api)",
"v1/feed/nod/",
f"newly-observed-domains-feed-({source.value})",
f"v1/{endpoint}/nod/",
response_path=(),
**kwargs,
)

def nad(self, **kwargs):
"""Returns back list of the newly active domains feed"""
sessionID = kwargs.get("sessionID")
after = kwargs.get("after")
if not (sessionID or after):
raise ValueError("sessionID or after (can be both) must be defined")
validate_feeds_parameters(kwargs)
endpoint = kwargs.pop("endpoint", Endpoint.FEED.value)
source = ENDPOINT_TO_SOURCE_MAP.get(endpoint).value
if endpoint == Endpoint.DOWNLOAD.value or kwargs.get("output_format", OutputFormat.JSONL.value) != OutputFormat.CSV.value:
# headers param is allowed only in Feed API and CSV format
kwargs.pop("headers", None)

return self._results(
"newly-active-domains-feed-(api)",
"v1/feed/nad/",
f"newly-active-domains-feed-({source})",
f"v1/{endpoint}/nad/",
response_path=(),
**kwargs,
)
Expand All @@ -1093,10 +1101,10 @@ def domainrdap(self, **kwargs):
"""Returns changes to global domain registration information, populated by the Registration Data Access Protocol (RDAP)"""
validate_feeds_parameters(kwargs)
endpoint = kwargs.pop("endpoint", Endpoint.FEED.value)
source = ENDPOINT_TO_SOURCE_MAP.get(endpoint)
source = ENDPOINT_TO_SOURCE_MAP.get(endpoint).value

return self._results(
f"domain-registration-data-access-protocol-feed-({source.value})",
f"domain-registration-data-access-protocol-feed-({source})",
f"v1/{endpoint}/domainrdap/",
response_path=(),
**kwargs,
Expand All @@ -1106,13 +1114,13 @@ def domaindiscovery(self, **kwargs):
"""Returns new domains as they are either discovered in domain registration information, observed by our global sensor network, or reported by trusted third parties"""
validate_feeds_parameters(kwargs)
endpoint = kwargs.pop("endpoint", Endpoint.FEED.value)
source = ENDPOINT_TO_SOURCE_MAP.get(endpoint)
source = ENDPOINT_TO_SOURCE_MAP.get(endpoint).value
if endpoint == Endpoint.DOWNLOAD.value or kwargs.get("output_format", OutputFormat.JSONL.value) != OutputFormat.CSV.value:
# headers param is allowed only in Feed API and CSV format
kwargs.pop("headers", None)

return self._results(
f"real-time-domain-discovery-feed-({source.value})",
f"real-time-domain-discovery-feed-({source})",
f"v1/{endpoint}/domaindiscovery/",
response_path=(),
**kwargs,
Expand Down
4 changes: 4 additions & 0 deletions domaintools/base_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ def _make_request(self):
parameters["headers"] = int(bool(self.kwargs.get("headers", False)))
headers["accept"] = HEADER_ACCEPT_KEY_CSV_FORMAT

header_api_key = parameters.pop("X-Api-Key", None)
if header_api_key:
headers["X-Api-Key"] = header_api_key

return session.get(url=self.url, params=parameters, headers=headers, **self.api.extra_request_params)
else:
return session.get(url=self.url, params=self.kwargs, **self.api.extra_request_params)
Expand Down
7 changes: 4 additions & 3 deletions domaintools/cli/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@
from typing import Optional, Dict, Tuple
from rich.progress import Progress, SpinnerColumn, TextColumn

from domaintools.constants import Endpoint, OutputFormat
from domaintools.api import API
from domaintools.exceptions import ServiceException
from domaintools.constants import Endpoint, OutputFormat
from domaintools.cli.utils import get_file_extension
from domaintools.exceptions import ServiceException
from domaintools._version import current as version


class DTCLICommand:
API_SUCCESS_STATUS = 200
APP_PARTNER_NAME = "python_wrapper_cli_2.0.0"
APP_PARTNER_NAME = f"python_wrapper_cli_{version}"

@staticmethod
def print_api_version(value: bool):
Expand Down
105 changes: 74 additions & 31 deletions domaintools/cli/commands/feeds.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import sys
import typer


Expand All @@ -23,20 +22,6 @@ def feeds_nad(
"--credfile",
help="Optional file with API username and API key, one per line.",
),
rate_limit: bool = typer.Option(
False,
"-l",
"--rate-limit",
help="Rate limit API calls against the API based on per minute limits.",
),
format: str = typer.Option(
"json",
"-f",
"--format",
help="Output format in {'list', 'json', 'xml', 'html'}",
callback=DTCLICommand.validate_format_input,
),
out_file: typer.FileTextWrite = typer.Option(sys.stdout, "-o", "--out-file", help="Output file (defaults to stdout)"),
no_verify_ssl: bool = typer.Option(
False,
"--no-verify-ssl",
Expand All @@ -47,6 +32,25 @@ def feeds_nad(
"--no-sign-api-key",
help="Skip signing of api key",
),
header_authentication: bool = typer.Option(
True,
"--no-header-auth",
help="Don't use header authentication",
),
output_format: str = typer.Option(
"jsonl",
"-f",
"--format",
help=f"Output format in [{OutputFormat.JSONL.value}, {OutputFormat.CSV.value}]",
callback=DTCLICommand.validate_feeds_format_input,
),
endpoint: str = typer.Option(
Endpoint.FEED.value,
"-e",
"--endpoint",
help=f"Valid endpoints: [{Endpoint.FEED.value}, {Endpoint.DOWNLOAD.value}]",
callback=DTCLICommand.validate_endpoint_input,
),
sessionID: str = typer.Option(
None,
"--session-id",
Expand All @@ -56,17 +60,29 @@ def feeds_nad(
None,
"--after",
help="Start of the time window, relative to the current time in seconds, for which data will be provided",
callback=DTCLICommand.validate_after_or_before_input,
),
before: str = typer.Option(
None,
"--before",
help="The end of the query window in seconds, relative to the current time, inclusive",
callback=DTCLICommand.validate_after_or_before_input,
),
domain: str = typer.Option(
None,
"-d",
"--domain",
help="A string value used to filter feed results",
),
headers: bool = typer.Option(
False,
"--headers",
help="Adds a header to the first line of response when text/csv is set in header parameters",
),
top: str = typer.Option(
None,
"--top",
help="Number of results to return in the response payload",
help="Number of results to return in the response payload. This is ignored in download endpoint",
),
):
DTCLICommand.run(name=c.FEEDS_NAD, params=ctx.params)
Expand All @@ -86,20 +102,6 @@ def feeds_nod(
"--credfile",
help="Optional file with API username and API key, one per line.",
),
rate_limit: bool = typer.Option(
False,
"-l",
"--rate-limit",
help="Rate limit API calls against the API based on per minute limits.",
),
format: str = typer.Option(
"json",
"-f",
"--format",
help="Output format in {'list', 'json', 'xml', 'html'}",
callback=DTCLICommand.validate_format_input,
),
out_file: typer.FileTextWrite = typer.Option(sys.stdout, "-o", "--out-file", help="Output file (defaults to stdout)"),
no_verify_ssl: bool = typer.Option(
False,
"--no-verify-ssl",
Expand All @@ -110,6 +112,25 @@ def feeds_nod(
"--no-sign-api-key",
help="Skip signing of api key",
),
header_authentication: bool = typer.Option(
True,
"--no-header-auth",
help="Don't use header authentication",
),
output_format: str = typer.Option(
"jsonl",
"-f",
"--format",
help=f"Output format in [{OutputFormat.JSONL.value}, {OutputFormat.CSV.value}]",
callback=DTCLICommand.validate_feeds_format_input,
),
endpoint: str = typer.Option(
Endpoint.FEED.value,
"-e",
"--endpoint",
help=f"Valid endpoints: [{Endpoint.FEED.value}, {Endpoint.DOWNLOAD.value}]",
callback=DTCLICommand.validate_endpoint_input,
),
sessionID: str = typer.Option(
None,
"--session-id",
Expand All @@ -119,17 +140,29 @@ def feeds_nod(
None,
"--after",
help="Start of the time window, relative to the current time in seconds, for which data will be provided",
callback=DTCLICommand.validate_after_or_before_input,
),
before: str = typer.Option(
None,
"--before",
help="The end of the query window in seconds, relative to the current time, inclusive",
callback=DTCLICommand.validate_after_or_before_input,
),
domain: str = typer.Option(
None,
"-d",
"--domain",
help="A string value used to filter feed results",
),
headers: bool = typer.Option(
False,
"--headers",
help="Adds a header to the first line of response when text/csv is set in header parameters",
),
top: str = typer.Option(
None,
"--top",
help="Number of results to return in the response payload",
help="Number of results to return in the response payload. This is ignored in download endpoint",
),
):
DTCLICommand.run(name=c.FEEDS_NOD, params=ctx.params)
Expand Down Expand Up @@ -159,6 +192,11 @@ def feeds_domainrdap(
"--no-sign-api-key",
help="Skip signing of api key",
),
header_authentication: bool = typer.Option(
True,
"--no-header-auth",
help="Don't use header authentication",
),
endpoint: str = typer.Option(
Endpoint.FEED.value,
"-e",
Expand Down Expand Up @@ -222,6 +260,11 @@ def feeds_domaindiscovery(
"--no-sign-api-key",
help="Skip signing of api key",
),
header_authentication: bool = typer.Option(
True,
"--no-header-auth",
help="Don't use header authentication",
),
output_format: str = typer.Option(
"jsonl",
"-f",
Expand Down
2 changes: 2 additions & 0 deletions domaintools/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,9 @@ def convert_str_to_dateobj(string_date: str, date_format: Optional[str] = "%Y-%m
def get_feeds_products_list():
return [
"newly-active-domains-feed-(api)",
"newly-active-domains-feed-(s3)",
"newly-observed-domains-feed-(api)",
"newly-observed-domains-feed-(s3)",
"domain-registration-data-access-protocol-feed-(api)",
"domain-registration-data-access-protocol-feed-(s3)",
"real-time-domain-discovery-feed-(api)",
Expand Down
5 changes: 5 additions & 0 deletions domaintools_async/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ async def _make_async_request(self, session):
if self.kwargs.get("output_format", OutputFormat.JSONL.value) == OutputFormat.CSV.value:
parameters["headers"] = int(bool(self.kwargs.get("headers", False)))
headers["accept"] = HEADER_ACCEPT_KEY_CSV_FORMAT

header_api_key = parameters.pop("X-Api-Key", None)
if header_api_key:
headers["X-Api-Key"] = header_api_key

results = await session.get(url=self.url, params=parameters, headers=headers, **self.api.extra_request_params)
else:
results = await session.get(url=self.url, params=self.kwargs, **self.api.extra_request_params)
Expand Down
Loading