Skip to content

Commit 9293a18

Browse files
committed
GH-1294 Remove usage of TARGET_PROTOCOL header
Resolves #1294
1 parent fcef53c commit 9293a18

File tree

5 files changed

+23
-15
lines changed

5 files changed

+23
-15
lines changed

spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.time.OffsetDateTime;
2222
import java.util.Collections;
2323
import java.util.HashMap;
24+
import java.util.Iterator;
2425
import java.util.Map;
2526
import java.util.Objects;
2627
import java.util.stream.Collectors;
@@ -361,8 +362,22 @@ static String determinePrefixToUse(Map<String, Object> messageHeaders) {
361362
return determinePrefixToUse(messageHeaders, false);
362363
}
363364

365+
static String extractTargetProtocol(Map<String, Object> messageHeaders) {
366+
Iterator<String> keyIterator = messageHeaders.keySet().iterator();
367+
for (; keyIterator.hasNext();) {
368+
String key = keyIterator.next();
369+
if (key.startsWith("kafka_")) {
370+
return Protocols.KAFKA;
371+
}
372+
else if (key.startsWith("amqp")) {
373+
return Protocols.AMQP;
374+
}
375+
}
376+
return null;
377+
}
378+
364379
static String determinePrefixToUse(Map<String, Object> messageHeaders, boolean strict) {
365-
String targetProtocol = (String) messageHeaders.get(MessageUtils.TARGET_PROTOCOL);
380+
String targetProtocol = extractTargetProtocol(messageHeaders);
366381
String prefix = determinePrefixToUse(targetProtocol);
367382
if (StringUtils.hasText(prefix) && (strict || StringUtils.hasText((String) messageHeaders.get(prefix + _SPECVERSION)))) {
368383
return prefix;

spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public class CloudEventsFunctionInvocationHelper implements FunctionInvocationHe
7171

7272
@Override
7373
public boolean isRetainOutputAsMessage(Message<?> message) {
74-
return message.getHeaders().containsKey(MessageUtils.TARGET_PROTOCOL) || (message.getHeaders().containsKey(MessageUtils.MESSAGE_TYPE)
74+
return (message.getHeaders().containsKey(MessageUtils.MESSAGE_TYPE)
7575
&& message.getHeaders().get(MessageUtils.MESSAGE_TYPE).equals(CloudEventMessageUtils.CLOUDEVENT_VALUE));
7676
}
7777

@@ -101,8 +101,8 @@ public Message<?> postProcessResult(Object result, Message<?> input) {
101101
if (input != null) {
102102
targetPrefix = CloudEventMessageUtils.determinePrefixToUse(input.getHeaders(), true);
103103
}
104-
else if (result instanceof Message) {
105-
targetPrefix = CloudEventMessageUtils.determinePrefixToUse(((Message<?>) result).getHeaders(), true);
104+
else if (result instanceof Message resultMessage) {
105+
targetPrefix = CloudEventMessageUtils.determinePrefixToUse(resultMessage.getHeaders(), true);
106106
}
107107

108108
Assert.hasText(targetPrefix, "Unable to determine prefix for Cloud Event atttributes, "

spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/message/MessageUtils.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,6 @@ public abstract class MessageUtils {
3131
* Value for 'message-type' typically use as header key.
3232
*/
3333
public static String MESSAGE_TYPE = "message-type";
34-
35-
/**
36-
* Value for 'target-protocol' typically use as header key.
37-
*/
38-
public static String TARGET_PROTOCOL = "target-protocol";
39-
4034
/**
4135
* Value for 'target-protocol' typically use as header key.
4236
*/

spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventFunctionTests.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
3030
import org.springframework.boot.builder.SpringApplicationBuilder;
3131
import org.springframework.cloud.function.context.FunctionCatalog;
32-
import org.springframework.cloud.function.context.message.MessageUtils;
3332
import org.springframework.context.ApplicationContext;
3433
import org.springframework.context.annotation.Bean;
3534
import org.springframework.context.annotation.Configuration;
@@ -99,7 +98,7 @@ public void testBinaryPojoToPojoDefaultOutputHeaderProviderImperative() {
9998
.setId(id)
10099
.setSource("https://spring.io/")
101100
.setType("org.springframework")
102-
.setHeader(MessageUtils.TARGET_PROTOCOL, CloudEventMessageUtils.Protocols.KAFKA)
101+
.setHeader("kafka_foo", "blah")
103102
.build(CloudEventMessageUtils.AMQP_ATTR_PREFIX);
104103

105104
assertThat(CloudEventMessageUtils.isCloudEvent(inputMessage)).isTrue();
@@ -136,7 +135,7 @@ public void testBinaryPojoToPojoDefaultOutputHeaderProviderReactive() {
136135
.setId(id)
137136
.setSource("https://spring.io/")
138137
.setType("org.springframework")
139-
.setHeader(MessageUtils.TARGET_PROTOCOL, CloudEventMessageUtils.Protocols.KAFKA)
138+
.setHeader("kafka_foo", "blah")
140139
.build(CloudEventMessageUtils.AMQP_ATTR_PREFIX);
141140

142141
assertThat(CloudEventMessageUtils.isCloudEvent(inputMessage)).isTrue();
@@ -173,7 +172,7 @@ public void testBinaryPojoToPojoDefaultOutputHeaderProviderReactiveMono() {
173172
.setId(id)
174173
.setSource("https://spring.io/")
175174
.setType("org.springframework")
176-
.setHeader(MessageUtils.TARGET_PROTOCOL, CloudEventMessageUtils.Protocols.KAFKA)
175+
.setHeader("kafka_foo", "blah")
177176
.build(CloudEventMessageUtils.AMQP_ATTR_PREFIX);
178177

179178
assertThat(CloudEventMessageUtils.isCloudEvent(inputMessage)).isTrue();

spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtilsAndBuilderTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public void testHeaderKeyInsensitivity() {
4747
@Test// see https://github.com/spring-cloud/spring-cloud-function/issues/680
4848
public void testProperAttributeExtractionRegardlessOfTargetProtocol() {
4949
Message<String> ceMessage = CloudEventMessageBuilder.withData("foo").build();
50-
ceMessage = MessageBuilder.fromMessage(ceMessage).setHeader("target-protocol", "kafka").build();
50+
ceMessage = MessageBuilder.fromMessage(ceMessage).setHeader("kafka_foo", "blah").build();
5151

5252
String prefix = CloudEventMessageUtils.determinePrefixToUse(ceMessage.getHeaders());
5353
assertThat(prefix).isEqualTo("ce-");

0 commit comments

Comments
 (0)