Skip to content

Commit 00c2a41

Browse files
committed
feat: add requests tracking mixin | track dAuth requests
1 parent 8dcb28f commit 00c2a41

File tree

4 files changed

+137
-73
lines changed

4 files changed

+137
-73
lines changed

extensions/business/dauth/dauth_manager.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
"""
2323
from extensions.business.mixins.node_tags_mixin import _NodeTagsMixin
2424
from naeural_core.business.default.web_app.supervisor_fast_api_web_app import SupervisorFastApiWebApp as BasePlugin
25+
from extensions.business.mixins.request_tracking_mixin import _RequestTrackingMixin
2526
from extensions.business.dauth.dauth_mixin import _DauthMixin
2627

2728
__VER__ = '0.2.2'
@@ -36,6 +37,10 @@
3637
'DAUTH_VERBOSE' : False,
3738
'DAUTH_LOG_RESPONSE' : True,
3839

40+
'REQUESTS_CSTORE_HKEY': 'DAUTH_REQUESTS',
41+
'REQUESTS_MAX_RECORDS': 2,
42+
'REQUESTS_LOG_INTERVAL': 5 * 60,
43+
3944
'SUPRESS_LOGS_AFTER_INTERVAL' : 300,
4045

4146
# required ENV keys are defined in plugin template and should be added here
@@ -83,7 +88,8 @@
8388
class DauthManagerPlugin(
8489
BasePlugin,
8590
_DauthMixin,
86-
_NodeTagsMixin
91+
_NodeTagsMixin,
92+
_RequestTrackingMixin,
8793
):
8894
"""
8995
This plugin is the dAuth FastAPI web app that provides an endpoints for decentralized authentication.
@@ -103,10 +109,23 @@ def on_init(self):
103109
self.P("Started {} plugin on {} / {}\n - Auth keys: {}\n - Predefined keys: {}".format(
104110
self.__class__.__name__, my_address, my_eth_address,
105111
self.cfg_auth_env_keys, self.cfg_auth_predefined_keys)
106-
)
112+
)
113+
self._init_request_tracking()
107114
return
108115

109116

117+
def on_request(self, request):
118+
self._track_request(request)
119+
return
120+
121+
def on_response(self, method, response):
122+
self._track_response(method, response)
123+
return
124+
125+
def process(self):
126+
self._maybe_log_tracked_requests()
127+
return
128+
110129
def __get_current_epoch(self):
111130
"""
112131
Get the current epoch of the node.

extensions/business/deeploy/deeploy_manager_api.py

Lines changed: 8 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from .deeploy_mixin import _DeeployMixin
1111
from .deeploy_target_nodes_mixin import _DeeployTargetNodesMixin
1212
from extensions.business.mixins.node_tags_mixin import _NodeTagsMixin
13+
from extensions.business.mixins.request_tracking_mixin import _RequestTrackingMixin
1314
from .deeploy_const import (
1415
DEEPLOY_CREATE_REQUEST, DEEPLOY_CREATE_REQUEST_MULTI_PLUGIN, DEEPLOY_GET_APPS_REQUEST, DEEPLOY_DELETE_REQUEST,
1516
DEEPLOY_ERRORS, DEEPLOY_KEYS, DEEPLOY_SCALE_UP_JOB_WORKERS_REQUEST, DEEPLOY_STATUS, DEEPLOY_INSTANCE_COMMAND_REQUEST,
@@ -20,8 +21,6 @@
2021

2122
from naeural_core.business.default.web_app.supervisor_fast_api_web_app import SupervisorFastApiWebApp as BasePlugin
2223

23-
DEEPLOY_REQUESTS_CSTORE_HKEY = "DEEPLOY_REQUESTS"
24-
2524
__VER__ = '0.6.0'
2625

2726

@@ -39,6 +38,8 @@
3938
'WARMUP_DELAY' : 300,
4039
'PIPELINES_CHECK_DELAY' : 300,
4140
'MIN_ETH_BALANCE' : 0.00005,
41+
42+
'REQUESTS_CSTORE_HKEY': 'DEEPLOY_REQUESTS',
4243
'REQUESTS_LOG_INTERVAL' : 5 * 60,
4344
'REQUESTS_MAX_RECORDS' : 2,
4445

@@ -55,6 +56,7 @@ class DeeployManagerApiPlugin(
5556
_DeeployTargetNodesMixin,
5657
_NodeTagsMixin,
5758
_DeeployJobMixin,
59+
_RequestTrackingMixin,
5860
):
5961
"""
6062
This plugin is the dAuth FastAPI web app that provides an endpoints for decentralized authentication.
@@ -86,68 +88,16 @@ def on_init(self):
8688
color='r', boxed=True
8789
)
8890
self.maybe_stop_tunnel_engine()
89-
# Request tracking state
90-
self.__recent_requests = self.deque(maxlen=self.cfg_requests_max_records)
91-
self.__last_requests_log_time = 0
91+
self._init_request_tracking()
9292
return
9393

