Skip to content

Commit 604649d

Browse files
committed
GH-2961: Concurrent writes and partition header
- When concurrent threads publish to a binding, PartitionAwareFunctionWrapper resets to null between invocations. Addressing this issue by guarding this reset from occurring if the partiton header on the producer is found. Fixes #2961
1 parent 4b2810f commit 604649d

File tree

1 file changed

+8
-2
lines changed

1 file changed

+8
-2
lines changed

core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/PartitionAwareFunctionWrapper.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2023 the original author or authors.
2+
* Copyright 2020-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -90,7 +90,13 @@ public Object apply(Object input) {
9090
this.setEnhancerIfNecessary();
9191
}
9292
Object result = this.function.apply(input);
93-
if (!((FunctionInvocationWrapper) this.function).isInputTypePublisher()) {
93+
boolean messageContainsPartitionHeader = false;
94+
if (result != null && Message.class.isAssignableFrom(result.getClass())) {
95+
if (((Message<?>) result).getHeaders().containsKey(BinderHeaders.PARTITION_HEADER)) {
96+
messageContainsPartitionHeader = true;
97+
}
98+
}
99+
if (!((FunctionInvocationWrapper) this.function).isInputTypePublisher() && !messageContainsPartitionHeader) {
94100
((FunctionInvocationWrapper) this.function).setEnhancer(null);
95101
}
96102
return result;

0 commit comments

Comments
 (0)