Skip to content

Commit 1c1d541

Browse files
committed
feat: structured output non blocking in stream events
1 parent a583968 commit 1c1d541

File tree

2 files changed

+164
-54
lines changed

2 files changed

+164
-54
lines changed

packages/mcp-use/examples/client/structured_output.ts

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ async function main() {
5858
// 2. Attempt structured output at finish points
5959
// 3. Continue execution if required information is missing
6060
// 4. Only finish when all required fields can be populated
61-
const result: CityInfo = await agent.run(
61+
const eventStream = agent.streamEvents(
6262
`
6363
Research comprehensive information about the city of Padova (also known as Padua) in Italy.
6464
@@ -72,6 +72,26 @@ async function main() {
7272
CityInfoSchema, // outputSchema - this enables structured output
7373
)
7474

75+
let result: CityInfo | null = null
76+
77+
for await (const event of eventStream) {
78+
// Look for structured output in the final result
79+
if (event.event === 'on_chain_end' && event.data?.output) {
80+
try {
81+
// Try to parse the output as structured data
82+
const parsed = CityInfoSchema.parse(event.data.output)
83+
result = parsed
84+
break
85+
} catch (e) {
86+
// If parsing fails, continue streaming
87+
console.log('Waiting for structured output...')
88+
}
89+
}
90+
}
91+
if (!result) {
92+
throw new Error('Failed to obtain structured output')
93+
}
94+
7595
// Now you have strongly-typed, validated data!
7696
console.log(`Name: ${result.name}`)
7797
console.log(`Country: ${result.country}`)

packages/mcp-use/src/agents/mcp_agent.ts

Lines changed: 143 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -634,7 +634,8 @@ export class MCPAgent {
634634
const startTime = Date.now()
635635
const toolsUsedNames: string[] = []
636636
let stepsTaken = 0
637-
let success = false
637+
const structuredOutputSuccess = false
638+
const structuredOutputSuccessRef = { value: structuredOutputSuccess }
638639

639640
// Schema-aware setup for structured output
640641
let structuredLlm: BaseLanguageModelInterface | null = null
@@ -755,33 +756,28 @@ export class MCPAgent {
755756

756757
// If structured output is requested, attempt to create it
757758
if (outputSchema && structuredLlm) {
758-
try {
759759
logger.info('🔧 Attempting structured output...')
760-
const structuredResult = await this._attemptStructuredOutput<T>(
761-
result,
762-
structuredLlm,
760+
const currentResult = result
761+
this._attemptStructuredOutput<T>(
762+
currentResult,
763+
this.llm!,
763764
outputSchema,
764-
schemaDescription,
765-
)
766-
logger.debug(`🔄 Structured result: ${JSON.stringify(structuredResult)}`)
767-
768-
// Add the final response to conversation history if memory is enabled
769-
if (this.memoryEnabled) {
770-
this.addToHistory(new AIMessage(`Structured result: ${JSON.stringify(structuredResult)}`))
771-
}
772-
773-
logger.info('✅ Structured output successful')
774-
success = true
775-
return structuredResult as string | T
776-
}
777-
catch (e) {
765+
).then(structuredResult => {
766+
if (this.memoryEnabled) {
767+
this.addToHistory(new AIMessage(`Structured result: ${JSON.stringify(structuredResult)}`))
768+
}
769+
770+
logger.info('✅ Structured output successful')
771+
structuredOutputSuccessRef.value = true
772+
return structuredResult as string | T
773+
}).catch(e => {
778774
logger.warn(`⚠️ Structured output failed: ${e}`)
779775
// Continue execution to gather missing information
780776
const failedStructuredOutputPrompt = `
781777
The current result cannot be formatted into the required structure.
782778
Error: ${String(e)}
783779
784-
Current information: ${result}
780+
Current information: ${currentResult}
785781
786782
If information is missing, please continue working to gather the missing information needed for:
787783
${schemaDescription}
@@ -796,19 +792,16 @@ export class MCPAgent {
796792
}
797793

798794
logger.info('🔄 Continuing execution to gather missing information...')
799-
continue
800-
}
801-
}
802-
else {
803-
// Regular execution without structured output
804-
break
795+
})
805796
}
806797
}
807798

