Skip to content

Commit aa34754

Browse files
committed
Factor out device-to-court mapping
1 parent 810ff87 commit aa34754

File tree

5 files changed

+164
-63
lines changed

5 files changed

+164
-63
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ write_to = "_version.py"
1313

1414
[project]
1515
name = "tptools"
16-
version = "0.6.6"
16+
version = "0.6.7"
1717
authors = [{ name = "martin f. krafft", email = "[email protected]" }]
1818
description = "A set of tools to export data from with TournamentSoftware"
1919
readme = "README.md"

tests/test_devcourtmap.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
from io import BytesIO
2+
3+
import pytest
4+
5+
from tptools.court import Court
6+
from tptools.devcourtmap import DeviceCourtMap
7+
8+
9+
@pytest.fixture
10+
def tomldevmap() -> BytesIO:
11+
devmap = {
12+
"192.0.2.1": "C1",
13+
"192.0.2.2": "C07",
14+
}
15+
toml = "\n".join([f'{ip} = "{court}"' for ip, court in devmap.items()])
16+
return BytesIO(toml.encode())
17+
18+
19+
def test_toml_reader(tomldevmap: BytesIO) -> None:
20+
devmap = DeviceCourtMap.read_toml_devmap(tomldevmap)
21+
assert "192.0.2.1" in devmap
22+
assert devmap["192.0.2.2"] == "C07"
23+
24+
25+
@pytest.mark.parametrize(
26+
"input, exp",
27+
[
28+
("C1", (None, 1)),
29+
("2-C1", (2, 1)),
30+
("-3-C4", (3, 4)),
31+
("C01", (None, 1)),
32+
("C42", (None, 42)),
33+
("c2", (None, 2)),
34+
("court3", (None, 3)),
35+
("Court4", (None, 4)),
36+
("COURT5", (None, 5)),
37+
("Court 6", (None, 6)),
38+
("Court 08", (None, 8)),
39+
("C 7", None),
40+
("Cou7", None),
41+
],
42+
)
43+
def test_name_normaliser(input: str, exp: tuple[int | None, int] | None) -> None:
44+
assert DeviceCourtMap.normalise_court_name_for_matching(input) == exp
45+
46+
47+
@pytest.fixture
48+
def devcourtmap(tomldevmap: BytesIO) -> DeviceCourtMap:
49+
return DeviceCourtMap(tomldevmap)
50+
51+
52+
@pytest.mark.parametrize(
53+
"ip, text", [("192.0.2.1", "C1"), ("192.0.2.2", "C07"), ("192.0.2.3", None)]
54+
)
55+
def test_find_match(devcourtmap: DeviceCourtMap, ip: str, text: str | None) -> None:
56+
assert devcourtmap.find_match_for_ip(ip) == text
57+
58+
59+
@pytest.mark.parametrize(
60+
"ip, court",
61+
[("192.0.2.1", "court1"), ("192.0.2.2", "court2"), ("192.0.2.3", "court3")],
62+
)
63+
def test_find_court(
64+
devcourtmap: DeviceCourtMap, ip: str, court: str, court1: Court, court2: Court
65+
) -> None:
66+
cm = {"court1": court1, "court2": court2}
67+
assert devcourtmap.find_court_for_ip(ip, cm.values()) == cm.get(court)

