Skip to content

Commit 50b8b84

Browse files
authored
Merge pull request #148 from DomainTools/IDEV-2020-handle-large-response-feeds-improved
IDEV-2020 handle large response feeds improved
2 parents bb0e087 + 0bca37b commit 50b8b84

File tree

11 files changed

+617
-410
lines changed

11 files changed

+617
-410
lines changed

domaintools/api.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from datetime import datetime, timedelta, timezone
22
from hashlib import sha1, sha256, md5
33
from hmac import new as hmac
4+
45
import re
56

67
from domaintools.constants import Endpoint, ENDPOINT_TO_SOURCE_MAP, FEEDS_PRODUCTS_LIST, OutputFormat
@@ -11,6 +12,7 @@
1112
ParsedDomainRdap,
1213
Reputation,
1314
Results,
15+
FeedsResults,
1416
)
1517
from domaintools.filters import (
1618
filter_by_riskscore,
@@ -1065,7 +1067,7 @@ def iris_detect_ignored_domains(
10651067
**kwargs,
10661068
)
10671069

1068-
def nod(self, **kwargs):
1070+
def nod(self, **kwargs) -> FeedsResults:
10691071
"""Returns back list of the newly observed domains feed"""
10701072
validate_feeds_parameters(kwargs)
10711073
endpoint = kwargs.pop("endpoint", Endpoint.FEED.value)
@@ -1078,10 +1080,11 @@ def nod(self, **kwargs):
10781080
f"newly-observed-domains-feed-({source.value})",
10791081
f"v1/{endpoint}/nod/",
10801082
response_path=(),
1083+
cls=FeedsResults,
10811084
**kwargs,
10821085
)
10831086

1084-
def nad(self, **kwargs):
1087+
def nad(self, **kwargs) -> FeedsResults:
10851088
"""Returns back list of the newly active domains feed"""
10861089
validate_feeds_parameters(kwargs)
10871090
endpoint = kwargs.pop("endpoint", Endpoint.FEED.value)
@@ -1094,10 +1097,11 @@ def nad(self, **kwargs):
10941097
f"newly-active-domains-feed-({source})",
10951098
f"v1/{endpoint}/nad/",
10961099
response_path=(),
1100+
cls=FeedsResults,
10971101
**kwargs,
10981102
)
10991103

1100-
def domainrdap(self, **kwargs):
1104+
def domainrdap(self, **kwargs) -> FeedsResults:
11011105
"""Returns changes to global domain registration information, populated by the Registration Data Access Protocol (RDAP)"""
11021106
validate_feeds_parameters(kwargs)
11031107
endpoint = kwargs.pop("endpoint", Endpoint.FEED.value)
@@ -1107,10 +1111,11 @@ def domainrdap(self, **kwargs):
11071111
f"domain-registration-data-access-protocol-feed-({source})",
11081112
f"v1/{endpoint}/domainrdap/",
11091113
response_path=(),
1114+
cls=FeedsResults,
11101115
**kwargs,
11111116
)
11121117

1113-
def domaindiscovery(self, **kwargs):
1118+
def domaindiscovery(self, **kwargs) -> FeedsResults:
11141119
"""Returns new domains as they are either discovered in domain registration information, observed by our global sensor network, or reported by trusted third parties"""
11151120
validate_feeds_parameters(kwargs)
11161121
endpoint = kwargs.pop("endpoint", Endpoint.FEED.value)
@@ -1123,5 +1128,6 @@ def domaindiscovery(self, **kwargs):
11231128
f"real-time-domain-discovery-feed-({source})",
11241129
f"v1/{endpoint}/domaindiscovery/",
11251130
response_path=(),
1131+
cls=FeedsResults,
11261132
**kwargs,
11271133
)

domaintools/base_results.py

