2525import org .sliceworkz .eventmodeling .boundedcontext .LifecycleCapability ;
2626import org .sliceworkz .eventmodeling .commands .AbstractCommand ;
2727import org .sliceworkz .eventmodeling .commands .Command ;
28- import org .sliceworkz .eventmodeling .commands .CommandContext ;
2928import org .sliceworkz .eventmodeling .commands .OutboundCommand ;
3029import org .sliceworkz .eventmodeling .events .Instance ;
3130import org .sliceworkz .eventmodeling .events .Tracing ;
32- import org .sliceworkz .eventmodeling .module .boundedcontext .BoundedContextFunctions ;
31+ import org .sliceworkz .eventmodeling .module .boundedcontext .PerformanceLogger ;
3332import org .sliceworkz .eventmodeling .module .readmodels .ReadModelModule ;
33+ import org .sliceworkz .eventstore .events .EphemeralEvent ;
3434import org .sliceworkz .eventstore .events .Event ;
3535import org .sliceworkz .eventstore .events .EventReference ;
36- import org .sliceworkz .eventstore .stream . AppendCriteria ;
36+ import org .sliceworkz .eventstore .projection . Projector . ProjectorMetrics ;
3737import org .sliceworkz .eventstore .stream .EventStream ;
3838
3939import io .micrometer .core .instrument .Counter ;
@@ -46,26 +46,22 @@ public class DCBModule<DOMAIN_EVENT_TYPE,OUTBOUND_EVENT_TYPE> implements Lifecyc
4646
4747 private String boundedContext ;
4848 private Instance instance ;
49-
49+
5050 private ReadModelModule <DOMAIN_EVENT_TYPE > readModelModule ;
5151 private EventStream <DOMAIN_EVENT_TYPE > domainEventStream ;
5252 private EventStream <OUTBOUND_EVENT_TYPE > outboundEventStream ;
53- private boolean kernelMode ;
54-
55- private BoundedContextFunctions kernelFunctions ;
5653
5754 private MeterRegistry meterRegistry ;
5855 private ConcurrentHashMap <String , Counter > commandCounters = new ConcurrentHashMap <>();
5956 private ConcurrentHashMap <String , Timer > commandTimers = new ConcurrentHashMap <>();
6057 private ConcurrentHashMap <String , Counter > domainEventCounters = new ConcurrentHashMap <>();
61-
62- public DCBModule ( String boundedContext , Instance instance , ReadModelModule <DOMAIN_EVENT_TYPE > readModelModule , EventStream <DOMAIN_EVENT_TYPE > domainEventStream , EventStream <OUTBOUND_EVENT_TYPE > outboundEventStream , boolean kernelMode , MeterRegistry meterRegistry ) {
58+
59+ public DCBModule ( String boundedContext , Instance instance , ReadModelModule <DOMAIN_EVENT_TYPE > readModelModule , EventStream <DOMAIN_EVENT_TYPE > domainEventStream , EventStream <OUTBOUND_EVENT_TYPE > outboundEventStream , MeterRegistry meterRegistry ) {
6360 this .boundedContext = boundedContext ;
6461 this .instance = instance ;
6562 this .readModelModule = readModelModule ;
6663 this .domainEventStream = domainEventStream ;
6764 this .outboundEventStream = outboundEventStream ;
68- this .kernelMode = kernelMode ;
6965 this .meterRegistry = meterRegistry ;
7066 }
7167
@@ -78,55 +74,55 @@ public Optional<EventReference> execute ( OutboundCommand<DOMAIN_EVENT_TYPE,OUT
7874 }
7975
8076 private <PRODUCED_EVENT_TYPE > Optional <EventReference > executeInternal ( AbstractCommand <DOMAIN_EVENT_TYPE ,PRODUCED_EVENT_TYPE > command , Tracing tracing , EventStream <PRODUCED_EVENT_TYPE > targetEventStream ) {
81- Optional <EventReference > result ;
82-
83- // to avoid infinite recursing ...
84- if ( kernelMode ) {
85- // ... after all this code needs to execute in the end ...
86-
77+ String commandName = command .getClass ().getSimpleName ();
78+
79+ Counter counter = commandCounters .computeIfAbsent (commandName , name ->
80+ meterRegistry .counter ("sliceworkz.eventmodeling.command.execute" ,
81+ io .micrometer .core .instrument .Tags .of ("context" , boundedContext , "command" , name )));
82+ counter .increment ();
83+
84+ Timer timer = commandTimers .computeIfAbsent (commandName , name ->
85+ meterRegistry .timer ("sliceworkz.eventmodeling.command.duration" ,
86+ io .micrometer .core .instrument .Tags .of ("context" , boundedContext , "command" , name )));
87+
88+ return timer .record (() -> {
89+ long start = System .currentTimeMillis ();
90+
8791 // execute command and get resulting events
88- CommandContext <DOMAIN_EVENT_TYPE ,PRODUCED_EVENT_TYPE > commandContext = new DCBCommandContextImpl <DOMAIN_EVENT_TYPE ,PRODUCED_EVENT_TYPE >(boundedContext , readModelModule , domainEventStream , targetEventStream , tracing );
89- CommandResultImpl <DOMAIN_EVENT_TYPE ,PRODUCED_EVENT_TYPE > commandResult = (CommandResultImpl <DOMAIN_EVENT_TYPE , PRODUCED_EVENT_TYPE >)command .execute (commandContext );
90-
92+ DCBCommandContextImpl <DOMAIN_EVENT_TYPE ,PRODUCED_EVENT_TYPE > commandContext = new DCBCommandContextImpl <DOMAIN_EVENT_TYPE ,PRODUCED_EVENT_TYPE >(boundedContext , readModelModule , domainEventStream , targetEventStream , tracing );
93+ CommandResultImpl <DOMAIN_EVENT_TYPE ,PRODUCED_EVENT_TYPE > commandResult = (CommandResultImpl <DOMAIN_EVENT_TYPE , PRODUCED_EVENT_TYPE >) command .execute (commandContext );
94+
95+ Optional <EventReference > result ;
96+
9197 if ( !commandResult .raisedEvents ().isEmpty () ) {
9298 // append to the event store (with optimistic locking the DCB way)
9399 // and return the last event reference produced (for bookmarking purposes etc ...)
94- result = targetEventStream .append (AppendCriteria .none (), commandResult .raisedEvents ()).stream ().reduce ((first ,second )->second ).map (Event ::reference );
95-
100+ result = targetEventStream .append (commandResult .appendCriteria (), commandResult .raisedEvents ())
101+ .stream ().reduce ((first ,second )->second ).map (Event ::reference );
102+
103+ // Record metrics for each raised domain event
104+ String channel = tracing .channel () != null ? tracing .channel () : "unknown" ;
105+ for (EphemeralEvent <? extends PRODUCED_EVENT_TYPE > event : commandResult .raisedEvents ()) {
106+ String eventName = event .data ().getClass ().getSimpleName ();
107+ String cacheKey = eventName + ":" + channel ;
108+ Counter eventCounter = domainEventCounters .computeIfAbsent (cacheKey , key ->
109+ meterRegistry .counter ("sliceworkz.eventmodeling.domain.event" ,
110+ io .micrometer .core .instrument .Tags .of ("context" , boundedContext , "event" , eventName , "channel" , channel , "source" , "dcb" )));
111+ eventCounter .increment ();
112+ }
96113 } else {
97114 LOGGER .debug ("no events raised by command {}" , command .getClass ());
98115 result = Optional .empty ();
99116 }
100-
101- return result ;
102-
103- } else {
104-
105- String commandName = command .getClass ().getSimpleName ();
106-
107- Counter counter = commandCounters .computeIfAbsent (commandName , name ->
108- meterRegistry .counter ("sliceworkz.eventmodeling.command.execute" ,
109- io .micrometer .core .instrument .Tags .of ("context" , boundedContext , "command" , name )));
110- counter .increment ();
111-
112- Timer timer = commandTimers .computeIfAbsent (commandName , name ->
113- meterRegistry .timer ("sliceworkz.eventmodeling.command.duration" ,
114- io .micrometer .core .instrument .Tags .of ("context" , boundedContext , "command" , name )));
115-
116- return timer .record (()->{
117-
118- @ SuppressWarnings ({ "unchecked" , "rawtypes" })
119- ExecuteCommandCommand <DOMAIN_EVENT_TYPE ,OUTBOUND_EVENT_TYPE > cmd = new ExecuteCommandCommand (boundedContext , instance , readModelModule , domainEventStream , targetEventStream , command , meterRegistry , domainEventCounters );
120- kernelFunctions .executeKernelCommand (cmd , tracing );
121-
122- // return the last application event reference rather than the observability event
123- return cmd .getLastAppendedEventReference ();
124- });
125- }
126- }
127117
128- public void kernelFunctions ( BoundedContextFunctions kernelFunctions ) {
129- this .kernelFunctions = kernelFunctions ;
118+ long finish = System .currentTimeMillis ();
119+ long duration = finish - start ;
120+ ProjectorMetrics projectorMetrics = commandContext .projectorMetrics ();
121+ PerformanceLogger .Metrics metrics = new PerformanceLogger .Metrics (duration , projectorMetrics .queriesDone (), projectorMetrics .eventsStreamed (), projectorMetrics .eventsHandled (), projectorMetrics .lastEventReference ());
122+ PerformanceLogger .entry ().context (boundedContext ).instance (instance ).metrics (metrics ).type ("command.execute" ).command (command .commandName ()).log ();
123+
124+ return result ;
125+ });
130126 }
131127
132128 @ Override
0 commit comments