|
22 | 22 | import java.util.Objects; |
23 | 23 | import java.util.Set; |
24 | 24 | import java.util.UUID; |
| 25 | +import java.util.concurrent.BrokenBarrierException; |
| 26 | +import java.util.concurrent.CyclicBarrier; |
25 | 27 | import java.util.concurrent.CountDownLatch; |
26 | 28 | import java.util.concurrent.Executors; |
27 | 29 | import java.util.concurrent.ScheduledExecutorService; |
| 30 | +import java.util.concurrent.TimeoutException; |
28 | 31 | import java.util.concurrent.TimeUnit; |
29 | 32 | import java.util.concurrent.atomic.AtomicBoolean; |
| 33 | + |
30 | 34 | import java.util.function.Consumer; |
31 | 35 | import java.util.function.Function; |
32 | 36 | import java.util.function.Supplier; |
@@ -321,6 +325,42 @@ void scstPartitionAlwaysSetEvenInConcurrentScenarios() throws Exception { |
321 | 325 | } |
322 | 326 | } |
323 | 327 |
|
| 328 | + /* |
| 329 | + * This test verifies that when a partition key expression is set, then scst_partition is always set, even in |
| 330 | + * concurrent scenarios using function binding. |
| 331 | + * See https://github.com/spring-cloud/spring-cloud-stream/issues/2961 for more details |
| 332 | + */ |
| 333 | + @Test |
| 334 | + void scstPartitionAlwaysSetEvenInConcurrentScenariosWithFunctions() throws Exception { |
| 335 | + try (ConfigurableApplicationContext context = new SpringApplicationBuilder( |
| 336 | + TestChannelBinderConfiguration.getCompleteConfiguration(FunctionPartitionConfiguration.class)).web( |
| 337 | + WebApplicationType.NONE).run("--spring.cloud.stream.function.definition=concurrentFunction", |
| 338 | + "--spring.cloud.stream.bindings.concurrentFunction-out-0.producer.partition-count=3", |
| 339 | + "--spring.cloud.stream.bindings.concurrentFunction-out-0.producer.partition-key-expression=headers['partitionKey']", |
| 340 | + "--spring.jmx.enabled=false")) { |
| 341 | + CyclicBarrier barrier = context.getBean(CyclicBarrier.class); |
| 342 | + StreamBridge streamBridge = context.getBean(StreamBridge.class); |
| 343 | + |
| 344 | + Thread otherThread = new Thread(() -> streamBridge.send("concurrentFunction-in-0", "wait")); |
| 345 | + otherThread.start(); |
| 346 | + barrier.await(5, TimeUnit.SECONDS); // wait for thread to be started and function called |
| 347 | + streamBridge.send("concurrentFunction-in-0", "passThrough"); |
| 348 | + barrier.await(5, TimeUnit.SECONDS); // notifies thread to continue function |
| 349 | + otherThread.join(); |
| 350 | + |
| 351 | + int messagesWithoutScstPartition = 2; // total messages |
| 352 | + OutputDestination output = context.getBean(OutputDestination.class); |
| 353 | + Message<byte[]> message = output.receive(1000, "concurrentFunction-out-0"); |
| 354 | + while (message != null) { |
| 355 | + if (message.getHeaders().containsKey("scst_partition")) { |
| 356 | + messagesWithoutScstPartition--; |
| 357 | + } |
| 358 | + message = output.receive(1000, "concurrentFunction-out-0"); |
| 359 | + } |
| 360 | + assertThat(messagesWithoutScstPartition).isEqualTo(0); |
| 361 | + } |
| 362 | + } |
| 363 | + |
324 | 364 | @Test |
325 | 365 | void withOutputContentTypeWildCardBindings() { |
326 | 366 | try (ConfigurableApplicationContext context = new SpringApplicationBuilder(TestChannelBinderConfiguration |
@@ -866,6 +906,31 @@ public Function<String, Message<String>> function(StreamBridge bridge, BindingSe |
866 | 906 | } |
867 | 907 | } |
868 | 908 |
|
| 909 | + @EnableAutoConfiguration |
| 910 | + public static class FunctionPartitionConfiguration { |
| 911 | + |
| 912 | + @Bean |
| 913 | + public CyclicBarrier cyclicBarrierFunction() { |
| 914 | + return new CyclicBarrier(2); |
| 915 | + } |
| 916 | + @Bean |
| 917 | + public Function<String, Message<String>> concurrentFunction(StreamBridge bridge, BindingServiceProperties properties, CyclicBarrier cyclicBarrierFunction) { |
| 918 | + return s -> { |
| 919 | + if (s.startsWith("wait")) { |
| 920 | + try { |
| 921 | + cyclicBarrierFunction.await(5, TimeUnit.SECONDS); // wait for notifying main thread to send other event |
| 922 | + cyclicBarrierFunction.await(5, TimeUnit.SECONDS); // wait for other event been sent |
| 923 | + } catch (BrokenBarrierException | InterruptedException | TimeoutException e) { |
| 924 | + throw new RuntimeException(e); |
| 925 | + } |
| 926 | + } |
| 927 | + return MessageBuilder.withPayload(s) |
| 928 | + .setHeader("partitionKey", s.length()) |
| 929 | + .build(); |
| 930 | + }; |
| 931 | + } |
| 932 | + } |
| 933 | + |
869 | 934 | @EnableAutoConfiguration |
870 | 935 | public static class InterceptorConfiguration { |
871 | 936 | @Bean |
|
0 commit comments