Skip to content
This repository was archived by the owner on Mar 13, 2021. It is now read-only.

Commit aae2bdb

Browse files
author
Dave Syer
committed
Add a flag so that app can close when stream ends
If user sets grpc.exitOnComplete=true then the app will close down after the end of the output flux (which is the same as the end of the input flux for a Consumer). See projectriff/riff#544 for a change in the sidecar that supports this, and makes it possible to declare a k8s Job with a sidecar and a function invoker.
1 parent e76883d commit aae2bdb

File tree

6 files changed

+75
-6
lines changed

6 files changed

+75
-6
lines changed

src/main/java/io/projectriff/invoker/ApplicationRunner.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,15 @@ public Object evaluate(String expression, Object root, Object... attrs) {
152152
return parsed.getValue(context);
153153
}
154154

155+
public boolean isRunning() {
156+
if (this.app == null) {
157+
return false;
158+
}
159+
Expression parsed = new SpelExpressionParser()
160+
.parseExpression("context.isRunning()");
161+
return parsed.getValue(this.app, Boolean.class);
162+
}
163+
155164
@PreDestroy
156165
public void close() {
157166
closeContext();

src/main/java/io/projectriff/invoker/GrpcConfiguration.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import org.springframework.cloud.function.context.catalog.FunctionInspector;
3939
import org.springframework.cloud.function.core.FluxConsumer;
4040
import org.springframework.cloud.function.core.FluxSupplier;
41+
import org.springframework.context.ApplicationContext;
42+
import org.springframework.context.ConfigurableApplicationContext;
4143
import org.springframework.context.annotation.Configuration;
4244
import org.springframework.context.event.ContextRefreshedEvent;
4345
import org.springframework.context.event.EventListener;
@@ -56,6 +58,7 @@ public class GrpcConfiguration {
5658
private static final Log logger = LogFactory.getLog(GrpcConfiguration.class);
5759
private Server server;
5860
private int port = 10382;
61+
private boolean exitOnComplete;
5962

6063
@Autowired
6164
private Gson mapper;
@@ -65,6 +68,8 @@ public class GrpcConfiguration {
6568
private FunctionInspector inspector;
6669
@Autowired
6770
private FunctionCatalog catalog;
71+
@Autowired
72+
private ConfigurableApplicationContext context;
6873

6974
public int getPort() {
7075
return port;
@@ -75,6 +80,14 @@ public void setPort(int port) {
7580
this.port = port;
7681
}
7782

83+
public boolean isExitOnComplete() {
84+
return exitOnComplete;
85+
}
86+
87+
public void setExitOnComplete(boolean exitOnComplete) {
88+
this.exitOnComplete = exitOnComplete;
89+
}
90+
7891
/** Start serving requests. */
7992
@EventListener(ContextRefreshedEvent.class)
8093
public void start() {
@@ -96,7 +109,8 @@ public void start() {
96109
function(function,
97110
inspector.getRegistration(function)
98111
.getType()),
99-
this.mapper, inspector.getInputType(function),
112+
this::maybeClose, this.mapper,
113+
inspector.getInputType(function),
100114
inspector.getOutputType(function),
101115
inspector.isMessage(function)))
102116
.build();
@@ -176,6 +190,19 @@ private Function<Flux<?>, Flux<?>> function(Object result, FunctionType type) {
176190
return output;
177191
}
178192

193+
private void maybeClose() {
194+
if (this.exitOnComplete) {
195+
ApplicationContext context = this.context;
196+
while (context instanceof ConfigurableApplicationContext) {
197+
ConfigurableApplicationContext closeable = (ConfigurableApplicationContext) context;
198+
if (closeable.isRunning()) {
199+
closeable.close();
200+
}
201+
context = context.getParent();
202+
}
203+
}
204+
}
205+
179206
private static final class ConsumerAdapter
180207
implements Function<Flux<Object>, Flux<Object>> {
181208

@@ -204,9 +231,10 @@ public SupplierAdapter(Supplier<Publisher<?>> result) {
204231
this.result = result;
205232
}
206233

234+
@SuppressWarnings("unchecked")
207235
@Override
208236
public Flux<Object> apply(Flux<Object> input) {
209-
return Flux.from(result.get());
237+
return (Flux<Object>) Flux.from(result.get());
210238
}
211239

212240
}

src/main/java/io/projectriff/invoker/JavaFunctionInvokerServer.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,14 @@ public class JavaFunctionInvokerServer
5050
private final Class<?> inputType;
5151
private final Class<?> outputType;
5252
private final Gson mapper;
53+
private Runnable callback;
5354

5455
private static final Log logger = LogFactory.getLog(JavaFunctionInvokerServer.class);
5556

56-
public JavaFunctionInvokerServer(Function<Flux<?>, Flux<?>> function, Gson mapper,
57-
Class<?> inputType, Class<?> outputType, boolean isMessage) {
57+
public JavaFunctionInvokerServer(Function<Flux<?>, Flux<?>> function,
58+
Runnable callback, Gson mapper, Class<?> inputType, Class<?> outputType,
59+
boolean isMessage) {
60+
this.callback = callback;
5861
this.mapper = mapper;
5962
this.inputType = inputType;
6063
this.outputType = outputType;
@@ -122,8 +125,11 @@ public StreamObserver<io.projectriff.grpc.function.FunctionProtos.Message> call(
122125
}, () -> {
123126
// Make sure the emitter is disposed (should work even if it already
124127
// was since it's idempotent)
125-
emitter.dispose();
128+
if (!emitter.isDisposed()) {
129+
emitter.dispose();
130+
}
126131
responseObserver.onCompleted();
132+
this.callback.run();
127133
});
128134

129135
return new StreamObserver<io.projectriff.grpc.function.FunctionProtos.Message>() {

src/test/java/io/projectriff/functions/Logger.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public class Logger implements Consumer<Flux<String>> {
2727

2828
@Override
2929
public void accept(Flux<String> input) {
30-
input.map(value -> "Hello " + value).subscribe(System.out::println);
30+
input.map(value -> "Hello " + value).doOnNext(System.out::println);
3131
}
3232

3333
}

src/test/java/io/projectriff/invoker/GrpcSinkTests.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,5 +65,18 @@ public void fluxConsumer() throws Exception {
6565
ApplicationRunner runner = (ApplicationRunner) ReflectionTestUtils
6666
.getField(this.runner, "runner");
6767
assertThat(runner.containsBean("io.projectriff.functions.Logger")).isFalse();
68+
assertThat(runner.isRunning()).isTrue();
69+
}
70+
71+
@Test
72+
public void fluxConsumerCloses() throws Exception {
73+
runner.run("--server.port=0", "--grpc.port=" + port, "--grpc.exitOnComplete=true",
74+
"--function.uri=file:target/test-classes"
75+
+ "?handler=io.projectriff.functions.Logger");
76+
List<String> result = client.send("foo");
77+
assertThat(result).isEmpty();
78+
ApplicationRunner runner = (ApplicationRunner) ReflectionTestUtils
79+
.getField(this.runner, "runner");
80+
assertThat(runner.isRunning()).isFalse();
6881
}
6982
}

src/test/java/io/projectriff/invoker/GrpcSourceTests.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,5 +65,18 @@ public void fluxSupplier() throws Exception {
6565
ApplicationRunner runner = (ApplicationRunner) ReflectionTestUtils
6666
.getField(this.runner, "runner");
6767
assertThat(runner.containsBean("io.projectriff.functions.Words")).isFalse();
68+
assertThat(runner.isRunning()).isTrue();
69+
}
70+
71+
@Test
72+
public void fluxSupplierCloses() throws Exception {
73+
runner.run("--server.port=0", "--grpc.port=" + port, "--grpc.exitOnComplete=true",
74+
"--function.uri=file:target/test-classes"
75+
+ "?handler=io.projectriff.functions.Words");
76+
List<String> result = client.send();
77+
assertThat(result).contains("foo");
78+
ApplicationRunner runner = (ApplicationRunner) ReflectionTestUtils
79+
.getField(this.runner, "runner");
80+
assertThat(runner.isRunning()).isFalse();
6881
}
6982
}

0 commit comments

Comments
 (0)