Skip to content

Commit 45b6597

Browse files
committed
GH-2222 Add bits related to removal of TARGET_PROTOCOL removal in s-c-function
Resolves #2222
1 parent cbc0354 commit 45b6597

File tree

6 files changed

+18
-25
lines changed

6 files changed

+18
-25
lines changed

binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration/KafkaMultiBinderCustomConfigurationTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ class KafkaMultiBinderCustomConfigurationTests {
6565
private DefaultBinderFactory binderFactory;
6666

6767
/**
68-
* Verifies that the custom user configuration is loaded from spring.main.sources
68+
* Verifies that the custom user configuration is loaded from spring.main.sources.
6969
*/
7070
@Test
7171
void binderKafka2UsesCustomConfigurationIsLoadedFromSpringMainSources() throws IllegalAccessException {
@@ -83,7 +83,7 @@ void binderKafka2UsesCustomConfigurationIsLoadedFromSpringMainSources() throws I
8383
}
8484

8585
/**
86-
* Verifies that the default configuration is used when no custom user configuration is provided
86+
* Verifies that the default configuration is used when no custom user configuration is provided.
8787
*/
8888
@Test
8989
void binderKafka1UsesDefaultBeanFromKafkaBinderMetricsConfigurationWithMultiBinder() throws IllegalAccessException {
@@ -101,7 +101,7 @@ void binderKafka1UsesDefaultBeanFromKafkaBinderMetricsConfigurationWithMultiBind
101101
}
102102

103103
/**
104-
* Helper method to get the binder context from the binderInstanceCache field in DefaultBinderFactory
104+
* Helper method to get the binder context from the binderInstanceCache field in DefaultBinderFactory.
105105
*/
106106
private ConfigurableApplicationContext getBinderContext(String binderName) throws IllegalAccessException {
107107
Field binderInstanceCacheField = ReflectionUtils.findField(DefaultBinderFactory.class, "binderInstanceCache");
@@ -114,7 +114,7 @@ private ConfigurableApplicationContext getBinderContext(String binderName) throw
114114
}
115115

116116
/**
117-
* Custom configuration that provides a custom KafkaBinderMetrics
117+
* Custom configuration that provides a custom KafkaBinderMetrics.
118118
*/
119119
static class CustomConfiguration {
120120

core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/HeaderTests.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.springframework.boot.WebApplicationType;
2626
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
2727
import org.springframework.boot.builder.SpringApplicationBuilder;
28-
import org.springframework.cloud.function.context.message.MessageUtils;
2928
import org.springframework.cloud.stream.binder.test.InputDestination;
3029
import org.springframework.cloud.stream.binder.test.OutputDestination;
3130
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
@@ -65,7 +64,6 @@ void checkWithEmptyPojo() {
6564
Message<byte[]> messageReceived = outputDestination.receive(1000, "emptyConfigurationDestination");
6665
MessageHeaders headers = messageReceived.getHeaders();
6766
assertThat(headers).isNotNull();
68-
assertThat(headers.get(MessageUtils.TARGET_PROTOCOL)).isEqualTo("kafka");
6967
assertThat(headers.get(MessageHeaders.CONTENT_TYPE)).isEqualTo("application/json");
7068
}
7169
}
@@ -86,7 +84,6 @@ void checkIfHeaderProvidedInData() {
8684
Message<byte[]> result = output.receive(1000, "myBinding-out-0");
8785
MessageHeaders headers = result.getHeaders();
8886
assertThat(headers).isNotNull();
89-
assertThat(headers.get(MessageUtils.TARGET_PROTOCOL)).isEqualTo("kafka");
9087
assertThat(headers.get(MessageHeaders.CONTENT_TYPE)).isEqualTo("application/json");
9188
assertThat(headers.get("anyHeader")).isEqualTo("anyValue");
9289
}
@@ -107,7 +104,6 @@ void checkGenericMessageSent() {
107104
Message<byte[]> result = output.receive(1000, "uppercase-out-0");
108105
MessageHeaders headers = result.getHeaders();
109106
assertThat(headers).isNotNull();
110-
assertThat(headers.get(MessageUtils.TARGET_PROTOCOL)).isEqualTo("kafka");
111107
assertThat(headers.get(MessageHeaders.CONTENT_TYPE)).isEqualTo("application/json");
112108
}
113109
}
@@ -131,7 +127,6 @@ void checkMessageWrappedFunctionalConsumer() {
131127
assertThat(headers).isNotNull();
132128
assertThat(headers).isNotNull();
133129
assertThat(headers.get(MessageHeaders.CONTENT_TYPE)).isEqualTo("application/json");
134-
assertThat(headers.get(MessageUtils.TARGET_PROTOCOL)).isEqualTo("kafka");
135130
}
136131

137132
@EnableAutoConfiguration

