Skip to content

Commit 762d7be

Browse files
authored
Fix domain resolver error, add resolve status and add tests (#808)
* Fix domain resolver error, add resolve status and add tests * Remove unused parameter in domain resolver * Update changelog
1 parent 42bc9b2 commit 762d7be

File tree

3 files changed

+106
-26
lines changed

3 files changed

+106
-26
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
- Fixed typo and broken link in documentation
2121
- Fixed assign_callback error in confluentkafka input
2222
- Fixed error logging in ` _get_configuration`, which caused the github checks to fail
23+
- Fix domain resolver errors for invalid domains
2324

2425
## 16.1.0
2526
### Deprecations

logprep/processor/domain_resolver/processor.py

Lines changed: 72 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,11 @@
3232
import datetime
3333
import logging
3434
import socket
35+
from enum import IntEnum
3536
from functools import cached_property
3637
from multiprocessing import context
3738
from multiprocessing.pool import ThreadPool
38-
from typing import Optional
39+
from typing import Optional, Any
3940
from urllib.parse import urlsplit
4041

4142
from attr import define, field, validators
@@ -50,6 +51,19 @@
5051
logger = logging.getLogger("DomainResolver")
5152

5253

54+
class ResolveStatus(IntEnum):
55+
"""Status of resolving domains"""
56+
57+
SUCCESS = 0
58+
"""Resolving the domain was successful"""
59+
TIMEOUT = 1
60+
"""Domain resolver timeout while trying to resolve the domain (this is not a socket timeout)"""
61+
INVALID = 2
62+
"""The resolved domain was invalid and thus not resolved"""
63+
UNKNOWN = 3
64+
"""Tried to resolve the domain, but the domain is unknown"""
65+
66+
5367
class DomainResolver(Processor):
5468
"""Resolve domains."""
5569

@@ -120,10 +134,24 @@ class Metrics(Processor.Metrics):
120134
)
121135
)
122136
"""Number of timeouts that occurred while resolving a url"""
137+
invalid_domains: CounterMetric = field(
138+
factory=lambda: CounterMetric(
139+
description="Number of invalid domains",
140+
name="domain_resolver_invalid_domains",
141+
)
142+
)
143+
"""Number of invalid domains that were trying to be resolved"""
144+
unknown_domains: CounterMetric = field(
145+
factory=lambda: CounterMetric(
146+
description="Number of unknown domains",
147+
name="domain_resolver_unknown_domains",
148+
)
149+
)
150+
"""Number of unknown domains that were trying to be resolved"""
123151

124152
__slots__ = ["_domain_ip_map"]
125153

126-
_domain_ip_map: dict
154+
_domain_ip_map: dict[str, Optional[str]]
127155

128156
rule_class = DomainResolverRule
129157

@@ -132,19 +160,19 @@ def __init__(self, name: str, configuration: Processor.Config):
132160
self._domain_ip_map = {}
133161

134162
@cached_property
135-
def _cache(self):
163+
def _cache(self) -> Cache:
136164
cache_max_timedelta = datetime.timedelta(days=self._config.max_caching_days)
137165
return Cache(max_items=self._config.max_cached_domains, max_timedelta=cache_max_timedelta)
138166

139167
@cached_property
140-
def _hasher(self):
168+
def _hasher(self) -> SHA256Hasher:
141169
return SHA256Hasher()
142170

143171
@cached_property
144-
def _thread_pool(self):
172+
def _thread_pool(self) -> ThreadPool:
145173
return ThreadPool(processes=1)
146174

147-
def _apply_rules(self, event, rule):
175+
def _apply_rules(self, event: dict[str, Any], rule: DomainResolverRule) -> None:
148176
source_field = rule.source_fields[0]
149177
domain_or_url_str = get_dotted_field_value(event, source_field)
150178
if not domain_or_url_str:
@@ -158,37 +186,55 @@ def _apply_rules(self, event, rule):
158186
return
159187
self.metrics.total_urls += 1
160188
if self._config.cache_enabled:
161-
hash_string = self._hasher.hash_str(domain, salt=self._config.hash_salt)
162-
requires_storing = self._cache.requires_storing(hash_string)
163-
if requires_storing:
164-
resolved_ip = self._resolve_ip(domain, hash_string)
165-
self._domain_ip_map.update({hash_string: resolved_ip})
166-
self.metrics.resolved_new += 1
167-
else:
168-
resolved_ip = self._domain_ip_map.get(hash_string)
169-
self.metrics.resolved_cached += 1
170-
self._add_resolve_infos_to_event(event, rule, resolved_ip)
171-
if self._config.debug_cache:
172-
self._store_debug_infos(event, requires_storing)
189+
self._resolve_with_cache(domain, event, rule)
173190
else:
174-
resolved_ip = self._resolve_ip(domain)
191+
resolved_ip, _ = self._resolve_ip(domain)
175192
self._add_resolve_infos_to_event(event, rule, resolved_ip)
176193

