Skip to content

Commit 253b61a

Browse files
committed
Refactor message compaction and legacy migration handling
- Removed unnecessary logging statements in `compaction.go` to streamline message processing and improve clarity. - Updated `MigrateLegacySessionToAGUI` to handle legacy messages in JSONL format, enhancing compatibility with existing data. - Improved error handling and logging during legacy message migration, ensuring better traceability and user feedback. These changes enhance the efficiency of message compaction and improve the robustness of legacy data handling.
1 parent 4b92692 commit 253b61a

File tree

2 files changed

+24
-81
lines changed

2 files changed

+24
-81
lines changed

components/backend/websocket/compaction.go

Lines changed: 3 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -57,16 +57,12 @@ func (c *MessageCompactor) HandleEvent(event map[string]interface{}) {
5757
c.handleMessagesSnapshot(event)
5858
case types.EventTypeRunStarted, types.EventTypeRunFinished, types.EventTypeRunError:
5959
// Lifecycle events - skip, don't affect message compaction
60-
log.Printf("Compaction: Skipping lifecycle event %s", eventType)
6160
case types.EventTypeStepStarted, types.EventTypeStepFinished:
6261
// Step events - skip, don't affect message compaction
63-
log.Printf("Compaction: Skipping step event %s", eventType)
6462
case types.EventTypeStateSnapshot, types.EventTypStateDelta:
6563
// State events - skip, don't affect message compaction
66-
log.Printf("Compaction: Skipping state event %s", eventType)
6764
case types.EventTypeActivitySnapshot, types.EventTypeActivityDelta:
6865
// Activity events - skip, don't affect message compaction
69-
log.Printf("Compaction: Skipping activity event %s", eventType)
7066
default:
7167
log.Printf("Compaction: WARNING - Unhandled event type: %s", eventType)
7268
}
@@ -87,9 +83,6 @@ func (c *MessageCompactor) GetMessages() []types.Message {
8783
// If we included "running" status tools here, they would duplicate when
8884
// the active run's TOOL_CALL_END events are replayed.
8985
if len(c.activeToolCalls) > 0 {
90-
log.Printf("Compaction: WARNING - %d tool calls never completed (missing TOOL_CALL_END)", len(c.activeToolCalls))
91-
log.Printf("Compaction: This indicates events from an active/incomplete run were included in compaction")
92-
log.Printf("Compaction: Discarding incomplete tool calls to prevent duplicates")
9386
// Clear activeToolCalls - don't include them in snapshot
9487
c.activeToolCalls = make(map[string]*ActiveToolCall)
9588
}
@@ -99,17 +92,12 @@ func (c *MessageCompactor) GetMessages() []types.Message {
9992
hiddenCount := 0
10093
for _, msg := range c.messages {
10194
if c.hiddenMessages[msg.ID] {
102-
log.Printf("Compaction: Filtering hidden message %s from snapshot", msg.ID[:8])
10395
hiddenCount++
10496
continue
10597
}
10698
visibleMessages = append(visibleMessages, msg)
10799
}
108100

109-
if hiddenCount > 0 {
110-
log.Printf("Compaction: Filtered %d hidden messages, returning %d visible", hiddenCount, len(visibleMessages))
111-
}
112-
113101
return visibleMessages
114102
}
115103

@@ -118,7 +106,6 @@ func (c *MessageCompactor) GetMessages() []types.Message {
118106
func (c *MessageCompactor) handleTextMessageStart(event map[string]interface{}) {
119107
// Flush previous message if any
120108
if c.currentMessage != nil {
121-
log.Printf("Compaction: Flushing previous message (id=%s, content=%d chars)", c.currentMessage.ID, len(c.currentMessage.Content))
122109
c.messages = append(c.messages, *c.currentMessage)
123110
}
124111

@@ -137,34 +124,26 @@ func (c *MessageCompactor) handleTextMessageStart(event map[string]interface{})
137124
Role: role,
138125
Content: "",
139126
}
140-
log.Printf("Compaction: TEXT_MESSAGE_START role=%s, messageId=%s", role, messageID)
141127
}
142128

143129
func (c *MessageCompactor) handleTextMessageContent(event map[string]interface{}) {
144130
if c.currentMessage == nil {
145-
log.Printf("Compaction: WARNING - TEXT_MESSAGE_CONTENT without START")
146131
return
147132
}
148133

149134
delta, _ := event["delta"].(string)
150135
c.currentMessage.Content += delta
151-
log.Printf("Compaction: TEXT_MESSAGE_CONTENT delta=%d chars, total=%d chars", len(delta), len(c.currentMessage.Content))
152136
}
153137

154138
func (c *MessageCompactor) handleTextMessageEnd(event map[string]interface{}) {
155139
if c.currentMessage != nil {
156140
// User messages never have tool calls - flush immediately
141+
// Assistant messages might have tool calls - keep open
142+
// We'll flush when a new TEXT_MESSAGE_START arrives or at the end of compaction
157143
if c.currentMessage.Role == types.RoleUser {
158-
log.Printf("Compaction: TEXT_MESSAGE_END (user) id=%s, flushing immediately", c.currentMessage.ID)
159144
c.messages = append(c.messages, *c.currentMessage)
160145
c.currentMessage = nil
161-
} else {
162-
// Assistant messages might have tool calls - keep open
163-
log.Printf("Compaction: TEXT_MESSAGE_END id=%s, content=%d chars (keeping message open for tool calls)", c.currentMessage.ID, len(c.currentMessage.Content))
164-
// We'll flush when a new TEXT_MESSAGE_START arrives or at the end of compaction
165146
}
166-
} else {
167-
log.Printf("Compaction: WARNING - TEXT_MESSAGE_END without current message")
168147
}
169148
}
170149

@@ -196,13 +175,6 @@ func (c *MessageCompactor) handleToolCallStart(event map[string]interface{}) {
196175
ParentToolUseID: parentToolUseID,
197176
Status: "running",
198177
}
199-
if parentToolUseID != "" {
200-
log.Printf("Compaction: Started tool %s (%s) with parent %s", toolName, toolID[:8], parentToolUseID[:8])
201-
} else {
202-
log.Printf("Compaction: Started tool %s (%s)", toolName, toolID[:8])
203-
}
204-
} else {
205-
log.Printf("Compaction: WARNING - TOOL_CALL_START without toolCallId")
206178
}
207179
}
208180

