Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions api/desecapi/tests/test_dyndns12update.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,68 @@ def test_update_multiple_v4(self):
self.assertIP(ipv4=new_ip)
self.assertIP(subname="sub", ipv4=new_ip)

def test_update_multiple_with_overwrite(self):
# /nic/update?hostname=sub1.a.io,sub2.a.io,sub3.a.io&myip=1.2.3.4&ipv6=::1&sub2.a.io.ipv6=::2
new_ip4 = "1.2.3.4"
new_ip6 = "::1"
new_ip6_overwrite = "::2"
domain1 = "sub1." + self.my_domain.name
domain2 = "sub2." + self.my_domain.name
domain3 = "sub3." + self.my_domain.name

with self.assertRequests(
self.request_pdns_zone_update(self.my_domain.name),
self.request_pdns_zone_axfr(self.my_domain.name),
):
response = self.client.get(
self.reverse("v1:dyndns12update"),
{
"hostname": f"{domain1},{domain2},{domain3}",
"myip": new_ip4,
"ipv6": new_ip6,
f"myipv6:{domain2}": new_ip6_overwrite,
},
)

self.assertStatus(response, status.HTTP_200_OK)
self.assertEqual(response.data, "good")

self.assertIP(subname="sub1", ipv4=new_ip4, ipv6=new_ip6)
self.assertIP(subname="sub2", ipv4=new_ip4, ipv6=new_ip6_overwrite)
self.assertIP(subname="sub3", ipv4=new_ip4, ipv6=new_ip6)

def test_update_multiple_with_extra(self):
# /nic/update?hostname=sub1.a.io,sub3.a.io&myip=1.2.3.4&ipv6=::1&sub2.a.io.ipv6=::2
old_ip4 = "10.0.0.2"
new_ip4 = "1.2.3.4"
new_ip6 = "::1"
new_ip6_extra = "::2"
domain1 = "sub1." + self.my_domain.name
domain2 = "sub2." + self.my_domain.name
domain3 = "sub3." + self.my_domain.name
self.create_rr_set(self.my_domain, [old_ip4], subname="sub2", type="A", ttl=60)

with self.assertRequests(
self.request_pdns_zone_update(self.my_domain.name),
self.request_pdns_zone_axfr(self.my_domain.name),
):
response = self.client.get(
self.reverse("v1:dyndns12update"),
{
"hostname": f"{domain1},{domain3}",
"myip": new_ip4,
"ipv6": new_ip6,
f"myipv6:{domain2}": new_ip6_extra,
},
)

self.assertStatus(response, status.HTTP_200_OK)
self.assertEqual(response.data, "good")

self.assertIP(subname="sub1", ipv4=new_ip4, ipv6=new_ip6)
self.assertIP(subname="sub2", ipv4=old_ip4, ipv6=new_ip6_extra)
self.assertIP(subname="sub3", ipv4=new_ip4, ipv6=new_ip6)

def test_update_multiple_username_param(self):
# /nic/update?username=a.io,sub.a.io&myip=1.2.3.4
new_ip = "1.2.3.4"
Expand Down Expand Up @@ -403,6 +465,36 @@ def test_update_multiple_with_subnet(self):
self.assertIP(subname="sub1", ipv4="10.1.0.1")
self.assertIP(subname="sub2", ipv4="10.1.0.2")

def test_update_multiple_with_subnet_and_ip_override(self):
# /nic/update?hostname=a.io,b.io&myip=10.1.0.0/16&a.io=192.168.1.1
domain1 = "sub1." + self.my_domain.name
domain2 = "sub2." + self.my_domain.name
self.create_rr_set(
self.my_domain, ["10.0.0.1"], subname="sub1", type="A", ttl=60
)
self.create_rr_set(
self.my_domain, ["10.0.0.2"], subname="sub2", type="A", ttl=60
)

with self.assertRequests(
self.request_pdns_zone_update(self.my_domain.name),
self.request_pdns_zone_axfr(self.my_domain.name),
):
response = self.client.get(
self.reverse("v1:dyndns12update"),
{
"hostname": f"{domain1},{domain2}",
"myip": "10.1.0.0/16",
f"myipv4:{domain1}": "192.168.1.1",
},
)

self.assertStatus(response, status.HTTP_200_OK)
self.assertEqual(response.data, "good")

self.assertIP(subname="sub1", ipv4="192.168.1.1")
self.assertIP(subname="sub2", ipv4="10.1.0.2")

def test_update_multiple_with_one_being_already_up_to_date(self):
# /nic/update?hostname=a.io,sub.a.io&myip=1.2.3.4
new_ip = "1.2.3.4"
Expand Down Expand Up @@ -446,6 +538,20 @@ def test_update_same_domain_twice(self):

self.assertIP(ipv4=new_ip)

def test_update_overwrite_with_invalid_subnet(self):
# /nic/update?hostname=a.io&a.io.myip=1.2.3.4/64
domain1 = self.create_domain(owner=self.owner).name

