diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index ddfd0bfd9..401af0f67 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -89,4 +89,30 @@ jobs:
- name: Run tests
working-directory: typescript-sdk
- run: pnpm run test
\ No newline at end of file
+ run: pnpm run test
+
+ java:
+ name: Java SDK Tests
+ runs-on: ubuntu-latest
+
+ steps:
+ - name: Checkout code
+ uses: actions/checkout@v4
+
+ - name: Set up JDK
+ uses: actions/setup-java@v4
+ with:
+ java-version: '18'
+ distribution: 'temurin'
+
+ - name: Cache Maven dependencies
+ uses: actions/cache@v4
+ with:
+ path: ~/.m2
+ key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
+ restore-keys: |
+ ${{ runner.os }}-m2-
+
+ - name: Run tests
+ working-directory: java-sdk
+ run: mvn test
\ No newline at end of file
diff --git a/java-sdk/integrations/spring-ai/pom.xml b/java-sdk/integrations/spring-ai/pom.xml
new file mode 100644
index 000000000..195255bb4
--- /dev/null
+++ b/java-sdk/integrations/spring-ai/pom.xml
@@ -0,0 +1,91 @@
+
+
+ 4.0.0
+
+ com.ag-ui
+ ag-ui
+ 0.0.1-SNAPSHOT
+ ../../pom.xml
+
+
+ spring-ai
+
+
+ 21
+ 21
+ UTF-8
+ 3.2.0
+ 1.0.0
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-dependencies
+ ${spring.boot.version}
+ pom
+ import
+
+
+ org.springframework.ai
+ spring-ai-bom
+ ${spring-ai.version}
+ pom
+ import
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
+ org.springframework.ai
+ spring-ai-model
+ 1.0.0
+
+
+ org.springframework.ai
+ spring-ai-starter-model-ollama
+ 1.0.0
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+ com.ag-ui
+ client
+ 0.0.1-SNAPSHOT
+ compile
+
+
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+ ${spring.boot.version}
+
+
+
+ repackage
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/java-sdk/integrations/spring-ai/src/main/java/com/agui/CorsConfig.java b/java-sdk/integrations/spring-ai/src/main/java/com/agui/CorsConfig.java
new file mode 100644
index 000000000..09ee55009
--- /dev/null
+++ b/java-sdk/integrations/spring-ai/src/main/java/com/agui/CorsConfig.java
@@ -0,0 +1,37 @@
+package com.agui;
+
+import org.springframework.boot.web.servlet.FilterRegistrationBean;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.core.Ordered;
+import org.springframework.web.cors.CorsConfiguration;
+import org.springframework.web.cors.CorsConfigurationSource;
+import org.springframework.web.cors.UrlBasedCorsConfigurationSource;
+import org.springframework.web.filter.CorsFilter;
+
+import java.util.Arrays;
+
+@Configuration
+public class CorsConfig {
+
+ @Bean
+ public CorsConfigurationSource corsConfigurationSource() {
+ CorsConfiguration configuration = new CorsConfiguration();
+ configuration.setAllowedOriginPatterns(Arrays.asList("*")); // Or specify domains
+ configuration.setAllowedMethods(Arrays.asList("GET", "POST", "PUT", "DELETE", "OPTIONS"));
+ configuration.setAllowedHeaders(Arrays.asList("*"));
+
+ UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
+ source.registerCorsConfiguration("/**", configuration);
+ return source;
+ }
+
+ @Bean
+ public FilterRegistrationBean corsFilter() {
+ FilterRegistrationBean bean = new FilterRegistrationBean<>(
+ new CorsFilter(corsConfigurationSource())
+ );
+ bean.setOrder(Ordered.HIGHEST_PRECEDENCE);
+ return bean;
+ }
+}
\ No newline at end of file
diff --git a/java-sdk/integrations/spring-ai/src/main/java/com/agui/MainApplication.java b/java-sdk/integrations/spring-ai/src/main/java/com/agui/MainApplication.java
new file mode 100644
index 000000000..068021053
--- /dev/null
+++ b/java-sdk/integrations/spring-ai/src/main/java/com/agui/MainApplication.java
@@ -0,0 +1,12 @@
+package com.agui;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class MainApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(MainApplication.class, args);
+ }
+}
diff --git a/java-sdk/integrations/spring-ai/src/main/java/com/agui/spring/AgUiController.java b/java-sdk/integrations/spring-ai/src/main/java/com/agui/spring/AgUiController.java
new file mode 100644
index 000000000..49286af03
--- /dev/null
+++ b/java-sdk/integrations/spring-ai/src/main/java/com/agui/spring/AgUiController.java
@@ -0,0 +1,113 @@
+package com.agui.spring;
+
+import com.agui.client.RunAgentParameters;
+import com.agui.client.subscriber.AgentSubscriber;
+import com.agui.client.subscriber.AgentSubscriberParams;
+import com.agui.event.BaseEvent;
+import com.agui.types.State;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import jakarta.servlet.http.HttpServletResponse;
+import org.springframework.ai.ollama.OllamaChatModel;
+import org.springframework.ai.ollama.api.OllamaApi;
+import org.springframework.ai.ollama.api.OllamaOptions;
+import org.springframework.http.CacheControl;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.*;
+import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+@RestController
+public class AgUiController {
+
+ @PostMapping(value = "/sse/{agentId}")
+ public ResponseEntity streamData(@PathVariable("agentId") final String agentId, @RequestBody() final AgUiParameters agUiParameters) {
+ SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
+
+ var chatModel = OllamaChatModel.builder()
+ .defaultOptions(OllamaOptions.builder().model("llama3.2").build())
+ .ollamaApi(OllamaApi.builder().baseUrl("http://localhost:11434").build())
+ .build();
+
+ SpringAgent agent = new SpringAgent(
+ agentId,
+ "description",
+ Objects.nonNull(agUiParameters.getThreadId()) ? agUiParameters.getThreadId() : UUID.randomUUID().toString(),
+ agUiParameters.getMessages().stream().map(m -> {
+ if (Objects.isNull(m.getName())) {
+ m.setName("");
+ }
+ return m;
+ }).toList(),
+ chatModel,
+ new State(),
+ true
+ );
+
+ var parameters = RunAgentParameters.builder()
+ .runId(UUID.randomUUID().toString())
+ .context(agUiParameters.getContext())
+ .forwardedProps(agUiParameters.getForwardedProps())
+ .tools(agUiParameters.getTools())
+ .build();
+
+ var objectMapper = new ObjectMapper();
+
+ agent.runAgent(parameters, new AgentSubscriber() {
+ @Override
+ public void onEvent(BaseEvent event) {
+ try {
+ emitter.send(SseEmitter.event().data(" " + objectMapper.writeValueAsString(event)).build());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ @Override
+ public void onRunFinalized(AgentSubscriberParams params) {
+ emitter.complete();
+ }
+ @Override
+ public void onRunFailed(AgentSubscriberParams params, Throwable throwable) {
+ emitter.completeWithError(throwable);
+ }
+ });
+
+ return ResponseEntity
+ .ok()
+ .cacheControl(CacheControl.noCache())
+ .body(emitter);
+ }
+
+ @GetMapping(value = "/{agentId}", produces = MediaType.TEXT_PLAIN_VALUE)
+ public ResponseBodyEmitter streamData(
+ @PathVariable("agentId") final String agentId,
+ HttpServletResponse response
+ ) {
+ response.setHeader("Cache-Control", "no-cache");
+ response.setHeader("Connection", "keep-alive");
+ response.setContentType("text/plain;charset=UTF-8");
+
+ ResponseBodyEmitter emitter = new ResponseBodyEmitter();
+
+ // Process data in a separate thread
+ CompletableFuture.runAsync(() -> {
+ try {
+ for (int i = 0; i < 10; i++) {
+ emitter.send("Data chunk " + i + "\n");
+ Thread.sleep(1000); // Simulate processing delay
+ }
+ emitter.complete();
+ } catch (Exception e) {
+ emitter.completeWithError(e);
+ }
+ });
+
+ return emitter;
+ }
+
+}
diff --git a/java-sdk/integrations/spring-ai/src/main/java/com/agui/spring/AgUiParameters.java b/java-sdk/integrations/spring-ai/src/main/java/com/agui/spring/AgUiParameters.java
new file mode 100644
index 000000000..ce9544c51
--- /dev/null
+++ b/java-sdk/integrations/spring-ai/src/main/java/com/agui/spring/AgUiParameters.java
@@ -0,0 +1,56 @@
+package com.agui.spring;
+
+import com.agui.message.BaseMessage;
+import com.agui.types.Context;
+import com.agui.types.Tool;
+
+import java.util.List;
+
+public class AgUiParameters {
+
+ private String threadId;
+ private List tools;
+ private List context;
+ private Object forwardedProps;
+ private List messages;
+
+ public void setThreadId(final String threadId) {
+ this.threadId = threadId;
+ }
+
+ public String getThreadId() {
+ return this.threadId;
+ }
+
+ public void setTools(final List tools) {
+ this.tools = tools;
+ }
+
+ public List getTools() {
+ return tools;
+ }
+
+ public void setContext(final List context) {
+ this.context = context;
+ }
+
+ public List getContext() {
+ return this.context;
+ }
+
+ public void setForwardedProps(final Object forwardedProps) {
+ this.forwardedProps = forwardedProps;
+ }
+
+ public Object getForwardedProps() {
+ return this.forwardedProps;
+ }
+
+ public void setMessages(final List messages) {
+ this.messages = messages;
+ }
+
+ public List getMessages() {
+ return this.messages;
+ }
+}
diff --git a/java-sdk/integrations/spring-ai/src/main/java/com/agui/spring/SpringAgent.java b/java-sdk/integrations/spring-ai/src/main/java/com/agui/spring/SpringAgent.java
new file mode 100644
index 000000000..096069596
--- /dev/null
+++ b/java-sdk/integrations/spring-ai/src/main/java/com/agui/spring/SpringAgent.java
@@ -0,0 +1,202 @@
+package com.agui.spring;
+
+import com.agui.client.AbstractAgent;
+import com.agui.event.*;
+import com.agui.message.BaseMessage;
+import com.agui.types.RunAgentInput;
+import com.agui.types.State;
+import org.springframework.ai.chat.messages.*;
+import org.springframework.ai.chat.model.ChatModel;
+
+import java.sql.Array;
+import java.time.LocalDateTime;
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+
+public class SpringAgent extends AbstractAgent {
+
+ private final ChatModel chatModel;
+
+ public SpringAgent(
+ final String agentId,
+ final String description,
+ final String threadId,
+ final List messages,
+ final ChatModel chatModel,
+ final State state,
+ final boolean debug
+ ) {
+ super(agentId, description, threadId, messages, state, debug);
+
+ this.chatModel = chatModel;
+ }
+
+ @Override
+ protected CompletableFuture run(RunAgentInput input, Consumer eventHandler) {
+ var threadId = Objects.nonNull(input.threadId()) ? input.threadId() : UUID.randomUUID().toString();
+ var runId = Objects.nonNull(input.runId()) ? input.runId() : UUID.randomUUID().toString();
+
+ eventHandler.accept(generateRunStartedEvent(input, runId, threadId));
+
+ CompletableFuture future = new CompletableFuture<>();
+
+ var messageId = UUID.randomUUID().toString();
+
+ StringBuilder message = new StringBuilder();
+
+ this.chatModel.stream(this.convertToSpringMessages(input.messages()).toArray(new Message[0]))
+ .doFirst(() -> {
+ var event = new TextMessageStartEvent();
+ event.setRole("assistant");
+ event.setMessageId(messageId);
+ event.setTimestamp(LocalDateTime.now().getNano());
+ eventHandler.accept(event);
+ })
+ .doOnNext((res) -> {
+ if (Objects.nonNull(res) && !res.isEmpty()) {
+ var contentEvent = new TextMessageContentEvent();
+ contentEvent.setTimestamp(LocalDateTime.now().getNano());
+ contentEvent.setDelta(res);
+ contentEvent.setMessageId(messageId);
+ eventHandler.accept(contentEvent);
+ message.append(res);
+ }
+ })
+ .doOnError(future::completeExceptionally)
+ .doOnCancel(() -> future.completeExceptionally(new RuntimeException("Cancelled")))
+ .doOnComplete(() -> {
+ var textMessageContentEvent = new TextMessageContentEvent();
+ textMessageContentEvent.setDelta(message.toString());
+ textMessageContentEvent.setMessageId(messageId);
+ textMessageContentEvent.setTimestamp(LocalDateTime.now().getNano());
+
+ eventHandler.accept(textMessageContentEvent);
+
+ var textMessageEndEvent = new TextMessageEndEvent();
+ textMessageEndEvent.setTimestamp(LocalDateTime.now().getNano());
+ textMessageEndEvent.setMessageId(messageId);
+ eventHandler.accept(textMessageEndEvent);
+
+ var assistantMessage = new com.agui.message.AssistantMessage();
+ assistantMessage.setId(messageId);
+ assistantMessage.setContent(message.toString());
+ assistantMessage.setName("");
+ this.addMessage(assistantMessage);
+
+ var snapshotEvent = new MessagesSnapshotEvent();
+ snapshotEvent.setMessages(this.messages);
+ snapshotEvent.setTimestamp(LocalDateTime.now().getNano());
+
+ eventHandler.accept(snapshotEvent);
+
+ var event = new RunFinishedEvent();
+
+ event.setRunId(runId);
+ event.setResult(message.toString());
+ event.setThreadId(threadId);
+
+ event.setTimestamp(LocalDateTime.now().getNano());
+ eventHandler.accept(event);
+
+ future.complete(null);
+
+ })
+ .subscribe();
+
+ return future;
+ }
+
+ private List convertToSpringMessages(final List messages) {
+ return messages.stream().map((message) -> {
+ switch (message.getRole()) {
+ case "assistant":
+ com.agui.message.AssistantMessage mappedAssistantMessage = (com.agui.message.AssistantMessage)message;
+
+ return new AssistantMessage(
+ mappedAssistantMessage.getContent(),
+ Map.of(
+ "id",
+ Objects.nonNull(mappedAssistantMessage.getId()) ? mappedAssistantMessage.getId() : UUID.randomUUID().toString(),
+ "name",
+ Objects.nonNull(mappedAssistantMessage.getName()) ? mappedAssistantMessage.getName() : ""
+ ),
+ Objects.isNull(mappedAssistantMessage.getToolCalls())
+ ? emptyList()
+ : mappedAssistantMessage.getToolCalls().stream().map(toolCall -> new AssistantMessage.ToolCall(
+ Objects.nonNull(toolCall.id()) ? toolCall.id() : UUID.randomUUID().toString(),
+ toolCall.type(),
+ toolCall.function().name(),
+ toolCall.function().arguments()
+ )).toList()
+ );
+ case "user":
+ default:
+ com.agui.message.UserMessage mappedUserMessage = (com.agui.message.UserMessage)message;
+
+ return UserMessage.builder()
+ .text(mappedUserMessage.getContent())
+ .metadata(
+ Map.of(
+ "id",
+ Objects.nonNull(mappedUserMessage.getId()) ? mappedUserMessage.getId() : UUID.randomUUID().toString(),
+ "name",
+ Objects.nonNull(mappedUserMessage.getName()) ? mappedUserMessage.getName() : ""
+ )
+ ).build();
+ case "system":
+ com.agui.message.SystemMessage mappedSystemMessage = (com.agui.message.SystemMessage)message;
+
+ return SystemMessage.builder()
+ .text(mappedSystemMessage.getContent())
+ .metadata(
+ Map.of(
+ "id",
+ Objects.nonNull(mappedSystemMessage.getId()) ? mappedSystemMessage.getId() : UUID.randomUUID().toString(),
+ "name",
+ Objects.nonNull(mappedSystemMessage.getName()) ? mappedSystemMessage.getName() : ""
+ )
+ ).build();
+ case "developer":
+ com.agui.message.DeveloperMessage mappedDeveloperMessage = (com.agui.message.DeveloperMessage)message;
+
+ return UserMessage.builder()
+ .text(mappedDeveloperMessage.getContent())
+ .metadata(
+ Map.of(
+ "id",
+ Objects.nonNull(mappedDeveloperMessage.getId()) ? mappedDeveloperMessage.getId() : UUID.randomUUID().toString(),
+ "name",
+ Objects.nonNull(mappedDeveloperMessage.getName()) ? mappedDeveloperMessage.getName() : ""
+ )
+ ).build();
+ case "tool":
+ com.agui.message.ToolMessage mappedToolMessage = (com.agui.message.ToolMessage)message;
+
+ return new ToolResponseMessage(
+ asList(
+ new ToolResponseMessage.ToolResponse(mappedToolMessage.getToolCallId(), mappedToolMessage.getName(), Objects.nonNull(mappedToolMessage.getError()) ? mappedToolMessage.getError() : mappedToolMessage.getContent())
+ ),
+ Map.of(
+ "id",
+ Objects.nonNull(mappedToolMessage.getId()) ? mappedToolMessage.getId() : UUID.randomUUID().toString(),
+ "name",
+ Objects.nonNull(mappedToolMessage.getName()) ? mappedToolMessage.getName() : ""
+ )
+ );
+ }
+ }).toList();
+ }
+
+ private RunStartedEvent generateRunStartedEvent(final RunAgentInput input, String runId, String threadId) {
+ var event = new RunStartedEvent();
+ event.setThreadId(threadId);
+ event.setRunId(runId);
+ event.setTimestamp(LocalDateTime.now().getNano());
+
+ return event;
+ }
+}
diff --git a/java-sdk/integrations/spring-ai/src/main/resources/application.properties b/java-sdk/integrations/spring-ai/src/main/resources/application.properties
new file mode 100644
index 000000000..2109a440d
--- /dev/null
+++ b/java-sdk/integrations/spring-ai/src/main/resources/application.properties
@@ -0,0 +1 @@
+spring.application.name=demo
diff --git a/java-sdk/packages/.gitignore b/java-sdk/packages/.gitignore
new file mode 100644
index 000000000..fa180dea5
--- /dev/null
+++ b/java-sdk/packages/.gitignore
@@ -0,0 +1,38 @@
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**/target/
+!**/src/test/**/target/
+
+### IntelliJ IDEA ###
+.idea/modules.xml
+../.idea/jarRepositories.xml
+.idea/compiler.xml
+.idea/libraries/
+*.iws
+*.iml
+*.ipr
+
+### Eclipse ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/
+
+### Mac OS ###
+.DS_Store
\ No newline at end of file
diff --git a/java-sdk/packages/client/pom.xml b/java-sdk/packages/client/pom.xml
new file mode 100644
index 000000000..e20d4ba6d
--- /dev/null
+++ b/java-sdk/packages/client/pom.xml
@@ -0,0 +1,81 @@
+
+
+ 4.0.0
+
+ com.ag-ui
+ ag-ui
+ 0.0.1-SNAPSHOT
+
+
+ client
+
+
+ 18
+ 18
+ UTF-8
+
+
+
+
+ Pascal Wilbrink
+ pascal.wilbrink@gmail.com
+ https://github.com/pascalwilbrink
+
+ Developer
+
+
+
+
+
+ Pascal Wilbrink
+ pascal.wilbrink@gmail.com
+ https://github.com/pascalwilbrink
+
+ Maintainer
+ Developer
+
+
+
+
+
+
+ com.ag-ui
+ core
+ 0.0.1-SNAPSHOT
+ compile
+
+
+ org.junit.jupiter
+ junit-jupiter
+ 5.12.2
+ test
+
+
+ org.assertj
+ assertj-core
+ 3.24.2
+ test
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 3.0.0-M9
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.11.0
+
+ 18
+ 18
+
+
+
+
+
\ No newline at end of file
diff --git a/java-sdk/packages/client/src/main/java/com/agui/client/AbstractAgent.java b/java-sdk/packages/client/src/main/java/com/agui/client/AbstractAgent.java
new file mode 100644
index 000000000..d41c967b7
--- /dev/null
+++ b/java-sdk/packages/client/src/main/java/com/agui/client/AbstractAgent.java
@@ -0,0 +1,308 @@
+package com.agui.client;
+
+import com.agui.client.subscriber.AgentSubscriber;
+import com.agui.client.subscriber.AgentSubscriberParams;
+import com.agui.event.*;
+import com.agui.message.BaseMessage;
+import com.agui.types.RunAgentInput;
+import com.agui.types.State;
+
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+public abstract class AbstractAgent {
+
+ protected String agentId;
+ protected String description;
+ protected String threadId;
+ protected List messages;
+ protected State state;
+ protected boolean debug = false;
+
+ private final List agentSubscribers = new ArrayList<>();
+
+ public AbstractAgent(
+ final String agentId,
+ final String description,
+ final String threadId,
+ final List messages,
+ final State state,
+ final boolean debug
+ ) {
+ this.agentId = agentId;
+ this.description = Objects.nonNull(description) ? description : "";
+ this.threadId = Objects.nonNull(threadId) ? threadId : UUID.randomUUID().toString();
+ this.messages = Objects.nonNull(messages) ? messages : new ArrayList<>();
+ this.state = Objects.nonNull(state) ? state : new State();
+ this.debug = debug;
+ }
+
+ public Subscription subscribe(final AgentSubscriber subscriber) {
+ this.agentSubscribers.add(subscriber);
+ return () -> this.agentSubscribers.remove(subscriber);
+ }
+
+ // New signature: CompletableFuture with event handler callback
+ protected abstract CompletableFuture run(final RunAgentInput input, Consumer eventHandler);
+
+ public CompletableFuture runAgent(RunAgentParameters parameters) {
+ return this.runAgent(parameters, null);
+ }
+
+ public CompletableFuture runAgent(
+ RunAgentParameters parameters,
+ AgentSubscriber subscriber
+ ) {
+ this.agentId = Objects.nonNull(this.agentId) ? this.agentId : UUID.randomUUID().toString();
+
+ var input = this.prepareRunAgentInput(parameters);
+ List subscribers = prepareSubscribers(subscriber);
+
+ this.onInitialize(input, subscribers);
+
+ // Create the event handler that processes each event
+ Consumer eventHandler = event -> {
+ try {
+ // Notify all subscribers of the general event
+ subscribers.forEach(s -> {
+ try {
+ s.onEvent(event);
+ } catch (Exception e) {
+ System.err.println("Error in subscriber.onEvent: " + e.getMessage());
+ if (debug) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ // Handle specific event types if subscriber is provided
+ if (Objects.nonNull(subscriber)) {
+ handleEventByType(event, subscriber);
+ }
+ } catch (Exception e) {
+ System.err.println("Error handling event: " + e.getMessage());
+ if (debug) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ // Run the agent and handle completion/errors
+ return this.run(input, eventHandler)
+ .whenComplete((result, throwable) -> {
+ try {
+ // Equivalent to RxJava's doFinally - always executed
+ subscribers.forEach(s -> {
+ try {
+ var params = new AgentSubscriberParams(
+ this.messages,
+ this.state,
+ this,
+ input
+ );
+ s.onRunFinalized(params);
+ } catch (Exception e) {
+ System.err.println("Error in subscriber.onRunFinalized: " + e.getMessage());
+ if (debug) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ if (debug) {
+ System.out.println("Agent run completed - parameters = " + parameters +
+ ", subscriber = " + subscriber);
+ }
+
+ if (throwable != null) {
+ System.err.println("Agent run completed with error: " + throwable.getMessage());
+ if (debug) {
+ throwable.printStackTrace();
+ }
+ }
+ } catch (Exception e) {
+ System.err.println("Error in completion handler: " + e.getMessage());
+ if (debug) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+
+ private List prepareSubscribers(AgentSubscriber subscriber) {
+ List subscribers = new ArrayList<>();
+
+ // Add default subscriber for handling RunFinishedEvent
+ subscribers.add(new AgentSubscriber() {
+ @Override
+ public void onRunFinishedEvent(RunFinishedEvent event) {
+ // Handle result if needed
+ // Object result = event.getResult();
+ }
+ });
+
+ if (Objects.nonNull(subscriber)) {
+ subscribers.add(subscriber);
+ }
+
+ subscribers.addAll(this.agentSubscribers);
+ return subscribers;
+ }
+
+ private void handleEventByType(BaseEvent event, AgentSubscriber subscriber) {
+ try {
+ switch (event.getType()) {
+ case RUN_STARTED -> subscriber.onRunStartedEvent((RunStartedEvent) event);
+ case RUN_ERROR -> subscriber.onRunErrorEvent((RunErrorEvent) event);
+ case RUN_FINISHED -> subscriber.onRunFinishedEvent((RunFinishedEvent) event);
+ case STEP_STARTED -> subscriber.onStepStartedEvent((StepStartedEvent) event);
+ case STEP_FINISHED -> subscriber.onStepFinishedEvent((StepFinishedEvent) event);
+ case TEXT_MESSAGE_START -> subscriber.onTextMessageStartEvent((TextMessageStartEvent) event);
+ case TEXT_MESSAGE_CONTENT -> subscriber.onTextMessageContentEvent((TextMessageContentEvent) event);
+ case TEXT_MESSAGE_CHUNK -> {
+ var contentEvent = new TextMessageContentEvent();
+ contentEvent.setMessageId(((TextMessageChunkEvent)event).getMessageId());
+ contentEvent.setDelta(((TextMessageChunkEvent)event).getDelta());
+ contentEvent.setTimestamp(event.getTimestamp());
+ subscriber.onTextMessageContentEvent(contentEvent);
+ }
+ case TEXT_MESSAGE_END -> subscriber.onTextMessageEndEvent((TextMessageEndEvent) event);
+ case TOOL_CALL_START -> subscriber.onToolCallStartEvent((ToolCallStartEvent) event);
+ case TOOL_CALL_ARGS -> subscriber.onToolCallArgsEvent((ToolCallArgsEvent) event);
+ case TOOL_CALL_RESULT -> subscriber.onToolCallResultEvent((ToolCallResultEvent) event);
+ case TOOL_CALL_END -> subscriber.onToolCallEndEvent((ToolCallEndEvent) event);
+ case RAW -> subscriber.onRawEvent((RawEvent) event);
+ case CUSTOM -> subscriber.onCustomEvent((CustomEvent) event);
+ case MESSAGES_SNAPSHOT -> subscriber.onMessagesSnapshotEvent((MessagesSnapshotEvent) event);
+ case STATE_SNAPSHOT -> subscriber.onStateSnapshotEvent((StateSnapshotEvent) event);
+ case STATE_DELTA -> subscriber.onStateDeltaEvent((StateDeltaEvent) event);
+ default -> {
+ if (debug) {
+ System.out.println("Unhandled event type: " + event.getType());
+ }
+ }
+ }
+ } catch (Exception e) {
+ System.err.println("Error handling event type " + event.getType() + ": " + e.getMessage());
+ if (debug) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ protected void onInitialize(
+ final RunAgentInput input,
+ final List subscribers
+ ) {
+ subscribers.forEach(subscriber -> {
+ try {
+ subscriber.onRunInitialized(
+ new AgentSubscriberParams(
+ this.messages,
+ this.state,
+ this,
+ input
+ )
+ );
+ } catch (Exception e) {
+ System.err.println("Error in subscriber.onRunInitialized: " + e.getMessage());
+ if (debug) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+
+ public void addMessage(final BaseMessage message) {
+ if (Objects.isNull(message.getId())) {
+ message.setId(UUID.randomUUID().toString());
+ }
+ if (Objects.isNull(message.getName())) {
+ message.setName("");
+ }
+ this.messages.add(message);
+
+ this.agentSubscribers.forEach(subscriber -> {
+ try {
+ subscriber.onNewMessage(message);
+ } catch (Exception e) {
+ System.err.println("Error in message subscriber: " + e.getMessage());
+ if (debug) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ // TODO: Fire onNewToolCall if the message is from assistant and contains tool calls
+ // TODO: Fire onMessagesChanged sequentially
+ }
+
+ public void addMessages(final List messages) {
+ messages.forEach(this::addMessage); // Fixed: was using this.messages instead of parameter
+ }
+
+ public void setMessages(final List messages) {
+ this.messages = messages;
+
+ this.agentSubscribers.forEach(subscriber -> {
+ try {
+ // TODO: Fire onMessagesChanged
+ // subscriber.onMessagesChanged(messages);
+ } catch (Exception e) {
+ System.err.println("Error in messages changed subscriber: " + e.getMessage());
+ if (debug) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+
+ public void setState(final State state) {
+ this.state = state;
+
+ this.agentSubscribers.forEach(subscriber -> {
+ try {
+ // TODO: Fire onStateChanged
+ // subscriber.onStateChanged(state);
+ } catch (Exception e) {
+ System.err.println("Error in state changed subscriber: " + e.getMessage());
+ if (debug) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+
+ protected RunAgentInput prepareRunAgentInput(RunAgentParameters parameters) {
+ return new RunAgentInput(
+ this.threadId,
+ parameters.getRunId().orElse(UUID.randomUUID().toString()),
+ this.state,
+ this.messages,
+ parameters.getTools().orElse(Collections.emptyList()),
+ parameters.getContext().orElse(Collections.emptyList()),
+ parameters.getForwardedProps().orElse(null)
+ );
+ }
+
+ public State getState() {
+ return this.state;
+ }
+
+ // Utility method for subclasses to easily emit events
+ protected void emitEvent(BaseEvent event, Consumer eventHandler) {
+ if (eventHandler != null) {
+ eventHandler.accept(event);
+ }
+ }
+
+ // Utility method for subclasses to handle errors in event emission
+ protected CompletableFuture handleEventEmissionError(Throwable throwable) {
+ System.err.println("Error during event emission: " + throwable.getMessage());
+ if (debug) {
+ throwable.printStackTrace();
+ }
+ return CompletableFuture.failedFuture(throwable);
+ }
+}
\ No newline at end of file
diff --git a/java-sdk/packages/client/src/main/java/com/agui/client/RunAgentParameters.java b/java-sdk/packages/client/src/main/java/com/agui/client/RunAgentParameters.java
new file mode 100644
index 000000000..aca26d630
--- /dev/null
+++ b/java-sdk/packages/client/src/main/java/com/agui/client/RunAgentParameters.java
@@ -0,0 +1,86 @@
+package com.agui.client;
+
+import com.agui.types.Context;
+import com.agui.types.Tool;
+
+import java.util.List;
+import java.util.Optional;
+
+public class RunAgentParameters {
+
+ private final Optional runId;
+ private final Optional> tools;
+ private final Optional> context;
+ private final Optional