Skip to content

Commit 869aa26

Browse files
committed
fix user message rendering
1 parent 560c34c commit 869aa26

File tree

7 files changed

+111
-52
lines changed

7 files changed

+111
-52
lines changed

apps/array/src/api/fetcher.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ export const buildApiFetcher: (config: {
1010
): Promise<Response> => {
1111
const headers = new Headers();
1212
headers.set("Authorization", `Bearer ${token}`);
13+
headers.set("Content-Type", "application/json");
1314

1415
if (input.urlSearchParams) {
1516
input.url.search = input.urlSearchParams.toString();
@@ -21,10 +22,6 @@ export const buildApiFetcher: (config: {
2122
? JSON.stringify(input.parameters?.body)
2223
: undefined;
2324

24-
if (body) {
25-
headers.set("Content-Type", "application/json");
26-
}
27-
2825
if (input.parameters?.header) {
2926
for (const [key, value] of Object.entries(input.parameters.header)) {
3027
if (value != null) {

apps/array/src/api/posthogClient.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import type { AgentEvent } from "@posthog/agent";
2+
import type { StoredLogEntry } from "@features/sessions/utils/parseSessionLogs";
23
import { logger } from "@renderer/lib/logger";
34
import type { Task, TaskRun } from "@shared/types";
45
import { buildApiFetcher } from "./fetcher";
@@ -236,7 +237,7 @@ export class PostHogAPIClient {
236237
async appendTaskRunLog(
237238
taskId: string,
238239
runId: string,
239-
entries: AgentEvent[],
240+
entries: StoredLogEntry[],
240241
): Promise<void> {
241242
const teamId = await this.getTeamId();
242243
const url = `${this.api.baseUrl}/api/projects/${teamId}/tasks/${taskId}/runs/${runId}/append_log/`;
@@ -246,7 +247,6 @@ export class PostHogAPIClient {
246247
path: url,
247248
overrides: {
248249
body: JSON.stringify({ entries }),
249-
headers: { "Content-Type": "application/json" },
250250
},
251251
});
252252
if (!response.ok) {

apps/array/src/renderer/features/sessions/components/ConsoleMessage.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { Bug, Info, Warning, XCircle } from "@phosphor-icons/react";
2-
import { Badge, Box, Flex, Text } from "@radix-ui/themes";
2+
import { Badge, Flex, Text } from "@radix-ui/themes";
33

44
interface ConsoleMessageProps {
55
level: "info" | "debug" | "warn" | "error";

apps/array/src/renderer/features/sessions/components/SessionView.tsx

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,9 @@ function parseSessionNotification(
106106
notification: SessionNotification,
107107
): ParseResult {
108108
const { update } = notification;
109+
if (!update?.sessionUpdate) {
110+
return null;
111+
}
109112

110113
switch (update.sessionUpdate) {
111114
case "user_message_chunk":
@@ -225,7 +228,7 @@ class MessageBuilder {
225228
if (toolData.kind) existing.toolData.kind = toolData.kind;
226229
}
227230

228-
addConsole(consoleData: ConsoleData, ts: number, eventIndex: number): void {
231+
addConsole(consoleData: ConsoleData, _ts: number, eventIndex: number): void {
229232
this.flushAgentText();
230233
this.messages.push({
231234
id: `console-${eventIndex}`,

apps/array/src/renderer/features/sessions/components/TurnCollapsible.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ interface ToolData {
1515

1616
interface ParsedMessage {
1717
id: string;
18-
type: "user" | "agent" | "tool";
18+
type: "user" | "agent" | "tool" | "console";
1919
content: string;
2020
toolData?: ToolData;
2121
}

apps/array/src/renderer/features/sessions/stores/sessionStore.ts

Lines changed: 94 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ function convertRawEntriesToEvents(
4141
}
4242

4343
for (const entry of rawEntries) {
44-
const ts = entry.timestamp ? new Date(entry.timestamp).getTime() : Date.now();
44+
const ts = entry.timestamp
45+
? new Date(entry.timestamp).getTime()
46+
: Date.now();
4547

4648
events.push({
4749
type: "acp_message",
@@ -92,6 +94,7 @@ export interface AgentSession {
9294
isPromptPending: boolean;
9395
isCloud: boolean;
9496
logUrl?: string;
97+
processedLineCount?: number;
9598
}
9699

97100
interface ConnectParams {
@@ -216,6 +219,7 @@ export const useSessionStore = create<SessionStore>((set, get) => ({
216219
isPromptPending: false,
217220
isCloud: true,
218221
logUrl: latestRunLogUrl,
222+
processedLineCount: rawEntries.length,
219223
},
220224
},
221225
}));
@@ -393,7 +397,62 @@ export const useSessionStore = create<SessionStore>((set, get) => ({
393397
throw new Error("No active session for task");
394398
}
395399

396-
// Set pending state
400+
const blocks: ContentBlock[] =
401+
typeof prompt === "string" ? [{ type: "text", text: prompt }] : prompt;
402+
403+
if (session.isCloud) {
404+
// Cloud: send via S3 log - cloud runner polls and picks up
405+
// No pending state needed since we just append to log
406+
const authState = useAuthStore.getState();
407+
const { client } = authState;
408+
if (!client) {
409+
throw new Error("API client not available");
410+
}
411+
412+
const notification: StoredLogEntry = {
413+
type: "notification" as const,
414+
timestamp: new Date().toISOString(),
415+
direction: "client" as const,
416+
notification: {
417+
method: "session/update" as const,
418+
params: {
419+
update: {
420+
sessionUpdate: "user_message_chunk",
421+
content: blocks[0],
422+
},
423+
},
424+
},
425+
};
426+
427+
await client.appendTaskRunLog(taskId, session.taskRunId, [notification]);
428+
log.info("Sent cloud message via S3", {
429+
taskId,
430+
runId: session.taskRunId,
431+
});
432+
433+
// Optimistically add user message to local state immediately
434+
const ts = Date.now();
435+
const userEvent: SessionEvent = {
436+
type: "session_update",
437+
ts,
438+
notification: notification.notification?.params as SessionNotification,
439+
};
440+
set((state) => ({
441+
sessions: {
442+
...state.sessions,
443+
[session.taskRunId]: {
444+
...state.sessions[session.taskRunId],
445+
events: [...state.sessions[session.taskRunId].events, userEvent],
446+
processedLineCount:
447+
(state.sessions[session.taskRunId].processedLineCount ?? 0) + 1,
448+
},
449+
},
450+
}));
451+
452+
return { stopReason: "pending" };
453+
}
454+
455+
// Local: set pending state and send via IPC
397456
set((state) => ({
398457
sessions: {
399458
...state.sessions,
@@ -405,39 +464,8 @@ export const useSessionStore = create<SessionStore>((set, get) => ({
405464
}));
406465

407466
try {
408-
// Convert text to ContentBlock array if needed
409-
const blocks: ContentBlock[] =
410-
typeof prompt === "string" ? [{ type: "text", text: prompt }] : prompt;
411-
412-
if (session.isCloud) {
413-
// Cloud: send via S3 log - cloud runner polls and picks up
414-
const authState = useAuthStore.getState();
415-
const { client } = authState;
416-
if (!client) {
417-
throw new Error("API client not available");
418-
}
419-
420-
const notification = {
421-
type: "notification" as const,
422-
timestamp: new Date().toISOString(),
423-
notification: {
424-
method: "sessionUpdate",
425-
params: {
426-
sessionUpdate: "user_message",
427-
content: blocks,
428-
},
429-
},
430-
};
431-
432-
await client.appendTaskRunLog(taskId, session.taskRunId, [notification]);
433-
log.info("Sent cloud message via S3", { taskId, runId: session.taskRunId });
434-
return { stopReason: "pending" };
435-
}
436-
437-
// Local: send via IPC directly to running agent
438467
return await window.electronAPI.agentPrompt(session.taskRunId, blocks);
439468
} finally {
440-
// Clear pending state
441469
set((state) => ({
442470
sessions: {
443471
...state.sessions,
@@ -520,29 +548,55 @@ export const useSessionStore = create<SessionStore>((set, get) => ({
520548
const text = await response.text();
521549
const lines = text.trim().split("\n").filter(Boolean);
522550

523-
// Only process new entries
524-
const currentEventCount = session.events.length;
525-
if (lines.length > currentEventCount) {
526-
const newLines = lines.slice(currentEventCount);
551+
// Only process new entries (track by line count, not event count)
552+
const processedCount = session.processedLineCount ?? 0;
553+
if (lines.length > processedCount) {
554+
const newLines = lines.slice(processedCount);
527555
for (const line of newLines) {
528556
try {
529557
const entry = JSON.parse(line);
530558
const ts = entry.timestamp
531559
? new Date(entry.timestamp).getTime()
532560
: Date.now();
533561

534-
const event: SessionEvent = {
562+
// Create acp_message for raw log entry
563+
const acpEvent: SessionEvent = {
535564
type: "acp_message",
536565
direction: entry.direction ?? "agent",
537566
ts,
538567
message: entry.notification,
539568
};
540-
541-
get()._handleEvent(taskRunId, event);
569+
get()._handleEvent(taskRunId, acpEvent);
570+
571+
// Also create session_update event for session/update notifications
572+
if (
573+
entry.type === "notification" &&
574+
entry.notification?.method === "session/update" &&
575+
entry.notification?.params
576+
) {
577+
const sessionUpdateEvent: SessionEvent = {
578+
type: "session_update",
579+
ts,
580+
notification: entry.notification
581+
.params as SessionNotification,
582+
};
583+
get()._handleEvent(taskRunId, sessionUpdateEvent);
584+
}
542585
} catch {
543586
// Skip invalid JSON
544587
}
545588
}
589+
590+
// Update processed line count
591+
set((state) => ({
592+
sessions: {
593+
...state.sessions,
594+
[taskRunId]: {
595+
...state.sessions[taskRunId],
596+
processedLineCount: lines.length,
597+
},
598+
},
599+
}));
546600
}
547601
} catch (err) {
548602
log.warn("Cloud polling error", { error: err });

packages/agent/src/agent.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -560,7 +560,10 @@ Generated by PostHog Agent`;
560560

561561
// Start session in SessionStore (updates task run status to in_progress)
562562
const taskRun = await this.sessionStore.start(taskRunId, taskId, taskRunId);
563-
this.logger.debug("Session started", { taskRunId, logUrl: taskRun?.log_url });
563+
this.logger.debug("Session started", {
564+
taskRunId,
565+
logUrl: taskRun?.log_url,
566+
});
564567

565568
// Create internal ACP connection with S3 persistence
566569
const acpConnection = createAcpConnection({
@@ -650,7 +653,8 @@ Generated by PostHog Agent`;
650653
// Look for user_message notifications
651654
if (
652655
entry.notification?.method === "sessionUpdate" &&
653-
(entry.notification?.params as any)?.sessionUpdate === "user_message"
656+
(entry.notification?.params as any)?.sessionUpdate ===
657+
"user_message"
654658
) {
655659
const content = (entry.notification?.params as any)?.content;
656660
if (content) {
@@ -690,7 +694,8 @@ Generated by PostHog Agent`;
690694

691695
await this.sessionStore.complete(taskRunId);
692696
} catch (error) {
693-
const errorMessage = error instanceof Error ? error.message : String(error);
697+
const errorMessage =
698+
error instanceof Error ? error.message : String(error);
694699
this.logger.error("Cloud task execution failed", {
695700
taskId: task.id,
696701
error: errorMessage,

0 commit comments

Comments
 (0)