Skip to content

Commit 7d2ada3

Browse files
authored
🤖 Fix assistant message part timestamps (#314)
## Problem Assistant message timestamps were inconsistent across reloads: - **Before reload**: All parts showed the timestamp of the first message - **After reload**: All parts showed the timestamp of the last message (or Date.now()) This happened because: 1. Text/reasoning parts weren't getting timestamps during streaming (unlike tool parts) 2. Display logic used message-level timestamp instead of part-level timestamps 3. Reconnection case was overwriting metadata.timestamp with Date.now() ## Solution - Add `timestamp` field to text/reasoning parts during streaming (matching tool parts) - Preserve first timestamp when merging adjacent parts of the same type - Use `part.timestamp ?? baseTimestamp` when displaying (consistent with tool parts) - Preserve `metadata.timestamp` in reconnection case instead of overwriting ## Testing Added comprehensive tests covering: - Timestamp assignment during streaming - Timestamp preservation when displaying - Fallback to message-level timestamp for old history without part-level timestamps All existing tests pass. _Generated with `cmux`_
1 parent d90a90b commit 7d2ada3

File tree

3 files changed

+158
-5
lines changed

3 files changed

+158
-5
lines changed

src/types/stream.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ export interface StreamEndEvent {
4040
providerMetadata?: Record<string, unknown>;
4141
duration?: number;
4242
systemMessageTokens?: number;
43+
historySequence?: number; // Present when loading from history
44+
timestamp?: number; // Present when loading from history
4345
};
4446
// Parts array preserves temporal ordering of reasoning, text, and tool calls
4547
parts: CompletedMessagePart[];

src/utils/messages/StreamingMessageAggregator.test.ts

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,3 +543,150 @@ it("should clear TODOs on reconnection stream-end", () => {
543543
// Verify TODOs were cleared on stream-end (even in reconnection case)
544544
expect(aggregator.getCurrentTodos()).toHaveLength(0);
545545
});
546+
547+
describe("Part-level timestamps", () => {
548+
it("should assign timestamps to text/reasoning parts during streaming", () => {
549+
const aggregator = new StreamingMessageAggregator();
550+
const startTime = Date.now();
551+
552+
// Start a stream
553+
aggregator.handleStreamStart({
554+
type: "stream-start",
555+
workspaceId: "test-ws",
556+
messageId: "msg-1",
557+
model: "claude-3",
558+
historySequence: 1,
559+
});
560+
561+
// Add text deltas
562+
aggregator.handleStreamDelta({
563+
type: "stream-delta",
564+
workspaceId: "test-ws",
565+
messageId: "msg-1",
566+
delta: "First part ",
567+
tokens: 2,
568+
timestamp: startTime,
569+
});
570+
571+
aggregator.handleStreamDelta({
572+
type: "stream-delta",
573+
workspaceId: "test-ws",
574+
messageId: "msg-1",
575+
delta: "second part",
576+
tokens: 2,
577+
timestamp: startTime + 100,
578+
});
579+
580+
// Add reasoning delta
581+
aggregator.handleReasoningDelta({
582+
type: "reasoning-delta",
583+
workspaceId: "test-ws",
584+
messageId: "msg-1",
585+
delta: "thinking...",
586+
tokens: 1,
587+
timestamp: startTime + 200,
588+
});
589+
590+
// End stream
591+
aggregator.handleStreamEnd({
592+
type: "stream-end",
593+
workspaceId: "test-ws",
594+
messageId: "msg-1",
595+
metadata: {
596+
model: "claude-3",
597+
historySequence: 1,
598+
},
599+
parts: [],
600+
});
601+
602+
// Check that parts have timestamps
603+
const messages = aggregator.getAllMessages();
604+
expect(messages).toHaveLength(1);
605+
const msg = messages[0];
606+
607+
// Text parts should have timestamps
608+
const textParts = msg.parts.filter((p) => p.type === "text");
609+
expect(textParts.length).toBeGreaterThan(0);
610+
for (const part of textParts) {
611+
if (part.type === "text") {
612+
expect(part.timestamp).toBeNumber();
613+
}
614+
}
615+
616+
// Reasoning parts should have timestamps
617+
const reasoningParts = msg.parts.filter((p) => p.type === "reasoning");
618+
expect(reasoningParts.length).toBeGreaterThan(0);
619+
for (const part of reasoningParts) {
620+
if (part.type === "reasoning") {
621+
expect(part.timestamp).toBeNumber();
622+
}
623+
}
624+
});
625+
626+
it("should preserve individual part timestamps when displaying", () => {
627+
const aggregator = new StreamingMessageAggregator();
628+
const startTime = 1000;
629+
630+
// Simulate stream-end with pre-timestamped parts
631+
aggregator.handleStreamEnd({
632+
type: "stream-end",
633+
workspaceId: "test-ws",
634+
messageId: "msg-1",
635+
metadata: {
636+
model: "claude-3",
637+
historySequence: 1,
638+
timestamp: startTime, // Message-level timestamp
639+
},
640+
parts: [
641+
{ type: "text", text: "First", timestamp: startTime },
642+
{ type: "text", text: " second", timestamp: startTime + 100 },
643+
{ type: "reasoning", text: "thinking", timestamp: startTime + 200 },
644+
],
645+
});
646+
647+
// Get displayed messages
648+
const displayed = aggregator.getDisplayedMessages();
649+
650+
// Should have merged text parts into one display message and one reasoning message
651+
const assistantMsgs = displayed.filter((m) => m.type === "assistant");
652+
const reasoningMsgs = displayed.filter((m) => m.type === "reasoning");
653+
654+
expect(assistantMsgs).toHaveLength(1);
655+
expect(reasoningMsgs).toHaveLength(1);
656+
657+
// Assistant message should use the timestamp of the first text part
658+
expect(assistantMsgs[0].timestamp).toBe(startTime);
659+
660+
// Reasoning message should use its part's timestamp
661+
expect(reasoningMsgs[0].timestamp).toBe(startTime + 200);
662+
});
663+
664+
it("should use message-level timestamp as fallback when parts don't have timestamps", () => {
665+
const aggregator = new StreamingMessageAggregator();
666+
const messageTimestamp = 5000;
667+
668+
// Load a message without part-level timestamps (e.g., from old history)
669+
aggregator.handleStreamEnd({
670+
type: "stream-end",
671+
workspaceId: "test-ws",
672+
messageId: "msg-1",
673+
metadata: {
674+
model: "claude-3",
675+
historySequence: 1,
676+
timestamp: messageTimestamp,
677+
},
678+
parts: [
679+
{ type: "text", text: "No timestamp" },
680+
{ type: "reasoning", text: "thinking" },
681+
],
682+
});
683+
684+
const displayed = aggregator.getDisplayedMessages();
685+
const assistantMsgs = displayed.filter((m) => m.type === "assistant");
686+
const reasoningMsgs = displayed.filter((m) => m.type === "reasoning");
687+
688+
// Both should fall back to message-level timestamp
689+
expect(assistantMsgs[0].timestamp).toBe(messageTimestamp);
690+
expect(reasoningMsgs[0].timestamp).toBe(messageTimestamp);
691+
});
692+
});

src/utils/messages/StreamingMessageAggregator.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,7 @@ export class StreamingMessageAggregator {
256256
message.parts.push({
257257
type: "text",
258258
text: data.delta,
259+
timestamp: data.timestamp,
259260
});
260261

261262
// Track delta for token counting and TPS calculation
@@ -315,7 +316,7 @@ export class StreamingMessageAggregator {
315316
role: "assistant",
316317
metadata: {
317318
...data.metadata,
318-
timestamp: Date.now(),
319+
timestamp: data.metadata.timestamp ?? Date.now(),
319320
},
320321
parts: data.parts,
321322
};
@@ -452,6 +453,7 @@ export class StreamingMessageAggregator {
452453
message.parts.push({
453454
type: "reasoning",
454455
text: data.delta,
456+
timestamp: data.timestamp,
455457
});
456458

457459
// Track delta for token counting and TPS calculation
@@ -575,16 +577,18 @@ export class StreamingMessageAggregator {
575577

576578
// Try to merge with last part if same type
577579
if (lastMerged?.type === "text" && part.type === "text") {
578-
// Merge text parts
580+
// Merge text parts, preserving the first timestamp
579581
mergedParts[mergedParts.length - 1] = {
580582
type: "text",
581583
text: lastMerged.text + part.text,
584+
timestamp: lastMerged.timestamp ?? part.timestamp,
582585
};
583586
} else if (lastMerged?.type === "reasoning" && part.type === "reasoning") {
584-
// Merge reasoning parts
587+
// Merge reasoning parts, preserving the first timestamp
585588
mergedParts[mergedParts.length - 1] = {
586589
type: "reasoning",
587590
text: lastMerged.text + part.text,
591+
timestamp: lastMerged.timestamp ?? part.timestamp,
588592
};
589593
} else {
590594
// Different type or tool part - add new part
@@ -624,7 +628,7 @@ export class StreamingMessageAggregator {
624628
isStreaming,
625629
isPartial: message.metadata?.partial ?? false,
626630
isLastPartOfMessage: isLastPart,
627-
timestamp: baseTimestamp,
631+
timestamp: part.timestamp ?? baseTimestamp,
628632
});
629633
} else if (part.type === "text" && part.text) {
630634
// Skip empty text parts
@@ -640,7 +644,7 @@ export class StreamingMessageAggregator {
640644
isLastPartOfMessage: isLastPart,
641645
isCompacted: message.metadata?.compacted ?? false,
642646
model: message.metadata?.model,
643-
timestamp: baseTimestamp,
647+
timestamp: part.timestamp ?? baseTimestamp,
644648
});
645649
} else if (isDynamicToolPart(part)) {
646650
const status =

0 commit comments

Comments
 (0)