Skip to content

Commit 7295f9b

Browse files
committed
minor fixes
1 parent dcd05ad commit 7295f9b

File tree

4 files changed

+283
-257
lines changed

4 files changed

+283
-257
lines changed

classes/protocol_settings.py

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -148,32 +148,28 @@ class WriteMode(Enum):
148148
''' WRITE ONLY'''
149149

150150
@classmethod
151-
def fromString(cls, name : str):
151+
def fromString(cls, name: str) -> "WriteMode":
152152
name = name.strip().upper()
153153

154-
#common alternative names
155-
alias : dict[str,WriteMode] = {
156-
"R" : "READ",
157-
"NO" : "READ",
158-
"READ" : "READ",
159-
"WD" : "READ",
160-
"RD" : "READDISABLED",
161-
"READDISABLED" : "READDISABLED",
162-
"DISABLED" : "READDISABLED",
163-
"D" : "READDISABLED",
164-
"R/W" : "WRITE",
165-
"RW" : "WRITE",
166-
"W" : "WRITE",
167-
"YES" : "WRITE",
168-
"WO" : "WRITEONLY"
154+
# Strings mapped directly to the Enum members
155+
alias: dict[str, WriteMode] = {
156+
"R": cls.READ,
157+
"NO": cls.READ,
158+
"READ": cls.READ,
159+
"WD": cls.READ,
160+
"RD": cls.READDISABLED,
161+
"READDISABLED": cls.READDISABLED,
162+
"DISABLED": cls.READDISABLED,
163+
"D": cls.READDISABLED,
164+
"R/W": cls.WRITE,
165+
"RW": cls.WRITE,
166+
"W": cls.WRITE,
167+
"YES": cls.WRITE,
168+
"WO": cls.WRITEONLY
169169
}
170170

171-
if name in alias:
172-
name = alias[name]
173-
else:
174-
name = "READ" #default
175-
176-
return getattr(cls, name)
171+
# Use .get() to handle the default case cleanly
172+
return alias.get(name, cls.READ)
177173

178174
class Registry_Type(Enum):
179175
ZERO = 0x00
@@ -214,7 +210,7 @@ class registry_map_entry:
214210
data_byteorder : str = ''
215211
''' entry specific byte order little | big | '' '''
216212

217-
read_command : bytes = None
213+
read_command : bytes | None = None
218214
''' for transports/protocols that require sending a command on top of "register" '''
219215

220216
read_interval : int = 1000

classes/transports/timescaledb.py

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ def __init__(self, settings: SectionProxy) -> None:
435435
self._stop_event: threading.Event = getattr(self, "_stop_event", threading.Event())
436436

437437
# init runtime backoff settings for connection attempts
438-
self._reconnect_lock: lock = threading.Lock() # prevents multiple concurrent TSDB reconnect triggers
438+
self._reconnect_lock: RLock = threading.RLock() # prevents multiple concurrent TSDB reconnect triggers
439439
self._reconnect_thread_running = False # guard to prevent duplicate reconnect threads
440440
self._stop_reconnect_event: threading.Event = getattr(self, "_stop_reconnect_event", threading.Event())
441441

@@ -603,6 +603,7 @@ def _attempt_reconnect(self) -> None:
603603

604604
attempt_no = 0
605605
while (attempts <= 0) or (attempt_no < attempts): # attempts <= 0 => unlimited attempts
606+
self._log.info(f"Reconnect attempt {attempt_no} starting")
606607
if self._stop_reconnect_event.is_set():
607608
self._log.info("Auto-reconnect: stop requested, exiting reconnect loop.")
608609
break
@@ -623,7 +624,6 @@ def _attempt_reconnect(self) -> None:
623624

624625
try:
625626
# Attempt to re-establish DB connection.
626-
627627
with self.engine.connect() as conn:
628628
conn.execute(text("SELECT 1"))
629629
self._set_tsdb_connected(True, "reconnect successful") # noqa: FBT003
@@ -671,7 +671,7 @@ def _trigger_reconnect(self) -> None:
671671
"""Prevent concurrent reconnect threads from being spawned """
672672

673673
if not hasattr(self, "_reconnect_lock"):
674-
self._reconnect_lock = threading.Lock()
674+
self._reconnect_lock = threading.RLock()
675675
self._reconnect_thread_running = False
676676

677677
if not hasattr(self, "_stop_reconnect_event"):
@@ -761,6 +761,33 @@ def _create_engine(self) -> None:
761761
def _create_tables(self) -> None:
762762
"""
763763
Create ORM tables for device_info, device_metrics_wide, device_metrics_narrow and metric_catalog.
764+
765+
TODO - 1 this method is currently called at startup after connection and will attempt to create tables each time.
766+
We should optimize this to only attempt to create tables if we detect they don't exist, or if we detect a schema change that requires a refresh.
767+
We can use SQLAlchemy's inspection/reflection capabilities to check for existing tables and columns before attempting to create them,
768+
and we can also track the expected schema in the class to detect when a refresh is needed.
769+
770+
2 To handle multiple protocols each with their own unique registries, with potentially multiple devices per protocol, we need to change this method to
771+
create wide table names based on the specific protocol name as seen in a given settings section. This requires a new protocol_settings method to
772+
return a list of active scraper protocols to support (based on settings). TSDB will loop through those protocols and create mappings to table names
773+
for each protocol based on the active registry map for that protocol. The required columns for each protocol based table will be created from device
774+
level registry scans which will then be blended together to one column list per protocol. Narrow table does not need to be changed since it is designed to be
775+
dynamic and handle any metric with the metric name as a key. Metric catalog needs to be updated to include device_info_id to track which metrics
776+
belong to which protocol/device for the dynamic table creation. Device info table also needs to be updated to include protocol to link devices to
777+
specific protocols and registries. Probably a Protocol table will be needed to track protocol names and link to device info and metric catalog.
778+
779+
3 Narrow table creation should handle ASCII in a separate text column with dictionary compression, and the metric_catalog
780+
should track which metrics are ASCII to know which values to put in the ASCII column vs the value column.
781+
782+
4. Rollup logic also needs to be updated to handle multiple protocols/devices with different registries and metric sets,
783+
and to know which tables to pull from for rollups based on the protocol/device of the incoming data.
784+
This will employ a device_info_id column in the metric_catalog to link metrics to specific protocol/device tables,
785+
and the rollup manager needs to be aware of this when creating rollup views and policies.
786+
787+
5. Backlog and flush logic also need to be updated to handle multiple protocol/device tables and to know which table
788+
to write to based on the protocol/device of the incoming data.
789+
790+
For now, we can assume a single static registry map and a single wide table structure for simplicity.
764791
"""
765792
with self.SessionFactory() as session:
766793
with self._reconnect_lock:
@@ -1089,7 +1116,6 @@ def _cache_wide_table_columns(self) -> None:
10891116
def _validate_wide_row(self, row: dict) -> bool:
10901117
# Use the same lock to ensure we aren't validating against a table being swapped
10911118
with self._schema_lock:
1092-
10931119
# metadata keys like m_time and device_info_id are excluded in _cache_wide_table_columns
10941120
extra_keys: set = set(row) - self._wide_columns
10951121
fewer_keys: set = self._wide_columns - set(row)
@@ -1107,10 +1133,15 @@ def _validate_wide_row(self, row: dict) -> bool:
11071133
msg = f"Database schema is still missing columns after resync: {sorted(still_extra)}"
11081134
self._log.error(msg)
11091135
raise ValueError(msg)
1110-
elif fewer_keys:
1111-
self._log.warning( f"Wide-table schema mismatch; missing keys in scrape data: {sorted(fewer_keys)}" )
1112-
msg: str= f"Missing columns: {sorted(fewer_keys)}"
1136+
else:
1137+
return True # Validation passes after successful resync
11131138

1139+
elif fewer_keys:
1140+
self._log.warning(
1141+
f"Wide-table schema mismatch; missing keys in scrape data: {sorted(fewer_keys)}"
1142+
)
1143+
msg: str = f"Missing columns: {sorted(fewer_keys)}"
1144+
return False
11141145
else:
11151146
return True
11161147

@@ -1831,7 +1862,7 @@ def __init__(
18311862
backlog_lock: threading.RLock,
18321863
flush_queue: queue.Queue,
18331864
backlog: BacklogManager,
1834-
reconnect_lock: threading.Lock
1865+
reconnect_lock: threading.RLock
18351866
) -> None:
18361867

18371868
self.rollup_policy: dict = rollup_policy
@@ -1844,7 +1875,7 @@ def __init__(
18441875
self._backlog_lock: threading.RLock = backlog_lock
18451876
self._flush_queue: queue.Queue = flush_queue
18461877
self.backlog: 'BacklogManager' = backlog
1847-
self._reconnect_lock: threading.Lock = reconnect_lock
1878+
self._reconnect_lock: threading.RLock = reconnect_lock
18481879

18491880
self._refresh_rollup_thread = threading.Thread(target=self._refresh_rollup_loop, daemon=True, name="RollupAutoRefreshThread")
18501881
self._stop_refresh_rollup_event: threading.Event = getattr(self, "_stop_refresh_rollup_event", threading.Event())

classes/transports/transport_base.py

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -25,32 +25,27 @@ class TransportWriteMode(Enum):
2525
''' skip all safeties '''
2626

2727
@classmethod
28-
def fromString(cls, name : str):
28+
def fromString(cls, name : str) -> "TransportWriteMode":
2929
name = name.strip().upper()
3030

3131
#common inputs
3232
alias : dict[str,TransportWriteMode] = {
33-
"" : "READ", #default
34-
"FALSE" : "READ",
35-
"NO" : "READ",
36-
"READ" : "READ",
37-
"R" : "READ",
38-
39-
"TRUE" : "WRITE",
40-
"YES" : "WRITE",
41-
"WRITE" : "WRITE",
42-
"W" : "WRITE",
43-
44-
"RELAXED" : "RELAXED",
45-
"UNSAFE" : "UNSAFE",
33+
"" : cls.READ, #default
34+
"FALSE": cls.READ,
35+
"NO": cls.READ,
36+
"READ": cls.READ,
37+
"R": cls.READ,
38+
39+
"TRUE": cls.WRITE,
40+
"YES": cls.WRITE,
41+
"WRITE": cls.WRITE,
42+
"W": cls.WRITE,
43+
44+
"RELAXED": cls.RELAXED,
45+
"UNSAFE": cls.UNSAFE
4646
}
47-
48-
if name in alias:
49-
name = alias[name]
50-
else:
51-
name = "READ" #default
52-
53-
return getattr(cls, name)
47+
# handle any direct matches to enum names returning the corresponding enum value, defaulting to READ if no match is found.
48+
return alias.get(name, cls.READ)
5449

5550
class transport_base:
5651
type : str = ""
@@ -67,7 +62,7 @@ class transport_base:
6762

6863
write_enabled : bool = False
6964
''' deprecated -- use / move to write_mode'''
70-
write_mode : TransportWriteMode = None
65+
write_mode : TransportWriteMode
7166

7267
max_precision : int = 2
7368

@@ -77,12 +72,16 @@ class transport_base:
7772
connected : bool = False
7873
_needs_reconnection : bool = False
7974

80-
on_message : Callable[["transport_base", registry_map_entry, str], None] = None
75+
on_message: Callable[[transport_base, registry_map_entry, str], None] | None = None
8176
''' callback, on message received '''
8277

83-
request_upstream_reconnect: Callable[[str], None] | None = None # callback for reconnect.
78+
request_upstream_reconnect: Callable[[str], None] | None = None
79+
''' callback for reconnect. transport should call this with the name of the transport it wants to reconnect to
80+
trigger a reconnect from the bridge. This is required for transports that have a bridge and need to trigger
81+
a reconnect of the bridge when the bridge's connection drops.
82+
'''
8483

85-
_log : logging.Logger = None
84+
_log : logging.Logger
8685

8786

8887
def __init__(self, settings : "SectionProxy") -> None:

0 commit comments

Comments
 (0)