Skip to content

Commit 0f1a870

Browse files
committed
added observability to aggregates
1 parent 7740b4a commit 0f1a870

File tree

4 files changed

+81
-24
lines changed

4 files changed

+81
-24
lines changed

sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/aggregates/AggregateContextImpl.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,36 @@
1717
*/
1818
package org.sliceworkz.eventmodeling.module.aggregates;
1919

20+
import java.time.Instant;
2021
import java.util.List;
2122

2223
import org.sliceworkz.eventmodeling.aggregates.Aggregate;
2324
import org.sliceworkz.eventmodeling.aggregates.AggregateContext;
2425
import org.sliceworkz.eventmodeling.aggregates.AggregateEventAppender;
26+
import org.sliceworkz.eventmodeling.events.Instance;
27+
import org.sliceworkz.eventmodeling.module.boundedcontext.PerformanceLogger;
28+
import org.sliceworkz.eventmodeling.module.boundedcontext.PerformanceLogger.Metrics;
2529
import org.sliceworkz.eventstore.events.EventReference;
2630
import org.sliceworkz.eventstore.events.Tags;
2731
import org.sliceworkz.eventstore.projection.Projection;
2832
import org.sliceworkz.eventstore.projection.Projector;
33+
import org.sliceworkz.eventstore.projection.Projector.ProjectorMetrics;
2934
import org.sliceworkz.eventstore.stream.EventStream;
3035

