Skip to content

Commit 640e498

Browse files
committed
Enables async classification for fast SSE updates
Allows immediate streaming of minimal event data while classification runs in parallel Enhances user experience by updating events with enriched details once available
1 parent 8c49045 commit 640e498

File tree

2 files changed

+98
-19
lines changed

2 files changed

+98
-19
lines changed

server/src/webhook.ts

Lines changed: 80 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -161,39 +161,101 @@ export function registerWebhookRoutes(app: express.Application) {
161161
return base;
162162
};
163163

164-
// Append events to in-memory store for this path
164+
// Append minimal records (no heavy classification) to in-memory store for this path
165165
try {
166-
const records: EventRecord[] = events.map((e: any) => normalize(e));
166+
const minimalNormalize = (ev: any): EventRecord => {
167+
const eventType = ev?.eventType || ev?.EventType || ev?.type || ev?.Type || (ev && ev['event-type']) || '';
168+
const timestamp = ev?.eventTime || ev?.event_time || ev?.time || ev?.Time || new Date().toISOString();
169+
return {
170+
id: ev?.id,
171+
eventType: String(eventType || '').trim(),
172+
timestamp,
173+
data: ev?.data ?? ev?.Data ?? null,
174+
raw: ev,
175+
typed: null,
176+
common: null,
177+
};
178+
};
167179

180+
const minimalRecords: EventRecord[] = events.map((e: any) => minimalNormalize(e));
168181
const existing = eventStore.get(userPath) || [];
169-
const combined = [...records.reverse(), ...existing];
182+
const combined = [...minimalRecords.reverse(), ...existing];
170183
const trimmed = combined.slice(0, EVENT_STORE_CAP);
171184
eventStore.set(userPath, trimmed);
172185
} catch (err) {
173186
console.warn('Failed to store events in memory', (err as Error).message);
174187
}
175188

176-
// Notify any SSE clients connected to this webhook path
189+
// Notify any SSE clients connected to this webhook path: send minimal record immediately,
190+
// then run classification asynchronously and send an enriched update when ready.
177191
try {
178192
for (const e of events) {
179-
// reuse normalization logic to ensure SSE payloads include typed when available
180-
const ev = ((): EventRecord => {
193+
const tStart = Date.now();
194+
try {
195+
console.log(`[webhook] receive start path=${userPath} eventId=${e?.id || 'n/a'} ts=${new Date(tStart).toISOString()}`);
196+
} catch (_) { /* ignore */ }
197+
198+
// Build minimal record (same shape as stored)
199+
const minimal: EventRecord = {
200+
id: e?.id,
201+
eventType: String(e?.eventType || e?.EventType || e?.type || '').trim(),
202+
timestamp: e?.eventTime || e?.time || new Date().toISOString(),
203+
data: e?.data ?? e?.Data ?? null,
204+
raw: e,
205+
typed: null,
206+
common: null,
207+
};
208+
209+
// Send minimal SSE immediately
210+
sendSseToPath(userPath, minimal);
211+
212+
const tAfterSend = Date.now();
213+
try {
214+
console.log(`[webhook] minimal SSE sent path=${userPath} eventId=${minimal.id || 'n/a'} elapsedMs=${tAfterSend - tStart}`);
215+
} catch (_) {}
216+
217+
// Classify/enrich asynchronously to avoid blocking the request
218+
(async () => {
219+
const classifyStart = Date.now();
181220
try {
182-
const nr = (normalize as any)(e) as EventRecord;
183-
return nr;
221+
const classified = classifyEventPayload(e);
222+
if (classified && classified.typed) {
223+
// Build enriched record
224+
const enriched: EventRecord = { ...minimal };
225+
enriched.typed = classified.typed as any;
226+
enriched.common = (classified as any).common ?? null;
227+
if ((classified as any).name) enriched.eventType = (classified as any).name;
228+
229+
// Update eventStore: replace the minimal record we inserted earlier (match by id if present)
230+
try {
231+
const existingList = eventStore.get(userPath) || [];
232+
const idx = existingList.findIndex(r => (r.id && enriched.id && r.id === enriched.id) || r.raw === enriched.raw);
233+
if (idx >= 0) {
234+
const updated = existingList.slice();
235+
updated[idx] = enriched;
236+
eventStore.set(userPath, updated);
237+
}
238+
} catch (err) {
239+
console.warn('Failed to update eventStore with enriched record', (err as Error).message);
240+
}
241+
242+
// Send enriched SSE to clients
243+
try {
244+
sendSseToPath(userPath, enriched);
245+
const classifyEnd = Date.now();
246+
console.log(`[webhook] enriched SSE sent path=${userPath} eventId=${enriched.id || 'n/a'} classifyMs=${classifyEnd - classifyStart} totalMs=${classifyEnd - tStart}`);
247+
} catch (err) {
248+
console.warn('Failed to send enriched SSE', (err as Error).message);
249+
}
250+
} else {
251+
// If classification yielded nothing useful, we still log timing
252+
const classifyEnd = Date.now();
253+
console.log(`[webhook] classification no-op path=${userPath} eventId=${e?.id || 'n/a'} classifyMs=${classifyEnd - classifyStart}`);
254+
}
184255
} catch (err) {
185-
return {
186-
id: e?.id,
187-
eventType: String(e?.eventType || e?.EventType || e?.type || '').trim(),
188-
timestamp: e?.eventTime || e?.time || new Date().toISOString(),
189-
data: e?.data ?? e?.Data ?? null,
190-
raw: e,
191-
typed: null,
192-
common: null,
193-
};
256+
console.warn('Async classification error', (err as Error).message);
194257
}
195258
})();
196-
sendSseToPath(userPath, ev);
197259
}
198260
} catch (err) {
199261
console.warn('Failed to broadcast SSE', (err as Error).message);

src/components/WebhookInspector.tsx

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,24 @@ const WebhookInspector: React.FC<Props> = ({ userPath, selectedBayDbId = null, s
8080
try {
8181
const data = JSON.parse(ev.data);
8282
const newItem: EventItem = { id: data.id, eventType: data.eventType, timestamp: data.timestamp, data: data.data, raw: data.raw, expanded: false };
83-
setAllEvents(prev => [newItem, ...prev]);
83+
setAllEvents(prev => {
84+
try {
85+
if (newItem.id) {
86+
const idx = prev.findIndex(p => p.id && p.id === newItem.id);
87+
if (idx >= 0) {
88+
// merge fields so enriched SSE augments the minimal one
89+
const merged = { ...prev[idx], ...newItem };
90+
// move merged item to the top
91+
const copy = prev.slice();
92+
copy.splice(idx, 1);
93+
return [merged, ...copy];
94+
}
95+
}
96+
} catch (err) {
97+
// fallback to naive prepend on any error
98+
}
99+
return [newItem, ...prev];
100+
});
84101
// If the new item matches the current bay filter (or there is no filter) select it and focus the list
85102
try {
86103
const bayId = getBayIdFromEvent(newItem);

0 commit comments

Comments
 (0)