Skip to content

Commit 94142aa

Browse files
ehdrms2034olegz
authored andcommitted
change synchronized to reentrant lock in streambridge
1 parent 56d86c9 commit 94142aa

File tree

1 file changed

+14
-4
lines changed
  • core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function

1 file changed

+14
-4
lines changed

core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,14 @@ public boolean send(String bindingName, @Nullable String binderName, Object data
188188
ProducerProperties producerProperties = this.bindingServiceProperties.getProducerProperties(bindingName);
189189
MessageChannel messageChannel = this.resolveDestination(bindingName, producerProperties, binderName);
190190

191-
Function functionToInvoke = this.getStreamBridgeFunction(outputContentType.toString(), producerProperties);
191+
Function functionToInvoke;
192+
lock.lock();
193+
try {
194+
functionToInvoke = this.getStreamBridgeFunction(outputContentType.toString(), producerProperties);
195+
} finally {
196+
lock.unlock();
197+
}
198+
192199

193200
if (producerProperties != null && producerProperties.isPartitioned()) {
194201
functionToInvoke = new PartitionAwareFunctionWrapper(functionToInvoke, this.applicationContext, producerProperties);
@@ -202,8 +209,11 @@ public boolean send(String bindingName, @Nullable String binderName, Object data
202209
: new GenericMessage<>(data, Collections.singletonMap(MessageUtils.TARGET_PROTOCOL, targetType));
203210

204211
Message<?> resultMessage;
205-
synchronized (this) {
206-
resultMessage = (Message<byte[]>) functionToInvoke.apply(messageToSend);
212+
lock.lock();
213+
try {
214+
resultMessage = (Message<byte[]>)functionToInvoke.apply(messageToSend);
215+
} finally {
216+
lock.unlock();
207217
}
208218

209219
if (resultMessage == null
@@ -225,7 +235,7 @@ private int hashProducerProperties(ProducerProperties producerProperties, String
225235
return hash;
226236
}
227237

228-
private synchronized FunctionInvocationWrapper getStreamBridgeFunction(String outputContentType, ProducerProperties producerProperties) {
238+
private FunctionInvocationWrapper getStreamBridgeFunction(String outputContentType, ProducerProperties producerProperties) {
229239
int streamBridgeFunctionKey = this.hashProducerProperties(producerProperties, outputContentType);
230240
if (this.streamBridgeFunctionCache.containsKey(streamBridgeFunctionKey)) {
231241
return this.streamBridgeFunctionCache.get(streamBridgeFunctionKey);

0 commit comments

Comments
 (0)