9494

9595
def on_request(self, request):
96-
"""
97-
Hook called when a new request arrives from the FastAPI side (monitor thread).
98-
Captures minimal request metadata and writes the last N records to cstore.
99-
100-
Parameters
101-
----------
102-
request : dict
103-
Raw request payload pulled from the server queue.
104-
Structure: {'id': str, 'value': tuple, 'profile': dict|None}
105-
"""
106-
try:
107-
value = request.get('value')
108-
request_id = request.get('id')
109-
endpoint = value[0] if isinstance(value, (list, tuple)) and len(value) > 0 else 'unknown'
110-
record = {
111-
'id': request_id,
112-
'endpoint': endpoint,
113-
'date_start': self.datetime.now(self.timezone.utc).strftime('%Y-%m-%d %H:%M:%S'),
114-
'date_complete': None,
115-
}
116-
self.__recent_requests.append(record)
117-
self._save_requests_to_cstore()
118-
except Exception as e:
119-
self.P(f"Error tracking request in cstore: {e}", color='r')
96+
self._track_request(request)
12097
return
12198

122-
12399
def on_response(self, method, response):
124-
"""
125-
Hook called before the response is sent back to the FastAPI side (main thread).
126-
127-
Parameters
128-
----------
129-
method : str
130-
The endpoint name that was called.
131-
response : dict
132-
Response dict with 'id' and 'value' keys.
133-
"""
134-
try:
135-
request_id = response.get('id')
136-
for record in self.__recent_requests:
137-
if record.get('id') == request_id:
138-
record['date_complete'] = self.datetime.now(self.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')
139-
self._save_requests_to_cstore()
140-
break
141-
except Exception as e:
142-
self.P(f"Error tracking response in cstore: {e}", color='r')
143-
return
144-
145-
def _save_requests_to_cstore(self):
146-
self.chainstore_hset(
147-
hkey=DEEPLOY_REQUESTS_CSTORE_HKEY,
148-
key=self.ee_id,
149-
value=list(self.__recent_requests),
150-
)
100+
self._track_response(method, response)
151101
return
152102

153103

@@ -1117,17 +1067,5 @@ def process(self):
11171067
self.P(f"Error checking running pipelines: {e}", color='r')
11181068
self.__last_pipelines_check_time = self.time()
11191069

1120-
# Periodic dump of all nodes' recent requests from cstore
1121-
if (self.time() - self.__last_requests_log_time) > self.cfg_requests_log_interval:
1122-
try:
1123-
all_requests = self.chainstore_hgetall(hkey=DEEPLOY_REQUESTS_CSTORE_HKEY)
1124-
if all_requests:
1125-
self.P(f"Deeploy requests across all nodes:\n{self.json_dumps(all_requests, indent=2)}")
1126-
else:
1127-
self.P("Deeploy requests across all nodes: no data")
1128-
except Exception as e:
1129-
self.P(f"Error dumping deeploy requests from cstore: {e}", color='r')
1130-
# end try
1131-
self.__last_requests_log_time = self.time()
1132-
# end if
1070+
self._maybe_log_tracked_requests()
11331071
return
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
DEFAULT_REQUESTS_MAX_RECORDS = 2
2+
DEFAULT_REQUESTS_LOG_INTERVAL = 5 * 60 # 300 seconds
3+
4+
5+
class _RequestTrackingMixin(object):
6+
"""
7+
Mixin that adds chainstore-based request/response tracking to FastAPI plugins.
8+
9+
Opt-in: set REQUESTS_CSTORE_HKEY in plugin config to enable.
10+
When not set (None), all methods are no-ops.
11+
12+
Config keys:
13+
REQUESTS_CSTORE_HKEY : str or None -- chainstore hash key (None = disabled)
14+
REQUESTS_MAX_RECORDS : int -- max recent requests in deque (default 2)
15+
REQUESTS_LOG_INTERVAL : int -- seconds between cross-node log dumps (default 300)
16+
"""
17+
18+
@property
19+
def __rt_max_records(self):
20+
val = getattr(self, 'cfg_requests_max_records', None)
21+
if not isinstance(val, int) or val < 1:
22+
return DEFAULT_REQUESTS_MAX_RECORDS
23+
return val
24+
25+
@property
26+
def __rt_log_interval(self):
27+
val = getattr(self, 'cfg_requests_log_interval', None)
28+
if not isinstance(val, (int, float)) or val <= 0:
29+
return DEFAULT_REQUESTS_LOG_INTERVAL
30+
return val
31+
32+
@property
33+
def __rt_cstore_hkey(self):
34+
return getattr(self, 'cfg_requests_cstore_hkey', None)
35+
36+
def _init_request_tracking(self):
37+
"""Call from on_init(). Initializes tracking state if enabled."""
38+
self.__rt_recent_requests = None
39+
self.__rt_last_log_time = 0
40+
if self.__rt_cstore_hkey:
41+
self.__rt_recent_requests = self.deque(maxlen=self.__rt_max_records)
42+
return
43+
44+
45+
def _track_request(self, request):
46+
"""Called from the request processing flow. Records request start."""
47+
if self.__rt_recent_requests is None:
48+
return
49+
try:
50+
value = request.get('value')
51+
request_id = request.get('id')
52+
endpoint = value[0] if isinstance(value, (list, tuple)) and len(value) > 0 else 'unknown'
53+
record = {
54+
'id': request_id,
55+
'endpoint': endpoint,
56+
'date_start': self.datetime.now(self.timezone.utc).strftime('%Y-%m-%d %H:%M:%S'),
57+
'date_complete': None,
58+
}
59+
self.__rt_recent_requests.append(record)
60+
self.__rt_save()
61+
except Exception as e:
62+
self.P(f"Error tracking request in cstore: {e}", color='r')
63+
return
64+
65+
66+
def _track_response(self, method, response):
67+
"""Called from the response processing flow. Stamps completion time."""
68+
if self.__rt_recent_requests is None:
69+
return
70+
try:
71+
request_id = response.get('id')
72+
for record in self.__rt_recent_requests:
73+
if record.get('id') == request_id:
74+
record['date_complete'] = self.datetime.now(self.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')
75+
self.__rt_save()
76+
break
77+
except Exception as e:
78+
self.P(f"Error tracking response in cstore: {e}", color='r')
79+
return
80+
81+
82+
def __rt_save(self):
83+
"""Write recent requests to chainstore."""
84+
self.chainstore_hset(
85+
hkey=self.__rt_cstore_hkey,
86+
key=self.ee_id,
87+
value=list(self.__rt_recent_requests),
88+
)
89+
return
90+
91+
92+
def _maybe_log_tracked_requests(self):
93+
"""Call from process(). Periodically logs cross-node request data."""
94+
if self.__rt_recent_requests is None:
95+
return
96+
if (self.time() - self.__rt_last_log_time) > self.__rt_log_interval:
97+
try:
98+
hkey = self.__rt_cstore_hkey
99+
all_requests = self.chainstore_hgetall(hkey=hkey)
100+
if all_requests:
101+
self.P(f"{hkey} requests across all nodes:\n{self.json_dumps(all_requests, indent=2)}")
102+
else:
103+
self.P(f"{hkey} requests across all nodes: no data")
104+
except Exception as e:
105+
self.P(f"Error dumping requests from cstore: {e}", color='r')
106+
self.__rt_last_log_time = self.time()
107+
return

ver.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__VER__ = '2.10.67'
1+
__VER__ = '2.10.68'

0 commit comments

Comments
 (0)