-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmain.py
More file actions
1629 lines (1359 loc) · 57.7 KB
/
main.py
File metadata and controls
1629 lines (1359 loc) · 57.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
UCM Name Lookup Service
=======================
A CURRI (Cisco Unified Routing Rules Interface) server that provides
phone number to display name lookup for Cisco Unified Communications
Manager (UCM).
The service:
1. Loads a CSV file mapping phone numbers to display names into memory.
2. Exposes an HTTP/HTTPS POST endpoint that accepts XACML requests from UCM.
3. Parses the calling party number from the XACML request.
4. Looks up the display name in the in-memory directory.
5. Returns an XACML response with a Permit/Continue directive, including
the display name if found. Calls are NEVER rejected.
CURRI Protocol Reference:
https://developer.cisco.com/site/curri/
Usage:
# Development (HTTP only):
python main.py
# Production with Gunicorn (HTTP):
gunicorn -w 4 --threads 4 --worker-class gthread \\
-b 0.0.0.0:80 main:app
# Production with Gunicorn (HTTPS):
gunicorn -w 4 --threads 4 --worker-class gthread \\
-b 0.0.0.0:443 \\
--certfile=server.crt --keyfile=server.key main:app
Configuration:
Non-sensitive settings are defined in a YAML configuration file
(default: config.yaml). The file supports multiple UCM cluster
definitions, each with its own IP allow-list, CA certificate, and
certificate subject validation rules.
See config.yaml for the full schema and examples.
Environment Variables:
CONFIG_FILE - Path to the YAML configuration file
(default: config.yaml)
LOG_LEVEL - Logging level override: DEBUG, INFO, WARNING, ERROR
(overrides the value in config.yaml when set)
"""
import csv
import hashlib
import hmac
import ipaddress
import logging
import logging.handlers
import os
import secrets
import ssl
import sys
import threading
from dataclasses import dataclass, field
from xml.sax.saxutils import escape as xml_escape
from concurrent_log_handler import ConcurrentRotatingFileHandler
import yaml
from defusedxml import ElementTree as ET
from flask import Flask, Response, request
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
# Path to the YAML configuration file.
CONFIG_FILE = os.environ.get("CONFIG_FILE", "config.yaml")
def _load_config(config_path: str) -> dict:
"""Load configuration from a YAML file.
Uses ``yaml.safe_load`` to prevent unsafe deserialization. Returns
an empty dict when the file does not exist so that the application
can start with sensible defaults.
"""
if not os.path.isfile(config_path):
# Cannot use logger yet — logging is configured after this.
print(
f"[WARNING] Config file not found: {config_path} — "
"using built-in defaults"
)
return {}
with open(config_path, "r", encoding="utf-8") as fh:
data = yaml.safe_load(fh)
if data is None:
return {}
if not isinstance(data, dict):
print(
f"[ERROR] Config file {config_path} must contain a YAML "
"mapping at the top level"
)
sys.exit(1)
return data
_config = _load_config(CONFIG_FILE)
# Path to the CSV file containing phone number -> display name mappings.
CSV_FILE_PATH = _config.get("csv_file_path", "phone_directory.csv")
# Flask development server bind address and port.
FLASK_HOST = _config.get("flask_host", "0.0.0.0")
FLASK_PORT = int(_config.get("flask_port", 5000))
# Optional TLS certificate and key for HTTPS in development mode.
# For production, configure TLS through Gunicorn's --certfile / --keyfile.
TLS_CERT_FILE = _config.get("tls_cert_file")
TLS_KEY_FILE = _config.get("tls_key_file")
# Logging verbosity level. The LOG_LEVEL environment variable, when
# set, takes precedence over the value in the config file.
LOG_LEVEL = os.environ.get(
"LOG_LEVEL", _config.get("log_level", "INFO")
).upper()
# Insecure mode flag. When True, the application is allowed to run
# without TLS certificates (plain HTTP). When False (the default),
# the application refuses to start unless TLS is properly configured.
INSECURE_MODE = _config.get("insecure_mode", False) is True
# PII obfuscation flag. When True, phone numbers and display names are
# replaced with a salted HMAC-SHA256 hash in log output so that operators
# can correlate identical values without seeing the actual data.
OBFUSCATE_PII = _config.get("obfuscate_pii", False) is True
# Per-startup random salt for PII hashing. In production (Gunicorn),
# the salt is generated once in the master process and passed to workers
# via the _PII_SALT environment variable so all workers produce
# consistent hashes. In development (Flask dev server), a fresh salt
# is generated locally. The salt is never logged or persisted. This
# prevents precomputed rainbow-table attacks against the small keyspace
# of phone numbers. Because the salt changes on every restart, hashes
# from different process lifetimes are not comparable.
_pii_salt_hex = os.environ.get("_PII_SALT", "") if OBFUSCATE_PII else ""
_PII_SALT: bytes = (
bytes.fromhex(_pii_salt_hex) if _pii_salt_hex
else secrets.token_bytes(32) if OBFUSCATE_PII
else b""
)
def _obfuscate_pii(value: str | None) -> str | None:
"""Return a privacy-safe representation of *value* for logging.
When ``OBFUSCATE_PII`` is enabled, the value is hashed with
HMAC-SHA256 using a per-startup random salt and the first 24 hex
characters are returned wrapped in ``{! … !}`` delimiters. This
allows log readers to recognise when two values are identical
within the same process lifetime without revealing the underlying
data.
The random salt is generated from a CSPRNG at startup and kept
only in memory. It is never logged or persisted, which prevents
rainbow-table reversal of hashed phone numbers.
When obfuscation is disabled the original value is returned unchanged.
``None`` and empty strings are passed through as-is.
"""
if not OBFUSCATE_PII or not value:
return value
digest = hmac.new(
_PII_SALT, value.encode("utf-8"), hashlib.sha256
).hexdigest()[:24]
return f"{{! {digest} !}}"
# Optional directory for rotating log files. When set, the application
# writes log files here in addition to stdout/stderr.
LOG_DIR = _config.get("log_dir")
LOG_MAX_BYTES = int(_config.get("log_max_bytes", 10 * 1024 * 1024)) # 10 MB
LOG_BACKUP_COUNT = int(_config.get("log_backup_count", 5))
# XACML 2.0 namespace used by UCM in CURRI requests.
XACML_NS = "urn:oasis:names:tc:xacml:2.0:context:schema:os"
# Cisco CURRI XACML attribute IDs for call information.
CURRI_ATTR_CALLING_NUMBER = "urn:Cisco:uc:1.0:callingnumber"
CURRI_ATTR_CALLED_NUMBER = "urn:Cisco:uc:1.0:callednumber"
CURRI_ATTR_TRANSFORMED_CGPN = "urn:Cisco:uc:1.0:transformedcgpn"
CURRI_ATTR_TRANSFORMED_CDPN = "urn:Cisco:uc:1.0:transformedcdpn"
# ---------------------------------------------------------------------------
# Logging Setup
# ---------------------------------------------------------------------------
_log_fmt = logging.Formatter(
"%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
_log_level = getattr(logging, LOG_LEVEL, logging.INFO)
# Console handler (always active — supports `docker logs`).
_console_handler = logging.StreamHandler()
_console_handler.setFormatter(_log_fmt)
logging.root.setLevel(_log_level)
logging.root.addHandler(_console_handler)
# Rotating file handler (active when log_dir is configured).
if LOG_DIR:
os.makedirs(LOG_DIR, exist_ok=True)
_file_handler = ConcurrentRotatingFileHandler(
os.path.join(LOG_DIR, "app.log"),
maxBytes=LOG_MAX_BYTES,
backupCount=LOG_BACKUP_COUNT,
)
_file_handler.setFormatter(_log_fmt)
logging.root.addHandler(_file_handler)
logger = logging.getLogger("ucm_name_lookup")
# ---------------------------------------------------------------------------
# Insecure Mode Warning
# ---------------------------------------------------------------------------
_INSECURE_BANNER = (
"\n"
"########################################################################\n"
"# #\n"
"# WARNING: INSECURE MODE ENABLED #\n"
"# #\n"
"# This service is running WITHOUT TLS encryption. All traffic, #\n"
"# including CURRI requests and responses, is transmitted in plain #\n"
"# text and is vulnerable to eavesdropping and tampering. #\n"
"# #\n"
"# This mode is intended for development and testing ONLY. #\n"
"# Do NOT use insecure mode in production. #\n"
"# #\n"
"# To enable TLS, configure tls_cert_file and tls_key_file in #\n"
"# config.yaml and set insecure_mode to false (or remove it). #\n"
"# #\n"
"########################################################################"
)
_INSECURE_HOURLY_MSG = (
"SECURITY WARNING: This service is running in INSECURE MODE without "
"TLS encryption. All traffic is unencrypted. Configure TLS certificates "
"and disable insecure_mode for production use."
)
_insecure_mode_timer: threading.Timer | None = None
def _start_insecure_mode_warning() -> None:
"""Start a repeating timer that logs an insecure mode warning every hour.
The timer runs as a daemon thread so it does not prevent the process
from exiting.
"""
global _insecure_mode_timer
def _warn_and_reschedule() -> None:
logger.warning(_INSECURE_HOURLY_MSG)
_schedule_next()
def _schedule_next() -> None:
global _insecure_mode_timer
_insecure_mode_timer = threading.Timer(3600, _warn_and_reschedule)
_insecure_mode_timer.daemon = True
_insecure_mode_timer.start()
_schedule_next()
# ---------------------------------------------------------------------------
# UCM Cluster Definitions
# ---------------------------------------------------------------------------
@dataclass
class ClusterConfig:
"""Access rules for a single UCM cluster.
A request is authorized by a cluster when **all** of the cluster's
active rules match:
* The client IP must fall within at least one network in
``allowed_networks``. An empty list matches **no** IPs.
* The client certificate must contain at least one CN or SAN
in ``allowed_subjects``. An empty set matches **no**
subjects.
In **insecure mode only**, when a cluster defines neither
``allowed_subjects`` nor ``ca_file``, the certificate subject
check is skipped and only IP-based access control is enforced.
In secure mode, every cluster must have certificate
infrastructure or the service refuses to start.
Every active rule uses deny-by-default semantics: configuring a
rule but leaving it empty blocks all traffic for that criterion.
"""
name: str
allowed_networks: list[
ipaddress.IPv4Network | ipaddress.IPv6Network
] = field(default_factory=list)
allowed_subjects: set[str] = field(default_factory=set)
ca_file: str | None = None
def _validate_ca_cert(
ca_file: str, cluster_name: str
) -> bool:
"""Validate that a CA file contains a proper CA certificate.
The file **must** contain a CA certificate (``CA:TRUE``). If a
leaf certificate is detected instead, an error is logged and the
application exits — the user must provide the CA certificate that
signed the UCM's identity certificate, not the identity
certificate itself.
Returns ``True`` if the CA cert is valid, ``False`` otherwise.
The TLS layer (Gunicorn with ``CERT_REQUIRED``) handles full
chain validation at connection time.
.. note::
Leaf-certificate detection uses the private CPython API
``ssl._ssl._test_decode_cert``. There is no public Python
API to decode a PEM certificate without loading it into a
trust store. If this internal changes in a future CPython
release the leaf-detection heuristic will need updating.
"""
# --- Try loading as a CA certificate ---
try:
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.load_verify_locations(ca_file)
ca_certs = ctx.get_ca_certs()
if ca_certs:
return True
except (ssl.SSLError, OSError) as exc:
logger.error(
"Cluster '%s': failed to load CA file '%s': %s",
cluster_name,
ca_file,
exc,
)
return False
# --- Detect leaf certificate and give a clear error ---
try:
cert_dict = ssl._ssl._test_decode_cert(ca_file) # noqa: SLF001
if cert_dict and cert_dict.get("subject"):
logger.error(
"Cluster '%s': '%s' is a leaf certificate (not a CA). "
"The ca_file must be the CA certificate that signed "
"the UCM's identity certificate — not the identity "
"certificate itself. You can usually export the CA "
"cert from UCM OS Administration under "
"Security > Certificate Management.",
cluster_name,
ca_file,
)
sys.exit(1)
except (AttributeError, TypeError, ValueError, OSError) as exc:
logger.warning(
"Cluster '%s': could not parse certificate '%s': %s",
cluster_name,
ca_file,
exc,
)
logger.warning(
"Cluster '%s': CA file '%s' contained no usable certificates",
cluster_name,
ca_file,
)
return False
def _parse_network_list(
entries: list,
cluster_name: str,
) -> list[ipaddress.IPv4Network | ipaddress.IPv6Network]:
"""Parse a list of IP / CIDR strings into network objects.
Invalid entries are logged and skipped.
"""
networks: list[ipaddress.IPv4Network | ipaddress.IPv6Network] = []
for entry in entries:
entry = str(entry).strip()
if not entry:
continue
try:
networks.append(ipaddress.ip_network(entry, strict=False))
except ValueError:
logger.error(
"Invalid entry in cluster '%s' allowed_ips, "
"skipping: '%s'",
cluster_name,
entry,
)
return networks
def _parse_subject_list(entries: list) -> set[str]:
"""Parse a list of CN / SAN strings into a lower-cased set."""
subjects: set[str] = set()
for entry in entries:
entry = str(entry).strip()
if entry:
subjects.add(entry.lower())
return subjects
def _parse_clusters(raw_clusters: dict) -> list[ClusterConfig]:
"""Build :class:`ClusterConfig` objects from the ``clusters`` mapping
in the YAML configuration file.
Each key is a human-readable cluster name; the value is a dict with
optional keys ``allowed_ips``, ``allowed_subjects``, and ``ca_file``.
"""
clusters: list[ClusterConfig] = []
if not raw_clusters:
return clusters
if not isinstance(raw_clusters, dict):
logger.error(
"'clusters' in config must be a YAML mapping (got %s)",
type(raw_clusters).__name__,
)
sys.exit(1)
for name, cfg in raw_clusters.items():
if not isinstance(cfg, dict):
logger.error(
"Cluster '%s' must be a YAML mapping (got %s)",
name,
type(cfg).__name__,
)
sys.exit(1)
networks = _parse_network_list(
cfg.get("allowed_ips", []), str(name)
)
subjects = _parse_subject_list(
cfg.get("allowed_subjects", [])
)
ca_file = cfg.get("ca_file")
if ca_file is not None:
ca_file = str(ca_file)
if os.path.isfile(ca_file):
if not _validate_ca_cert(ca_file, str(name)):
logger.error(
"Cluster '%s': CA file '%s' is not a valid CA "
"certificate — aborting startup",
name,
ca_file,
)
sys.exit(1)
else:
logger.error(
"Cluster '%s': CA file not found: %s", name, ca_file
)
sys.exit(1)
clusters.append(
ClusterConfig(
name=str(name),
allowed_networks=networks,
allowed_subjects=subjects,
ca_file=ca_file,
)
)
return clusters
CLUSTERS: list[ClusterConfig] = _parse_clusters(
_config.get("clusters", {})
)
if CLUSTERS:
for _cl in CLUSTERS:
_parts: list[str] = []
if _cl.allowed_networks:
_parts.append(
f"{len(_cl.allowed_networks)} network(s): "
+ ", ".join(str(n) for n in _cl.allowed_networks)
)
if _cl.allowed_subjects:
_parts.append(
f"{len(_cl.allowed_subjects)} subject(s): "
+ ", ".join(sorted(_cl.allowed_subjects))
)
if _cl.ca_file:
_parts.append(f"CA: {_cl.ca_file} (chain validated by TLS layer)")
# In secure mode, every cluster must have cert infrastructure.
# IP-only clusters (no allowed_subjects and no ca_file) are
# only permitted in insecure mode.
_has_cert_rules = bool(_cl.allowed_subjects) or bool(_cl.ca_file)
if _cl.allowed_networks and not _has_cert_rules:
if not INSECURE_MODE:
logger.error(
"Cluster '%s' has allowed_ips but no "
"allowed_subjects or ca_file. In secure mode, "
"every cluster must define certificate "
"infrastructure (allowed_subjects and/or "
"ca_file). Either add them or set "
"'insecure_mode: true' for IP-only access.",
_cl.name,
)
sys.exit(1)
if _parts:
logger.info("Cluster '%s' — %s", _cl.name, "; ".join(_parts))
else:
logger.warning(
"Cluster '%s' has no allowed_ips or allowed_subjects "
"— all requests will be denied (deny-by-default)",
_cl.name,
)
else:
if not INSECURE_MODE:
logger.error(
"No clusters defined and insecure_mode is not enabled. "
"At least one cluster must be configured to restrict "
"access to trusted UCM servers. Either:\n"
" 1. Define one or more clusters in %s with allowed_ips, "
"ca_file, and/or allowed_subjects\n"
" 2. Set 'insecure_mode: true' in %s to bypass this "
"requirement (development/testing only)",
CONFIG_FILE,
CONFIG_FILE,
)
sys.exit(1)
logger.warning(
"No clusters defined — IP filtering and certificate "
"subject validation are DISABLED. All clients can reach "
"the /curri endpoint. Define clusters in %s to restrict access.",
CONFIG_FILE,
)
if OBFUSCATE_PII:
logger.info(
"PII obfuscation is ENABLED — phone numbers and display names "
"will appear as SHA-256 hashes in log output"
)
# ---------------------------------------------------------------------------
# Combined CA Bundle Generation
# ---------------------------------------------------------------------------
def _generate_ca_bundle(
clusters: list[ClusterConfig], bundle_path: str
) -> None:
"""Concatenate all unique cluster CA files into a single PEM bundle.
The resulting file can be passed to Gunicorn's ``--ca-certs`` option
so that the TLS layer trusts client certificates from every
configured cluster.
"""
seen: set[str] = set()
ca_files: list[str] = []
for cluster in clusters:
if cluster.ca_file and cluster.ca_file not in seen:
seen.add(cluster.ca_file)
ca_files.append(cluster.ca_file)
if not ca_files:
logger.info(
"No cluster defines ca_file — skipping CA bundle generation"
)
return
try:
with open(bundle_path, "w", encoding="utf-8") as out:
for ca_path in ca_files:
with open(ca_path, "r", encoding="utf-8") as ca_fh:
contents = ca_fh.read()
out.write(contents)
if not contents.endswith("\n"):
out.write("\n")
logger.info(
"Generated combined CA bundle (%d CA file(s)) → %s "
"Use this with Gunicorn: --ca-certs=%s --cert-reqs=2",
len(ca_files),
bundle_path,
bundle_path,
)
except OSError as exc:
logger.warning(
"Could not write CA bundle to '%s': %s "
"You can manually concatenate your cluster CA files for "
"Gunicorn's --ca-certs option.",
bundle_path,
exc,
)
CA_BUNDLE_PATH: str | None = _config.get("ca_bundle_path")
if CA_BUNDLE_PATH and CLUSTERS:
_generate_ca_bundle(CLUSTERS, CA_BUNDLE_PATH)
# ===========================================================================
# Client Certificate Helpers
# ===========================================================================
def _format_cert_name(name_tuple: tuple | None) -> str:
"""Format a certificate subject or issuer tuple into a readable string.
Converts the nested-tuple format from :meth:`ssl.SSLSocket.getpeercert`
into an OpenSSL-style one-line distinguished name, e.g.
``CN=server, O=Acme, C=US``.
"""
if not name_tuple:
return "<empty>"
parts: list[str] = []
for rdn in name_tuple:
for attr_type, attr_value in rdn:
parts.append(f"{attr_type}={attr_value}")
return ", ".join(parts) if parts else "<empty>"
def _log_cert_details(cert: dict, label: str = "Client") -> None:
"""Log certificate details at DEBUG level for TLS troubleshooting.
Outputs subject, issuer, serial number, validity dates, and SAN
entries. Never logs private key material.
"""
if not logger.isEnabledFor(logging.DEBUG):
return
logger.debug(
"%s certificate details:\n"
" Subject : %s\n"
" Issuer : %s\n"
" Serial : %s\n"
" Valid : %s → %s\n"
" SANs : %s",
label,
_format_cert_name(cert.get("subject")),
_format_cert_name(cert.get("issuer")),
cert.get("serialNumber", "<unknown>"),
cert.get("notBefore", "<unknown>"),
cert.get("notAfter", "<unknown>"),
", ".join(
f"{t}={v}" for t, v in cert.get("subjectAltName", ())
) or "<none>",
)
def _log_trusted_ca_certs(ssl_ctx: ssl.SSLContext) -> None:
"""Log the subjects of all CA certificates in the trust store.
Only produces output at DEBUG level. This helps diagnose
``unable to get local issuer certificate`` errors by showing
exactly which CAs the TLS layer will accept.
"""
if not logger.isEnabledFor(logging.DEBUG):
return
ca_certs = ssl_ctx.get_ca_certs()
if not ca_certs:
logger.debug("TLS trust store: <empty — no CA certs loaded>")
return
logger.debug(
"TLS trust store contains %d CA certificate(s):", len(ca_certs)
)
for idx, ca in enumerate(ca_certs, 1):
logger.debug(
" [%d] Subject: %s\n"
" Issuer : %s\n"
" Serial : %s\n"
" Valid : %s → %s",
idx,
_format_cert_name(ca.get("subject")),
_format_cert_name(ca.get("issuer")),
ca.get("serialNumber", "<unknown>"),
ca.get("notBefore", "<unknown>"),
ca.get("notAfter", "<unknown>"),
)
def _log_ca_bundle_contents(bundle_path: str) -> None:
"""Parse a PEM CA bundle and log each certificate at DEBUG level.
This is the primary diagnostic tool for ``unable to get local
issuer certificate`` errors — it shows exactly which CAs the TLS
layer will trust during the handshake.
"""
if not logger.isEnabledFor(logging.DEBUG):
return
if not os.path.isfile(bundle_path):
logger.debug("CA bundle file not found: %s", bundle_path)
return
try:
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.load_verify_locations(bundle_path)
_log_trusted_ca_certs(ctx)
except (ssl.SSLError, OSError) as exc:
logger.debug("Could not parse CA bundle '%s': %s", bundle_path, exc)
# Deferred call — helpers above must be defined before this executes.
if CA_BUNDLE_PATH and CLUSTERS:
_log_ca_bundle_contents(CA_BUNDLE_PATH)
def _get_ssl_socket():
"""Return the underlying SSL socket for the current request.
Checks Gunicorn's ``gunicorn.socket`` environ key first, then
falls back to the Werkzeug dev server's ``werkzeug.request``
handler connection.
Returns the socket object or ``None``.
"""
sock = request.environ.get("gunicorn.socket")
if sock is not None and hasattr(sock, "getpeercert"):
return sock
handler = request.environ.get("werkzeug.request")
if handler is not None:
conn = getattr(handler, "connection", None)
if conn is not None and hasattr(conn, "getpeercert"):
return conn
return None
def _get_peer_certificate() -> dict | None:
"""Extract the parsed peer (client) certificate.
Returns the certificate dictionary from
:meth:`ssl.SSLSocket.getpeercert`, or ``None`` if no
verified certificate is available.
"""
sock = _get_ssl_socket()
if sock is not None:
cert = sock.getpeercert()
if cert:
return cert
return None
def _get_cert_subjects(cert: dict) -> set[str]:
"""Extract CN and SAN values from a parsed peer certificate.
Collects:
- Common Name (CN) from the ``subject`` field.
- DNS names and IP addresses from ``subjectAltName``.
All values are lower-cased for case-insensitive comparison.
Args:
cert: Certificate dictionary as returned by
:meth:`ssl.SSLSocket.getpeercert`.
Returns:
A set of lower-cased subject identifiers found in the
certificate.
"""
subjects: set[str] = set()
# Extract CN from subject RDN sequence.
for rdn in cert.get("subject", ()):
for attr_type, attr_value in rdn:
if attr_type == "commonName":
subjects.add(attr_value.lower())
# Extract DNS names and IP addresses from SAN.
for san_type, san_value in cert.get("subjectAltName", ()):
if san_type in ("DNS", "IP Address"):
subjects.add(san_value.lower())
return subjects
# ---------------------------------------------------------------------------
# Flask Application
# ---------------------------------------------------------------------------
app = Flask(__name__)
app.config["MAX_CONTENT_LENGTH"] = 1 * 1024 * 1024 # 1 MB
@app.after_request
def _set_security_headers(response):
"""Add defense-in-depth security headers to every response."""
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["Cache-Control"] = "no-store"
response.headers["Content-Security-Policy"] = "default-src 'none'"
response.headers.pop("Server", None)
return response
@app.before_request
def _enforce_cluster_access():
"""Enforce per-cluster IP and certificate subject access rules.
When clusters are defined in the configuration file, every request
to the ``/curri`` endpoint must match **at least one** cluster.
Matching means satisfying **all** of the cluster's active rules:
1. The client IP must be within one of the cluster's
``allowed_ips`` networks. An empty list denies all IPs.
2. If the cluster has certificate infrastructure configured
(``allowed_subjects`` or ``ca_file``), the client
certificate must contain at least one CN or SAN in the
cluster's ``allowed_subjects``. An empty set denies all
subjects.
When a cluster defines neither ``allowed_subjects`` nor
``ca_file``, the certificate subject check is skipped entirely,
allowing IP-only access control (typical for insecure mode).
Every active rule uses **deny-by-default** semantics: configuring
a rule but leaving it empty blocks all traffic for that criterion.
The ``/health`` endpoint is restricted to localhost (``127.0.0.1``
and ``::1``) so that only the Docker health check and local probes
can reach it. This allows mTLS (``CERT_REQUIRED``) to remain
strict for all external connections.
When no clusters are configured, access is unrestricted (but the
service refuses to start without clusters unless insecure mode
is enabled).
Returns ``403 Forbidden`` if the request does not match any cluster.
"""
# Always restrict /health to localhost, regardless of cluster config.
if request.path == "/health":
if request.remote_addr in ("127.0.0.1", "::1"):
return None
logger.warning(
"Denied /health request from non-local IP %s",
request.remote_addr,
)
return Response("Forbidden\n", status=403, mimetype="text/plain")
if not CLUSTERS:
return None
# --- Parse client IP ---
client_ip_str = request.remote_addr
try:
client_ip = ipaddress.ip_address(client_ip_str)
except ValueError:
logger.warning(
"Could not parse client IP '%s' — denying request",
client_ip_str,
)
return Response("Forbidden\n", status=403, mimetype="text/plain")
# --- Obtain client certificate for subject validation ---
# With deny-by-default semantics, subject checks always run, so
# we always need the client certificate when clusters are defined.
cert = _get_peer_certificate()
if cert:
_log_cert_details(cert, "Client")
else:
logger.debug(
"No client certificate available for request from %s",
client_ip_str,
)
cert_subjects: set[str] | None = (
_get_cert_subjects(cert) if cert else None
)
# --- Check each cluster until one matches ---
# Every active rule uses deny-by-default: an empty
# allowed_networks list matches no IPs, and an empty
# allowed_subjects set matches no subjects. A cluster must
# explicitly list what it permits.
#
# In insecure mode only, when a cluster has no certificate
# infrastructure (no allowed_subjects AND no ca_file), the
# subject check is skipped for IP-only access control. In
# secure mode the startup check ensures every cluster has cert
# infrastructure, so this branch is never reached.
for cluster in CLUSTERS:
# 1. IP check (empty list → deny all)
if not any(client_ip in net for net in cluster.allowed_networks):
logger.debug(
"Cluster '%s': IP %s not in allowed_ips",
cluster.name,
client_ip_str,
)
continue
# 2. Certificate subject check.
# Skipped only in insecure mode when the cluster has no
# cert infrastructure (no allowed_subjects and no ca_file).
_has_cert_rules = bool(cluster.allowed_subjects) or bool(cluster.ca_file)
if _has_cert_rules or not INSECURE_MODE:
if cert_subjects is None:
logger.debug(
"Cluster '%s': client certificate is not accessible",
cluster.name,
)
continue
if not (cert_subjects & cluster.allowed_subjects):
logger.debug(
"Cluster '%s': cert subjects %s do not match "
"allowed_subjects %s",
cluster.name,
sorted(cert_subjects),
sorted(cluster.allowed_subjects),
)
continue
# Note: CA chain validation is handled by the TLS layer
# (Gunicorn with CERT_REQUIRED). If the client cert does not
# chain to a trusted root in the CA bundle, the TLS handshake
# fails before the request reaches the application.
# All defined rules passed — request is authorized.
logger.debug(
"Request from %s authorized via cluster '%s'",
client_ip_str,
cluster.name,
)
return None
# --- No cluster matched — deny ---
logger.warning(
"Denied request from %s to %s — no matching cluster "
"(checked %d cluster(s))",
client_ip_str,
request.path,
len(CLUSTERS),
)
return Response("Forbidden\n", status=403, mimetype="text/plain")
# In-memory phone directory loaded from CSV at startup.
# exact_directory: keys are normalized phone numbers (digits only), values are display names.
# prefix_trie: a trie of normalized prefix patterns for longest-prefix matching.
exact_directory: dict[str, str] = {}
prefix_trie: "PrefixTrie | None" = None
# ===========================================================================
# Prefix Trie for Efficient Longest-Prefix Matching
# ===========================================================================
class PrefixTrie:
"""A trie (prefix tree) for efficient longest-prefix phone number matching.
Each node in the trie represents a single digit. Nodes that correspond
to the end of a registered prefix store a display name. Lookup walks
the trie character-by-character and remembers the last (i.e. longest)
prefix that had a name, giving O(m) performance where *m* is the length
of the phone number being looked up.
"""
__slots__ = ("_root", "_size")
def __init__(self) -> None:
# Each node is a dict: digit -> child-node-dict.
# A special key None stores the display name at that prefix.
self._root: dict = {}
self._size: int = 0
def insert(self, prefix: str, display_name: str) -> None:
"""Insert a prefix -> display_name mapping into the trie."""
node = self._root
for ch in prefix:
node = node.setdefault(ch, {})
node[None] = display_name
self._size += 1
def longest_prefix_match(self, number: str) -> str | None:
"""Return the display name for the longest prefix matching *number*.
Walks the trie digit-by-digit and tracks the most recent node that
stores a display name. Returns ``None`` if no prefix matches.
"""
node = self._root
result: str | None = None
for ch in number:
child = node.get(ch)
if child is None:
break
node = child
name = node.get(None)
if name is not None:
result = name
return result
def __len__(self) -> int:
return self._size
# ===========================================================================
# CSV Directory Loader
# ===========================================================================
def load_phone_directory(
csv_path: str,
) -> tuple[dict[str, str], PrefixTrie]:
"""Load a phone number -> display name mapping from a CSV file.