core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/ImplicitFunctionBindingTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1208,7 +1208,8 @@ void headerPropagation() {
12081208
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
12091209
TestChannelBinderConfiguration.getCompleteConfiguration(SingleFunctionConfiguration.class))
12101210
.web(WebApplicationType.NONE).run("--spring.jmx.enabled=false",
1211-
"--spring.cloud.function.definition=func")) {
1211+
"--spring.cloud.function.definition=func",
1212+
"--spring.cloud.function.configuration.func.copy-input-headers=true")) {
12121213

12131214
InputDestination inputDestination = context.getBean(InputDestination.class);
12141215
OutputDestination outputDestination = context.getBean(OutputDestination.class);

core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import org.springframework.cloud.function.cloudevent.CloudEventMessageBuilder;
4848
import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils;
4949
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
50-
import org.springframework.cloud.function.context.message.MessageUtils;
5150
import org.springframework.cloud.stream.binder.test.InputDestination;
5251
import org.springframework.cloud.stream.binder.test.OutputDestination;
5352
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
@@ -166,7 +165,7 @@ void test_SCF_856() {
166165
streamBridge.send("myBinding-out-0",
167166
CloudEventMessageBuilder.withData("hello").setSource("my-source")
168167
.setId(UUID.randomUUID().toString()).setSpecVersion("1.0").setType("myType")
169-
.setHeader(MessageUtils.TARGET_PROTOCOL, "kafka").build(),
168+
.setHeader("kafka_foo", "kafka").build(),
170169
MimeTypeUtils.APPLICATION_JSON);
171170
OutputDestination output = context.getBean(OutputDestination.class);
172171
Message<byte[]> result = output.receive();

core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -404,8 +404,8 @@ private static <P> Message<P> sanitize(Message<P> inputMessage) {
404404
return MessageBuilder
405405
.fromMessage(inputMessage)
406406
.removeHeader("spring.cloud.stream.sendto.destination")
407-
.setHeader(MessageUtils.SOURCE_TYPE, inputMessage.getHeaders().get(MessageUtils.TARGET_PROTOCOL))
408-
.removeHeader(MessageUtils.TARGET_PROTOCOL)
407+
// .setHeader(MessageUtils.SOURCE_TYPE, inputMessage.getHeaders().get(MessageUtils.TARGET_PROTOCOL))
408+
// .removeHeader(MessageUtils.TARGET_PROTOCOL)
409409
.build();
410410
}
411411

@@ -503,13 +503,13 @@ private void bindFunctionToDestinations(BindableProxyFactory bindableProxyFactor
503503
bindableType = FluxMessageChannel.class;
504504
}
505505
Object binder = binderFactory.getBinder(binderConfigurationName, bindableType);
506-
String targetProtocol = binder.getClass().getSimpleName().startsWith("Rabbit") ? "amqp" : "kafka";
506+
//String targetProtocol = binder.getClass().getSimpleName().startsWith("Rabbit") ? "amqp" : "kafka";
507507
Field headersField = ReflectionUtils.findField(MessageHeaders.class, "headers");
508508
headersField.setAccessible(true);
509509
targetProtocolEnhancer.set(message -> {
510510
Map<String, Object> headersMap = (Map<String, Object>) ReflectionUtils
511511
.getField(headersField, ((Message) message).getHeaders());
512-
headersMap.putIfAbsent(MessageUtils.TARGET_PROTOCOL, targetProtocol);
512+
//headersMap.putIfAbsent(MessageUtils.TARGET_PROTOCOL, targetProtocol);
513513
if (CloudEventMessageUtils.isCloudEvent((message))) {
514514
headersMap.putIfAbsent(MessageUtils.MESSAGE_TYPE, CloudEventMessageUtils.CLOUDEVENT_VALUE);
515515
}
@@ -841,9 +841,9 @@ public Object apply(Message<byte[]> message) {
841841
private void setHeadersIfNeeded(Message message) {
842842
Map<String, Object> headersMap = (Map<String, Object>) ReflectionUtils
843843
.getField(this.headersField, message.getHeaders());
844-
if (StringUtils.hasText(targetProtocol)) {
845-
headersMap.putIfAbsent(MessageUtils.TARGET_PROTOCOL, targetProtocol);
846-
}
844+
// if (StringUtils.hasText(targetProtocol)) {
845+
// headersMap.putIfAbsent(MessageUtils.TARGET_PROTOCOL, targetProtocol);
846+
// }
847847
if (CloudEventMessageUtils.isCloudEvent(message)) {
848848
headersMap.putIfAbsent(MessageUtils.MESSAGE_TYPE, CloudEventMessageUtils.CLOUDEVENT_VALUE);
849849
}

core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package org.springframework.cloud.stream.function;
1818

1919
import java.lang.reflect.Type;
20-
import java.util.Collections;
2120
import java.util.LinkedHashMap;
2221
import java.util.Map;
2322
import java.util.concurrent.ConcurrentHashMap;
@@ -41,7 +40,6 @@
4140
import org.springframework.cloud.function.context.FunctionRegistry;
4241
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
4342
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.PassThruFunction;
44-
import org.springframework.cloud.function.context.message.MessageUtils;
4543
import org.springframework.cloud.function.core.FunctionInvocationHelper;
4644
import org.springframework.cloud.stream.binder.Binder;
4745
import org.springframework.cloud.stream.binder.BinderFactory;
@@ -204,12 +202,12 @@ public boolean send(String bindingName, @Nullable String binderName, Object data
204202
functionToInvoke = new PartitionAwareFunctionWrapper(functionToInvoke, this.applicationContext, producerProperties);
205203
}
206204

207-
String targetType = this.resolveBinderTargetType(bindingName, binderName, MessageChannel.class,
208-
this.applicationContext.getBean(BinderFactory.class));
205+
// String targetType = this.resolveBinderTargetType(bindingName, binderName, MessageChannel.class,
206+
// this.applicationContext.getBean(BinderFactory.class));
209207

210208
Message<?> messageToSend = data instanceof Message messageData
211-
? MessageBuilder.fromMessage(messageData).setHeaderIfAbsent(MessageUtils.TARGET_PROTOCOL, targetType).build()
212-
: new GenericMessage<>(data, Collections.singletonMap(MessageUtils.TARGET_PROTOCOL, targetType));
209+
? MessageBuilder.fromMessage(messageData).build()
210+
: new GenericMessage<>(data);
213211

214212
Message<?> resultMessage;
215213
lock.lock();

0 commit comments

Comments
 (0)