@@ -18,6 +18,7 @@ import { AnalyticHandler } from '../../../src/handler'
1818import { Moderation , checkInputs , streamResponse } from '../../moderation/Moderation'
1919import { formatResponse } from '../../outputparsers/OutputParserHelpers'
2020import { addSingleFileToStorage } from '../../../src/storageUtils'
21+ import { DynamicStructuredTool } from '../../tools/OpenAPIToolkit/core'
2122
2223const lenticularBracketRegex = / 【 [ ^ 】 ] * 】 / g
2324const imageRegex = / < i m g [ ^ > ] * \/ > / g
@@ -504,7 +505,6 @@ class OpenAIAssistant_Agents implements INode {
504505 toolCallId : item . id
505506 } )
506507 } )
507-
508508 const submitToolOutputs = [ ]
509509 for ( let i = 0 ; i < actions . length ; i += 1 ) {
510510 const tool = tools . find ( ( tool : any ) => tool . name === actions [ i ] . tool )
@@ -539,30 +539,23 @@ class OpenAIAssistant_Agents implements INode {
539539 }
540540
541541 try {
542- const stream = openai . beta . threads . runs . submitToolOutputsStream ( threadId , runThreadId , {
543- tool_outputs : submitToolOutputs
542+ await handleToolSubmission ( {
543+ openai,
544+ threadId,
545+ runThreadId,
546+ submitToolOutputs,
547+ tools,
548+ analyticHandlers,
549+ parentIds,
550+ llmIds,
551+ sseStreamer,
552+ chatId,
553+ options,
554+ input,
555+ usedTools,
556+ text,
557+ isStreamingStarted
544558 } )
545-
546- for await ( const event of stream ) {
547- if ( event . event === 'thread.message.delta' ) {
548- const chunk = event . data . delta . content ?. [ 0 ]
549- if ( chunk && 'text' in chunk && chunk . text ?. value ) {
550- text += chunk . text . value
551- if ( ! isStreamingStarted ) {
552- isStreamingStarted = true
553- if ( sseStreamer ) {
554- sseStreamer . streamStartEvent ( chatId , chunk . text . value )
555- }
556- }
557- if ( sseStreamer ) {
558- sseStreamer . streamTokenEvent ( chatId , chunk . text . value )
559- }
560- }
561- }
562- }
563- if ( sseStreamer ) {
564- sseStreamer . streamUsedToolsEvent ( chatId , usedTools )
565- }
566559 } catch ( error ) {
567560 console . error ( 'Error submitting tool outputs:' , error )
568561 await openai . beta . threads . runs . cancel ( threadId , runThreadId )
@@ -634,7 +627,6 @@ class OpenAIAssistant_Agents implements INode {
634627 toolCallId : item . id
635628 } )
636629 } )
637-
638630 const submitToolOutputs = [ ]
639631 for ( let i = 0 ; i < actions . length ; i += 1 ) {
640632 const tool = tools . find ( ( tool : any ) => tool . name === actions [ i ] . tool )
@@ -895,15 +887,212 @@ const downloadFile = async (openAIApiKey: string, fileObj: any, fileName: string
895887 }
896888}
897889
890+ interface ToolSubmissionParams {
891+ openai : OpenAI
892+ threadId : string
893+ runThreadId : string
894+ submitToolOutputs : any [ ]
895+ tools : any [ ]
896+ analyticHandlers : AnalyticHandler
897+ parentIds : ICommonObject
898+ llmIds : ICommonObject
899+ sseStreamer : IServerSideEventStreamer
900+ chatId : string
901+ options : ICommonObject
902+ input : string
903+ usedTools : IUsedTool [ ]
904+ text : string
905+ isStreamingStarted : boolean
906+ }
907+
908+ interface ToolSubmissionResult {
909+ text : string
910+ isStreamingStarted : boolean
911+ }
912+
913+ async function handleToolSubmission ( params : ToolSubmissionParams ) : Promise < ToolSubmissionResult > {
914+ const {
915+ openai,
916+ threadId,
917+ runThreadId,
918+ submitToolOutputs,
919+ tools,
920+ analyticHandlers,
921+ parentIds,
922+ llmIds,
923+ sseStreamer,
924+ chatId,
925+ options,
926+ input,
927+ usedTools
928+ } = params
929+
930+ let updatedText = params . text
931+ let updatedIsStreamingStarted = params . isStreamingStarted
932+
933+ const stream = openai . beta . threads . runs . submitToolOutputsStream ( threadId , runThreadId , {
934+ tool_outputs : submitToolOutputs
935+ } )
936+
937+ try {
938+ for await ( const event of stream ) {
939+ if ( event . event === 'thread.message.delta' ) {
940+ const chunk = event . data . delta . content ?. [ 0 ]
941+ if ( chunk && 'text' in chunk && chunk . text ?. value ) {
942+ updatedText += chunk . text . value
943+ if ( ! updatedIsStreamingStarted ) {
944+ updatedIsStreamingStarted = true
945+ if ( sseStreamer ) {
946+ sseStreamer . streamStartEvent ( chatId , chunk . text . value )
947+ }
948+ }
949+ if ( sseStreamer ) {
950+ sseStreamer . streamTokenEvent ( chatId , chunk . text . value )
951+ }
952+ }
953+ } else if ( event . event === 'thread.run.requires_action' ) {
954+ if ( event . data . required_action ?. submit_tool_outputs . tool_calls ) {
955+ const actions : ICommonObject [ ] = [ ]
956+
957+ event . data . required_action . submit_tool_outputs . tool_calls . forEach ( ( item ) => {
958+ const functionCall = item . function
959+ let args = { }
960+ try {
961+ args = JSON . parse ( functionCall . arguments )
962+ } catch ( e ) {
963+ console . error ( 'Error parsing arguments, default to empty object' )
964+ }
965+ actions . push ( {
966+ tool : functionCall . name ,
967+ toolInput : args ,
968+ toolCallId : item . id
969+ } )
970+ } )
971+
972+ const nestedToolOutputs = [ ]
973+ for ( let i = 0 ; i < actions . length ; i += 1 ) {
974+ const tool = tools . find ( ( tool : any ) => tool . name === actions [ i ] . tool )
975+ if ( ! tool ) continue
976+
977+ const toolIds = await analyticHandlers . onToolStart ( tool . name , actions [ i ] . toolInput , parentIds )
978+
979+ try {
980+ const toolOutput = await tool . call ( actions [ i ] . toolInput , undefined , undefined , {
981+ sessionId : threadId ,
982+ chatId : options . chatId ,
983+ input
984+ } )
985+ await analyticHandlers . onToolEnd ( toolIds , toolOutput )
986+ nestedToolOutputs . push ( {
987+ tool_call_id : actions [ i ] . toolCallId ,
988+ output : toolOutput
989+ } )
990+ usedTools . push ( {
991+ tool : tool . name ,
992+ toolInput : actions [ i ] . toolInput ,
993+ toolOutput
994+ } )
995+ } catch ( e ) {
996+ await analyticHandlers . onToolEnd ( toolIds , e )
997+ console . error ( 'Error executing tool' , e )
998+ throw new Error ( `Error executing tool. Tool: ${ tool . name } . Thread ID: ${ threadId } . Run ID: ${ runThreadId } ` )
999+ }
1000+ }
1001+
1002+ // Recursively handle nested tool submissions
1003+ const result = await handleToolSubmission ( {
1004+ openai,
1005+ threadId,
1006+ runThreadId,
1007+ submitToolOutputs : nestedToolOutputs ,
1008+ tools,
1009+ analyticHandlers,
1010+ parentIds,
1011+ llmIds,
1012+ sseStreamer,
1013+ chatId,
1014+ options,
1015+ input,
1016+ usedTools,
1017+ text : updatedText ,
1018+ isStreamingStarted : updatedIsStreamingStarted
1019+ } )
1020+ updatedText = result . text
1021+ updatedIsStreamingStarted = result . isStreamingStarted
1022+ }
1023+ }
1024+ }
1025+
1026+ if ( sseStreamer ) {
1027+ sseStreamer . streamUsedToolsEvent ( chatId , usedTools )
1028+ }
1029+
1030+ return {
1031+ text : updatedText ,
1032+ isStreamingStarted : updatedIsStreamingStarted
1033+ }
1034+ } catch ( error ) {
1035+ console . error ( 'Error submitting tool outputs:' , error )
1036+ await openai . beta . threads . runs . cancel ( threadId , runThreadId )
1037+
1038+ const errMsg = `Error submitting tool outputs. Thread ID: ${ threadId } . Run ID: ${ runThreadId } `
1039+
1040+ await analyticHandlers . onLLMError ( llmIds , errMsg )
1041+ await analyticHandlers . onChainError ( parentIds , errMsg , true )
1042+
1043+ throw new Error ( errMsg )
1044+ }
1045+ }
1046+
1047+ interface JSONSchema {
1048+ type ?: string
1049+ properties ?: Record < string , JSONSchema >
1050+ additionalProperties ?: boolean
1051+ required ?: string [ ]
1052+ [ key : string ] : any
1053+ }
1054+
8981055const formatToOpenAIAssistantTool = ( tool : any ) : OpenAI . Beta . FunctionTool => {
899- return {
1056+ const parameters = zodToJsonSchema ( tool . schema ) as JSONSchema
1057+
1058+ // For strict tools, we need to:
1059+ // 1. Set additionalProperties to false
1060+ // 2. Make all parameters required
1061+ // 3. Set the strict flag
1062+ if ( tool instanceof DynamicStructuredTool && tool . isStrict ( ) ) {
1063+ // Get all property names from the schema
1064+ const properties = parameters . properties || { }
1065+ const allPropertyNames = Object . keys ( properties )
1066+
1067+ parameters . additionalProperties = false
1068+ parameters . required = allPropertyNames
1069+
1070+ // Handle nested objects
1071+ for ( const [ _ , prop ] of Object . entries ( properties ) ) {
1072+ if ( prop . type === 'object' ) {
1073+ prop . additionalProperties = false
1074+ if ( prop . properties ) {
1075+ prop . required = Object . keys ( prop . properties )
1076+ }
1077+ }
1078+ }
1079+ }
1080+
1081+ const functionTool : OpenAI . Beta . FunctionTool = {
9001082 type : 'function' ,
9011083 function : {
9021084 name : tool . name ,
9031085 description : tool . description ,
904- parameters : zodToJsonSchema ( tool . schema )
1086+ parameters
9051087 }
9061088 }
1089+
1090+ // Add strict property if the tool is marked as strict
1091+ if ( tool instanceof DynamicStructuredTool && tool . isStrict ( ) ) {
1092+ ; ( functionTool . function as any ) . strict = true
1093+ }
1094+
1095+ return functionTool
9071096}
9081097
9091098module . exports = { nodeClass : OpenAIAssistant_Agents }
0 commit comments