diff --git a/.changeset/quiet-lamps-restore.md b/.changeset/quiet-lamps-restore.md new file mode 100644 index 000000000..d02c48d91 --- /dev/null +++ b/.changeset/quiet-lamps-restore.md @@ -0,0 +1,5 @@ +--- +'@openai/agents-openai': patch +--- + +fix: restore session history when responses compaction replacement fails diff --git a/packages/agents-openai/src/memory/openaiResponsesCompactionSession.ts b/packages/agents-openai/src/memory/openaiResponsesCompactionSession.ts index 7c83504b0..03dfdff46 100644 --- a/packages/agents-openai/src/memory/openaiResponsesCompactionSession.ts +++ b/packages/agents-openai/src/memory/openaiResponsesCompactionSession.ts @@ -204,10 +204,11 @@ export class OpenAIResponsesCompactionSession const compacted = await this.client.responses.compact(compactRequest); const outputItems = normalizeCompactionOutputItems(compacted.output ?? []); - await this.underlyingSession.clearSession(); - if (outputItems.length > 0) { - await this.underlyingSession.addItems(outputItems); - } + const previousItems = await this.getAllUnderlyingSessionItems(); + await this.replaceUnderlyingSessionItems({ + outputItems, + previousItems, + }); this.compactionCandidateItems = selectCompactionCandidateItems(outputItems); this.sessionItems = outputItems; @@ -287,6 +288,97 @@ export class OpenAIResponsesCompactionSession this.sessionItems = []; } + private async getAllUnderlyingSessionItems(): Promise { + return this.underlyingSession.getItems(); + } + + private async replaceUnderlyingSessionItems({ + outputItems, + previousItems, + }: { + outputItems: AgentInputItem[]; + previousItems: AgentInputItem[]; + }): Promise { + try { + await this.underlyingSession.clearSession(); + } catch (error) { + await this.restoreUnderlyingSessionItemsAfterFailedClear( + previousItems, + error, + ); + throw error; + } + + try { + if (outputItems.length > 0) { + await this.underlyingSession.addItems(outputItems); + } + } catch (error) { + await this.restoreUnderlyingSessionItems(previousItems, error); + throw error; + } + } + + private async restoreUnderlyingSessionItemsAfterFailedClear( + previousItems: AgentInputItem[], + error: unknown, + ): Promise { + let currentItems: AgentInputItem[]; + try { + currentItems = await this.getAllUnderlyingSessionItems(); + } catch (inspectionError) { + logger.warn( + 'Failed to inspect session history after compaction replacement clear failed.', + inspectionError, + ); + return; + } + + if (areAgentItemsEqual(currentItems, previousItems)) { + return; + } + + await this.restoreUnderlyingSessionItems(previousItems, error, { + popExistingItemCount: currentItems.length, + }); + } + + private async restoreUnderlyingSessionItems( + previousItems: AgentInputItem[], + error: unknown, + options: { + clearExistingItems?: boolean; + popExistingItemCount?: number; + } = {}, + ): Promise { + try { + if (options.popExistingItemCount !== undefined) { + for (let i = 0; i < options.popExistingItemCount; i += 1) { + const popped = await this.underlyingSession.popItem(); + if (!popped) { + break; + } + } + } else if (options.clearExistingItems !== false) { + await this.underlyingSession.clearSession(); + } + if (previousItems.length > 0) { + await this.underlyingSession.addItems(previousItems); + } + } catch (restoreError) { + logger.warn( + 'Failed to restore session history after compaction replacement failed.', + restoreError, + ); + return; + } + + logger.warn( + 'Restored previous session history after compaction replacement failed.', + error, + ); + } + private async ensureCompactionCandidates(): Promise<{ compactionCandidateItems: AgentInputItem[]; sessionItems: AgentInputItem[]; @@ -525,3 +617,10 @@ function isOpenAIConversationsSessionDelegate( ] === 'conversations' ); } + +function areAgentItemsEqual( + left: AgentInputItem[], + right: AgentInputItem[], +): boolean { + return JSON.stringify(left) === JSON.stringify(right); +} diff --git a/packages/agents-openai/test/openaiResponsesCompactionSession.test.ts b/packages/agents-openai/test/openaiResponsesCompactionSession.test.ts index ce2b2ddbb..c8495cd07 100644 --- a/packages/agents-openai/test/openaiResponsesCompactionSession.test.ts +++ b/packages/agents-openai/test/openaiResponsesCompactionSession.test.ts @@ -1,11 +1,89 @@ import { describe, expect, it, vi } from 'vitest'; -import { MemorySession } from '@openai/agents-core'; -import { UserError } from '@openai/agents-core'; +import { + MemorySession, + UserError, + type AgentInputItem, +} from '@openai/agents-core'; import { OpenAIResponsesCompactionSession } from '../src'; import { OPENAI_SESSION_API } from '../src/memory/openaiSessionApi'; +class PartiallyFailingReplacementSession extends MemorySession { + addCalls = 0; + clearCalls = 0; + + async addItems(items: AgentInputItem[]): Promise { + this.addCalls += 1; + if (this.addCalls === 1) { + await super.addItems(items.slice(0, 1)); + throw new Error('replacement failed'); + } + await super.addItems(items); + } + + async clearSession(): Promise { + this.clearCalls += 1; + await super.clearSession(); + } +} + +class FailingClearBeforeMutationSession extends MemorySession { + addCalls = 0; + clearCalls = 0; + + async addItems(items: AgentInputItem[]): Promise { + this.addCalls += 1; + await super.addItems(items); + } + + async clearSession(): Promise { + this.clearCalls += 1; + throw new Error('clear failed'); + } +} + +class FailingClearAfterMutationSession extends MemorySession { + addCalls = 0; + clearCalls = 0; + popCalls = 0; + + async addItems(items: AgentInputItem[]): Promise { + this.addCalls += 1; + await super.addItems(items); + } + + async popItem(): Promise { + this.popCalls += 1; + return super.popItem(); + } + + async clearSession(): Promise { + this.clearCalls += 1; + await super.clearSession(); + throw new Error('clear failed'); + } +} + +class FailingRestoreSession extends MemorySession { + addCalls = 0; + clearCalls = 0; + + async addItems(items: AgentInputItem[]): Promise { + this.addCalls += 1; + if (this.addCalls === 1) { + await super.addItems(items.slice(0, 1)); + throw new Error('replacement failed'); + } + throw new Error('restore failed'); + } + + async clearSession(): Promise { + this.clearCalls += 1; + await super.clearSession(); + } +} + describe('OpenAIResponsesCompactionSession', () => { it('rejects non-OpenAI model names', () => { expect(() => { @@ -449,6 +527,294 @@ describe('OpenAIResponsesCompactionSession', () => { ]); }); + it('restores history when replacement addItems fails', async () => { + const warn = vi.spyOn(console, 'warn').mockImplementation(() => {}); + const history = [ + { + type: 'message', + role: 'user', + content: [{ type: 'input_text', text: 'original' }], + }, + { + type: 'message', + role: 'assistant', + status: 'completed', + content: [{ type: 'output_text', text: 'private records' }], + }, + ] as AgentInputItem[]; + const underlyingSession = new PartiallyFailingReplacementSession({ + initialItems: history, + }); + const compact = vi.fn().mockResolvedValue({ + output: [ + { + type: 'message', + role: 'assistant', + status: 'completed', + content: [{ type: 'output_text', text: 'compacted one' }], + }, + { + type: 'message', + role: 'assistant', + status: 'completed', + content: [{ type: 'output_text', text: 'compacted two' }], + }, + ], + usage: { + input_tokens: 1, + output_tokens: 1, + total_tokens: 2, + }, + }); + const session = new OpenAIResponsesCompactionSession({ + client: { responses: { compact } } as any, + underlyingSession, + compactionMode: 'input', + }); + + try { + await expect( + session.runCompaction({ force: true, compactionMode: 'input' }), + ).rejects.toThrow('replacement failed'); + + expect(await underlyingSession.getItems()).toEqual(history); + expect(underlyingSession.clearCalls).toBe(2); + expect(underlyingSession.addCalls).toBe(2); + expect(warn).toHaveBeenCalledWith( + 'Restored previous session history after compaction replacement failed.', + expect.any(Error), + ); + } finally { + warn.mockRestore(); + } + }); + + it('does not restore when clearSession fails without mutation', async () => { + const history = [ + { + type: 'message', + role: 'user', + content: [{ type: 'input_text', text: 'original' }], + }, + ] as AgentInputItem[]; + const underlyingSession = new FailingClearBeforeMutationSession({ + initialItems: history, + }); + const compact = vi.fn().mockResolvedValue({ + output: [ + { + type: 'message', + role: 'assistant', + status: 'completed', + content: [{ type: 'output_text', text: 'compacted' }], + }, + ], + usage: { + input_tokens: 1, + output_tokens: 1, + total_tokens: 2, + }, + }); + const session = new OpenAIResponsesCompactionSession({ + client: { responses: { compact } } as any, + underlyingSession, + compactionMode: 'input', + }); + + await expect( + session.runCompaction({ force: true, compactionMode: 'input' }), + ).rejects.toThrow('clear failed'); + + expect(await underlyingSession.getItems()).toEqual(history); + expect(underlyingSession.clearCalls).toBe(1); + expect(underlyingSession.addCalls).toBe(0); + }); + + it('restores history when clearSession fails after mutation', async () => { + const warn = vi.spyOn(console, 'warn').mockImplementation(() => {}); + const history = [ + { + type: 'message', + role: 'user', + content: [{ type: 'input_text', text: 'original' }], + }, + ] as AgentInputItem[]; + const underlyingSession = new FailingClearAfterMutationSession({ + initialItems: history, + }); + const compact = vi.fn().mockResolvedValue({ + output: [ + { + type: 'message', + role: 'assistant', + status: 'completed', + content: [{ type: 'output_text', text: 'compacted' }], + }, + ], + usage: { + input_tokens: 1, + output_tokens: 1, + total_tokens: 2, + }, + }); + const session = new OpenAIResponsesCompactionSession({ + client: { responses: { compact } } as any, + underlyingSession, + compactionMode: 'input', + }); + + try { + await expect( + session.runCompaction({ force: true, compactionMode: 'input' }), + ).rejects.toThrow('clear failed'); + + expect(await underlyingSession.getItems()).toEqual(history); + expect(underlyingSession.clearCalls).toBe(1); + expect(underlyingSession.addCalls).toBe(1); + expect(underlyingSession.popCalls).toBe(0); + expect(warn).toHaveBeenCalledWith( + 'Restored previous session history after compaction replacement failed.', + expect.any(Error), + ); + } finally { + warn.mockRestore(); + } + }); + + it('clears partial history before restoring after clearSession fails', async () => { + const warn = vi.spyOn(console, 'warn').mockImplementation(() => {}); + const history = [ + { + type: 'message', + role: 'user', + content: [{ type: 'input_text', text: 'survived clear' }], + }, + { + type: 'message', + role: 'assistant', + status: 'completed', + content: [{ type: 'output_text', text: 'removed by partial clear' }], + }, + ] as AgentInputItem[]; + + class PartiallyFailingClearSession extends MemorySession { + addCalls = 0; + clearCalls = 0; + popCalls = 0; + + async addItems(items: AgentInputItem[]): Promise { + this.addCalls += 1; + await super.addItems(items); + } + + async popItem(): Promise { + this.popCalls += 1; + return super.popItem(); + } + + async clearSession(): Promise { + this.clearCalls += 1; + await super.popItem(); + throw new Error('clear failed'); + } + } + + const underlyingSession = new PartiallyFailingClearSession({ + initialItems: history, + }); + const compact = vi.fn().mockResolvedValue({ + output: [ + { + type: 'message', + role: 'assistant', + status: 'completed', + content: [{ type: 'output_text', text: 'compacted' }], + }, + ], + usage: { + input_tokens: 1, + output_tokens: 1, + total_tokens: 2, + }, + }); + const session = new OpenAIResponsesCompactionSession({ + client: { responses: { compact } } as any, + underlyingSession, + compactionMode: 'input', + }); + + try { + await expect( + session.runCompaction({ force: true, compactionMode: 'input' }), + ).rejects.toThrow('clear failed'); + + expect(await underlyingSession.getItems()).toEqual(history); + expect(underlyingSession.clearCalls).toBe(1); + expect(underlyingSession.popCalls).toBe(1); + expect(underlyingSession.addCalls).toBe(1); + expect(warn).toHaveBeenCalledWith( + 'Restored previous session history after compaction replacement failed.', + expect.any(Error), + ); + } finally { + warn.mockRestore(); + } + }); + + it('reraises replacement errors when restore fails', async () => { + const warn = vi.spyOn(console, 'warn').mockImplementation(() => {}); + const history = [ + { + type: 'message', + role: 'user', + content: [{ type: 'input_text', text: 'original' }], + }, + ] as AgentInputItem[]; + const underlyingSession = new FailingRestoreSession({ + initialItems: history, + }); + const compact = vi.fn().mockResolvedValue({ + output: [ + { + type: 'message', + role: 'assistant', + status: 'completed', + content: [{ type: 'output_text', text: 'compacted one' }], + }, + { + type: 'message', + role: 'assistant', + status: 'completed', + content: [{ type: 'output_text', text: 'compacted two' }], + }, + ], + usage: { + input_tokens: 1, + output_tokens: 1, + total_tokens: 2, + }, + }); + const session = new OpenAIResponsesCompactionSession({ + client: { responses: { compact } } as any, + underlyingSession, + compactionMode: 'input', + }); + + try { + await expect( + session.runCompaction({ force: true, compactionMode: 'input' }), + ).rejects.toThrow('replacement failed'); + expect(warn).toHaveBeenCalledWith( + 'Failed to restore session history after compaction replacement failed.', + expect.any(Error), + ); + expect(underlyingSession.clearCalls).toBe(2); + expect(underlyingSession.addCalls).toBe(2); + } finally { + warn.mockRestore(); + } + }); + it('normalizes compacted user image messages before reusing them as input', async () => { const dataUrl = 'data:image/jpeg;base64,abc123'; const compact = vi