Skip to content

Commit 2dcad9a

Browse files
ukellerolegz
authored andcommitted
Fix for reactive function in CustomRuntime
1 parent 3b88030 commit 2dcad9a

File tree

3 files changed

+46
-45
lines changed

3 files changed

+46
-45
lines changed

spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/AWSLambdaUtils.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,17 @@
2020
import java.io.InputStream;
2121
import java.lang.reflect.Type;
2222
import java.nio.charset.StandardCharsets;
23+
import java.util.ArrayList;
2324
import java.util.HashMap;
25+
import java.util.List;
2426
import java.util.Map;
2527
import java.util.concurrent.atomic.AtomicReference;
2628

2729
import com.amazonaws.services.lambda.runtime.Context;
2830
import org.apache.commons.logging.Log;
2931
import org.apache.commons.logging.LogFactory;
32+
import org.reactivestreams.Publisher;
33+
import reactor.core.publisher.Flux;
3034

3135
import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
3236
import org.springframework.cloud.function.json.JsonMapper;
@@ -141,6 +145,43 @@ private static byte[] extractPayload(Message<Object> msg, JsonMapper objectMappe
141145
}
142146
}
143147

148+
@SuppressWarnings("unchecked")
149+
public static byte[] generateOutputFromObject(Message<?> requestMessage, Object output, JsonMapper objectMapper, Type functionOutputType) {
150+
Message<byte[]> responseMessage = null;
151+
if (output instanceof Publisher<?>) {
152+
List<Object> result = new ArrayList<>();
153+
for (Object value : Flux.from((Publisher<?>) output).toIterable()) {
154+
if (logger.isDebugEnabled()) {
155+
logger.debug("Response value: " + value);
156+
}
157+
result.add(value);
158+
}
159+
if (result.size() > 1) {
160+
output = result;
161+
}
162+
else if (result.size() == 1) {
163+
output = result.get(0);
164+
}
165+
else {
166+
output = null;
167+
}
168+
if (output instanceof Message<?> && ((Message<?>) output).getPayload() instanceof byte[]) {
169+
responseMessage = (Message<byte[]>) output;
170+
}
171+
else if (output != null) {
172+
if (logger.isDebugEnabled()) {
173+
logger.debug("OUTPUT: " + output + " - " + output.getClass().getName());
174+
}
175+
byte[] payload = objectMapper.toJson(output);
176+
responseMessage = MessageBuilder.withPayload(payload).build();
177+
}
178+
}
179+
else {
180+
responseMessage = (Message<byte[]>) output;
181+
}
182+
return generateOutput(requestMessage, responseMessage, objectMapper, functionOutputType);
183+
}
184+
144185
@SuppressWarnings({ "rawtypes", "unchecked" })
145186
public static byte[] generateOutput(Message requestMessage, Message<?> responseMessage,
146187
JsonMapper objectMapper, Type functionOutputType) {

spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/CustomRuntimeEventLoop.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -157,14 +157,13 @@ private void eventLoop(ConfigurableApplicationContext context) {
157157
}
158158
System.setProperty("com.amazonaws.xray.traceHeader", traceId);
159159
}
160+
Object responseObject = function.apply(eventMessage);
160161

161-
Message<byte[]> responseMessage = (Message<byte[]>) function.apply(eventMessage);
162-
163-
if (responseMessage != null && logger.isDebugEnabled()) {
164-
logger.debug("Reply from function: " + responseMessage);
162+
if (responseObject != null && logger.isDebugEnabled()) {
163+
logger.debug("Reply from function: " + responseObject);
165164
}
166165

167-
byte[] outputBody = AWSLambdaUtils.generateOutput(eventMessage, responseMessage, mapper, function.getOutputType());
166+
byte[] outputBody = AWSLambdaUtils.generateOutputFromObject(eventMessage, responseObject, mapper, function.getOutputType());
168167
ResponseEntity<Object> result = rest.exchange(RequestEntity.post(URI.create(invocationUrl))
169168
.header(USER_AGENT, USER_AGENT_VALUE)
170169
.body(outputBody), Object.class);

spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/FunctionInvoker.java

Lines changed: 1 addition & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,13 @@
1919
import java.io.IOException;
2020
import java.io.InputStream;
2121
import java.io.OutputStream;
22-
import java.util.ArrayList;
23-
import java.util.List;
2422
import java.util.Set;
2523

2624
import com.amazonaws.services.lambda.runtime.Context;
2725
import com.amazonaws.services.lambda.runtime.RequestStreamHandler;
2826
import com.fasterxml.jackson.databind.MapperFeature;
2927
import org.apache.commons.logging.Log;
3028
import org.apache.commons.logging.LogFactory;
31-
import org.reactivestreams.Publisher;
32-
import reactor.core.publisher.Flux;
3329

3430
import org.springframework.boot.SpringApplication;
3531
import org.springframework.cloud.function.context.FunctionCatalog;
@@ -44,7 +40,6 @@
4440
import org.springframework.context.ConfigurableApplicationContext;
4541
import org.springframework.core.env.Environment;
4642
import org.springframework.messaging.Message;
47-
import org.springframework.messaging.support.MessageBuilder;
4843
import org.springframework.util.Assert;
4944
import org.springframework.util.StreamUtils;
5045
import org.springframework.util.StringUtils;
@@ -83,45 +78,11 @@ public void handleRequest(InputStream input, OutputStream output, Context contex
8378
.generateMessage(input, this.function.getInputType(), this.function.isSupplier(), jsonMapper, context);
8479

8580
Object response = this.function.apply(requestMessage);
86-
byte[] responseBytes = this.buildResult(requestMessage, response);
81+
byte[] responseBytes = AWSLambdaUtils.generateOutputFromObject(requestMessage, response, this.jsonMapper, function.getOutputType());
8782
StreamUtils.copy(responseBytes, output);
8883
// any exception should propagate
8984
}
9085

91-
@SuppressWarnings("unchecked")
92-
private byte[] buildResult(Message<?> requestMessage, Object output) throws IOException {
93-
Message<byte[]> responseMessage = null;
94-
if (output instanceof Publisher<?>) {
95-
List<Object> result = new ArrayList<>();
96-
for (Object value : Flux.from((Publisher<?>) output).toIterable()) {
97-
if (logger.isDebugEnabled()) {
98-
logger.debug("Response value: " + value);
99-
}
100-
result.add(value);
101-
}
102-
if (result.size() > 1) {
103-
output = result;
104-
}
105-
else if (result.size() == 1) {
106-
output = result.get(0);
107-
}
108-
else {
109-
output = null;
110-
}
111-
if (output != null) {
112-
if (logger.isDebugEnabled()) {
113-
logger.debug("OUTPUT: " + output + " - " + output.getClass().getName());
114-
}
115-
byte[] payload = this.jsonMapper.toJson(output);
116-
responseMessage = MessageBuilder.withPayload(payload).build();
117-
}
118-
}
119-
else {
120-
responseMessage = (Message<byte[]>) output;
121-
}
122-
return AWSLambdaUtils.generateOutput(requestMessage, responseMessage, this.jsonMapper, function.getOutputType());
123-
}
124-
12586
private void start() {
12687
Class<?> startClass = FunctionClassUtils.getStartClass();
12788
String[] properties = new String[] {"--spring.cloud.function.web.export.enabled=false", "--spring.main.web-application-type=none"};

0 commit comments

Comments
 (0)