@@ -11,12 +11,16 @@ import {
1111} from "@farfield/api" ;
1212import {
1313 ProtocolValidationError ,
14+ parseCommandExecutionRequestApprovalResponse ,
15+ parseFileChangeRequestApprovalResponse ,
16+ parseToolRequestUserInputResponsePayload ,
1417 parseThreadConversationState ,
1518 parseThreadStreamStateChangedBroadcast ,
1619 parseUserInputResponsePayload ,
1720 type IpcFrame ,
1821 type IpcRequestFrame ,
1922 type IpcResponseFrame ,
23+ type ThreadConversationRequest ,
2024 type ThreadConversationState ,
2125 type ThreadStreamStateChangedBroadcast ,
2226 type UserInputRequestId ,
@@ -593,21 +597,121 @@ export class CodexAgentAdapter implements AgentAdapter {
593597 input : AgentSubmitUserInputInput ,
594598 ) : Promise < { ownerClientId : string ; requestId : UserInputRequestId } > {
595599 this . ensureCodexAvailable ( ) ;
596- this . ensureIpcReady ( ) ;
600+ const parsedResponse = parseUserInputResponsePayload ( input . response ) ;
601+ const ownerClientIdForResult = ( ( ) => {
602+ const mapped = this . threadOwnerById . get ( input . threadId ) ;
603+ if ( mapped && mapped . trim ( ) . length > 0 ) {
604+ return mapped . trim ( ) ;
605+ }
606+ if ( input . ownerClientId && input . ownerClientId . trim ( ) . length > 0 ) {
607+ return input . ownerClientId . trim ( ) ;
608+ }
609+ if ( this . lastKnownOwnerClientId && this . lastKnownOwnerClientId . trim ( ) ) {
610+ return this . lastKnownOwnerClientId . trim ( ) ;
611+ }
612+ return "app-server" ;
613+ } ) ( ) ;
614+
615+ const threadForRouting = await this . runThreadOperationWithResumeRetry (
616+ input . threadId ,
617+ ( ) => this . appClient . readThread ( input . threadId , false ) ,
618+ ) ;
619+ const parsedRoutingThread = parseThreadConversationState ( threadForRouting . thread ) ;
620+ const routingPendingRequest = findPendingRequestWithId (
621+ parsedRoutingThread ,
622+ input . requestId ,
623+ ) ;
624+
625+ if ( routingPendingRequest ) {
626+ await this . runAppServerCall ( ( ) =>
627+ this . appClient . submitUserInput ( input . requestId , parsedResponse ) ,
628+ ) ;
629+
630+ const refreshedThread = await this . runThreadOperationWithResumeRetry (
631+ input . threadId ,
632+ ( ) => this . appClient . readThread ( input . threadId , true ) ,
633+ ) ;
634+ const parsedThread = parseThreadConversationState ( refreshedThread . thread ) ;
635+ this . streamSnapshotByThreadId . set ( input . threadId , parsedThread ) ;
636+ this . streamSnapshotOriginByThreadId . set ( input . threadId , "readThreadWithTurns" ) ;
637+ this . setThreadTitle ( input . threadId , parsedThread . title ) ;
597638
639+ const currentEvents = this . streamEventsByThreadId . get ( input . threadId ) ?? [ ] ;
640+ currentEvents . push (
641+ buildSyntheticSnapshotEvent ( input . threadId , ownerClientIdForResult , parsedThread ) ,
642+ ) ;
643+ if ( currentEvents . length > 400 ) {
644+ currentEvents . splice ( 0 , currentEvents . length - 400 ) ;
645+ }
646+ this . streamEventsByThreadId . set ( input . threadId , currentEvents ) ;
647+
648+ return {
649+ ownerClientId : ownerClientIdForResult ,
650+ requestId : input . requestId ,
651+ } ;
652+ }
653+
654+ this . ensureIpcReady ( ) ;
598655 const ownerClientId = resolveOwnerClientId (
599656 this . threadOwnerById ,
600657 input . threadId ,
601658 input . ownerClientId ,
602659 this . lastKnownOwnerClientId ?? undefined ,
603660 ) ;
661+ this . threadOwnerById . set ( input . threadId , ownerClientId ) ;
604662
605- await this . service . submitUserInput ( {
606- threadId : input . threadId ,
607- ownerClientId,
608- requestId : input . requestId ,
609- response : parseUserInputResponsePayload ( input . response ) ,
610- } ) ;
663+ const pendingIpcRequest = await this . resolvePendingIpcRequest (
664+ input . threadId ,
665+ input . requestId ,
666+ ) ;
667+ switch ( pendingIpcRequest . method ) {
668+ case "item/commandExecution/requestApproval" : {
669+ const commandResponse =
670+ parseCommandExecutionRequestApprovalResponse ( parsedResponse ) ;
671+ await this . service . submitCommandApprovalDecision ( {
672+ threadId : input . threadId ,
673+ ownerClientId,
674+ requestId : input . requestId ,
675+ response : commandResponse ,
676+ } ) ;
677+ break ;
678+ }
679+ case "item/fileChange/requestApproval" : {
680+ const fileResponse = parseFileChangeRequestApprovalResponse (
681+ parsedResponse ,
682+ ) ;
683+ await this . service . submitFileApprovalDecision ( {
684+ threadId : input . threadId ,
685+ ownerClientId,
686+ requestId : input . requestId ,
687+ response : fileResponse ,
688+ } ) ;
689+ break ;
690+ }
691+ case "item/tool/requestUserInput" : {
692+ const toolResponse = parseToolRequestUserInputResponsePayload (
693+ parsedResponse ,
694+ ) ;
695+ await this . service . submitUserInput ( {
696+ threadId : input . threadId ,
697+ ownerClientId,
698+ requestId : input . requestId ,
699+ response : toolResponse ,
700+ } ) ;
701+ break ;
702+ }
703+ case "execCommandApproval" :
704+ case "applyPatchApproval" :
705+ throw new Error (
706+ `Legacy approval request method ${ pendingIpcRequest . method } is not supported over desktop IPC for thread ${ input . threadId } ` ,
707+ ) ;
708+ case "account/chatgptAuthTokens/refresh" :
709+ case "item/tool/call" :
710+ case "item/plan/requestImplementation" :
711+ throw new Error (
712+ `Unsupported pending request method ${ pendingIpcRequest . method } for submitUserInput on thread ${ input . threadId } ` ,
713+ ) ;
714+ }
611715
612716 return {
613717 ownerClientId,
@@ -1045,6 +1149,34 @@ export class CodexAgentAdapter implements AgentAdapter {
10451149 return this . runAppServerCall ( operation ) ;
10461150 }
10471151
1152+ private async resolvePendingIpcRequest (
1153+ threadId : string ,
1154+ requestId : UserInputRequestId ,
1155+ ) : Promise < ThreadConversationRequest > {
1156+ const cachedSnapshot = this . streamSnapshotByThreadId . get ( threadId ) ;
1157+ if ( cachedSnapshot ) {
1158+ const pending = findPendingRequestWithId ( cachedSnapshot , requestId ) ;
1159+ if ( pending ) {
1160+ return pending ;
1161+ }
1162+ }
1163+
1164+ const liveState = await this . readLiveState ( threadId ) ;
1165+ if ( liveState . conversationState ) {
1166+ const pending = findPendingRequestWithId (
1167+ liveState . conversationState ,
1168+ requestId ,
1169+ ) ;
1170+ if ( pending ) {
1171+ return pending ;
1172+ }
1173+ }
1174+
1175+ throw new Error (
1176+ `Unable to find pending request ${ String ( requestId ) } in live state for thread ${ threadId } ` ,
1177+ ) ;
1178+ }
1179+
10481180 private resolveThreadTitle (
10491181 threadId : string ,
10501182 directTitle : string | null | undefined ,
@@ -1229,6 +1361,28 @@ function deriveThreadWaitingState(
12291361 } ;
12301362}
12311363
1364+ function requestIdsMatch (
1365+ left : UserInputRequestId ,
1366+ right : UserInputRequestId ,
1367+ ) : boolean {
1368+ return `${ left } ` === `${ right } ` ;
1369+ }
1370+
1371+ function findPendingRequestWithId (
1372+ state : ThreadConversationState ,
1373+ requestId : UserInputRequestId ,
1374+ ) : ThreadConversationRequest | null {
1375+ for ( const request of state . requests ) {
1376+ if ( request . completed === true ) {
1377+ continue ;
1378+ }
1379+ if ( requestIdsMatch ( request . id , requestId ) ) {
1380+ return request ;
1381+ }
1382+ }
1383+ return null ;
1384+ }
1385+
12321386function buildSyntheticSnapshotEvent (
12331387 threadId : string ,
12341388 sourceClientId : string ,
0 commit comments