-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathEventsDaemon.ts
More file actions
283 lines (244 loc) · 9.84 KB
/
EventsDaemon.ts
File metadata and controls
283 lines (244 loc) · 9.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
/**
* Events Daemon - Cross-Context Event Bridge Handler
*
* Handles 'event-bridge' messages sent by ScopedEventSystem's EventBridge
* to propagate events between browser and server contexts.
*
* CRITICAL: Includes rate limiting to prevent cascade failures.
*/
import { DaemonBase } from '../../command-daemon/shared/DaemonBase';
/**
* Rate limiter to prevent cascade failures from event floods.
* Tracks event counts per type and blocks when threshold exceeded.
* Enhanced with diagnostics to identify trending events.
*/
class EventRateLimiter {
private counts = new Map<string, number>();
private windowStart = Date.now();
private readonly windowMs = 1000; // 1-second window (matches Rust-side rate limiter)
private readonly maxPerWindow = 200; // Max 200 of same event per second
private readonly warnThreshold = 100; // Warn at 100+ per second
private blocked = new Set<string>();
private warned = new Set<string>(); // Track warned events to avoid spam
// Global stats for diagnostics
private totalBlocked = 0;
private totalWarned = 0;
private blockedHistory: Array<{ event: string; count: number; time: number }> = [];
shouldBlock(eventName: string): boolean {
const now = Date.now();
// Reset window if expired
if (now - this.windowStart > this.windowMs) {
// Log summary of previous window if there was activity
if (this.counts.size > 0) {
const hotEvents = Array.from(this.counts.entries())
.filter(([_, count]) => count >= this.warnThreshold)
.sort((a, b) => b[1] - a[1]);
if (hotEvents.length > 0) {
console.warn(`[EventRateLimiter] EVENT ACTIVITY: ${hotEvents.map(([e, c]) => `${e}(${c})`).join(', ')}`);
}
}
this.counts.clear();
this.blocked.clear();
this.warned.clear();
this.windowStart = now;
}
// Check if already blocked this window
if (this.blocked.has(eventName)) {
return true;
}
// Increment count
const count = (this.counts.get(eventName) ?? 0) + 1;
this.counts.set(eventName, count);
// Warn at threshold (once per window per event)
if (count === this.warnThreshold && !this.warned.has(eventName)) {
this.warned.add(eventName);
this.totalWarned++;
console.warn(`[EventRateLimiter] EVENT TRENDING: "${eventName}" at ${count}x in ${this.windowMs}ms (blocking at ${this.maxPerWindow})`);
}
// Block if over threshold
if (count > this.maxPerWindow) {
this.blocked.add(eventName);
this.totalBlocked++;
this.blockedHistory.push({ event: eventName, count, time: now });
// Keep only last 100 blocked events
if (this.blockedHistory.length > 100) {
this.blockedHistory.shift();
}
console.error(`[EventRateLimiter] EVENT CASCADE BLOCKED: "${eventName}" fired ${count}x in ${this.windowMs}ms`);
return true;
}
return false;
}
getStats(): { totalBlocked: number; totalWarned: number; recentBlocked: Array<{ event: string; count: number; time: number }> } {
return {
totalBlocked: this.totalBlocked,
totalWarned: this.totalWarned,
recentBlocked: this.blockedHistory.slice(-10)
};
}
}
import type { JTAGMessage, JTAGContext, JTAGPayload } from '../../../system/core/types/JTAGTypes';
import { JTAGMessageFactory } from '../../../system/core/types/JTAGTypes';
import type { JTAGRouter } from '../../../system/core/router/shared/JTAGRouter';
import type { UUID } from '../../../system/core/types/CrossPlatformUUID';
import { EventManager } from '../../../system/events/shared/JTAGEventSystem';
import { createBaseResponse, type BaseResponsePayload } from '../../../system/core/types/ResponseTypes';
import { JTAG_ENDPOINTS, JTAG_DAEMON_ENDPOINTS } from '../../../system/core/router/shared/JTAGEndpoints';
import {
EventRoutingUtils,
EVENT_METADATA_KEYS,
type EventScope
} from '../../../system/events/shared/EventSystemConstants';
/**
* Event bridge message payload with proper typing
*/
export interface EventBridgePayload extends JTAGPayload {
type: 'event-bridge';
scope: {
type: EventScope;
id?: string;
sessionId?: string;
};
eventName: string;
data: {
message?: unknown;
[EVENT_METADATA_KEYS.BRIDGED]?: boolean;
[EVENT_METADATA_KEYS.ORIGINAL_CONTEXT]?: string;
[EVENT_METADATA_KEYS.BRIDGE_TIMESTAMP]?: string;
[EVENT_METADATA_KEYS.BRIDGE_HOP_COUNT]?: number;
[key: string]: unknown;
};
originSessionId: UUID;
originContextUUID?: UUID; // Track originating context for recursion prevention
timestamp: string;
}
/**
* Event bridge response types
*/
export interface EventBridgeResponse extends BaseResponsePayload {
bridged?: boolean;
eventName?: string;
scope?: string;
}
/**
* Events Daemon - Handles cross-context event bridging
*/
export abstract class EventsDaemon extends DaemonBase {
public readonly subpath: string = JTAG_ENDPOINTS.EVENTS.BASE;
protected abstract eventManager: EventManager;
// Rate limiter to prevent cascade failures
private rateLimiter = new EventRateLimiter();
/**
* Handle event bridging to local context - implemented by environment-specific subclasses
*/
protected abstract handleLocalEventBridge(eventName: string, eventData: unknown): void;
constructor(
context: JTAGContext,
router: JTAGRouter
) {
super('EventsDaemon', context, router);
}
/**
* Initialize events daemon
*/
protected async initialize(): Promise<void> {
console.log(`🌉 EventsDaemon: Initialized for cross-context event bridging`);
}
/**
* Process event bridge messages from other contexts
*/
protected async processMessage(message: JTAGMessage): Promise<EventBridgeResponse> {
// Check payload type - events route to 'events' endpoint but have type 'event-bridge'
const payload = message.payload as EventBridgePayload;
if (payload.type === 'event-bridge') {
return await this.handleEventBridge(message);
}
// Check for stats requests
const endpoint = EventRoutingUtils.normalizeEndpoint(message.endpoint);
if (endpoint === JTAG_ENDPOINTS.EVENTS.STATS) {
return await this.getBridgeStats();
}
const errorMsg = `Unknown payload type: ${payload.type}, endpoint: ${message.endpoint}`;
console.error(`❌ EventsDaemon: ${errorMsg}`);
return createBaseResponse(false, message.context, message.payload.sessionId, {}) as EventBridgeResponse;
}
/**
* Handle incoming event bridge message
*/
private async handleEventBridge(message: JTAGMessage): Promise<EventBridgeResponse> {
const payload = message.payload as EventBridgePayload;
// CRITICAL: Rate limit to prevent cascade failures
if (this.rateLimiter.shouldBlock(payload.eventName)) {
return createBaseResponse(true, message.context, payload.sessionId, {
bridged: false,
eventName: payload.eventName,
scope: 'blocked-cascade'
}) as EventBridgeResponse;
}
try {
// Check if we're the origin context - if so, skip local emission but still route cross-environment
const isOriginContext = payload.originContextUUID && payload.originContextUUID === this.context.uuid;
if (!isOriginContext) {
// Re-emit the event in this context with bridge metadata using shared constants
const bridgedData = EventRoutingUtils.addBridgeMetadata(
payload.data,
payload.originSessionId,
payload.timestamp
);
// Delegate to environment-specific event handling
this.handleLocalEventBridge(payload.eventName, bridgedData);
// Note: DOM event dispatching is handled by environment-specific implementations
}
// Always route to other environments if this is NOT already a bridged event
// Check if event is already bridged to prevent infinite recursion using shared utility
const isAlreadyBridged = EventRoutingUtils.isEventBridged(payload.data);
if (!isAlreadyBridged) {
await this.routeToOtherEnvironments(payload);
}
return createBaseResponse(true, message.context, payload.sessionId, {
bridged: true,
eventName: payload.eventName,
scope: `${payload.scope.type}${payload.scope.id ? `:${payload.scope.id}` : ''}`
}) as EventBridgeResponse;
} catch (error) {
const errorMsg = error instanceof Error ? error.message : String(error);
console.error(`❌ EventsDaemon: Event bridge failed: ${errorMsg}`);
return createBaseResponse(false, message.context, payload.sessionId, {}) as EventBridgeResponse;
}
}
/**
* Route event to other environments (cross-environment bridging)
*/
private async routeToOtherEnvironments(payload: EventBridgePayload): Promise<void> {
try {
// Determine target environments (opposite of current)
const targetEnvironments = this.context.environment === 'server' ? ['browser'] : ['server'];
for (const targetEnv of targetEnvironments) {
// Create cross-environment message - send to the actual registered daemon endpoint
const crossEnvEndpoint = `${targetEnv}/${this.subpath}`;
const crossEnvMessage = JTAGMessageFactory.createEvent(
this.context,
'events-daemon',
crossEnvEndpoint,
payload
);
// Route to other environment via router's transport
try {
await this.router.postMessage(crossEnvMessage);
} catch (routingError) {
console.warn(`⚠️ EventsDaemon: Failed to route event to ${targetEnv}:`, routingError);
}
}
} catch (error) {
console.error(`❌ EventsDaemon: Cross-environment routing failed:`, error);
}
}
/**
* Get event bridge statistics
*/
async getBridgeStats(): Promise<EventBridgeResponse> {
return createBaseResponse(true, this.context, this.context.uuid as UUID, {
bridged: false // Stats request, not a bridge operation
}) as EventBridgeResponse;
}
}