tptools/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from .court import Court, CourtSelectionParams
2+
from .devcourtmap import DeviceCourtMap
23
from .draw import Draw, Event, Stage
34
from .drawtype import DrawType
45
from .entry import Entry, Player
@@ -10,6 +11,7 @@
1011
__all__ = [
1112
"Court",
1213
"CourtSelectionParams",
14+
"DeviceCourtMap",
1315
"Draw",
1416
"DrawType",
1517
"Entry",

tptools/devcourtmap.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import logging
2+
import re
3+
import tomllib
4+
from collections.abc import Iterable
5+
from typing import IO, Never
6+
7+
from tptools.court import Court
8+
from tptools.util import flatten_dict
9+
10+
logger = logging.getLogger(__name__)
11+
12+
13+
class DeviceCourtMap:
14+
def __init__(self, tomlfile: IO[bytes] | None = None) -> None:
15+
self._devmap: dict[str, int | str] = (
16+
{} if tomlfile is None else self.read_toml_devmap(tomlfile)
17+
)
18+
19+
@staticmethod
20+
def read_toml_devmap(tomlfile: IO[bytes]) -> dict[str, int | str] | Never:
21+
devmap: dict[str, int | str] = tomllib.load(tomlfile)
22+
23+
# TOML turns a key like 172.21.22.1 into a nested dict {"172":{"21":{"22"…}…}…}
24+
# so for convenience, flatten this:
25+
return flatten_dict(devmap)
26+
27+
@staticmethod
28+
def normalise_court_name_for_matching(
29+
courtname: str,
30+
) -> tuple[int | None, int] | None:
31+
m = re.match(
32+
r"(?:-?(?P<loc>\d+)-)?c(?:ourt *)?0*(?P<court>\d+)",
33+
courtname,
34+
flags=re.IGNORECASE,
35+
)
36+
if m:
37+
return int(loc) if (loc := m["loc"]) is not None else None, int(m["court"])
38+
else:
39+
return None
40+
41+
def find_court_for_ip(
42+
self, clientip: str, courts: Iterable[Court] | None = None
43+
) -> Court | None:
44+
courtname = None
45+
if (courtname := self._devmap.get(clientip)) is not None:
46+
logger.debug(f"Device at IP {clientip} might be on {courtname}")
47+
for court in courts or []:
48+
# TODO: can we do better than this to identify the court when we are
49+
# given a string that might not be what the current courtnamepolicy
50+
# returns, or an ID?
51+
if isinstance(courtname, int):
52+
if courtname == court.id:
53+
return court
54+
55+
else:
56+
term = self.normalise_court_name_for_matching(courtname)
57+
if term is not None:
58+
comps = (
59+
[]
60+
if court.location is None
61+
else [
62+
f"{court.location.id}-{court.name}",
63+
f"{court.location.id}-{court}",
64+
]
65+
)
66+
67+
for comparename in (
68+
*comps,
69+
court.name,
70+
str(court),
71+
):
72+
comp = self.normalise_court_name_for_matching(comparename)
73+
if (
74+
comp is not None
75+
and (term[0] is None and term[1] == comp[1])
76+
or (term == comp)
77+
):
78+
return court
79+
80+
logger.debug(f"No court found in devmap for device with IP {clientip}")
81+
return None
82+
83+
def find_match_for_ip(self, clientip: str) -> str | None:
84+
text = self._devmap.get(clientip)
85+
return str(text) if text is not None else None

tptools/tpsrv/squoresrv.py

Lines changed: 9 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from tptools import (
3232
Court,
3333
CourtSelectionParams,
34+
DeviceCourtMap,
3435
Draw,
3536
Entry,
3637
MatchSelectionParams,
@@ -59,7 +60,6 @@
5960
)
6061
from tptools.namepolicy.policybase import RegexpSubstTuple
6162
from tptools.util import (
62-
flatten_dict,
6363
normalise_dict_values_for_query_string,
6464
silence_logger,
6565
)
@@ -413,93 +413,40 @@ def get_config(
413413

414414
def get_dev_map(
415415
path: Annotated[pathlib.Path, Depends(get_devmap_path)],
416-
) -> dict[str, int | str] | Never:
417-
devmap: dict[str, int | str] = {}
418-
416+
) -> DeviceCourtMap:
419417
try:
420418
with open(path, "rb") as f:
421-
devmap = tomllib.load(f)
419+
return DeviceCourtMap(f)
422420

423421
except FileNotFoundError:
424422
logger.warning(f"Squore device to court map not found at {path}, ignoring…")
425423

426-
# TOML turns a key like 172.21.22.1 into a nested dict {"172":{"21":{"22"…}
427-
# so for convenience, flatten this:
428-
return flatten_dict(devmap)
424+
return DeviceCourtMap()
429425

430426

431427
# }}}
432428

433429
# {{{ Court name to dev/mirror
434430

435431

436-
def _normalise_court_name_for_matching(courtname: str) -> tuple[int | None, str] | None:
437-
m = re.match(
438-
r"(?:-?(?P<loc>\d+)-)?c(?:ourt *)?0*(?P<court>\d+)",
439-
courtname,
440-
flags=re.IGNORECASE,
441-
)
442-
if m:
443-
return int(loc) if (loc := m["loc"]) is not None else None, m["court"]
444-
else:
445-
return None
446-
447-
448432
def get_court_for_dev(
449-
dev_map: Annotated[dict[str, int | str], Depends(get_dev_map)],
433+
dev_map: Annotated[DeviceCourtMap, Depends(get_dev_map)],
450434
courts: Annotated[list[Court], Depends(get_courts)],
451435
clientip: Annotated[str, Depends(get_remote)],
452436
) -> Court | None:
453-
courtname = None
454-
if (courtname := dev_map.get(clientip)) is not None:
455-
logger.debug(f"Device at IP {clientip} might want feed for {courtname}")
456-
for court in courts:
457-
# TODO: can we do better than this to identify the court when we are
458-
# given a string that might not be what the current courtnamepolicy
459-
# returns, or an ID?
460-
if isinstance(courtname, int):
461-
if courtname == court.id:
462-
return court
463-
464-
else:
465-
term = _normalise_court_name_for_matching(courtname)
466-
if term is not None:
467-
comps = (
468-
[]
469-
if court.location is None
470-
else [
471-
f"{court.location.id}-{court.name}",
472-
f"{court.location.id}-{court}",
473-
]
474-
)
475-
476-
for comparename in (
477-
*comps,
478-
court.name,
479-
str(court),
480-
):
481-
comp = _normalise_court_name_for_matching(comparename)
482-
if (
483-
comp is not None
484-
and (term[0] is None and term[1] == comp[1])
485-
or (term == comp)
486-
):
487-
return court
488-
489-
logger.debug(f"No court found in devmap for device with IP {clientip}")
490-
return None
437+
return dev_map.find_court_for_ip(clientip, courts=courts)
491438

492439

493440
def get_mirror_for_dev(
494-
dev_map: Annotated[dict[str, int | str], Depends(get_dev_map)],
441+
dev_map: Annotated[DeviceCourtMap, Depends(get_dev_map)],
495442
clientip: Annotated[str, Depends(get_remote)],
496443
) -> str | None:
497444
othername = None
498-
if (othername := dev_map.get(clientip)) is not None and re.fullmatch(
445+
if (othername := dev_map.find_match_for_ip(clientip)) is not None and re.fullmatch(
499446
r"\w{6}-\d+-.+", othername := str(othername)
500447
):
501448
logger.debug(f"Device at IP {clientip} wants to be mirror for {othername}")
502-
return othername # type: ignore[return-value]
449+
return othername
503450

504451
logger.debug(f"No mirror device found in devmap for device with IP {clientip}")
505452
return None

0 commit comments

Comments
 (0)