Skip to content

Commit 3a3f8a2

Browse files
committed
emul beforeAgentInvocation/afterAgentInvocation
Signed-off-by: Dmitrii Tikhomirov <[email protected]>
1 parent 7b5ebfa commit 3a3f8a2

File tree

6 files changed

+204
-19
lines changed

6 files changed

+204
-19
lines changed

experimental/fluent/agentic-langchain4j/src/main/java/io/serverlessworkflow/fluent/agentic/langchain4j/ConditionalAgentServiceImpl.java

Lines changed: 67 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,23 @@
2020
import dev.langchain4j.agentic.internal.AgentExecutor;
2121
import dev.langchain4j.agentic.scope.AgenticScope;
2222
import dev.langchain4j.agentic.workflow.ConditionalAgentService;
23+
import io.serverlessworkflow.fluent.agentic.AgenticScopedRequest;
24+
import io.serverlessworkflow.fluent.agentic.AgenticScopedResponse;
2325
import java.util.Arrays;
2426
import java.util.List;
27+
import java.util.concurrent.atomic.AtomicReference;
2528
import java.util.function.Consumer;
2629
import java.util.function.Predicate;
2730

2831
public class ConditionalAgentServiceImpl<T>
2932
extends AbstractAgentService<T, ConditionalAgentService<T>>
3033
implements ConditionalAgentService<T> {
3134

35+
private final AtomicReference<Consumer<AgenticScopedRequest>> beforeAgentInvocation =
36+
new AtomicReference<>();
37+
private final AtomicReference<Consumer<AgenticScopedResponse>> afterAgentInvocation =
38+
new AtomicReference<>();
39+
3240
private ConditionalAgentServiceImpl(Class<T> agentServiceClass) {
3341
super(agentServiceClass);
3442
}
@@ -39,7 +47,21 @@ public static <T> ConditionalAgentService<T> builder(Class<T> agentServiceClass)
3947

4048
@Override
4149
public ConditionalAgentService<T> subAgents(Object... agents) {
42-
this.workflowBuilder.tasks(t -> t.sequence(agents));
50+
this.workflowBuilder.tasks(
51+
t ->
52+
t.sequence(agents)
53+
.inputFrom(
54+
scope -> {
55+
if (beforeAgentInvocation.get() != null) {
56+
beforeAgentInvocation.get().accept((AgenticScopedRequest) scope);
57+
}
58+
})
59+
.outputAs(
60+
scope -> {
61+
if (afterAgentInvocation.get() != null) {
62+
afterAgentInvocation.get().accept((AgenticScopedResponse) scope);
63+
}
64+
}));
4365
return this;
4466
}
4567

@@ -50,20 +72,43 @@ public ConditionalAgentService<T> subAgents(List<AgentExecutor> agentExecutors)
5072

5173
@Override
5274
public ConditionalAgentService<T> beforeAgentInvocation(Consumer<AgentRequest> consumer) {
53-
throw new UnsupportedOperationException(
54-
"Feature not implemented yet. See: https://github.com/serverlessworkflow/sdk-java/issues/836");
75+
beforeAgentInvocation.set(
76+
agenticScopedRequest -> consumer.accept(agenticScopedRequest.asAgentRequest()));
77+
return this;
5578
}
5679

5780
@Override
5881
public ConditionalAgentService<T> afterAgentInvocation(Consumer<AgentResponse> consumer) {
59-
throw new UnsupportedOperationException(
60-
"Feature not implemented yet. See: https://github.com/serverlessworkflow/sdk-java/issues/836");
82+
afterAgentInvocation.set(
83+
agenticScopedResponse -> consumer.accept(agenticScopedResponse.asAgentResponse()));
84+
return this;
6185
}
6286

6387
@Override
6488
public ConditionalAgentService<T> subAgents(Predicate<AgenticScope> condition, Object... agents) {
6589
this.workflowBuilder.tasks(
66-
t -> Arrays.stream(agents).forEach(agent -> t.when(condition).agent(agent)));
90+
t ->
91+
Arrays.stream(agents)
92+
.forEach(
93+
agent ->
94+
t.when(condition)
95+
.agent(agent)
96+
.inputFrom(
97+
scope -> {
98+
if (beforeAgentInvocation.get() != null) {
99+
beforeAgentInvocation
100+
.get()
101+
.accept((AgenticScopedRequest) scope);
102+
}
103+
})
104+
.outputAs(
105+
scope -> {
106+
if (afterAgentInvocation.get() != null) {
107+
afterAgentInvocation
108+
.get()
109+
.accept((AgenticScopedResponse) scope);
110+
}
111+
})));
67112
return this;
68113
}
69114

