-
Notifications
You must be signed in to change notification settings - Fork 630
Description
Describe the issue
Issue #3086 introduced the ability to dynamically create bindings at runtime via the defineInputBinding() and defineOutputBinding() methods in BindingsLifecycleController. However, we believe that the feature has some design considerations that could make this better...
What I mean is that this syntax essentially says:
- Initialize the binding.
- Start the binding.
- Configure the consumer properties extension after binding initialization.
BindingsLifecycleController controller = context.getBean(BindingsLifecycleController.class);
KafkaConsumerProperties consumerProperties = controller.defineInputBinding("test-input-binding");This can result in 2 problems that we've noticed...
Cannot Set Core Spring Cloud Stream Binding Configuration Options
The current defineInputBinding() only returns the extension properties, which are just the vendor-specific properties (i.e. just the Solace, Kafka, Rabbit, GCP, etc specific properties). But the core binding configuration options provided by SCSt itself doesn't seem to be able to be configurable?
For example, it doesn't seem like we're able to configure:
- The
destinationproperty - The
groupproperty - Which binder instance this binding should use. Or more specifically, the binder name (not binder type).
- The
ConsumerPropertiesobject itself (such asauto-startupor consumerconcurrency)
Initialization Process is Backwards from Standard Spring Cloud Stream initialization Process
The standard initialization process for a binding when managed by Spring Cloud Stream is:
- Configure the consumer/producer properties and its extension
- Initialize the binding.
- Start the binding.
Notice how this is backwards? The defineInputBinding() method reverses this process, and only allows for configuration of bindings that are fully reconfigurable during binding start()/stop(). It doesn't allow for the configuration of (or can even cause issues with) bindings that are configured and stores state at binding initialization time.
The solace binder is one such example where it wasn't an expected nor tested use case for the binding to be reconfigured through the ConsumerProperties/ProducerProperties object after binding initialization. At worst, using defineInputBinding() and configuring the resultant ConsumerProperties/ProducerProperties object can can cause a mismatch between the properties object and the internal state of the binding itself.
In my opinion, the contract for configuring a binding after initialization should have at most only been allowed via the Advanced Consumer Configuration or the Advanced Producer Configuration features. These 2 interfaces provide clear boundaries from which binder developers can clearly expect their bindings to be configured post-initialization. Whereas the ConsumerProperties/ProducerProperties objects on the other hand, would more exclusively be only applicable to binding initialization.
Version of the framework
Looking at Spring Cloud 2025.0.1. But we assume that 2025.1.0 functions similarly as the dynamic binding feature hasn't changed significantly since #3086.
Expected behavior
Provide a new interface which accepts the full configuration of the binding, which would then be used for dynamic runtime binding initialization. For example:
public class BindingsLifecycleController {
public Binding<?> createInputBinding(String bindingName, String binderName, String destination, @Nullable String group, ConsumerProperties consumerProperties);
public Binding<?> createOutputBinding(String bindingName, String binderName, String destination, ProducerProperties producerProperties);
}NOTE: The
ConsumerPropertiesandProducerPropertiesparameters should also accept their subclasses,ExtendedConsumerPropertiesandExtendedProducerPropertiesrespectively, so that binder-specific extension properties can also be configured at binding initialization time.
This would then allow for developers to be able to do something like:
public void createSolaceConsumerBinding(BindingsLifecycleController bindingsLifecycleController) {
String bindingName = "binding-name";
String binderName = "solace"; // or whatever binder name if user is using a "Connecting to Multiple Systems" setup (https://docs.spring.io/spring-cloud-stream/reference/spring-cloud-stream/multiple-systems.html#page-title)
String destination = "my-new-topic";
String group = "group-0";
ExtendedConsumerProperties<SolaceConsumerProperties> consumerProperties = new ExtendedConsumerProperties<>(new SolaceConsumerProperties());
consumerProperties.getExtension().setEndpointType(EndpointType.TOPIC_ENDPOINT);
// configure other consumerProperties as necessary
bindingsLifecycleController.createInputBinding(bindingName, binderName, destination, group, consumerProperties);
}At this point, after the call to the proposed createInputBinding method, the binding would be initialized. i.e. The same as what Explicit Binding Creation does.
Note: Optionally, the feature could also potentially start the binding according to the
consumerProperties.isAutoStartup()setting (matching the behavior of statically-configured bindings). But honestly, I'm not really sure if this is useful, since there would be no message handler's bound yet to the message channels.
After initializing the binding, the user would then need to connect the input/output channels to their MessageHandlers.
There are many different ways users could wire-up the channels (especially the output channel), which is out-of-scope for this proposal, but for illustrative purposes, here's 2 potential ways users could do it:
Example 1: Subscribe Spring Cloud Function to Input MessageChannel
@Bean
public Consumer<Message<?>> myFunction() {
return message -> System.out.println(message.getPayload());
}
@EventListener
void handleBindingCreatedEvent(BindingCreatedEvent event) {
// subscribing the message handler by looking it up from Spring Cloud Function
// Synonymous to the typical SCSt setup
Binding<?> binding = (Binding<?>) event.getSource();
System.out.println("Binding created: " + binding.getBindingName());
if (binding.isInput()) {
FunctionCatalog functionCatalog = applicationContext.getBean(FunctionCatalog.class);
FunctionInvocationWrapper fnc = functionCatalog.lookup("myFunction");
SubscribableChannel inputChannel = applicationContext.getBean(binding.getBindingName(), SubscribableChannel.class);
inputChannel.subscribe(fnc::apply);
System.out.println("Subscribed function: " + fnc + " to channel: " + binding.getBindingName());
}
}Example 2: Subscribe Custom MessageHandler instance to Input MessageChannel
private static class MyMessageHandler implements MessageHandler {
@Override
public void handleMessage(Message<?> message) {
System.out.println("Received message: " + message.getPayload());
}
}
@EventListener
void handleBindingCreatedEvent(BindingCreatedEvent event) {
// initializing and subscribing a custom MessageHandler instance
// Synonymous to a "Explicit Binding Creation" use case (https://docs.spring.io/spring-cloud-stream/reference/spring-cloud-stream/explicit-binding-creation.html#page-title)
Binding<?> binding = (Binding<?>) event.getSource();
System.out.println("Binding created: " + binding.getBindingName());
if (binding.isInput()) {
SubscribableChannel inputChannel = applicationContext.getBean(binding.getBindingName(), SubscribableChannel.class);
inputChannel.subscribe(new MyMessageHandler());
System.out.println("Subscribed message handler to channel: " + binding.getBindingName());
}
}Additional context
@olegz this issue is a write-up of the proposal that we were talking about the other week.
In particular the "Example 2: Subscribe Custom MessageHandler instance to Input MessageChannel" use case with Explicit Binding Creation is what is applicable for our own projects at Solace, where we needed a code-driven configurable MessageHandler instance per message processor, driven by Java code, not by configuration.