Skip to content

Commit 2ec3c9e

Browse files
authored
feat: move to read from s3 (#129)
1 parent 543d2af commit 2ec3c9e

File tree

6 files changed

+227
-46
lines changed

6 files changed

+227
-46
lines changed

src/main/preload.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ contextBridge.exposeInMainWorld("electronAPI", {
3535
ipcRenderer.invoke("store-api-key", apiKey),
3636
retrieveApiKey: (encryptedKey: string): Promise<string | null> =>
3737
ipcRenderer.invoke("retrieve-api-key", encryptedKey),
38+
fetchS3Logs: (logUrl: string): Promise<string> =>
39+
ipcRenderer.invoke("fetch-s3-logs", logUrl),
3840
// OAuth API
3941
oauthStartFlow: (
4042
region: CloudRegion,

src/main/services/agent.ts

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -118,13 +118,6 @@ export function registerAgentIpc(
118118
posthogApiKey: apiKey,
119119
posthogApiUrl: apiHost,
120120
posthogProjectId: projectId,
121-
onEvent: (event) => {
122-
console.log("agent event", event);
123-
if (!event || abortController.signal.aborted) return;
124-
const payload =
125-
event.type === "done" ? { ...event, success: true } : event;
126-
emitToRenderer(payload);
127-
},
128121
debug: true,
129122
});
130123

src/main/services/posthog.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,28 @@ export function registerPosthogIpc(): void {
3030
return encryptedKey;
3131
},
3232
);
33+
34+
// Fetch S3 logs
35+
ipcMain.handle(
36+
"fetch-s3-logs",
37+
async (_event: IpcMainInvokeEvent, logUrl: string): Promise<string> => {
38+
try {
39+
console.log("Fetching S3 logs from:", logUrl);
40+
const response = await fetch(logUrl);
41+
42+
if (!response.ok) {
43+
throw new Error(
44+
`Failed to fetch logs: ${response.status} ${response.statusText}`,
45+
);
46+
}
47+
48+
const content = await response.text();
49+
console.log("S3 logs fetched:", content);
50+
return content;
51+
} catch (error) {
52+
console.error("Failed to fetch S3 logs:", error);
53+
throw error;
54+
}
55+
},
56+
);
3357
}

src/renderer/features/editor/components/PlanEditor.tsx

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,11 +85,7 @@ export function PlanEditor({
8585

8686
// Track unsaved changes
8787
useEffect(() => {
88-
if (content !== fetchedContent) {
89-
setHasUnsavedChanges(true);
90-
} else {
91-
setHasUnsavedChanges(false);
92-
}
88+
setHasUnsavedChanges(content !== fetchedContent);
9389
}, [content, fetchedContent]);
9490

9591
return (

src/renderer/features/task-detail/stores/taskExecutionStore.ts

Lines changed: 199 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { track } from "@renderer/lib/analytics";
66
import type {
77
ClarifyingQuestion,
88
ExecutionMode,
9+
LogEntry,
910
PlanModePhase,
1011
QuestionAnswer,
1112
Task,
@@ -34,6 +35,23 @@ interface ArtifactEvent {
3435
}>;
3536
}
3637

38+
interface TodoItem {
39+
content: string;
40+
status: "pending" | "in_progress" | "completed";
41+
activeForm: string;
42+
}
43+
44+
interface TodoList {
45+
items: TodoItem[];
46+
metadata: {
47+
total: number;
48+
pending: number;
49+
in_progress: number;
50+
completed: number;
51+
last_updated: string;
52+
};
53+
}
54+
3755
const createProgressSignature = (progress: TaskRun): string =>
3856
[progress.status ?? "", progress.updated_at ?? ""].join("|");
3957

@@ -59,22 +77,79 @@ const toClarifyingQuestions = (
5977
requiresInput: hasCustomOption(q.options),
6078
}));
6179
};
62-
63-
export interface TodoItem {
64-
content: string;
65-
status: "pending" | "in_progress" | "completed";
66-
activeForm: string;
80+
/**
81+
* Convert S3 LogEntry to AgentEvent
82+
*/
83+
function logEntryToAgentEvent(entry: LogEntry): AgentEvent | null {
84+
try {
85+
const baseTs = Date.now();
86+
87+
if (entry.type === "token") {
88+
return {
89+
type: "token",
90+
ts: baseTs,
91+
content: entry.message,
92+
} as AgentEvent;
93+
}
94+
95+
if (entry.type === "info") {
96+
return {
97+
type: "token",
98+
ts: baseTs,
99+
content: entry.message,
100+
} as AgentEvent;
101+
}
102+
103+
if (entry.message) {
104+
try {
105+
const parsed = JSON.parse(entry.message);
106+
return {
107+
...parsed,
108+
type: entry.type as any,
109+
ts: parsed.ts || baseTs,
110+
} as AgentEvent;
111+
} catch {
112+
return {
113+
type: entry.type as any,
114+
ts: baseTs,
115+
message: entry.message,
116+
} as AgentEvent;
117+
}
118+
}
119+
120+
return null;
121+
} catch (err) {
122+
console.warn("Failed to convert log entry to agent event", err, entry);
123+
return null;
124+
}
67125
}
68126

69-
export interface TodoList {
70-
items: TodoItem[];
71-
metadata: {
72-
total: number;
73-
pending: number;
74-
in_progress: number;
75-
completed: number;
76-
last_updated: string;
77-
};
127+
/**
128+
* Fetch logs from S3 log URL via main process to avoid CORS issues
129+
* Always fetches and returns the entire log file
130+
*/
131+
async function fetchLogsFromS3Url(logUrl: string): Promise<AgentEvent[]> {
132+
try {
133+
const content = await window.electronAPI?.fetchS3Logs(logUrl);
134+
135+
if (!content || !content.trim()) {
136+
return [];
137+
}
138+
139+
const logEntries = content
140+
.trim()
141+
.split("\n")
142+
.map((line: string) => JSON.parse(line) as LogEntry);
143+
144+
const events = logEntries
145+
.map((entry: LogEntry) => logEntryToAgentEvent(entry))
146+
.filter((event): event is AgentEvent => event !== null);
147+
148+
return events;
149+
} catch (err) {
150+
console.warn("Failed to fetch task logs from S3", err);
151+
return [];
152+
}
78153
}
79154

80155
interface TaskExecutionState {
@@ -87,6 +162,8 @@ interface TaskExecutionState {
87162
unsubscribe: (() => void) | null;
88163
progress: TaskRun | null;
89164
progressSignature: string | null;
165+
// S3 log polling fields
166+
logPoller: ReturnType<typeof setInterval> | null;
90167
// Plan mode fields
91168
executionMode: ExecutionMode;
92169
planModePhase: PlanModePhase;
@@ -162,6 +239,7 @@ const defaultTaskState: TaskExecutionState = {
162239
unsubscribe: null,
163240
progress: null,
164241
progressSignature: null,
242+
logPoller: null,
165243
executionMode: "plan",
166244
planModePhase: "idle",
167245
clarifyingQuestions: [],
@@ -258,6 +336,9 @@ export const useTaskExecutionStore = create<TaskExecutionStore>()(
258336
if (taskState?.unsubscribe) {
259337
taskState.unsubscribe();
260338
}
339+
if (taskState?.logPoller) {
340+
clearInterval(taskState.logPoller);
341+
}
261342
set((state) => {
262343
const newTaskStates = { ...state.taskStates };
263344
delete newTaskStates[taskId];
@@ -268,19 +349,22 @@ export const useTaskExecutionStore = create<TaskExecutionStore>()(
268349
subscribeToAgentEvents: (taskId: string, channel: string) => {
269350
const store = get();
270351

271-
// Clean up existing subscription if any
272352
const existingState = store.taskStates[taskId];
273353
if (existingState?.unsubscribe) {
274354
existingState.unsubscribe();
275355
}
356+
if (existingState?.logPoller) {
357+
clearInterval(existingState.logPoller);
358+
}
359+
360+
store.updateTaskState(taskId, { logPoller: null });
276361

277-
// Create new subscription that persists even when component unmounts
362+
// Create new subscription that listens only to progress events
278363
const unsubscribeFn = window.electronAPI?.onAgentEvent(
279364
channel,
280365
(ev: AgentEvent | { type: "progress"; progress: TaskRun }) => {
281366
const currentStore = get();
282367

283-
// Handle custom progress events from Array backend
284368
if (ev?.type === "progress" && "progress" in ev) {
285369
const newProgress = ev.progress;
286370
const oldProgress = currentStore.getTaskState(taskId).progress;
@@ -289,34 +373,111 @@ export const useTaskExecutionStore = create<TaskExecutionStore>()(
289373
: null;
290374
const newSig = createProgressSignature(newProgress);
291375

292-
// Always update the stored progress state for UI (like TaskDetail)
293376
currentStore.setProgress(taskId, newProgress);
294377

295-
// Only add to logs if signature changed (to avoid duplicate log entries)
296378
if (oldSig !== newSig) {
297379
currentStore.addLog(taskId, {
298380
type: "progress",
299381
ts: Date.now(),
300382
progress: newProgress,
301383
} as unknown as AgentEvent);
302384
}
303-
return;
304-
}
305385

306-
// Store AgentEvent directly (ev is now narrowed to AgentEvent)
307-
if (ev?.type) {
308-
// Handle state changes for special event types
309-
if (ev.type === "error" || ev.type === "done") {
310-
currentStore.setRunning(taskId, false);
311-
if (ev.type === "done") {
312-
currentStore.setUnsubscribe(taskId, null);
386+
if (newProgress.log_url) {
387+
const currentState = currentStore.getTaskState(taskId);
388+
389+
// Don't start polling if task is already complete
390+
if (
391+
newProgress.status === "completed" ||
392+
newProgress.status === "failed"
393+
) {
394+
// Stop any existing poller
395+
if (currentState.logPoller) {
396+
clearInterval(currentState.logPoller);
397+
currentStore.updateTaskState(taskId, { logPoller: null });
398+
}
399+
400+
// Do one final fetch to get all logs
401+
if (newProgress.log_url) {
402+
fetchLogsFromS3Url(newProgress.log_url)
403+
.then((allEvents) => {
404+
if (allEvents.length > 0) {
405+
const store = get();
406+
const hasDone = allEvents.some(
407+
(event) => event.type === "done",
408+
);
409+
if (hasDone) {
410+
store.setRunning(taskId, false);
411+
store.checkPlanCompletion(taskId);
412+
}
413+
store.setLogs(taskId, allEvents);
414+
}
415+
})
416+
.catch((err) =>
417+
console.warn("Failed to fetch final logs", err),
418+
);
419+
}
420+
return;
421+
}
422+
423+
if (!currentState.logPoller) {
424+
const pollLogs = async () => {
425+
const state = get().getTaskState(taskId);
426+
const progress = state.progress;
427+
428+
if (
429+
!progress?.log_url ||
430+
progress.status === "completed" ||
431+
progress.status === "failed"
432+
) {
433+
if (state.logPoller) {
434+
clearInterval(state.logPoller);
435+
get().updateTaskState(taskId, { logPoller: null });
436+
}
437+
return;
438+
}
439+
440+
const allEvents = await fetchLogsFromS3Url(
441+
progress.log_url,
442+
);
443+
444+
if (allEvents.length > 0) {
445+
const store = get();
446+
447+
const hasError = allEvents.some(
448+
(event) => event.type === "error",
449+
);
450+
const hasDone = allEvents.some(
451+
(event) => event.type === "done",
452+
);
453+
454+
if (hasError || hasDone) {
455+
store.setRunning(taskId, false);
456+
if (hasDone) {
457+
const currentState = store.getTaskState(taskId);
458+
if (currentState.logPoller) {
459+
clearInterval(currentState.logPoller);
460+
store.updateTaskState(taskId, { logPoller: null });
461+
}
462+
}
463+
store.checkPlanCompletion(taskId);
464+
}
465+
466+
store.setLogs(taskId, allEvents);
467+
}
468+
};
469+
470+
void pollLogs();
471+
472+
const poller = setInterval(() => {
473+
void pollLogs();
474+
}, 2000);
475+
476+
currentStore.updateTaskState(taskId, { logPoller: poller });
313477
}
314-
// Check if plan needs to be loaded after run completes
315-
currentStore.checkPlanCompletion(taskId);
316478
}
317479

318-
// Add event to logs
319-
currentStore.addLog(taskId, ev);
480+
return;
320481
}
321482
},
322483
);
@@ -332,6 +493,10 @@ export const useTaskExecutionStore = create<TaskExecutionStore>()(
332493
taskState.unsubscribe();
333494
get().setUnsubscribe(taskId, null);
334495
}
496+
if (taskState?.logPoller) {
497+
clearInterval(taskState.logPoller);
498+
get().updateTaskState(taskId, { logPoller: null });
499+
}
335500
},
336501

337502
// High-level task execution actions
@@ -855,12 +1020,12 @@ export const useTaskExecutionStore = create<TaskExecutionStore>()(
8551020
}),
8561021
{
8571022
name: "task-execution-storage",
858-
// Don't persist unsubscribe functions as they can't be serialized
1023+
// Don't persist unsubscribe functions and pollers as they can't be serialized
8591024
partialize: (state) => ({
8601025
taskStates: Object.fromEntries(
8611026
Object.entries(state.taskStates).map(([taskId, taskState]) => [
8621027
taskId,
863-
{ ...taskState, unsubscribe: null },
1028+
{ ...taskState, unsubscribe: null, logPoller: null },
8641029
]),
8651030
),
8661031
}),

0 commit comments

Comments
 (0)