Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions checkdmarc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def check_domains(
nameservers: list[str] = None,
resolver: dns.resolver.Resolver = None,
timeout: float = 2.0,
timeout_retries: int = 2,
wait: float = 0.0,
) -> Union[OrderedDict, list[OrderedDict]]:
"""
Expand All @@ -79,6 +80,7 @@ def check_domains(
resolver (dns.resolver.Resolver): A resolver object to use for DNS
requests
timeout (float): number of seconds to wait for an answer from DNS
timeout_retries (int): The number of times to reattempt a query after a timeout
wait (float): number of seconds to wait between processing domains

Returns:
Expand Down Expand Up @@ -139,11 +141,16 @@ def check_domains(
nameservers=nameservers,
resolver=resolver,
timeout=timeout,
timeout_retries=timeout_retries,
)

mta_sts_mx_patterns = None
domain_results["mta_sts"] = check_mta_sts(
domain, nameservers=nameservers, resolver=resolver, timeout=timeout
domain,
nameservers=nameservers,
resolver=resolver,
timeout=timeout,
timeout_retries=timeout_retries,
)
if domain_results["mta_sts"]["valid"]:
mta_sts_mx_patterns = domain_results["mta_sts"]["policy"]["mx"]
Expand All @@ -155,6 +162,7 @@ def check_domains(
nameservers=nameservers,
resolver=resolver,
timeout=timeout,
timeout_retries=timeout_retries,
)

domain_results["spf"] = check_spf(
Expand All @@ -163,6 +171,7 @@ def check_domains(
nameservers=nameservers,
resolver=resolver,
timeout=timeout,
timeout_retries=timeout_retries,
)

domain_results["dmarc"] = check_dmarc(
Expand All @@ -172,10 +181,15 @@ def check_domains(
nameservers=nameservers,
resolver=resolver,
timeout=timeout,
timeout_retries=timeout_retries,
)

domain_results["smtp_tls_reporting"] = check_smtp_tls_reporting(
domain, nameservers=nameservers, resolver=resolver, timeout=timeout
domain,
nameservers=nameservers,
resolver=resolver,
timeout=timeout,
timeout_retries=timeout_retries,
)
if bimi_selector is not None:
domain_results["bimi"] = check_bimi(
Expand All @@ -186,6 +200,7 @@ def check_domains(
nameservers=nameservers,
resolver=resolver,
timeout=timeout,
timeout_retries=timeout_retries,
)

results.append(domain_results)
Expand All @@ -205,6 +220,7 @@ def check_ns(
nameservers: list[str] = None,
resolver: dns.resolver.Resolver = None,
timeout: float = 2.0,
timeout_retries: int = 2,
) -> OrderedDict:
"""
Returns a dictionary of nameservers and warnings or a dictionary with an
Expand Down Expand Up @@ -236,6 +252,7 @@ def check_ns(
nameservers=nameservers,
resolver=resolver,
timeout=timeout,
timeout_retries=timeout_retries,
)
except DNSException as error:
ns_results = OrderedDict([("hostnames", []), ("error", error.__str__())])
Expand Down
8 changes: 8 additions & 0 deletions checkdmarc/_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ def _main():
type=float,
default=2.0,
)
arg_parser.add_argument(
"--timeout-retries",
help="number of times to reattempt a query after a timeout (default 2)",
type=int,
default=2,
)

arg_parser.add_argument(
"-b", "--bimi-selector", default="default", help="the BIMI selector to use"
)
Expand Down Expand Up @@ -137,6 +144,7 @@ def _main():
include_tag_descriptions=args.descriptions,
nameservers=args.nameserver,
timeout=args.timeout,
timeout_retries=args.timeout_retries,
bimi_selector=args.bimi_selector,
wait=args.wait,
)
Expand Down
27 changes: 24 additions & 3 deletions checkdmarc/bimi.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,7 @@ def _query_bimi_record(
nameservers: list[str] = None,
resolver: dns.resolver.Resolver = None,
timeout: float = 2.0,
timeout_retries: int = 2,
):
"""
Queries DNS for a BIMI record
Expand All @@ -661,6 +662,7 @@ def _query_bimi_record(
resolver (dns.resolver.Resolver): A resolver object to use for DNS
requests
timeout (float): number of seconds to wait for a record from DNS
timeout_retries (int): The number of times to reattempt a query after a timeout

Returns:
str: A record string or None
Expand All @@ -674,7 +676,12 @@ def _query_bimi_record(

try:
records = query_dns(
target, "TXT", nameservers=nameservers, resolver=resolver, timeout=timeout
target,
"TXT",
nameservers=nameservers,
resolver=resolver,
timeout=timeout,
timeout_retries=timeout_retries,
)
for record in records:
if record.startswith(txt_prefix):
Expand Down Expand Up @@ -730,6 +737,7 @@ def query_bimi_record(
nameservers: list[str] = None,
resolver: dns.resolver.Resolver = None,
timeout: float = 2.0,
timeout_retries: int = 2,
) -> OrderedDict:
"""
Queries DNS for a BIMI record
Expand All @@ -741,6 +749,7 @@ def query_bimi_record(
resolver (dns.resolver.Resolver): A resolver object to use for DNS
requests
timeout (float): number of seconds to wait for a record from DNS
timeout_retries (int): The number of times to reattempt a query after a timeout

Returns:
OrderedDict: An ``OrderedDict`` with the following keys:
Expand All @@ -765,10 +774,16 @@ def query_bimi_record(
nameservers=nameservers,
resolver=resolver,
timeout=timeout,
timeout_retries=timeout_retries,
)
try:
root_records = query_dns(
domain, "TXT", nameservers=nameservers, resolver=resolver, timeout=timeout
domain,
"TXT",
nameservers=nameservers,
resolver=resolver,
timeout=timeout,
timeout_retries=timeout_retries,
)
for root_record in root_records:
if root_record.startswith("v=BIMI1"):
Expand All @@ -780,7 +795,11 @@ def query_bimi_record(

if record is None and domain != base_domain:
record = _query_bimi_record(
base_domain, nameservers=nameservers, resolver=resolver, timeout=timeout
base_domain,
nameservers=nameservers,
resolver=resolver,
timeout=timeout,
timeout_retries=timeout_retries,
)
location = base_domain
if record is None:
Expand Down Expand Up @@ -993,6 +1012,7 @@ def check_bimi(
nameservers: list[str] = None,
resolver: dns.resolver.Resolver = None,
timeout: float = 2.0,
timeout_retries: int = 2,
) -> OrderedDict:
"""
Returns a dictionary with a parsed BIMI record or an error.
Expand All @@ -1012,6 +1032,7 @@ def check_bimi(
resolver (dns.resolver.Resolver): A resolver object to use for DNS
requests
timeout (float): number of seconds to wait for an answer from DNS
timeout_retries (int): The number of times to reattempt a query after a timeout

Returns:
OrderedDict: An ``OrderedDict`` with the following keys:
Expand Down
Loading