Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
runs-on: ubuntu-20.04
strategy:
matrix:
python-version: ["3.6", "3.7", "3.8", "3.9", "3.10"]
python-version: ["3.9", "3.10", "3.11"]

steps:
- uses: actions/checkout@v2
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.1.0
2.2.0
2 changes: 1 addition & 1 deletion domaintools/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@

"""

current = "2.1.0"
current = "2.2.0"
148 changes: 43 additions & 105 deletions domaintools/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@
import re

from domaintools._version import current as version
from domaintools.results import GroupedIterable, ParsedWhois, Reputation, Results
from domaintools.results import (
GroupedIterable,
ParsedWhois,
ParsedDomainRdap,
Reputation,
Results,
)
from domaintools.filters import (
filter_by_riskscore,
filter_by_expire_date,
Expand Down Expand Up @@ -77,16 +83,12 @@ def __init__(
self._build_api_url(api_url, api_port)

if not https:
raise Exception(
"The DomainTools API endpoints no longer support http traffic. Please make sure https=True."
)
raise Exception("The DomainTools API endpoints no longer support http traffic. Please make sure https=True.")
if proxy_url:
if isinstance(proxy_url, str):
self.proxy_url = {"http://": proxy_url, "https://": proxy_url}
else:
raise Exception(
"Proxy URL must be a string. For example: '127.0.0.1:8888'"
)
raise Exception("Proxy URL must be a string. For example: '127.0.0.1:8888'")

def _build_api_url(self, api_url=None, api_port=None):
"""Build the API url based on the given url and port. Defaults to `https://api.domaintools.com`"""
Expand All @@ -110,31 +112,18 @@ def _rate_limit(self):
hours = limit_hours and 3600 / float(limit_hours)
minutes = limit_minutes and 60 / float(limit_minutes)

self.limits[product["id"]] = {
"interval": timedelta(seconds=minutes or hours or default)
}
self.limits[product["id"]] = {"interval": timedelta(seconds=minutes or hours or default)}

def _results(self, product, path, cls=Results, **kwargs):
"""Returns _results for the specified API path with the specified **kwargs parameters"""
if (
product != "account-information"
and self.rate_limit
and not self.limits_set
and not self.limits
):
if product != "account-information" and self.rate_limit and not self.limits_set and not self.limits:
self._rate_limit()

uri = "/".join((self._rest_api_url, path.lstrip("/")))
parameters = self.default_parameters.copy()
parameters["api_username"] = self.username
self.handle_api_key(path, parameters)
parameters.update(
{
key: str(value).lower() if value in (True, False) else value
for key, value in kwargs.items()
if value is not None
}
)
parameters.update({key: str(value).lower() if value in (True, False) else value for key, value in kwargs.items() if value is not None})

return cls(self, product, uri, **parameters)

Expand All @@ -147,14 +136,10 @@ def handle_api_key(self, path, parameters):
else:
raise ValueError(
"Invalid value '{0}' for 'key_sign_hash'. "
"Values available are {1}".format(
self.key_sign_hash, ",".join(AVAILABLE_KEY_SIGN_HASHES)
)
"Values available are {1}".format(self.key_sign_hash, ",".join(AVAILABLE_KEY_SIGN_HASHES))
)

parameters["timestamp"] = datetime.now(timezone.utc).strftime(
"%Y-%m-%dT%H:%M:%SZ"
)
parameters["timestamp"] = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
parameters["signature"] = hmac(
self.key.encode("utf8"),
"".join([self.username, parameters["timestamp"], path]).encode("utf8"),
Expand All @@ -163,9 +148,7 @@ def handle_api_key(self, path, parameters):

def account_information(self, **kwargs):
"""Provides a snapshot of your accounts current API usage"""
return self._results(
"account-information", "/v1/account", items_path=("products",), **kwargs
)
return self._results("account-information", "/v1/account", items_path=("products",), **kwargs)

def available_api_calls(self):
"""Provides a list of api calls that you can use based on your account information."""
Expand All @@ -180,25 +163,10 @@ def snakecase(string):
string[1:],
)

api_calls = tuple(
(
api_call
for api_call in dir(API)
if not api_call.startswith("_")
and callable(getattr(API, api_call, None))
)
)
return sorted(
[
snakecase(p["id"])
for p in self.account_information()["products"]
if snakecase(p["id"]) in api_calls
]
)
api_calls = tuple((api_call for api_call in dir(API) if not api_call.startswith("_") and callable(getattr(API, api_call, None))))
return sorted([snakecase(p["id"]) for p in self.account_information()["products"] if snakecase(p["id"]) in api_calls])

def brand_monitor(
self, query, exclude=None, domain_status=None, days_back=None, **kwargs
):
def brand_monitor(self, query, exclude=None, domain_status=None, days_back=None, **kwargs):
"""Pass in one or more terms as a list or separated by the pipe character ( | )"""
if exclude is None:
exclude = []
Expand Down Expand Up @@ -324,9 +292,16 @@ def parsed_whois(self, query, **kwargs):
**kwargs,
)

def registrant_monitor(
self, query, exclude=None, days_back=0, limit=None, **kwargs
):
def parsed_domain_rdap(self, query, **kwargs):
"""Pass in a domain name to see the most recent Domain-RDAP registration record"""
return self._results(
"parsed-domain-rdap",
"/v1/{0}/rdap/parsed/".format(query),
cls=ParsedDomainRdap,
**kwargs,
)

def registrant_monitor(self, query, exclude=None, days_back=0, limit=None, **kwargs):
"""One or more terms as a Python list or separated by the pipe character ( | )."""
if exclude is None:
exclude = []
Expand Down Expand Up @@ -354,15 +329,11 @@ def reputation(self, query, include_reasons=False, **kwargs):

def reverse_ip(self, domain=None, limit=None, **kwargs):
"""Pass in a domain name."""
return self._results(
"reverse-ip", "/v1/{0}/reverse-ip".format(domain), limit=limit, **kwargs
)
return self._results("reverse-ip", "/v1/{0}/reverse-ip".format(domain), limit=limit, **kwargs)

def host_domains(self, ip=None, limit=None, **kwargs):
"""Pass in an IP address."""
return self._results(
"reverse-ip", "/v1/{0}/host-domains".format(ip), limit=limit, **kwargs
)
return self._results("reverse-ip", "/v1/{0}/host-domains".format(ip), limit=limit, **kwargs)

def reverse_ip_whois(
self,
Expand Down Expand Up @@ -401,9 +372,7 @@ def reverse_name_server(self, query, limit=None, **kwargs):
**kwargs,
)

def reverse_whois(
self, query, exclude=None, scope="current", mode="purchase", **kwargs
):
def reverse_whois(self, query, exclude=None, scope="current", mode="purchase", **kwargs):
"""List of one or more terms to search for in the Whois record,
as a Python list or separated with the pipe character ( | ).
"""
Expand All @@ -423,9 +392,7 @@ def whois(self, query, **kwargs):
"""Pass in a domain name or an IP address to perform a whois lookup."""
return self._results("whois", "/v1/{0}/whois".format(query), **kwargs)

def whois_history(
self, query, mode=None, sort=None, offset=None, limit=None, **kwargs
):
def whois_history(self, query, mode=None, sort=None, offset=None, limit=None, **kwargs):
"""Pass in a domain name."""
return self._results(
"whois-history",
Expand Down Expand Up @@ -484,16 +451,7 @@ def iris(
"""Performs a search for the provided search terms ANDed together,
returning the pivot engine row data for the resulting domains.
"""
if (
not domain
and not ip
and not email
and not nameserver
and not registrar
and not registrant
and not registrant_org
and not kwargs
):
if not domain and not ip and not email and not nameserver and not registrar and not registrant and not registrant_org and not kwargs:
raise ValueError("At least one search term must be specified")

return self._results(
Expand Down Expand Up @@ -568,25 +526,17 @@ def iris_enrich(self, *domains, **kwargs):
younger_than_date = kwargs.pop("younger_than_date", {}) or None
older_than_date = kwargs.pop("older_than_date", {}) or None
updated_after = kwargs.pop("updated_after", {}) or None
include_domains_with_missing_field = (
kwargs.pop("include_domains_with_missing_field", {}) or None
)
exclude_domains_with_missing_field = (
kwargs.pop("exclude_domains_with_missing_field", {}) or None
)
include_domains_with_missing_field = kwargs.pop("include_domains_with_missing_field", {}) or None
exclude_domains_with_missing_field = kwargs.pop("exclude_domains_with_missing_field", {}) or None

filtered_results = DTResultFilter(result_set=results).by(
[
filter_by_riskscore(threshold=risk_score),
filter_by_expire_date(date=younger_than_date, lookup_type="before"),
filter_by_expire_date(date=older_than_date, lookup_type="after"),
filter_by_date_updated_after(date=updated_after),
filter_by_field(
field=include_domains_with_missing_field, filter_type="include"
),
filter_by_field(
field=exclude_domains_with_missing_field, filter_type="exclude"
),
filter_by_field(field=include_domains_with_missing_field, filter_type="include"),
filter_by_field(field=exclude_domains_with_missing_field, filter_type="exclude"),
]
)

Expand Down Expand Up @@ -691,9 +641,7 @@ def iris_investigate(
kwargs["search_hash"] = search_hash

if not (kwargs or domains):
raise ValueError(
"Need to define investigation using kwarg filters or domains"
)
raise ValueError("Need to define investigation using kwarg filters or domains")

if isinstance(domains, (list, tuple)):
domains = ",".join(domains)
Expand Down Expand Up @@ -723,12 +671,8 @@ def iris_investigate(
filter_by_expire_date(date=younger_than_date, lookup_type="before"),
filter_by_expire_date(date=older_than_date, lookup_type="after"),
filter_by_date_updated_after(date=updated_after),
filter_by_field(
field=include_domains_with_missing_field, filter_type="include"
),
filter_by_field(
field=exclude_domains_with_missing_field, filter_type="exclude"
),
filter_by_field(field=include_domains_with_missing_field, filter_type="include"),
filter_by_field(field=exclude_domains_with_missing_field, filter_type="exclude"),
]
)

Expand Down Expand Up @@ -768,9 +712,7 @@ def iris_detect_monitors(

if include_counts:
if not datetime_counts_since:
raise ValueError(
"Need to define datetime_counts_since when include_counts is True"
)
raise ValueError("Need to define datetime_counts_since when include_counts is True")
if isinstance(datetime_counts_since, datetime):
datetime_counts_since = str(datetime_counts_since.astimezone())
elif isinstance(datetime_counts_since, str):
Expand Down Expand Up @@ -978,9 +920,7 @@ def iris_detect_watched_domains(
**kwargs,
)

def iris_detect_manage_watchlist_domains(
self, watchlist_domain_ids, state, **kwargs
):
def iris_detect_manage_watchlist_domains(self, watchlist_domain_ids, state, **kwargs):
"""Changes the watch state of a list of domains by their Iris Detect domain ID.

watchlist_domain_ids: List[str]: required. List of Iris Detect domain IDs to manage.
Expand All @@ -999,9 +939,7 @@ def iris_detect_manage_watchlist_domains(
**kwargs,
)

def iris_detect_escalate_domains(
self, watchlist_domain_ids, escalation_type, **kwargs
):
def iris_detect_escalate_domains(self, watchlist_domain_ids, escalation_type, **kwargs):
"""Changes the escalation type of a list of domains by their Iris Detect domain ID.

watchlist_domain_ids: List[str]: required. List of Iris Detect domain IDs to escalate.
Expand Down
Loading