@@ -140,6 +140,37 @@ public void testCompositionWithNonExistingFunction() throws Exception {
140140 assertThat (registration .getNames ().iterator ().next ()).isEqualTo ("echo1" );
141141 }
142142
143+ @ Test
144+ public void testCompositionReactiveSupplierWithImplicitConsumer () throws Exception {
145+ FunctionCatalog catalog = this .configureCatalog (CompositionReactiveSupplierWithConsumer .class );
146+ FunctionInvocationWrapper function = catalog .lookup ("supplyPrimitive|consume" );
147+ function .apply (null );
148+ assertThat (CompositionReactiveSupplierWithConsumer .results .size ()).isEqualTo (2 );
149+ assertThat (CompositionReactiveSupplierWithConsumer .results .get (0 )).isEqualTo (1 );
150+ assertThat (CompositionReactiveSupplierWithConsumer .results .get (1 )).isEqualTo (2 );
151+ CompositionReactiveSupplierWithConsumer .results .clear ();
152+
153+ function = catalog .lookup ("supplyMessage|consume" );
154+ function .apply (null );
155+ assertThat (CompositionReactiveSupplierWithConsumer .results .size ()).isEqualTo (2 );
156+ assertThat (CompositionReactiveSupplierWithConsumer .results .get (0 )).isEqualTo (1 );
157+ assertThat (CompositionReactiveSupplierWithConsumer .results .get (1 )).isEqualTo (2 );
158+ CompositionReactiveSupplierWithConsumer .results .clear ();
159+
160+ function = catalog .lookup ("functionMessage|consume" );
161+ function .apply (Flux .fromArray (new Message [] {MessageBuilder .withPayload ("ricky" ).build (), MessageBuilder .withPayload ("bubbles" ).build ()}));
162+ assertThat (CompositionReactiveSupplierWithConsumer .results .size ()).isEqualTo (2 );
163+ assertThat (CompositionReactiveSupplierWithConsumer .results .get (0 )).isEqualTo ("RICKY" );
164+ assertThat (CompositionReactiveSupplierWithConsumer .results .get (1 )).isEqualTo ("BUBBLES" );
165+ CompositionReactiveSupplierWithConsumer .results .clear ();
166+
167+ function = catalog .lookup ("functionPrimitive|consume" );
168+ function .apply (Flux .fromArray (new String [] {"ricky" , "bubbles" }));
169+ assertThat (CompositionReactiveSupplierWithConsumer .results .size ()).isEqualTo (2 );
170+ assertThat (CompositionReactiveSupplierWithConsumer .results .get (0 )).isEqualTo ("RICKY" );
171+ assertThat (CompositionReactiveSupplierWithConsumer .results .get (1 )).isEqualTo ("BUBBLES" );
172+ }
173+
143174 @ SuppressWarnings ({ "rawtypes" , "unchecked" })
144175 @ Test
145176 public void testMessageWithArrayAsPayload () throws Exception {
@@ -1539,6 +1570,49 @@ public Function<String, String> echo2() {
15391570 }
15401571 }
15411572
1573+ @ EnableAutoConfiguration
1574+ @ Configuration // s-c-f-1141
1575+ @ SuppressWarnings ({"unchecked" , "rawtypes" })
1576+ public static class CompositionReactiveSupplierWithConsumer {
1577+ private static List results = new ArrayList <>();
1578+
1579+ @ Bean
1580+ public Function <Flux <String >, Flux <String >> functionPrimitive () {
1581+ return flux -> flux .map (v -> v .toUpperCase ());
1582+ }
1583+
1584+ @ Bean
1585+ public Function <Flux <Message <String >>, Flux <Message <String >>> functionMessage () {
1586+ return flux -> flux .map (v -> MessageBuilder .withPayload (v .getPayload ().toUpperCase ()).build ());
1587+ }
1588+
1589+ @ Bean
1590+ public Supplier <Flux <Message <Integer >>> supplyMessage () {
1591+ return () -> {
1592+ return Flux .fromArray (
1593+ new Message [] { MessageBuilder .withPayload (1 ).build (), MessageBuilder .withPayload (2 ).build () });
1594+ };
1595+ }
1596+
1597+ @ Bean
1598+ public Supplier <Flux <Integer >> supplyPrimitive () {
1599+ return () -> {
1600+ return Flux .fromArray (
1601+ new Integer [] { 1 , 2 });
1602+ };
1603+ }
1604+
1605+ @ Bean
1606+ public Consumer consume () {
1607+ return v -> {
1608+ if (v instanceof Message vMessage ) {
1609+ v = vMessage .getPayload ();
1610+ }
1611+ results .add (v );
1612+ };
1613+ }
1614+ }
1615+
15421616 @ EnableAutoConfiguration
15431617 @ Configuration
15441618 public static class MessageWithArrayAsPayload {
0 commit comments