Lines changed: 20 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,6 @@ def __init__(
5353
self._response = None
5454
self._items_list = None
5555
self._data = None
56-
self._limit_exceeded = None
57-
self._limit_exceeded_message = None
5856

5957
def _wait_time(self):
6058
if not self.api.rate_limit or not self.product in self.api.limits:
@@ -77,29 +75,6 @@ def _wait_time(self):
7775

7876
return wait_for
7977

80-
def _get_feeds_results_generator(self, parameters, headers):
81-
with Client(verify=self.api.verify_ssl, proxy=self.api.proxy_url, timeout=None) as session:
82-
status_code = None
83-
while status_code != 200:
84-
resp_data = session.get(url=self.url, params=parameters, headers=headers, **self.api.extra_request_params)
85-
status_code = resp_data.status_code
86-
self.setStatus(status_code, resp_data)
87-
88-
# Check limit exceeded here
89-
if "response" in resp_data.text and "limit_exceeded" in resp_data.text:
90-
self._limit_exceeded = True
91-
self._limit_exceeded_message = "limit exceeded"
92-
93-
yield resp_data
94-
95-
if self._limit_exceeded:
96-
raise ServiceException(503, "Limit Exceeded{}".format(self._limit_exceeded_message))
97-
98-
if not self.kwargs.get("sessionID"):
99-
# we'll only do iterative request for queries that has sessionID.
100-
# Otherwise, we will have an infinite request if sessionID was not provided but the required data asked is more than the maximum (1 hour of data)
101-
break
102-
10378
def _get_session_params(self):
10479
parameters = deepcopy(self.kwargs)
10580
parameters.pop("output_format", None)
@@ -118,12 +93,6 @@ def _get_session_params(self):
11893
return {"parameters": parameters, "headers": headers}
11994

12095
def _make_request(self):
121-
if self.product in FEEDS_PRODUCTS_LIST:
122-
session_params = self._get_session_params()
123-
parameters = session_params.get("parameters")
124-
headers = session_params.get("headers")
125-
126-
return self._get_feeds_results_generator(parameters=parameters, headers=headers)
12796

12897
with Client(verify=self.api.verify_ssl, proxy=self.api.proxy_url, timeout=None) as session:
12998
if self.product in [
@@ -138,15 +107,19 @@ def _make_request(self):
138107
patch_data = self.kwargs.copy()
139108
patch_data.update(self.api.extra_request_params)
140109
return session.patch(url=self.url, json=patch_data)
110+
elif self.product in FEEDS_PRODUCTS_LIST:
111+
session_params = self._get_session_params()
112+
parameters = session_params.get("parameters")
113+
headers = session_params.get("headers")
114+
return session.get(url=self.url, params=parameters, headers=headers, **self.api.extra_request_params)
141115
else:
142116
return session.get(url=self.url, params=self.kwargs, **self.api.extra_request_params)
143117

144118
def _get_results(self):
145119
wait_for = self._wait_time()
146120
if self.api.rate_limit and (wait_for is None or self.product == "account-information"):
147121
data = self._make_request()
148-
status_code = data.status_code if self.product not in FEEDS_PRODUCTS_LIST else 200
149-
if status_code == 503: # pragma: no cover
122+
if data.status_code == 503: # pragma: no cover
150123
sleeptime = 60
151124
log.info(
152125
"503 encountered for [%s] - sleeping [%s] seconds before retrying request.",
@@ -166,40 +139,27 @@ def _get_results(self):
166139
def data(self):
167140
if self._data is None:
168141
results = self._get_results()
169-
status_code = results.status_code if self.product not in FEEDS_PRODUCTS_LIST else 200
170-
self.setStatus(status_code, results)
171-
if (
172-
self.kwargs.get("format", "json") == "json"
173-
and self.product not in FEEDS_PRODUCTS_LIST # Special handling of feeds products' data to preserve the result in jsonline format
174-
):
142+
self.setStatus(results.status_code, results)
143+
if self.kwargs.get("format", "json") == "json":
175144
self._data = results.json()
176-
elif self.product in FEEDS_PRODUCTS_LIST:
177-
self._data = results # Uses generator to handle large data results from feeds endpoint
178145
else:
179146
self._data = results.text
180-
limit_exceeded, message = self.check_limit_exceeded()
181147

182-
if limit_exceeded:
183-
self._limit_exceeded = True
184-
self._limit_exceeded_message = message
148+
self.check_limit_exceeded()
185149

186-
if self._limit_exceeded is True:
187-
raise ServiceException(503, "Limit Exceeded{}".format(self._limit_exceeded_message))
188-
else:
189-
return self._data
150+
return self._data
190151

191152
def check_limit_exceeded(self):
192-
if self.product in FEEDS_PRODUCTS_LIST:
193-
# bypass here as this is handled in generator already
194-
return False, ""
195-
196-
if self.kwargs.get("format", "json") == "json" and self.product not in FEEDS_PRODUCTS_LIST:
197-
if "response" in self._data and "limit_exceeded" in self._data["response"] and self._data["response"]["limit_exceeded"] is True:
198-
return True, self._data["response"]["message"]
199-
# TODO: handle html, xml response errors better.
153+
limit_exceeded, reason = False, ""
154+
if isinstance(self._data, dict) and (
155+
"response" in self._data and "limit_exceeded" in self._data["response"] and self._data["response"]["limit_exceeded"] is True
156+
):
157+
limit_exceeded, reason = True, self._data["response"]["message"]
200158
elif "response" in self._data and "limit_exceeded" in self._data:
201-
return True, "limit exceeded"
202-
return False, ""
159+
limit_exceeded = True
160+
161+
if limit_exceeded:
162+
raise ServiceException(503, f"Limit Exceeded {reason}")
203163

204164
@property
205165
def status(self):
@@ -249,7 +209,7 @@ def response(self):
249209
return self._response
250210

251211
def items(self):
252-
return self.response().items() if isinstance(self.response(), dict) else self.response()
212+
return self.response().items()
253213

254214
def emails(self):
255215
"""Find and returns all emails mentioned in the response"""

domaintools/cli/api.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from rich.progress import Progress, SpinnerColumn, TextColumn
1010

1111
from domaintools.api import API
12-
from domaintools.constants import Endpoint, OutputFormat, FEEDS_PRODUCTS_LIST
12+
from domaintools.constants import Endpoint, FEEDS_PRODUCTS_LIST, OutputFormat
1313
from domaintools.cli.utils import get_file_extension
1414
from domaintools.exceptions import ServiceException
1515
from domaintools._version import current as version
@@ -111,8 +111,7 @@ def _get_formatted_output(cls, cmd_name: str, response, out_format: str = "json"
111111
if cmd_name in ("available_api_calls",):
112112
return "\n".join(response)
113113
if response.product in FEEDS_PRODUCTS_LIST:
114-
return "\n".join([data.text for data in response])
115-
114+
return "\n".join([data for data in response.response()])
116115
return str(getattr(response, out_format) if out_format != "list" else response.as_list())
117116

118117
@classmethod

domaintools/results.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
from ordereddict import OrderedDict
1212

1313
from itertools import zip_longest
14+
from typing import Generator
15+
1416
from domaintools_async import AsyncResults as Results
1517

1618

@@ -141,3 +143,28 @@ def flattened(self):
141143
flat[f"contact_{contact_key}_{i}"] = " | ".join(contact_value) if type(contact_value) in (list, tuple) else contact_value
142144

143145
return flat
146+
147+
148+
class FeedsResults(Results):
149+
"""Returns the generator for feeds results"""
150+
151+
def response(self) -> Generator:
152+
status_code = None
153+
while status_code != 200:
154+
resp_data = self.data()
155+
status_code = self.status
156+
yield resp_data
157+
158+
self._data = None # clear the data here
159+
if not self.kwargs.get("sessionID"):
160+
# we'll only do iterative request for queries that has sessionID.
161+
# Otherwise, we will have an infinite request if sessionID was not provided but the required data asked is more than the maximum (1 hour of data)
162+
break
163+
164+
def data(self):
165+
results = self._get_results()
166+
self.setStatus(results.status_code, results)
167+
self._data = results.text
168+
self.check_limit_exceeded()
169+
170+
return self._data

domaintools_async/__init__.py

Lines changed: 10 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@
22

33
import asyncio
44

5+
from copy import deepcopy
56
from httpx import AsyncClient
67

78
from domaintools.base_results import Results
8-
from domaintools.constants import FEEDS_PRODUCTS_LIST
9-
from domaintools.exceptions import ServiceUnavailableException, ServiceException
9+
from domaintools.constants import FEEDS_PRODUCTS_LIST, OutputFormat, HEADER_ACCEPT_KEY_CSV_FORMAT
10+
from domaintools.exceptions import ServiceUnavailableException
1011

1112

1213
class _AIter(object):
@@ -41,26 +42,6 @@ class AsyncResults(Results):
4142
def __await__(self):
4243
return self.__awaitable__().__await__()
4344

44-
async def _get_feeds_async_results_generator(self, session, parameters, headers):
45-
status_code = None
46-
while status_code != 200:
47-
resp_data = await session.get(url=self.url, params=parameters, headers=headers, **self.api.extra_request_params)
48-
status_code = resp_data.status_code
49-
self.setStatus(status_code, resp_data)
50-
51-
# Check limit exceeded here
52-
if "response" in resp_data.text and "limit_exceeded" in resp_data.text:
53-
self._limit_exceeded = True
54-
self._limit_exceeded_message = "limit exceeded"
55-
yield resp_data
56-
57-
if self._limit_exceeded:
58-
raise ServiceException(503, "Limit Exceeded{}".format(self._limit_exceeded_message))
59-
if not self.kwargs.get("sessionID"):
60-
# we'll only do iterative request for queries that has sessionID.
61-
# Otherwise, we will have an infinite request if sessionID was not provided but the required data asked is more than the maximum (1 hour of data)
62-
break
63-
6445
async def _make_async_request(self, session):
6546
if self.product in ["iris-investigate", "iris-enrich", "iris-detect-escalate-domains"]:
6647
post_data = self.kwargs.copy()
@@ -71,29 +52,24 @@ async def _make_async_request(self, session):
7152
patch_data.update(self.api.extra_request_params)
7253
results = await session.patch(url=self.url, json=patch_data)
7354
elif self.product in FEEDS_PRODUCTS_LIST:
74-
generator_params = self._get_session_params()
75-
parameters = generator_params.get("parameters")
76-
headers = generator_params.get("headers")
77-
results = await self._get_feeds_async_results_generator(session=session, parameters=parameters, headers=headers)
55+
session_params = self._get_session_params()
56+
parameters = session_params.get("parameters")
57+
headers = session_params.get("headers")
58+
results = await session.get(url=self.url, params=parameters, headers=headers, **self.api.extra_request_params)
7859
else:
7960
results = await session.get(url=self.url, params=self.kwargs, **self.api.extra_request_params)
8061
if results:
81-
status_code = results.status_code if self.product not in FEEDS_PRODUCTS_LIST else 200
82-
self.setStatus(status_code, results)
62+
self.setStatus(results.status_code, results)
8363
if self.kwargs.get("format", "json") == "json":
8464
self._data = results.json()
85-
elif self.product in FEEDS_PRODUCTS_LIST:
86-
self._data = results # Uses generator to handle large data results from feeds endpoint
8765
else:
8866
self._data = results.text()
89-
limit_exceeded, message = self.check_limit_exceeded()
9067

91-
if limit_exceeded:
92-
self._limit_exceeded = True
93-
self._limit_exceeded_message = message
68+
self.check_limit_exceeded()
9469

9570
async def __awaitable__(self):
9671
if self._data is None:
72+
9773
async with AsyncClient(verify=self.api.verify_ssl, proxy=self.api.proxy_url, timeout=None) as session:
9874
wait_time = self._wait_time()
9975
if wait_time is None and self.api:

0 commit comments

Comments
 (0)