177-
def _add_resolve_infos_to_event(self, event, rule, resolved_ip):
194+
def _resolve_with_cache(
195+
self, domain: str, event: dict[str, Any], rule: DomainResolverRule
196+
) -> None:
197+
hash_string = self._hasher.hash_str(domain, salt=self._config.hash_salt)
198+
requires_storing = self._cache.requires_storing(hash_string)
199+
if requires_storing:
200+
resolved_ip, status = self._resolve_ip(domain)
201+
if status in (ResolveStatus.SUCCESS, ResolveStatus.UNKNOWN, ResolveStatus.TIMEOUT):
202+
self._domain_ip_map.update({hash_string: resolved_ip})
203+
self.metrics.resolved_new += 1
204+
else:
205+
resolved_ip = self._domain_ip_map.get(hash_string)
206+
self.metrics.resolved_cached += 1
207+
self._add_resolve_infos_to_event(event, rule, resolved_ip)
208+
209+
if self._config.debug_cache:
210+
self._store_debug_infos(event, requires_storing)
211+
212+
def _add_resolve_infos_to_event(
213+
self, event: dict[str, Any], rule: DomainResolverRule, resolved_ip: Optional[str]
214+
) -> None:
178215
if resolved_ip:
179216
self._write_target_field(event, rule, resolved_ip)
180217

181-
def _resolve_ip(self, domain, hash_string=None):
218+
def _resolve_ip(self, domain: str) -> tuple[Optional[str], int]:
219+
"""Resolve domain with timeout.
220+
221+
Assumes socket default timeout is None and relies on threading to create a timeout.
222+
"""
182223
try:
183224
result = self._thread_pool.apply_async(socket.gethostbyname, (domain,))
184225
resolved_ip = result.get(timeout=self._config.timeout)
185-
return resolved_ip
186-
except (context.TimeoutError, OSError):
187-
if hash_string:
188-
self._domain_ip_map[hash_string] = None
226+
return resolved_ip, ResolveStatus.SUCCESS
227+
except ValueError: # Makes no connection so does not need to be cached
228+
self.metrics.invalid_domains += 1
229+
return None, ResolveStatus.INVALID
230+
except context.TimeoutError:
189231
self.metrics.timeouts += 1
232+
return None, ResolveStatus.TIMEOUT
233+
except OSError: # Won't be timeout if default timeout is None
234+
self.metrics.unknown_domains += 1
235+
return None, ResolveStatus.UNKNOWN
190236

191-
def _store_debug_infos(self, event, requires_storing):
237+
def _store_debug_infos(self, event: dict[str, Any], requires_storing: bool) -> None:
192238
event_dbg = {
193239
"resolved_ip_debug": {
194240
"obtained_from_cache": not requires_storing,

tests/unit/processor/domain_resolver/test_domain_resolver.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
# pylint: disable=missing-docstring
22
# pylint: disable=protected-access
33
from copy import deepcopy
4+
from multiprocessing import context
45
from pathlib import Path
56
from unittest import mock
67

78
from logprep.factory import Factory
89
from logprep.processor.base.exceptions import FieldExistsWarning
10+
from logprep.processor.domain_resolver.processor import ResolveStatus
911
from tests.unit.processor.base import BaseProcessorTestCase
1012

1113
REL_TLD_LIST_PATH = "tests/testdata/external/public_suffix_list.dat"
@@ -28,6 +30,8 @@ class TestDomainResolver(BaseProcessorTestCase):
2830
"logprep_domain_resolver_resolved_new",
2931
"logprep_domain_resolver_resolved_cached",
3032
"logprep_domain_resolver_timeouts",
33+
"logprep_domain_resolver_invalid_domains",
34+
"logprep_domain_resolver_unknown_domains",
3135
]
3236

3337
@mock.patch("socket.gethostbyname", return_value="1.2.3.4")
@@ -162,6 +166,35 @@ def test_url_to_ip_resolved_and_added_with_cache_disabled(self, _):
162166
self.object.process(document)
163167
assert document == expected
164168

169+
@mock.patch("socket.gethostbyname", side_effect=UnicodeError("invalid"), return_value="1.2.3.4")
170+
def test_invalid_domain_with_unicode_error_is_resolved_to_none_and_returns_status(self, _):
171+
resolved_ip, status = self.object._resolve_ip("google..invalid.de")
172+
assert resolved_ip is None
173+
assert status is ResolveStatus.INVALID
174+
175+
@mock.patch("socket.gethostbyname", side_effect=context.TimeoutError, return_value="1.2.3.4")
176+
def test_valid_domain_with_timeout_error_is_resolved_to_none_and_returns_status(self, _):
177+
resolved_ip, status = self.object._resolve_ip("google.de")
178+
assert resolved_ip is None
179+
assert status is ResolveStatus.TIMEOUT
180+
181+
@mock.patch("socket.gethostbyname", side_effect=OSError, return_value="1.2.3.4")
182+
def test_unknown_domain_with_os_error_is_resolved_to_none_and_returns_status(self, _):
183+
resolved_ip, status = self.object._resolve_ip("google.de")
184+
assert resolved_ip is None
185+
assert status is ResolveStatus.UNKNOWN
186+
187+
@mock.patch("socket.gethostbyname", return_value="1.2.3.4")
188+
def test_existing_domain_is_resolved_to_and_returns_status(self, _):
189+
resolved_ip, status = self.object._resolve_ip("google.de")
190+
assert resolved_ip == "1.2.3.4"
191+
assert status is ResolveStatus.SUCCESS
192+
193+
def test_empty_domain_is_snot_resolved(self):
194+
document = {"url": " "}
195+
self.object.process(document)
196+
assert document.get("resolved_ip") is None
197+
165198
def test_domain_to_ip_not_resolved(self):
166199
document = {"url": "google.thisisnotavalidtld"}
167200
self.object.process(document)

0 commit comments

Comments
 (0)