with self.assertRequests():
response = self.client.get(
self.reverse("v1:dyndns12update"),
{"hostname": f"{domain1}", f"myipv4:{domain1}": "1.2.3.4/64"},
)

self.assertContains(
response, "invalid subnet", status_code=status.HTTP_400_BAD_REQUEST
)

def test_update_multiple_with_invalid_subnet(self):
# /nic/update?hostname=sub1.a.io,sub2.a.io&myip=1.2.3.4/64
domain1 = "sub1." + self.my_domain.name
Expand Down
182 changes: 139 additions & 43 deletions api/desecapi/views/dyndns.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,47 +82,13 @@ def _find_action(self, param_keys, separator) -> UpdateAction:
# Check URL parameters
for param_key in param_keys:
try:
params = set(
filter(
lambda param: separator in param or param in ("", "preserve"),
map(str.strip, self.request.query_params[param_key].split(",")),
)
)
param_value = self.request.query_params[param_key]
except KeyError:
continue
if not params:
continue

try:
(param,) = params # unpacks if params has exactly one element
except ValueError: # more than one element
if params & {"", "preserve"}:
raise ValidationError(
detail=f'IP parameter "{param_key}" cannot have addresses and "preserve" or an empty value at the same time.',
code="inconsistent-parameter",
)
if any("/" in param for param in params):
raise ValidationError(
detail=f'IP parameter "{param_key}" cannot use subnet notation with multiple addresses.',
code="multiple-subnet",
)
else: # one element
match param:
case "":
return SetIPs(ips=[])
case "preserve":
return PreserveIPs()
case str(x) if "/" in x:
try:
subnet = ip_network(param, strict=False)
return UpdateWithSubnet(subnet=subnet)
except ValueError as e:
raise ValidationError(
detail=f'IP parameter "{param_key}" is an invalid subnet: {e}',
code="invalid-subnet",
)

return SetIPs(ips=list(params))
action = self._get_action_from_param(param_key, param_value, separator)
if action is not None:
return action

# Check remote IP address
client_ip = self.request.META.get("REMOTE_ADDR")
Expand All @@ -132,6 +98,73 @@ def _find_action(self, param_keys, separator) -> UpdateAction:
# give up
return SetIPs(ips=[])

@staticmethod
def _get_action_from_param(
param_key: str, param_value: str, separator: str
) -> UpdateAction | None:
"""
Parses a single query parameter value to determine the DynDNS update action.

This function is responsible for interpreting the `param_value` (which can be a single IP,
a comma-separated list of IPs, 'preserve', or a CIDR subnet) and converting it into
a structured UpdateAction dataclass. It also performs validation on the parameter's format.

Args:
param_key: The name of the query parameter (e.g., 'myip', 'myipv4', 'myipv6', or a qname for extra actions).
Used for error messages.
param_value: The string value of the query parameter (e.g., '1.2.3.4', '1.2.3.4,5.6.7.8',
'192.168.1.0/24', 'preserve', or '').
separator: The character used to distinguish IP versions (e.g., '.' for IPv4, ':' for IPv6).

Returns:
An instance of SetIPs, UpdateWithSubnet, PreserveIPs, or None if no valid action can be
derived from the parameter (e.g., an IPv4 address was given, but IPv6 is required by the separator).
Returns SetIPs(ips=[]) if param_value is an empty string.

Raises:
ValidationError: If the parameter value is inconsistent (e.g., 'preserve' with addresses)
or if a subnet is malformed.
"""
params = set(
filter(
lambda param: separator in param or param in ("", "preserve"),
map(str.strip, param_value.split(",")),
)
)
if not params:
return None

try:
(param,) = params # unpacks if params has exactly one element
except ValueError: # more than one element
if params & {"", "preserve"}:
raise ValidationError(
detail=f'IP parameter "{param_key}" cannot have addresses and "preserve" or an empty value at the same time.',
code="inconsistent-parameter",
)
if any("/" in param for param in params):
raise ValidationError(
detail=f'IP parameter "{param_key}" cannot use subnet notation with multiple addresses.',
code="multiple-subnet",
)
else: # one element
match param:
case "":
return SetIPs(ips=[])
case "preserve":
return PreserveIPs()
case str(x) if "/" in x:
try:
subnet = ip_network(param, strict=False)
return UpdateWithSubnet(subnet=subnet)
except ValueError as e:
raise ValidationError(
detail=f'IP parameter "{param_key}" is an invalid subnet: {e}',
code="invalid-subnet",
)

return SetIPs(ips=list(params))

@staticmethod
def _sanitize_qnames(qnames_str) -> set[str]:
qnames = qnames_str.lower().split(",")
Expand Down Expand Up @@ -188,19 +221,52 @@ def qnames(self) -> set[str]:
}
)

