Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,9 @@ class A2AStreamingHandler(
activeStreams.remove(streamId)
}

// Send initial connection established event
try {
emitter.send(SseEmitter.event()
.name("connected")
.data(mapOf("streamId" to streamId))
)
} catch (e: Exception) {
logger.error("Error sending initial event", e)
emitter.completeWithError(e)
emitter.onError { throwable ->
logger.error("Stream error for streamId: {}", streamId, throwable)
activeStreams.remove(streamId)
}

return emitter
Expand All @@ -88,49 +82,15 @@ class A2AStreamingHandler(

try {
val eventData = when (event) {
is Message -> {
SseEmitter.event()
.name("message")
.data(objectMapper.writeValueAsString(event), MediaType.APPLICATION_JSON)
}
is Task -> {
SseEmitter.event()
.name("task")
.data(objectMapper.writeValueAsString(event), MediaType.APPLICATION_JSON)
}
is TaskStatusUpdateEvent -> {
SseEmitter.event()
.name("task-update")
.data(
objectMapper.writeValueAsString(
SendStreamingMessageResponse(
"2.0",
streamId,
event,
null
)
), MediaType.APPLICATION_JSON
)
}
is TaskArtifactUpdateEvent -> {
SseEmitter.event()
.name("task-update")
.data(
objectMapper.writeValueAsString(
SendStreamingMessageResponse(
"2.0",
streamId,
event,
null
)
), MediaType.APPLICATION_JSON
)
is Message, is Task, is TaskArtifactUpdateEvent, is TaskStatusUpdateEvent -> {
createEventData(streamId, event)
}
}
emitter.send(eventData)
} catch (e: Exception) {
logger.error("Error sending stream event", e)
emitter.completeWithError(e)
logger.error("Error sending stream event to streamId: {}", streamId, e)
val emitter = activeStreams.remove(streamId)
emitter?.completeWithError(e)
}
}

Expand All @@ -142,16 +102,48 @@ class A2AStreamingHandler(
emitter?.complete()
}

private fun createEventData(streamId: String, event: StreamingEventKind): SseEmitter.SseEventBuilder {
val eventName = when (event) {
is Message -> "message"
is Task -> "task"
is TaskStatusUpdateEvent, is TaskArtifactUpdateEvent -> "task-update"
}

val response = SendStreamingMessageResponse(
"2.0",
streamId,
event,
null
)

return SseEmitter.event()
.name(eventName)
.data(objectMapper.writeValueAsString(response), MediaType.APPLICATION_JSON)
}

/**
* Shuts down the streaming handler
*/
fun shutdown() {
logger.info("Shutting down, closing {} active streams", activeStreams.size)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code here was suggested by AI but, imho, looks good


activeStreams.values.forEach { emitter ->
try {
emitter.complete()
} catch (e: Exception) {
logger.warn("Error completing emitter during shutdown", e)
}
}
activeStreams.clear()

scheduler.shutdown()
try {
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
logger.warn("Scheduler did not terminate gracefully, forcing shutdown")
scheduler.shutdownNow()
}
} catch (e: InterruptedException) {
Thread.currentThread().interrupt()
scheduler.shutdownNow()
}
}
Expand Down
Loading