Skip to content

Commit f6b12ad

Browse files
committed
fix user message rendering
1 parent 560c34c commit f6b12ad

File tree

5 files changed

+99
-47
lines changed

5 files changed

+99
-47
lines changed

apps/array/src/api/fetcher.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ export const buildApiFetcher: (config: {
1010
): Promise<Response> => {
1111
const headers = new Headers();
1212
headers.set("Authorization", `Bearer ${token}`);
13+
headers.set("Content-Type", "application/json");
1314

1415
if (input.urlSearchParams) {
1516
input.url.search = input.urlSearchParams.toString();
@@ -21,10 +22,6 @@ export const buildApiFetcher: (config: {
2122
? JSON.stringify(input.parameters?.body)
2223
: undefined;
2324

24-
if (body) {
25-
headers.set("Content-Type", "application/json");
26-
}
27-
2825
if (input.parameters?.header) {
2926
for (const [key, value] of Object.entries(input.parameters.header)) {
3027
if (value != null) {

apps/array/src/api/posthogClient.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,6 @@ export class PostHogAPIClient {
246246
path: url,
247247
overrides: {
248248
body: JSON.stringify({ entries }),
249-
headers: { "Content-Type": "application/json" },
250249
},
251250
});
252251
if (!response.ok) {

apps/array/src/renderer/features/sessions/components/SessionView.tsx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,9 @@ function parseSessionNotification(
106106
notification: SessionNotification,
107107
): ParseResult {
108108
const { update } = notification;
109+
if (!update?.sessionUpdate) {
110+
return null;
111+
}
109112

110113
switch (update.sessionUpdate) {
111114
case "user_message_chunk":

apps/array/src/renderer/features/sessions/stores/sessionStore.ts

Lines changed: 87 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ export interface AgentSession {
9292
isPromptPending: boolean;
9393
isCloud: boolean;
9494
logUrl?: string;
95+
processedLineCount?: number;
9596
}
9697

9798
interface ConnectParams {
@@ -216,6 +217,7 @@ export const useSessionStore = create<SessionStore>((set, get) => ({
216217
isPromptPending: false,
217218
isCloud: true,
218219
logUrl: latestRunLogUrl,
220+
processedLineCount: rawEntries.length,
219221
},
220222
},
221223
}));
@@ -393,7 +395,59 @@ export const useSessionStore = create<SessionStore>((set, get) => ({
393395
throw new Error("No active session for task");
394396
}
395397

396-
// Set pending state
398+
const blocks: ContentBlock[] =
399+
typeof prompt === "string" ? [{ type: "text", text: prompt }] : prompt;
400+
401+
if (session.isCloud) {
402+
// Cloud: send via S3 log - cloud runner polls and picks up
403+
// No pending state needed since we just append to log
404+
const authState = useAuthStore.getState();
405+
const { client } = authState;
406+
if (!client) {
407+
throw new Error("API client not available");
408+
}
409+
410+
const notification = {
411+
type: "notification" as const,
412+
timestamp: new Date().toISOString(),
413+
direction: "client" as const,
414+
notification: {
415+
method: "session/update",
416+
params: {
417+
update: {
418+
sessionUpdate: "user_message_chunk",
419+
content: blocks[0],
420+
},
421+
},
422+
},
423+
};
424+
425+
await client.appendTaskRunLog(taskId, session.taskRunId, [notification]);
426+
log.info("Sent cloud message via S3", { taskId, runId: session.taskRunId });
427+
428+
// Optimistically add user message to local state immediately
429+
const ts = Date.now();
430+
const userEvent: SessionEvent = {
431+
type: "session_update",
432+
ts,
433+
notification: notification.notification.params as SessionNotification,
434+
};
435+
set((state) => ({
436+
sessions: {
437+
...state.sessions,
438+
[session.taskRunId]: {
439+
...state.sessions[session.taskRunId],
440+
events: [...state.sessions[session.taskRunId].events, userEvent],
441+
processedLineCount:
442+
(state.sessions[session.taskRunId].processedLineCount ?? 0) + 1,
443+
},
444+
},
445+
}));
446+
447+
return { stopReason: "pending" };
448+
}
449+
450+
// Local: set pending state and send via IPC
397451
set((state) => ({
398452
sessions: {
399453
...state.sessions,
@@ -405,39 +459,8 @@ export const useSessionStore = create<SessionStore>((set, get) => ({
405459
}));
406460

407461
try {
408-
// Convert text to ContentBlock array if needed
409-
const blocks: ContentBlock[] =
410-
typeof prompt === "string" ? [{ type: "text", text: prompt }] : prompt;
411-
412-
if (session.isCloud) {
413-
// Cloud: send via S3 log - cloud runner polls and picks up
414-
const authState = useAuthStore.getState();
415-
const { client } = authState;
416-
if (!client) {
417-
throw new Error("API client not available");
418-
}
419-
420-
const notification = {
421-
type: "notification" as const,
422-
timestamp: new Date().toISOString(),
423-
notification: {
424-
method: "sessionUpdate",
425-
params: {
426-
sessionUpdate: "user_message",
427-
content: blocks,
428-
},
429-
},
430-
};
431-
432-
await client.appendTaskRunLog(taskId, session.taskRunId, [notification]);
433-
log.info("Sent cloud message via S3", { taskId, runId: session.taskRunId });
434-
return { stopReason: "pending" };
435-
}
436-
437-
// Local: send via IPC directly to running agent
438462
return await window.electronAPI.agentPrompt(session.taskRunId, blocks);
439463
} finally {
440-
// Clear pending state
441464
set((state) => ({
442465
sessions: {
443466
...state.sessions,
@@ -520,29 +543,54 @@ export const useSessionStore = create<SessionStore>((set, get) => ({
520543
const text = await response.text();
521544
const lines = text.trim().split("\n").filter(Boolean);
522545

523-
// Only process new entries
524-
const currentEventCount = session.events.length;
525-
if (lines.length > currentEventCount) {
526-
const newLines = lines.slice(currentEventCount);
546+
// Only process new entries (track by line count, not event count)
547+
const processedCount = session.processedLineCount ?? 0;
548+
if (lines.length > processedCount) {
549+
const newLines = lines.slice(processedCount);
527550
for (const line of newLines) {
528551
try {
529552
const entry = JSON.parse(line);
530553
const ts = entry.timestamp
531554
? new Date(entry.timestamp).getTime()
532555
: Date.now();
533556

534-
const event: SessionEvent = {
557+
// Create acp_message for raw log entry
558+
const acpEvent: SessionEvent = {
535559
type: "acp_message",
536560
direction: entry.direction ?? "agent",
537561
ts,
538562
message: entry.notification,
539563
};
540-
541-
get()._handleEvent(taskRunId, event);
564+
get()._handleEvent(taskRunId, acpEvent);
565+
566+
// Also create session_update event for session/update notifications
567+
if (
568+
entry.type === "notification" &&
569+
entry.notification?.method === "session/update" &&
570+
entry.notification?.params
571+
) {
572+
const sessionUpdateEvent: SessionEvent = {
573+
type: "session_update",
574+
ts,
575+
notification: entry.notification.params as SessionNotification,
576+
};
577+
get()._handleEvent(taskRunId, sessionUpdateEvent);
578+
}
542579
} catch {
543580
// Skip invalid JSON
544581
}
545582
}
583+
584+
// Update processed line count
585+
set((state) => ({
586+
sessions: {
587+
...state.sessions,
588+
[taskRunId]: {
589+
...state.sessions[taskRunId],
590+
processedLineCount: lines.length,
591+
},
592+
},
593+
}));
546594
}
547595
} catch (err) {
548596
log.warn("Cloud polling error", { error: err });

packages/agent/src/agent.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -560,7 +560,10 @@ Generated by PostHog Agent`;
560560

561561
// Start session in SessionStore (updates task run status to in_progress)
562562
const taskRun = await this.sessionStore.start(taskRunId, taskId, taskRunId);
563-
this.logger.debug("Session started", { taskRunId, logUrl: taskRun?.log_url });
563+
this.logger.debug("Session started", {
564+
taskRunId,
565+
logUrl: taskRun?.log_url,
566+
});
564567

565568
// Create internal ACP connection with S3 persistence
566569
const acpConnection = createAcpConnection({
@@ -650,7 +653,8 @@ Generated by PostHog Agent`;
650653
// Look for user_message notifications
651654
if (
652655
entry.notification?.method === "sessionUpdate" &&
653-
(entry.notification?.params as any)?.sessionUpdate === "user_message"
656+
(entry.notification?.params as any)?.sessionUpdate ===
657+
"user_message"
654658
) {
655659
const content = (entry.notification?.params as any)?.content;
656660
if (content) {
@@ -690,7 +694,8 @@ Generated by PostHog Agent`;
690694

691695
await this.sessionStore.complete(taskRunId);
692696
} catch (error) {
693-
const errorMessage = error instanceof Error ? error.message : String(error);
697+
const errorMessage =
698+
error instanceof Error ? error.message : String(error);
694699
this.logger.error("Cloud task execution failed", {
695700
taskId: task.id,
696701
error: errorMessage,

0 commit comments

Comments
 (0)