@@ -40,6 +40,7 @@ class EventModelBuilder(
4040 private var handler : EventStreamProcessor ? = null ,
4141) : ExtensionAwareBuilder {
4242
43+ private val additionalProcessors: MutableList <EventStreamProcessor > = mutableListOf ()
4344 private val virtualEvents: MutableList <Pair <String , VirtualEventIngest >> = mutableListOf ()
4445 private var checkpointConfig: CheckpointConfiguration ? = null
4546
@@ -74,6 +75,19 @@ class EventModelBuilder(
7475 virtualEvents.add(streamId to stream)
7576 }
7677
78+ /* *
79+ * Registers an additional event stream processor for the model. The provided processor will
80+ * handle specific stream processing tasks in conjunction with the existing processors.
81+ *
82+ * @param processor The event stream processor to register. This should implement the
83+ * [EventStreamProcessor] interface to process individual events.
84+ */
85+ fun registerProcessor (
86+ processor : EventStreamProcessor ,
87+ ) {
88+ additionalProcessors.add(processor)
89+ }
90+
7791 /* *
7892 * Sets the event stream processor that handles events for the model.
7993 */
@@ -109,6 +123,7 @@ class EventModelBuilder(
109123 buildList {
110124 add(handler)
111125 addAll(extensions.filterIsInstance<EventStreamProcessor >())
126+ addAll(additionalProcessors)
112127 }
113128 )
114129
@@ -140,6 +155,7 @@ class EventModelBuilder(
140155 )
141156 }
142157 }
158+
143159 extensions.forEach {
144160 it.modelCreatedCallback(model)
145161 }
0 commit comments