@@ -76,7 +121,22 @@ public ConditionalAgentService<T> subAgents(
76121
@Override
77122
public ConditionalAgentService<T> subAgent(
78123
Predicate<AgenticScope> condition, AgentExecutor agentExecutor) {
79-
this.workflowBuilder.tasks(t -> t.when(condition).agent(agentExecutor));
124+
this.workflowBuilder.tasks(
125+
t ->
126+
t.when(condition)
127+
.agent(agentExecutor)
128+
.inputFrom(
129+
scope -> {
130+
if (beforeAgentInvocation.get() != null) {
131+
beforeAgentInvocation.get().accept((AgenticScopedRequest) scope);
132+
}
133+
})
134+
.outputAs(
135+
scope -> {
136+
if (afterAgentInvocation.get() != null) {
137+
afterAgentInvocation.get().accept((AgenticScopedResponse) scope);
138+
}
139+
}));
80140
return this;
81141
}
82142
}

experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentAdapters.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,21 @@ public static List<AgentExecutor> toExecutors(Object... agents) {
3838

3939
public static Function<DefaultAgenticScope, Object> toFunction(
4040
AgentExecutor exec,
41-
AtomicReference<Consumer<AgenticScope>> beforeAgentInvocation,
42-
AtomicReference<Consumer<AgenticScope>> afterAgentInvocation) {
41+
AtomicReference<Consumer<AgenticScopedRequest>> beforeAgentInvocation,
42+
AtomicReference<Consumer<AgenticScopedResponse>> afterAgentInvocation) {
43+
String agentName = exec.agentInvoker().name();
4344
return defaultAgenticScope -> {
4445
if (beforeAgentInvocation.get() != null) {
45-
beforeAgentInvocation.get().accept(defaultAgenticScope);
46+
beforeAgentInvocation
47+
.get()
48+
.accept(new AgenticScopedRequest(defaultAgenticScope, agentName));
4649
}
4750
Object result = exec.execute(defaultAgenticScope);
4851
if (afterAgentInvocation.get() != null) {
4952
defaultAgenticScope.writeState("input", result);
50-
afterAgentInvocation.get().accept(defaultAgenticScope);
53+
afterAgentInvocation
54+
.get()
55+
.accept(new AgenticScopedResponse(defaultAgenticScope, agentName, result));
5156
}
5257
return result;
5358
};

experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentTaskItemListBuilder.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ public class AgentTaskItemListBuilder extends BaseTaskItemListBuilder<AgentTaskI
3737
implements AgentDoFluent<AgentTaskItemListBuilder> {
3838

3939
private final FuncTaskItemListBuilder delegate;
40-
private final AtomicReference<Consumer<AgenticScope>> beforeAgentInvocation =
40+
private final AtomicReference<Consumer<AgenticScopedRequest>> beforeAgentInvocation =
4141
new AtomicReference<>();
42-
private final AtomicReference<Consumer<AgenticScope>> afterAgentInvocation =
42+
private final AtomicReference<Consumer<AgenticScopedResponse>> afterAgentInvocation =
4343
new AtomicReference<>();
4444

4545
public AgentTaskItemListBuilder() {
@@ -169,12 +169,12 @@ public AgentTaskItemListBuilder switchCase(
169169
}
170170

171171
public AgentTaskItemListBuilder inputFrom(Consumer<AgenticScope> beforeAgentInvocation) {
172-
this.beforeAgentInvocation.set(beforeAgentInvocation);
172+
this.beforeAgentInvocation.set(beforeAgentInvocation::accept);
173173
return self();
174174
}
175175

176176
public AgentTaskItemListBuilder outputAs(Consumer<AgenticScope> afterAgentInvocationConsumer) {
177-
this.afterAgentInvocation.set(afterAgentInvocationConsumer);
177+
this.afterAgentInvocation.set(afterAgentInvocationConsumer::accept);
178178
return self();
179179
}
180180
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.fluent.agentic;
17+
18+
import dev.langchain4j.agentic.agent.AgentRequest;
19+
import dev.langchain4j.agentic.internal.AgentInvocation;
20+
import dev.langchain4j.agentic.scope.AgenticScope;
21+
import java.util.List;
22+
import java.util.Map;
23+
24+
public class AgenticScopedRequest implements AgenticScope {
25+
26+
protected final String agentName;
27+
private final AgenticScope wrapped;
28+
29+
AgenticScopedRequest(AgenticScope wrapped, String agentName) {
30+
this.wrapped = wrapped;
31+
this.agentName = agentName;
32+
}
33+
34+
@Override
35+
public Object memoryId() {
36+
return wrapped.memoryId();
37+
}
38+
39+
@Override
40+
public void writeState(String s, Object o) {
41+
wrapped.writeState(s, o);
42+
}
43+
44+
@Override
45+
public void writeStates(Map<String, Object> map) {
46+
wrapped.writeStates(map);
47+
}
48+
49+
@Override
50+
public boolean hasState(String s) {
51+
return wrapped.hasState(s);
52+
}
53+
54+
@Override
55+
public Object readState(String s) {
56+
return wrapped.readState(s);
57+
}
58+
59+
@Override
60+
public <T> T readState(String s, T t) {
61+
return wrapped.readState(s, t);
62+
}
63+
64+
@Override
65+
public Map<String, Object> state() {
66+
return wrapped.state();
67+
}
68+
69+
@Override
70+
public String contextAsConversation(String... strings) {
71+
return wrapped.contextAsConversation(strings);
72+
}
73+
74+
@Override
75+
public String contextAsConversation(Object... objects) {
76+
return wrapped.contextAsConversation(objects);
77+
}
78+
79+
@Override
80+
public List<AgentInvocation> agentInvocations(String s) {
81+
return wrapped.agentInvocations(s);
82+
}
83+
84+
public AgentRequest asAgentRequest() {
85+
return new AgentRequest(this, agentName, state());
86+
}
87+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.fluent.agentic;
17+
18+
import dev.langchain4j.agentic.agent.AgentResponse;
19+
import dev.langchain4j.agentic.scope.AgenticScope;
20+
21+
public class AgenticScopedResponse extends AgenticScopedRequest {
22+
23+
private final Object response;
24+
25+
AgenticScopedResponse(AgenticScope wrapped, String agentName, Object response) {
26+
super(wrapped, agentName);
27+
this.response = response;
28+
}
29+
30+
public AgentResponse asAgentResponse() {
31+
return new AgentResponse(this, agentName, state(), response);
32+
}
33+
}

experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/LoopAgentsBuilder.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ public class LoopAgentsBuilder {
3333

3434
private final FuncTaskItemListBuilder funcDelegate;
3535
private final ForTaskFunction forTask;
36-
private final AtomicReference<Consumer<AgenticScope>> beforeAgentInvocation =
36+
private final AtomicReference<Consumer<AgenticScopedRequest>> beforeAgentInvocation =
3737
new AtomicReference<>();
38-
private final AtomicReference<Consumer<AgenticScope>> afterAgentInvocation =
38+
private final AtomicReference<Consumer<AgenticScopedResponse>> afterAgentInvocation =
3939
new AtomicReference<>();
4040

4141
private int maxIterations = 1024;
@@ -84,12 +84,12 @@ public LoopAgentsBuilder exitCondition(BiPredicate<AgenticScope, Integer> exitCo
8484
}
8585

8686
public LoopAgentsBuilder inputFrom(Consumer<AgenticScope> beforeAgentInvocationConsumer) {
87-
this.beforeAgentInvocation.set(beforeAgentInvocationConsumer);
87+
this.beforeAgentInvocation.set(beforeAgentInvocationConsumer::accept);
8888
return this;
8989
}
9090

9191
public LoopAgentsBuilder outputAs(Consumer<AgenticScope> afterAgentInvocationConsumer) {
92-
this.afterAgentInvocation.set(afterAgentInvocationConsumer);
92+
this.afterAgentInvocation.set(afterAgentInvocationConsumer::accept);
9393
return this;
9494
}
9595

0 commit comments

Comments
 (0)