|
20 | 20 |
|
21 | 21 | from naeural_core.business.default.web_app.supervisor_fast_api_web_app import SupervisorFastApiWebApp as BasePlugin |
22 | 22 |
|
| 23 | +DEEPLOY_REQUESTS_CSTORE_HKEY = "DEEPLOY_REQUESTS" |
| 24 | +DEEPLOY_REQUESTS_MAX_RECORDS = 5 |
23 | 25 |
|
24 | 26 | __VER__ = '0.6.0' |
25 | 27 |
|
|
38 | 40 | 'WARMUP_DELAY' : 300, |
39 | 41 | 'PIPELINES_CHECK_DELAY' : 300, |
40 | 42 | 'MIN_ETH_BALANCE' : 0.00005, |
| 43 | + 'REQUESTS_LOG_INTERVAL' : 5 * 60, |
41 | 44 |
|
42 | 45 | 'VALIDATION_RULES': { |
43 | 46 | **BasePlugin.CONFIG['VALIDATION_RULES'], |
@@ -82,8 +85,41 @@ def on_init(self): |
82 | 85 | color='r', boxed=True |
83 | 86 | ) |
84 | 87 | self.maybe_stop_tunnel_engine() |
| 88 | + # Request tracking state |
| 89 | + self.__recent_requests = self.deque(maxlen=DEEPLOY_REQUESTS_MAX_RECORDS) |
| 90 | + self.__last_requests_log_time = 0 |
85 | 91 | return |
86 | | - |
| 92 | + |
| 93 | + |
| 94 | + def on_request(self, request): |
| 95 | + """ |
| 96 | + Hook called when a new request arrives from the FastAPI side. |
| 97 | + Captures minimal request metadata and writes the last N records to cstore. |
| 98 | +
|
| 99 | + Parameters |
| 100 | + ---------- |
| 101 | + request : dict |
| 102 | + Raw request payload pulled from the server queue. |
| 103 | + Structure: {'id': str, 'value': tuple, 'profile': dict|None} |
| 104 | + """ |
| 105 | + try: |
| 106 | + value = request.get('value') |
| 107 | + endpoint = value[0] if isinstance(value, (list, tuple)) and len(value) > 0 else 'unknown' |
| 108 | + record = { |
| 109 | + 'ts': self.datetime.now(self.timezone.utc).strftime('%Y-%m-%d %H:%M:%S'), |
| 110 | + 'endpoint': endpoint, |
| 111 | + } |
| 112 | + self.__recent_requests.append(record) |
| 113 | + self.chainstore_hset( |
| 114 | + hkey=DEEPLOY_REQUESTS_CSTORE_HKEY, |
| 115 | + key=self.ee_id, |
| 116 | + value=list(self.__recent_requests), |
| 117 | + ) |
| 118 | + except Exception as e: |
| 119 | + self.P(f"Error tracking request in cstore: {e}", color='r') |
| 120 | + return |
| 121 | + |
| 122 | + |
87 | 123 | def __check_eth_balance(self): |
88 | 124 | """ |
89 | 125 | Check if the oracle has enough ETH to cover gas fees for web3 transactions. |
@@ -1045,4 +1081,17 @@ def process(self): |
1045 | 1081 | self.P(f"Error checking running pipelines: {e}", color='r') |
1046 | 1082 | self.__last_pipelines_check_time = self.time() |
1047 | 1083 |
|
| 1084 | + # Periodic dump of all nodes' recent requests from cstore |
| 1085 | + if (self.time() - self.__last_requests_log_time) > self.cfg_requests_log_interval: |
| 1086 | + try: |
| 1087 | + all_requests = self.chainstore_hgetall(hkey=DEEPLOY_REQUESTS_CSTORE_HKEY) |
| 1088 | + if all_requests: |
| 1089 | + self.P(f"Deeploy requests across all nodes:\n{self.json_dumps(all_requests, indent=2)}") |
| 1090 | + else: |
| 1091 | + self.P("Deeploy requests across all nodes: no data") |
| 1092 | + except Exception as e: |
| 1093 | + self.P(f"Error dumping deeploy requests from cstore: {e}", color='r') |
| 1094 | + # end try |
| 1095 | + self.__last_requests_log_time = self.time() |
| 1096 | + # end if |
1048 | 1097 | return |
0 commit comments