3136
public class AggregateContextImpl<DOMAIN_EVENT_TYPE> implements AggregateContext<DOMAIN_EVENT_TYPE> {
3237

38+
private String boundedContext;
39+
private Instance instance;
3340
private Tags identity;
3441
private Aggregate<DOMAIN_EVENT_TYPE> aggregate;
3542
private EventStream<DOMAIN_EVENT_TYPE> eventStream;
3643
private Projection<DOMAIN_EVENT_TYPE> projectionTowardsAggregate;
3744
private EventReference lastEventReference;
3845
private AggregateEventAppender<DOMAIN_EVENT_TYPE> aggregateEventAppender;
3946

40-
public AggregateContextImpl ( Tags identity, Aggregate<DOMAIN_EVENT_TYPE> aggregate, EventStream<DOMAIN_EVENT_TYPE> eventStream ) {
47+
public AggregateContextImpl ( String boundedContext, Instance instance, Tags identity, Aggregate<DOMAIN_EVENT_TYPE> aggregate, EventStream<DOMAIN_EVENT_TYPE> eventStream ) {
48+
this.boundedContext = boundedContext;
49+
this.instance = instance;
4150
this.identity = identity;
4251
this.aggregate = aggregate;
4352
this.eventStream = eventStream;
@@ -74,8 +83,18 @@ public AggregateEventAppender<DOMAIN_EVENT_TYPE> eventAppender() {
7483

7584
@Override
7685
public void updateFromStream() {
77-
this.lastEventReference = Projector.from(eventStream).towards(projectionTowardsAggregate).startingAfter(lastEventReference).build().run().lastEventReference();
86+
Instant start = Instant.now();
87+
88+
89+
ProjectorMetrics projectorMetrics = Projector.from(eventStream).towards(projectionTowardsAggregate).startingAfter(lastEventReference).build().run();
90+
this.lastEventReference = projectorMetrics.lastEventReference();
7891
this.aggregateEventAppender = new AggregateEventAppenderImpl<>(eventStream, aggregate, identity, lastEventReference);
92+
Instant finish = Instant.now();
93+
94+
long duration = finish.toEpochMilli() - start.toEpochMilli();
95+
Metrics metrics = new Metrics(duration, projectorMetrics.queriesDone(), projectorMetrics.eventsStreamed(), projectorMetrics.eventsHandled(), projectorMetrics.lastEventReference());
96+
97+
PerformanceLogger.entry().context(boundedContext).instance(instance).metrics(metrics).type("aggregate.load").aggregate(aggregate.getClass().getSimpleName()).log();
7998
}
8099

81100
}

sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/aggregates/AggregateModule.java

Lines changed: 54 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -27,63 +27,96 @@
2727
import org.slf4j.LoggerFactory;
2828
import org.sliceworkz.eventmodeling.aggregates.Aggregate;
2929
import org.sliceworkz.eventmodeling.aggregates.AggregateCapability;
30+
import org.sliceworkz.eventmodeling.events.Instance;
3031
import org.sliceworkz.eventstore.events.Tags;
3132
import org.sliceworkz.eventstore.stream.EventStream;
3233

33-
// TODO add micrometer for observability
34+
import io.micrometer.core.instrument.Counter;
35+
import io.micrometer.core.instrument.MeterRegistry;
36+
import io.micrometer.core.instrument.Timer;
37+
3438
public class AggregateModule<DOMAIN_EVENT_TYPE> implements AggregateCapability<DOMAIN_EVENT_TYPE> {
3539

3640
private static final Logger LOGGER = LoggerFactory.getLogger(AggregateModule.class);
3741

38-
private Map<Class<? extends Aggregate<DOMAIN_EVENT_TYPE>>,Constructor<? extends Aggregate<DOMAIN_EVENT_TYPE>>> aggregateClassesWithConstructor = new HashMap<>();
42+
private Map<Class<? extends Aggregate<DOMAIN_EVENT_TYPE>>,AggregateInfo<DOMAIN_EVENT_TYPE>> aggregateInfoByClass = new HashMap<>();
3943
private EventStream<DOMAIN_EVENT_TYPE> domainEventStream;
4044
private String boundedContext;
45+
private Instance instance;
46+
private MeterRegistry meterRegistry;
4147

42-
public AggregateModule ( String boundedContext, List<? extends AggregateSpecificationImpl<DOMAIN_EVENT_TYPE,?,?>> aggregateSpecifications, EventStream<DOMAIN_EVENT_TYPE> domainEventStream ) {
48+
public AggregateModule ( String boundedContext, Instance instance, List<? extends AggregateSpecificationImpl<DOMAIN_EVENT_TYPE,?,?>> aggregateSpecifications, EventStream<DOMAIN_EVENT_TYPE> domainEventStream, MeterRegistry meterRegistry ) {
4349
this.boundedContext = boundedContext;
50+
this.instance = instance;
4451
this.domainEventStream = domainEventStream;
52+
this.meterRegistry = meterRegistry;
53+
54+
io.micrometer.core.instrument.Tags tags = io.micrometer.core.instrument.Tags
55+
.of("context", boundedContext);
56+
4557
aggregateSpecifications.forEach(spec->{
46-
if ( aggregateClassesWithConstructor.containsKey(spec.aggregateClass()) ) {
58+
if ( aggregateInfoByClass.containsKey(spec.aggregateClass()) ) {
4759
throw new IllegalArgumentException("duplicate aggregate registration for '%s'".formatted(spec.aggregateClass()));
4860
}
4961
try {
50-
aggregateClassesWithConstructor.put(spec.aggregateClass(), spec.aggregateClass().getDeclaredConstructor(new Class[] {}));
62+
63+
var aggregateTags = tags.and(io.micrometer.core.instrument.Tags.of("aggregate", spec.aggregateClass().getSimpleName()));
64+
65+
Counter counter = meterRegistry.counter("sliceworkz.eventmodeling.aggregate.load.count", aggregateTags);
66+
Timer timer = meterRegistry.timer("sliceworkz.eventmodeling.aggregate.load.duration", aggregateTags);
67+
68+
AggregateInfo<DOMAIN_EVENT_TYPE> aggregateInfo = new AggregateInfo<>(spec.aggregateClass().getDeclaredConstructor(new Class[] {}), counter, timer);
69+
70+
aggregateInfoByClass.put(spec.aggregateClass(), aggregateInfo);
5171
} catch (NoSuchMethodException | SecurityException e) {
5272
LOGGER.error(e.getMessage(), e);
5373
throw new RuntimeException(e);
5474
}
5575
});
5676

57-
LOGGER.info("aggregates: %s".formatted(aggregateClassesWithConstructor.keySet()));
77+
LOGGER.info("aggregates: %s".formatted(aggregateInfoByClass.keySet()));
5878
}
5979

6080
@SuppressWarnings("unchecked")
6181
@Override
6282
public <T extends Aggregate<DOMAIN_EVENT_TYPE>> T aggregate(Class<T> aggregateClass, Tags identity) {
6383

64-
if ( aggregateClassesWithConstructor.containsKey(aggregateClass)) {
84+
if ( aggregateInfoByClass.containsKey(aggregateClass)) {
6585

6686
if ( identity == null || identity.tags().size() < 1 ) {
6787
throw new IllegalArgumentException("aggregate identity needs at least one tag, got '%s'".formatted(identity));
6888
}
6989

70-
T result;
71-
try {
72-
result = (T) aggregateClassesWithConstructor.get(aggregateClass).newInstance(new Object[] {});
73-
74-
AggregateContextImpl<DOMAIN_EVENT_TYPE> aci = new AggregateContextImpl<DOMAIN_EVENT_TYPE> (identity, result, domainEventStream);
75-
result.setContext(aci);
76-
aci.updateFromStream();
77-
78-
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException
79-
| InvocationTargetException | SecurityException e) {
80-
LOGGER.error(e.getMessage(), e);
81-
throw new RuntimeException(e);
82-
}
83-
return result;
90+
AggregateInfo<DOMAIN_EVENT_TYPE> aggregateInfo = aggregateInfoByClass.get(aggregateClass);
91+
92+
aggregateInfo.counter().increment();
93+
94+
return aggregateInfo.timer().record(()->{
95+
T result;
96+
try {
97+
result = (T) aggregateInfo.constructor().newInstance(new Object[] {});
98+
99+
AggregateContextImpl<DOMAIN_EVENT_TYPE> aci = new AggregateContextImpl<DOMAIN_EVENT_TYPE> (boundedContext, instance, identity, result, domainEventStream);
100+
result.setContext(aci);
101+
aci.updateFromStream();
102+
103+
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException
104+
| InvocationTargetException | SecurityException e) {
105+
LOGGER.error(e.getMessage(), e);
106+
throw new RuntimeException(e);
107+
}
108+
return result;
109+
});
110+
84111
} else {
85112
throw new IllegalArgumentException("aggregate class '%s' not registered in bounded context '%s'".formatted(aggregateClass, boundedContext));
86113
}
87114
}
88115

116+
public record AggregateInfo<DOMAIN_EVENT_TYPE> (
117+
Constructor<? extends Aggregate<DOMAIN_EVENT_TYPE>> constructor,
118+
Counter counter,
119+
Timer timer
120+
) { }
121+
89122
}

sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/boundedcontext/BoundedContextBuilderImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ public <T extends BoundedContext<DOMAIN_EVENT_TYPE, INBOUND_EVENT_TYPE, OUTBOUND
284284
ReadModelModule<DOMAIN_EVENT_TYPE> rmm = new ReadModelModule<DOMAIN_EVENT_TYPE>(name, domainEventStream, readAllInStoreEventStream, liveModelClasses, consistentReadModels, eventuallyConsistentSharedReadModels, eventuallyConsistentLocalReadModels, eventuallyConsistentEphemeralReadModels, instance, meterRegistry);
285285
DCBModule<DOMAIN_EVENT_TYPE, OUTBOUND_EVENT_TYPE> dcb = new DCBModule<DOMAIN_EVENT_TYPE, OUTBOUND_EVENT_TYPE>(name, instance, rmm, domainEventStream, outboundEventStream, false, meterRegistry);
286286

287-
AggregateModule<DOMAIN_EVENT_TYPE> aggregateModule = new AggregateModule<DOMAIN_EVENT_TYPE>(name, aggregateSpecifications, domainEventStream);
287+
AggregateModule<DOMAIN_EVENT_TYPE> aggregateModule = new AggregateModule<DOMAIN_EVENT_TYPE>(name, instance, aggregateSpecifications, domainEventStream, meterRegistry);
288288

289289
BoundedContextImpl<DOMAIN_EVENT_TYPE, INBOUND_EVENT_TYPE, OUTBOUND_EVENT_TYPE> bc =
290290
new BoundedContextImpl<DOMAIN_EVENT_TYPE, INBOUND_EVENT_TYPE, OUTBOUND_EVENT_TYPE>(name, deployedFeatureSlices, undeployedFeatureSlices, domainEventStream, inboundEventStream, outboundEventStream, observabilityEventStream, dcb, aggregateModule, rmm, am, im, om, instance, meterRegistry);

sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/boundedcontext/PerformanceLogger.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ public PerformanceLogger readmodel ( String value ) {
4646
return this;
4747
}
4848

49+
public PerformanceLogger aggregate ( String aggregate ) {
50+
add("aggregate", aggregate);
51+
return this;
52+
}
53+
4954
public PerformanceLogger command ( String value ) {
5055
add("command", value);
5156
return this;

0 commit comments

Comments
 (0)