@@ -411,37 +411,14 @@ async def _stream_task_process(
411411 ):
412412 try :
413413 chunk_data = json .loads (chunk )
414- except Exception as e :
415- logger .warning (f"Invalid chunk received: { chunk } - { e } " )
416- continue
417414
418- if (
419- isinstance (chunk_data , dict )
420- and "type" in chunk_data
421- and chunk_data ["type" ]
422- in [
423- "history" ,
424- "history_update" ,
425- "history_complete" ,
426- ]
427- ):
428- continue
429-
430- if isinstance (chunk_data , dict ):
431- if "type" not in chunk_data and "text" in chunk_data :
432- chunk_data ["type" ] = "text"
415+ if isinstance (chunk_data , dict ) and "content" in chunk_data :
416+ content = chunk_data .get ("content" , {})
417+ role = content .get ("role" , "agent" )
418+ parts = content .get ("parts" , [])
433419
434- if "type" in chunk_data :
435- try :
436- update_message = Message (role = "agent" , parts = [chunk_data ])
437-
438- await self .update_store (
439- request .params .id ,
440- TaskStatus (
441- state = TaskState .WORKING , message = update_message
442- ),
443- update_history = False ,
444- )
420+ if parts :
421+ update_message = Message (role = role , parts = parts )
445422
446423 yield SendTaskStreamingResponse (
447424 id = request .id ,
@@ -455,14 +432,13 @@ async def _stream_task_process(
455432 ),
456433 )
457434
458- if chunk_data .get ("type" ) == "text" :
459- full_response += chunk_data .get ("text" , "" )
460- final_message = update_message
461-
462- except Exception as e :
463- logger .error (
464- f"Error processing chunk: { e } , chunk: { chunk_data } "
465- )
435+ for part in parts :
436+ if part .get ("type" ) == "text" :
437+ full_response += part .get ("text" , "" )
438+ final_message = update_message
439+ except Exception as e :
440+ logger .error (f"Error processing chunk: { e } , chunk: { chunk } " )
441+ continue
466442
467443 # Determine the final state of the task
468444 task_state = (
0 commit comments