808-
const stepArray = nextStepOutput as AgentStep[]
809-
intermediateSteps.push(...stepArray)
799+
// Check if it's an array of steps or a finish result
800+
if (Array.isArray(nextStepOutput)) {
801+
const stepArray = nextStepOutput as AgentStep[]
802+
intermediateSteps.push(...stepArray)
810803

811-
for (const step of stepArray) {
804+
for (const step of stepArray) {
812805
yield step
813806
const { action, observation } = step
814807
const toolName = action.tool
@@ -827,14 +820,15 @@ export class MCPAgent {
827820
logger.info(`📄 Tool result: ${outputStr}`)
828821
}
829822

830-
// Detect direct return
831-
if (stepArray.length) {
832-
const lastStep = stepArray[stepArray.length - 1]
833-
const toolReturn: AgentFinish | null = await this._agentExecutor._getToolReturn(lastStep)
834-
if (toolReturn) {
835-
logger.info(`🏆 Tool returned directly at step ${stepNum + 1}`)
836-
result = toolReturn.returnValues?.output ?? 'No output generated'
837-
break
823+
// Detect direct return
824+
if (stepArray.length) {
825+
const lastStep = stepArray[stepArray.length - 1]
826+
const toolReturn: AgentFinish | null = await this._agentExecutor._getToolReturn(lastStep)
827+
if (toolReturn) {
828+
logger.info(`🏆 Tool returned directly at step ${stepNum + 1}`)
829+
result = toolReturn.returnValues?.output ?? 'No output generated'
830+
break
831+
}
838832
}
839833
}
840834
}
@@ -861,7 +855,7 @@ export class MCPAgent {
861855
}
862856

863857
logger.info('🎉 Agent execution complete')
864-
success = true
858+
structuredOutputSuccessRef.value = true
865859

866860
// Return regular result
867861
return result as string | T
@@ -891,7 +885,7 @@ export class MCPAgent {
891885
await this.telemetry.trackAgentExecution({
892886
executionMethod: 'stream',
893887
query,
894-
success,
888+
success: structuredOutputSuccess,
895889
modelProvider: this.modelProvider,
896890
modelName: this.modelName,
897891
serverCount,
@@ -909,7 +903,7 @@ export class MCPAgent {
909903
toolsUsedNames,
910904
response: result,
911905
executionTimeMs,
912-
errorType: success ? null : 'execution_error',
906+
errorType: structuredOutputSuccess ? null : 'execution_error',
913907
conversationHistoryLength,
914908
})
915909

@@ -959,11 +953,12 @@ export class MCPAgent {
959953
* Yields LangChain StreamEvent objects from the underlying streamEvents() method.
960954
* This provides token-level streaming and fine-grained event updates.
961955
*/
962-
public async* streamEvents(
956+
public async* streamEvents<T = string>(
963957
query: string,
964958
maxSteps?: number,
965959
manageConnector = true,
966960
externalHistory?: BaseMessage[],
961+
outputSchema?: ZodSchema<T>,
967962
): AsyncGenerator<StreamEvent, void, void> {
968963
let initializedHere = false
969964
const startTime = Date.now()
@@ -972,6 +967,11 @@ export class MCPAgent {
972967
let totalResponseLength = 0
973968
let finalResponse = ''
974969

970+
// Enhance query with schema information if structured output is requested
971+
if (outputSchema) {
972+
query = this._enhanceQueryWithSchema(query, outputSchema)
973+
}
974+
975975
try {
976976
// Initialize if needed
977977
if (manageConnector && !this._initialized) {
@@ -1050,11 +1050,87 @@ export class MCPAgent {
10501050
if (Array.isArray(output) && output.length > 0 && output[0]?.text) {
10511051
finalResponse = output[0].text
10521052
}
1053+
else if (typeof output === 'string') {
1054+
finalResponse = output
1055+
}
1056+
else if (output && typeof output === 'object' && 'output' in output) {
1057+
finalResponse = output.output
1058+
}
10531059
}
10541060
}
10551061

1056-
// Add the final AI response to conversation history if memory is enabled
1057-
if (this.memoryEnabled && finalResponse) {
1062+
// Convert to structured output if requested
1063+
if (outputSchema && finalResponse) {
1064+
logger.info('🔧 Attempting structured output conversion...')
1065+
1066+
try {
1067+
// Start the conversion (non-blocking)
1068+
let conversionCompleted = false
1069+
let conversionResult: T | null = null
1070+
let conversionError: Error | null = null
1071+
1072+
const _conversionPromise = this._attemptStructuredOutput<T>(
1073+
finalResponse,
1074+
this.llm!,
1075+
outputSchema,
1076+
).then(result => {
1077+
conversionCompleted = true
1078+
conversionResult = result
1079+
return result
1080+
}).catch(error => {
1081+
conversionCompleted = true
1082+
conversionError = error
1083+
throw error
1084+
})
1085+
1086+
// Yield progress events while conversion is running
1087+
let progressCount = 0
1088+
1089+
while (!conversionCompleted) {
1090+
// Wait 2 seconds
1091+
await new Promise(resolve => setTimeout(resolve, 2000))
1092+
1093+
if (!conversionCompleted) {
1094+
// Still running - yield progress event
1095+
progressCount++
1096+
yield {
1097+
event: 'on_structured_output_progress',
1098+
data: {
1099+
message: `Converting to structured output... (${progressCount * 2}s)`,
1100+
elapsed: progressCount * 2
1101+
},
1102+
} as unknown as StreamEvent
1103+
}
1104+
}
1105+
1106+
// Check if conversion succeeded or failed
1107+
if (conversionError) {
1108+
throw conversionError
1109+
}
1110+
1111+
if (conversionResult) {
1112+
// Yield structured result as a custom event
1113+
yield {
1114+
event: 'on_structured_output',
1115+
data: { output: conversionResult },
1116+
} as unknown as StreamEvent
1117+
1118+
if (this.memoryEnabled) {
1119+
this.addToHistory(new AIMessage(`Structured result: ${JSON.stringify(conversionResult)}`))
1120+
}
1121+
1122+
logger.info('✅ Structured output successful')
1123+
}
1124+
} catch (e) {
1125+
logger.warn(`⚠️ Structured output failed: ${e}`)
1126+
// Yield error event
1127+
yield {
1128+
event: 'on_structured_output_error',
1129+
data: { error: e instanceof Error ? e.message : String(e) },
1130+
} as unknown as StreamEvent
1131+
}
1132+
} else if (this.memoryEnabled && finalResponse) {
1133+
// Add the final AI response to conversation history if memory is enabled
10581134
this.addToHistory(new AIMessage(finalResponse))
10591135
}
10601136

@@ -1118,14 +1194,31 @@ export class MCPAgent {
11181194
*/
11191195
private async _attemptStructuredOutput<T>(
11201196
rawResult: string | any,
1121-
structuredLlm: BaseLanguageModelInterface,
1197+
llm: BaseLanguageModelInterface,
11221198
outputSchema: ZodSchema<T>,
1123-
schemaDescription: string,
11241199
): Promise<T> {
11251200
logger.info(`🔄 Attempting structured output with schema: ${outputSchema}`)
1126-
logger.info(`🔄 Schema description: ${schemaDescription}`)
11271201
logger.info(`🔄 Raw result: ${JSON.stringify(rawResult, null, 2)}`)
11281202

1203+
// Schema-aware setup for structured output
1204+
let structuredLlm: BaseLanguageModelInterface | null = null
1205+
let schemaDescription = ''
1206+
1207+
logger.debug(`🔄 Structured output requested, schema: ${JSON.stringify(zodToJsonSchema(outputSchema), null, 2)}`)
1208+
// Check if withStructuredOutput method exists
1209+
if (llm && 'withStructuredOutput' in llm && typeof (llm as any).withStructuredOutput === 'function') {
1210+
structuredLlm = (llm as any).withStructuredOutput(outputSchema)
1211+
}
1212+
else if (llm) {
1213+
// Fallback: use the same LLM but we'll handle structure in our helper method
1214+
structuredLlm = llm
1215+
}
1216+
else {
1217+
throw new Error('LLM is required for structured output')
1218+
}
1219+
schemaDescription = JSON.stringify(zodToJsonSchema(outputSchema), null, 2)
1220+
logger.info(`🔄 Schema description: ${schemaDescription}`)
1221+
11291222
// Handle different input formats - rawResult might be an array or object from the agent
11301223
let textContent: string = ''
11311224
if (typeof rawResult === 'string') {
@@ -1178,13 +1271,16 @@ export class MCPAgent {
11781271
logger.info(`🔄 Structured output attempt ${attempt} - using streaming approach`)
11791272

11801273
// Use streaming to avoid blocking the event loop
1181-
const stream = await structuredLlm.stream(formatPrompt)
1274+
const stream = await structuredLlm!.stream(formatPrompt)
11821275
let structuredResult = null
11831276
let chunkCount = 0
11841277

11851278
for await (const chunk of stream) {
11861279
chunkCount++
11871280

1281+
// Print the chunk for debugging
1282+
logger.info(`Chunk ${chunkCount}: ${JSON.stringify(chunk, null, 2)}`)
1283+
11881284
// Handle different chunk types
11891285
if (typeof chunk === 'string') {
11901286
// If it's a string, try to parse it as JSON
@@ -1205,12 +1301,6 @@ export class MCPAgent {
12051301
}
12061302
}
12071303

1208-
// Yield control to allow keepalive events during streaming
1209-
// This prevents the event loop from being blocked during long LLM processing,
1210-
// allowing keepalive events to fire and preventing inactivity timeouts
1211-
await new Promise(resolve => setTimeout(resolve, 0))
1212-
1213-
// Log progress every 10 chunks
12141304
if (chunkCount % 10 === 0) {
12151305
logger.info(`🔄 Structured output streaming: ${chunkCount} chunks`)
12161306
}

0 commit comments

Comments
 (0)