@cached_property
def extra_qnames(self) -> dict[str, dict[str, str]]:
"""
Parses query parameters of the form 'myipv4:qname' or 'myipv6:qname'
to extract additional qnames and their associated update arguments.

Returns:
A dictionary where keys are qnames (e.g., 'sub.example.com') and values
are dictionaries mapping RR type ('A' or 'AAAA') to the raw query parameter
value (e.g., {'A': '1.2.3.4,5.6.7.8'} or {'AAAA': 'preserve'}).
Multiple IP values for the same qname/type are concatenated with commas.
"""
qnames = defaultdict(dict)

for param, value in self.request.query_params.items():
if param.startswith("myipv6:"):
type_ = "AAAA"
elif param.startswith("myipv4:"):
type_ = "A"
else:
continue

for qname in self._sanitize_qnames(param.split(":", 1)[1]):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for the reviewer: Since I'm reusing the existing _sanitize_qnames function, this means the new parameters will also support multiple domains, e.g. myipv6:sub1.example.com,sub2.example.com=2001::/64.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine.

The existing logic looks like you're also somehow combining values from multiple parameters whose list of qnames has overlap, such as: &myipv4:a.example,b.example=1.2.3.4&myipv4:b.example=4.3.2.1. If I read your code correctly, we then have `qname['b.example']['A'] = '1.2.3.4,4.3.2.1'. Is that correct?

(I find that rather unexpected, and would think that the last value takes precedence.)

existing = qnames[qname].get(type_)
if existing is not None:
argument = f"{existing},{value}"
else:
argument = value
qnames[qname][type_] = argument

return qnames

@cached_property
def domain(self) -> Domain:
qnames = self.qnames | self.extra_qnames.keys()
qname_qs = (
Domain.objects.filter_qname(qname, owner=self.request.user)
for qname in self.qnames
for qname in qnames
)
domains = (
Domain.objects.none()
.union(*(qs.order_by("-name_length")[:1] for qs in qname_qs), all=True)
.all()
)

if len(domains) != len(self.qnames):
if len(domains) != len(qnames):
metrics.get("desecapi_dynDNS12_domain_not_found").inc()
raise NotFound("nohost")

Expand All @@ -218,6 +284,26 @@ def domain(self) -> Domain:
def subnames(self) -> list[str]:
return [qname.rpartition(f".{self.domain.name}")[0] for qname in self.qnames]

@cached_property
def extra_actions(self) -> dict[tuple[str, str], UpdateAction]:
"""
Converts the raw string arguments from `extra_qnames` into structured `UpdateAction` objects.

Returns:
A dictionary where keys are `(RR_type, subname)` tuples (e.g., ('A', 'sub'))
and values are `UpdateAction` instances (SetIPs, UpdateWithSubnet, PreserveIPs).
"""
return {
(
type_,
qname.rpartition(f".{self.domain.name}")[0],
): self._get_action_from_param(
qname, argument, "." if type_ == "A" else ":"
)
for qname, arguments in self.extra_qnames.items()
for type_, argument in arguments.items()
}

def get_serializer_context(self):
return {
**super().get_serializer_context(),
Expand All @@ -226,8 +312,12 @@ def get_serializer_context(self):
}

def get_queryset(self):
subnames = [
*self.subnames,
*[subname for (type_, subname) in self.extra_actions.keys()],
]
return self.domain.rrset_set.filter(
subname__in=self.subnames, type__in=["A", "AAAA"]
subname__in=subnames, type__in=["A", "AAAA"]
).prefetch_related("records")

@staticmethod
Expand Down Expand Up @@ -261,6 +351,13 @@ def get(self, request, *args, **kwargs) -> Response:
"A": self._find_action(["myip", "myipv4", "ip"], separator="."),
"AAAA": self._find_action(["myipv6", "ipv6", "myip", "ip"], separator=":"),
}
subname_actions = {
(type_, subname): action
for subname in self.subnames
for type_, action in actions.items()
}
for (type_, subname), action in self.extra_actions.items():
subname_actions[(type_, subname)] = action

data = [
{
Expand All @@ -269,8 +366,7 @@ def get(self, request, *args, **kwargs) -> Response:
"ttl": 60,
"records": records,
}
for subname in self.subnames
for type_, action in actions.items()
for (type_, subname), action in subname_actions.items()
if (records := self._get_records(subname_records[subname], action))
is not None
]
Expand Down
6 changes: 6 additions & 0 deletions docs/dyndns/configure.rst
Original file line number Diff line number Diff line change
Expand Up @@ -193,5 +193,11 @@ syntax.) For example, when using ddclient, you can edit ``ddclient.conf`` and se
case the IPv4 address of both ``domain.org`` and ``sub.domain.org`` will be
updated while preserving any (different) IPv6 addresses.

For Fritz!Box devices, an update URL could look like this:
``https://update.dedyn.io/?myipv4=<ipaddr>&myipv6=<ip6addr>&myipv6:host.<domain>=<ip6lanprefix>``
This example updates the main domain with the router's global IPv4 and IPv6
addresses, and additionally updates the subdomain `host.<domain name>` with
an IPv6 LAN prefix.

If you try to update several subdomains by issuing multiple update requests,
your update requests may be refused (see :ref:`rate-limits`).
Loading