@@ -215,15 +187,11 @@ func (c *MessageCompactor) handleToolCallArgs(event map[string]interface{}) {
215187
delta, _ := event["delta"].(string)
216188

217189
if toolID == "" {
218-
log.Printf("Compaction: WARNING - TOOL_CALL_ARGS without toolCallId")
219190
return
220191
}
221192

222193
if active, ok := c.activeToolCalls[toolID]; ok {
223194
active.Args += delta
224-
log.Printf("Compaction: Accumulated args for %s: %d chars", active.Name, len(active.Args))
225-
} else {
226-
log.Printf("Compaction: WARNING - TOOL_CALL_ARGS for unknown tool %s", toolID[:8])
227195
}
228196
}
229197

@@ -237,13 +205,11 @@ func (c *MessageCompactor) handleToolCallEnd(event map[string]interface{}) {
237205
errorStr, _ := event["error"].(string)
238206

239207
if toolID == "" {
240-
log.Printf("Compaction: WARNING - TOOL_CALL_END without toolCallId")
241208
return
242209
}
243210

244211
active, ok := c.activeToolCalls[toolID]
245212
if !ok {
246-
log.Printf("Compaction: WARNING - TOOL_CALL_END for unknown tool %s", toolID[:8])
247213
return
248214
}
249215

@@ -262,9 +228,6 @@ func (c *MessageCompactor) handleToolCallEnd(event map[string]interface{}) {
262228
tc.Status = "error"
263229
}
264230

265-
log.Printf("Compaction: Completed tool %s (%s), args=%d chars, result=%d chars",
266-
tc.Name, tc.ID[:8], len(tc.Args), len(tc.Result))
267-
268231
// Add to message
269232
// Check if we need to create a new message or add to current
270233
if c.currentMessage != nil && c.currentMessage.Role == types.RoleAssistant {
@@ -299,7 +262,6 @@ func (c *MessageCompactor) handleRawEvent(event map[string]interface{}) {
299262
if hidden, _ := data["hidden"].(bool); hidden {
300263
if messageID, ok := data["messageId"].(string); ok {
301264
c.hiddenMessages[messageID] = true
302-
log.Printf("Compaction: Marking message %s as hidden", messageID[:8])
303265
}
304266
}
305267
return
@@ -404,48 +366,25 @@ func (c *MessageCompactor) handleMessagesSnapshot(event map[string]interface{})
404366
c.messages = append(c.messages, msg)
405367
}
406368

407-
log.Printf("Compaction: Received MESSAGES_SNAPSHOT with %d messages from runner", len(c.messages))
408369
}
409370

410371
// CompactEvents is the main entry point for event compaction
411372
func CompactEvents(events []map[string]interface{}) []types.Message {
412-
log.Printf("Compaction: Starting compaction of %d events", len(events))
413373

414374
// Count event types to help debug
415375
eventTypeCounts := make(map[string]int)
416376
for _, event := range events {
417377
eventType, _ := event["type"].(string)
418378
eventTypeCounts[eventType]++
419379
}
420-
log.Printf("Compaction: Event type breakdown: %v", eventTypeCounts)
421380

422381
compactor := NewMessageCompactor()
423382

424-
for i, event := range events {
425-
eventType, _ := event["type"].(string)
426-
if i < 10 || i >= len(events)-10 {
427-
// Log first and last 10 events in detail
428-
log.Printf("Compaction: [%d/%d] Processing event type=%s", i+1, len(events), eventType)
429-
}
383+
for _, event := range events {
430384
compactor.HandleEvent(event)
431385
}
432386

433387
messages := compactor.GetMessages()
434-
log.Printf("Compaction: Finished compaction - produced %d messages from %d events", len(messages), len(events))
435-
436-
// Log summary of messages
437-
for i, msg := range messages {
438-
toolCallCount := len(msg.ToolCalls)
439-
log.Printf("Compaction: Message[%d]: role=%s, id=%s, content=%d chars, toolCalls=%d",
440-
i, msg.Role, msg.ID, len(msg.Content), toolCallCount)
441-
}
442-
443-
if len(messages) == 0 && len(events) > 0 {
444-
log.Printf("Compaction: WARNING - 0 messages produced! This indicates:")
445-
log.Printf("Compaction: - No TEXT_MESSAGE_START events found, OR")
446-
log.Printf("Compaction: - No MESSAGES_SNAPSHOT events found, OR")
447-
log.Printf("Compaction: - Only lifecycle events (which are skipped)")
448-
}
449388

450389
return messages
451390
}

components/backend/websocket/legacy_translator.go

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ import (
1313
// MigrateLegacySessionToAGUI converts old message format to AG-UI events
1414
// Creates a MESSAGES_SNAPSHOT from legacy messages and persists it
1515
func MigrateLegacySessionToAGUI(sessionID string) error {
16-
// Check if session has legacy messages
17-
legacyPath := StateBaseDir + "/sessions/" + sessionID + "/messages.json"
16+
// Check if session has legacy messages (JSONL format)
17+
legacyPath := StateBaseDir + "/sessions/" + sessionID + "/messages.jsonl"
1818
data, err := os.ReadFile(legacyPath)
1919
if err != nil {
2020
if os.IsNotExist(err) {
@@ -24,25 +24,28 @@ func MigrateLegacySessionToAGUI(sessionID string) error {
2424
return err
2525
}
2626

27-
log.Printf("LegacyMigration: Found legacy messages.json for %s, converting to AG-UI", sessionID)
27+
log.Printf("LegacyMigration: Found legacy messages.jsonl for %s, converting to AG-UI", sessionID)
2828

29-
var legacyData struct {
30-
Messages []map[string]interface{} `json:"messages"`
31-
SessionID string `json:"sessionId"`
32-
}
33-
34-
if err := json.Unmarshal(data, &legacyData); err != nil {
35-
log.Printf("LegacyMigration: Failed to parse legacy messages: %v", err)
36-
return err
29+
// Parse JSONL - each line is a complete message
30+
var legacyMessages []map[string]interface{}
31+
lines := splitLines(data)
32+
for _, line := range lines {
33+
if len(line) == 0 {
34+
continue
35+
}
36+
var msg map[string]interface{}
37+
if err := json.Unmarshal(line, &msg); err == nil {
38+
legacyMessages = append(legacyMessages, msg)
39+
}
3740
}
3841

3942
// Convert to AG-UI Message format
4043
messages := make([]types.Message, 0)
41-
42-
for _, legacyMsg := range legacyData.Messages {
44+
45+
for _, legacyMsg := range legacyMessages {
4346
msgType, _ := legacyMsg["type"].(string)
4447
payload, _ := legacyMsg["payload"].(map[string]interface{})
45-
48+
4649
switch msgType {
4750
case "user_message":
4851
content, _ := payload["content"].(string)
@@ -66,8 +69,8 @@ func MigrateLegacySessionToAGUI(sessionID string) error {
6669
}
6770
}
6871
// Tool calls will be reconstructed from tool_result pairs
69-
70-
// system.message, agent.running, agent.waiting are not chat messages, skip
72+
73+
// system.message, agent.running, agent.waiting are not chat messages, skip
7174
}
7275
}
7376

@@ -96,6 +99,8 @@ func MigrateLegacySessionToAGUI(sessionID string) error {
9699
migratedPath := legacyPath + ".migrated"
97100
if err := os.Rename(legacyPath, migratedPath); err != nil {
98101
log.Printf("LegacyMigration: Warning - failed to rename legacy file: %v", err)
102+
} else {
103+
log.Printf("LegacyMigration: Renamed %s to %s", legacyPath, migratedPath)
99104
}
100105

101106
return nil
@@ -107,4 +112,3 @@ func generateEventID() string {
107112
rand.Read(b)
108113
return hex.EncodeToString(b)
109114
}
110-

0 commit comments

Comments
 (0)