Skip to content
Open
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 31 additions & 22 deletions agentscope-core/src/main/java/io/agentscope/core/ReActAgent.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -130,6 +131,7 @@ public class ReActAgent extends AgentBase {
private final StructuredOutputReminder structuredOutputReminder;
private final PlanNotebook planNotebook;
private final ToolExecutionContext toolExecutionContext;
private final List<Hook> sortedHooks;

// ==================== Internal Components ====================

Expand Down Expand Up @@ -160,6 +162,9 @@ private ReActAgent(Builder builder) {
this.hookNotifier = new HookNotifier();
this.messagePreparer = new MessagePreparer();

this.sortedHooks =
getHooks().stream().sorted(Comparator.comparingInt(Hook::priority)).toList();

addNestedModule("memory", this.memory);
}

Expand Down Expand Up @@ -681,52 +686,56 @@ private class HookNotifier {
Mono<List<Msg>> notifyPreReasoning(AgentBase agent, List<Msg> msgs) {
PreReasoningEvent event =
new PreReasoningEvent(agent, model.getModelName(), null, msgs);
Mono<PreReasoningEvent> result = Mono.just(event);
for (Hook hook : getSortedHooks()) {
result = result.flatMap(e -> hook.onEvent(e));
}
return result.map(PreReasoningEvent::getInputMessages);
return Flux.fromIterable(sortedHooks)
.reduce(
Mono.just(event),
(currentMono, hook) -> currentMono.flatMap(hook::onEvent))
.flatMap(Function.identity())
.map(PreReasoningEvent::getInputMessages);
}

Mono<Msg> notifyPostReasoning(Msg reasoningMsg) {
PostReasoningEvent event =
new PostReasoningEvent(
ReActAgent.this, model.getModelName(), null, reasoningMsg);
Mono<PostReasoningEvent> result = Mono.just(event);
for (Hook hook : getSortedHooks()) {
result = result.flatMap(e -> hook.onEvent(e));
}
return result.map(PostReasoningEvent::getReasoningMessage);
return Flux.fromIterable(sortedHooks)
.reduce(
Mono.just(event),
(currentMono, hook) -> currentMono.flatMap(hook::onEvent))
.flatMap(Function.identity())
.map(PostReasoningEvent::getReasoningMessage);
}

Mono<Void> notifyReasoningChunk(Msg chunk, Msg accumulated) {
ReasoningChunkEvent event =
new ReasoningChunkEvent(
ReActAgent.this, model.getModelName(), null, chunk, accumulated);
return Flux.fromIterable(getSortedHooks()).flatMap(hook -> hook.onEvent(event)).then();
return Flux.fromIterable(sortedHooks).concatMap(hook -> hook.onEvent(event)).then();
}

Mono<ToolUseBlock> notifyPreActing(ToolUseBlock toolUse) {
PreActingEvent event = new PreActingEvent(ReActAgent.this, toolkit, toolUse);
Mono<PreActingEvent> result = Mono.just(event);
for (Hook hook : getSortedHooks()) {
result = result.flatMap(e -> hook.onEvent(e));
}
return result.map(PreActingEvent::getToolUse);
return Flux.fromIterable(sortedHooks)
.reduce(
Mono.just(event),
(currentMono, hook) -> currentMono.flatMap(hook::onEvent))
.flatMap(Function.identity())
.map(PreActingEvent::getToolUse);
}

Mono<Void> notifyActingChunk(ToolUseBlock toolUse, ToolResultBlock chunk) {
ActingChunkEvent event = new ActingChunkEvent(ReActAgent.this, toolkit, toolUse, chunk);
return Flux.fromIterable(getSortedHooks()).flatMap(hook -> hook.onEvent(event)).then();
return Flux.fromIterable(sortedHooks).concatMap(hook -> hook.onEvent(event)).then();
}

Mono<ToolResultBlock> notifyPostActing(ToolUseBlock toolUse, ToolResultBlock toolResult) {
var event = new PostActingEvent(ReActAgent.this, toolkit, toolUse, toolResult);
Mono<PostActingEvent> result = Mono.just(event);
for (Hook hook : getSortedHooks()) {
result = result.flatMap(e -> hook.onEvent(e));
}
return result.map(PostActingEvent::getToolResult);
return Flux.fromIterable(sortedHooks)
.reduce(
Mono.just(event),
(currentMono, hook) -> currentMono.flatMap(hook::onEvent))
.flatMap(Function.identity())
.map(PostActingEvent::getToolResult);
}

Mono<Void> notifyStreamingMsg(Msg msg, ReasoningContext context) {
Expand Down
Loading