diff --git a/plan.md b/plan.md new file mode 100644 index 0000000..41f694e --- /dev/null +++ b/plan.md @@ -0,0 +1,336 @@ +# Implementation Plan: Optional Live Model Snapshotting + +## Problem + +Live models are rebuilt from scratch on every `read()` call by replaying all matching events through a `Projector`. For read models with large event histories, this becomes a performance bottleneck. Aggregates already solve this with snapshotting — we apply the same techniques to live models. + +## Design Principles (mirroring aggregate snapshotting) + +| Aggregate Snapshotting | Live Model Snapshotting (proposed) | +|---|---| +| `SnapshotCapable` on aggregate | `SnapshotCapable` on read model (reused) | +| `SnapshotStorage` SPI | Same `SnapshotStorage` SPI (reused, no changes) | +| `SnapshotSpecification` builder API | `LiveModelSnapshotSpecification` builder API (new, mirrors it) | +| `AggregateSpecification.snapshots(storage)` | `LiveModelSpecification.snapshots(storage)` | +| Key from `name + Tags` | Key from `readModelName + constructorParams` | +| `AggregateContextImpl` does load/save | `ProjectLiveModelCommand` does load/save | +| `AggregateModule` orchestrates | `ReadModelModule` orchestrates | +| Event count threshold triggers save | Event count threshold triggers save | +| `readAndWrite() / readOnly() / writeOnly()` | Same modes | +| Micrometer metrics | Same pattern of metrics | + +## Changes by Module + +--- + +### 1. `sliceworkz-eventmodeling-api` (public API) + +#### 1a. Modify `SnapshotCapable` — add overloaded `key()` for constructor-param-based identity + +**File:** `snapshots/SnapshotCapable.java` + +Add a second `default` key method alongside the existing Tags-based one: + +```java +default String key(String name, Object... constructorParams) { + StringBuilder keyBuilder = new StringBuilder(name != null ? name : ""); + if (constructorParams != null) { + for (Object param : constructorParams) { + keyBuilder.append("/"); + keyBuilder.append(param != null ? param.toString() : ""); + } + } + return keyBuilder.toString(); +} +``` + +This keeps `SnapshotCapable` as the single marker interface for both aggregates and live models. The Tags-based `key(String, Tags)` remains untouched. Read models use the `key(String, Object...)` overload. Both can be overridden for custom key logic. + +**Rationale:** Reusing `SnapshotCapable` rather than creating a new interface means: +- Read models and aggregates share the same contract (`takeSnapshot`, `fromSnapshot`, `version`) +- The `SnapshotStorage` SPI needs zero changes +- Existing `SnapshotSpecificationImpl` logic can be reused + +#### 1b. New `LiveModelSnapshotSpecification` interface + +**File:** `snapshots/LiveModelSnapshotSpecification.java` (new) + +```java +public interface LiveModelSnapshotSpecification { + + LiveModelSnapshotSpecification eventCountThreshold(int threshold); + + BoundedContextBuilder readAndWrite(); + BoundedContextBuilder readOnly(); + BoundedContextBuilder writeOnly(); +} +``` + +Mirrors `SnapshotSpecification` exactly, just returns to the builder via a different path. This is the fluent API returned after calling `.snapshots(storage)` on a live model registration. + +#### 1c. Modify `LiveModelSpecification` — add `snapshots()` entry point + +**File:** `readmodels/LiveModelSpecification.java` + +Add: +```java + LiveModelSnapshotSpecification snapshots(SnapshotStorage snapshotStorage); +``` + +This creates the same entry point pattern as `AggregateSpecification.snapshots(storage)`. + +--- + +### 2. `sliceworkz-eventmodeling-impl` (implementation) + +#### 2a. New `LiveModelSnapshotSpecificationImpl` + +**File:** `module/snapshots/LiveModelSnapshotSpecificationImpl.java` (new) + +Direct mirror of `SnapshotSpecificationImpl`. Holds: +- `SnapshotStorage snapshotStorage` +- `READ_AND_OR_WRITE readAndOrWrite` (reuse the existing enum from `SnapshotSpecificationImpl`) +- `int eventCountThreshold` (default 100) + +Returns to `BoundedContextBuilder` on terminal methods. + +#### 2b. Modify `BoundedContextBuilderImpl.LiveModelSpecificationImpl` + +**File:** `module/boundedcontext/BoundedContextBuilderImpl.java` + +Add snapshot state to the inner class: + +```java +class LiveModelSpecificationImpl implements LiveModelSpecification { + // existing fields... + private LiveModelSnapshotSpecificationImpl snapshotSpecification; + + @Override + public LiveModelSnapshotSpecification snapshots( + SnapshotStorage snapshotStorage) { + this.snapshotSpecification = new LiveModelSnapshotSpecificationImpl<>(builder, snapshotStorage); + return snapshotSpecification; + } + + // Accessor methods for ReadModelModule to use: + public SnapshotStorage snapshotStorage() { ... } + public boolean readSnapshots() { ... } + public boolean writeSnapshots() { ... } + public int snapshotEventCountThreshold() { ... } +} +``` + +#### 2c. Modify `ReadModelModule` — pass snapshot config, store per-readmodel info + +**File:** `module/readmodels/ReadModelModule.java` + +The constructor and `liveModel()` method need to know about snapshot configuration per live model class. Instead of just holding `Collection>` for live models, hold a map of `LiveModelInfo` records (same pattern as `AggregateModule.AggregateInfo`): + +```java +record LiveModelInfo( + Class> readModelClass, + SnapshotStorage snapshotStorage, + boolean readSnapshots, + boolean writeSnapshots, + int snapshotEventCountThreshold, + Counter counterSnapshotRead, + Counter counterSnapshotWrite +) { } +``` + +The `liveModel()` method passes snapshot config to `ProjectLiveModelCommand`. + +#### 2d. Modify `ProjectLiveModelCommand` — the core change + +**File:** `module/readmodels/ProjectLiveModelCommand.java` + +This is the equivalent of `AggregateContextImpl` + `AggregateModule.aggregate()` for live models. The `execute()` method changes from: + +```java +// BEFORE: +readModel = instantiate(readModelClass, constructorParams); +Projector.from(eventSource).towards(readModel).build().run(); +``` + +To: + +```java +// AFTER: +readModel = instantiate(readModelClass, constructorParams); + +EventReference lastEventReference = null; + +// 1. LOAD snapshot (if configured and read model implements SnapshotCapable) +if (readSnapshots && readModel instanceof SnapshotCapable snapshotCapable) { + String key = snapshotCapable.key(readModel.readmodelName(), constructorParams); + String version = snapshotCapable.version(); + var loaded = snapshotStorage.load(key, version); + if (loaded.isPresent()) { + snapshotCapable.fromSnapshot(loaded.get().snapshot()); + lastEventReference = loaded.get().lastEventReference(); + counterSnapshotRead.increment(); + } +} + +// 2. Replay remaining events from after snapshot +Projector projector = Projector.from(eventSource) + .towards(readModel) + .startingAfter(lastEventReference) // <-- key: resume from snapshot point + .build(); +ProjectorMetrics metrics = projector.run(); + +// 3. SAVE snapshot (if configured and threshold met) +if (writeSnapshots && readModel instanceof SnapshotCapable snapshotCapable) { + long totalEvents = (lastEventReference != null ? 1 : 0) + metrics.eventsStreamed(); + if (metrics.eventsStreamed() >= snapshotEventCountThreshold) { + String key = snapshotCapable.key(readModel.readmodelName(), constructorParams); + snapshotStorage.save(key, snapshotCapable.version(), + snapshotCapable.takeSnapshot(), metrics.lastEventReference()); + counterSnapshotWrite.increment(); + } +} +``` + +The same logic applies to `ProjectLiveModelUnboundedCommand`. + +#### 2e. Wire snapshot config through `BoundedContextBuilderImpl.build()` + +**File:** `module/boundedcontext/BoundedContextBuilderImpl.java` + +Change the `build()` method to pass `LiveModelSpecificationImpl` objects (not just classes) to `ReadModelModule`, so it has access to snapshot configuration: + +```java +// BEFORE: +Collection> liveModelClasses = liveModelSpecs.stream() + .map(LiveModelSpecificationImpl::readModelClass) + .collect(...); +new ReadModelModule<>(..., liveModelClasses, ...); + +// AFTER: +new ReadModelModule<>(..., liveModelSpecs, ...); +``` + +--- + +### 3. `sliceworkz-eventmodeling-testing` — no changes needed + +The existing test infrastructure supports the pattern. Mock snapshot storages are test-local. + +--- + +### 4. `sliceworkz-eventmodeling-tests-inmem` — new test class + +#### New: `LiveModelSnapshotTest` + +**File:** `module/readmodels/LiveModelSnapshotTest.java` (new) + +Mirrors `AggregateCapabilityTest` for live models: + +1. **`testLiveModelSnapshots()`** — Create a live read model that implements `SnapshotCapable`. Append N events, read the model. Verify snapshot is saved. Read again, verify snapshot is loaded and only delta events are replayed. + +2. **`testLiveModelSnapshotsChangingVersion()`** — Same as aggregate test: changing version causes snapshot miss, full replay. + +3. **`testLiveModelWithoutSnapshots()`** — Verify that live models without `SnapshotCapable` work exactly as before (no regression). + +4. **`testLiveModelSnapshotReadOnly()`** / **`testLiveModelSnapshotWriteOnly()`** — Test the modes. + +A mock read model + mock snapshot storage following the exact same pattern as `MockAggregate` + `MockSnapshotStorage` in the aggregate tests. + +--- + +## Builder API Usage (end-user perspective) + +### Without snapshots (unchanged): +```java +BoundedContext.newBuilder(...) + .readmodel(AccountDetailsReadModel.class).live() + .build(); +``` + +### With snapshots (new): +```java +SnapshotStorage snapshotStorage = new InMemorySnapshotStorage<>(); +// or: new PostgresSnapshotStorage<>(dataSource); +// or: any custom SnapshotStorage implementation + +BoundedContext.newBuilder(...) + .readmodel(AccountDetailsReadModel.class) + .snapshots(snapshotStorage) + .eventCountThreshold(200) + .readAndWrite() // returns to builder (terminal) + .build(); +``` + +### Read model implementation: +```java +public class AccountDetailsReadModel + implements ReadModel, SnapshotCapable { + + private DomainConceptId accountId; + private AccountDetails account; + + public AccountDetailsReadModel(DomainConceptId accountId) { + this.accountId = accountId; + } + + // --- ReadModel methods (unchanged) --- + @Override public EventQuery eventQuery() { ... } + @Override public void when(BankingDomainEvent event) { ... } + public Optional getAccountDetails() { ... } + + // --- SnapshotCapable methods (new) --- + @Override + public AccountDetailsSnapshot takeSnapshot() { + return new AccountDetailsSnapshot(account); + } + + @Override + public void fromSnapshot(AccountDetailsSnapshot snapshot) { + this.account = snapshot.accountDetails(); + } + + @Override + public String version() { return "v1"; } + + // key() uses default implementation: "AccountDetailsReadModel/" +} +``` + +## Snapshot Key Strategy + +- Aggregates: `key(name, Tags)` → `"AggregateName/tagKey-tagValue/..."` +- Live models: `key(name, Object...)` → `"ReadModelName/param1.toString()/param2.toString()/..."` +- Both overridable by the user for custom key schemes +- Version matching identical: exact match required, mismatch = full replay + +## Metrics (mirroring aggregates) + +| Metric | Tags | Purpose | +|--------|------|---------| +| `sliceworkz.eventmodeling.readmodel.live.snapshot.read.count` | context, readmodel | Snapshot loads | +| `sliceworkz.eventmodeling.readmodel.live.snapshot.write.count` | context, readmodel | Snapshot saves | + +Existing `sliceworkz.eventmodeling.readmodel.live.render` and `sliceworkz.eventmodeling.readmodel.live.duration` metrics remain unchanged. + +## Files Changed Summary + +| Module | File | Change | +|--------|------|--------| +| api | `snapshots/SnapshotCapable.java` | Add `key(String, Object...)` default method | +| api | `snapshots/LiveModelSnapshotSpecification.java` | **New** — builder spec interface | +| api | `readmodels/LiveModelSpecification.java` | Add `snapshots()` method | +| impl | `module/snapshots/LiveModelSnapshotSpecificationImpl.java` | **New** — mirrors `SnapshotSpecificationImpl` | +| impl | `module/boundedcontext/BoundedContextBuilderImpl.java` | Add snapshot fields to `LiveModelSpecificationImpl`, wire through `build()` | +| impl | `module/readmodels/ReadModelModule.java` | Hold `LiveModelInfo` instead of just classes, pass to commands | +| impl | `module/readmodels/ProjectLiveModelCommand.java` | Load/save snapshot around projection | +| impl | `module/readmodels/ProjectLiveModelUnboundedCommand.java` | Same as above | +| tests-inmem | `module/readmodels/LiveModelSnapshotTest.java` | **New** — test class mirroring `AggregateCapabilityTest` | + +## What Stays Unchanged + +- `SnapshotStorage` interface — fully reused, zero changes +- `SnapshotSpecification` — aggregate-specific, untouched +- `SnapshotSpecificationImpl` — aggregate-specific, untouched (but `READ_AND_OR_WRITE` enum reused) +- `AggregateModule`, `AggregateContextImpl` — untouched +- Existing read model behavior without snapshots — fully backward compatible +- `BoundedContext.read()` / `readUnbounded()` — signature unchanged diff --git a/sliceworkz-eventmodeling-api/src/main/java/org/sliceworkz/eventmodeling/readmodels/LiveModelSpecification.java b/sliceworkz-eventmodeling-api/src/main/java/org/sliceworkz/eventmodeling/readmodels/LiveModelSpecification.java index ef6f124..32688b6 100644 --- a/sliceworkz-eventmodeling-api/src/main/java/org/sliceworkz/eventmodeling/readmodels/LiveModelSpecification.java +++ b/sliceworkz-eventmodeling-api/src/main/java/org/sliceworkz/eventmodeling/readmodels/LiveModelSpecification.java @@ -18,6 +18,8 @@ package org.sliceworkz.eventmodeling.readmodels; import org.sliceworkz.eventmodeling.boundedcontext.BoundedContextBuilder; +import org.sliceworkz.eventmodeling.snapshots.LiveModelSnapshotSpecification; +import org.sliceworkz.eventmodeling.snapshots.SnapshotStorage; public interface LiveModelSpecification { @@ -27,4 +29,17 @@ public interface LiveModelSpecification eventuallyConsistent(); + /** + * Configures snapshotting for this live model to optimize projection performance. + *

+ * Snapshots store the complete state of a live model at a point in time, allowing + * faster projection by avoiding replay of the entire event history. The live model + * must implement {@link org.sliceworkz.eventmodeling.snapshots.SnapshotCapable} to use this feature. + * + * @param the type representing the live model's snapshot state + * @param snapshotStorage the storage mechanism for persisting and loading snapshots + * @return a snapshot specification for further configuration + */ + LiveModelSnapshotSpecification snapshots ( SnapshotStorage snapshotStorage ); + } \ No newline at end of file diff --git a/sliceworkz-eventmodeling-api/src/main/java/org/sliceworkz/eventmodeling/snapshots/LiveModelSnapshotSpecification.java b/sliceworkz-eventmodeling-api/src/main/java/org/sliceworkz/eventmodeling/snapshots/LiveModelSnapshotSpecification.java new file mode 100644 index 0000000..8e94149 --- /dev/null +++ b/sliceworkz-eventmodeling-api/src/main/java/org/sliceworkz/eventmodeling/snapshots/LiveModelSnapshotSpecification.java @@ -0,0 +1,82 @@ +/* + * Sliceworkz Event Modeling - an opinionated Event Modeling framework in Java + * Copyright © 2025 Sliceworkz / XTi (info@sliceworkz.org) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see . + */ +package org.sliceworkz.eventmodeling.snapshots; + +import org.sliceworkz.eventmodeling.boundedcontext.BoundedContextBuilder; + +/** + * Provides a fluent API for configuring snapshot behavior for a live model. + *

+ * This specification controls when snapshots are taken and whether they should + * be read during live model projection, written after projection, or both. + * Snapshot configuration allows fine-tuning of the performance/storage tradeoff. + * + * @param the base type of domain events in the bounded context + * @param the base type of inbound events from external systems + * @param the base type of outbound events to external systems + */ +public interface LiveModelSnapshotSpecification { + + /** + * Sets the event count threshold for triggering snapshot creation. + *

+ * A snapshot is automatically created when the number of events replayed since the last + * snapshot (or from the beginning if no snapshot exists) reaches this threshold. + * Lower values create snapshots more frequently, trading storage space for faster + * live model projection. Higher values reduce storage but may slow projection of live models + * with long event histories. + * + * @param eventCountThreshold number of events before creating a new snapshot, must be greater than 0 + * @return this specification for method chaining + * @throws IllegalArgumentException if threshold is less than or equal to 0 + */ + LiveModelSnapshotSpecification eventCountThreshold ( int eventCountThreshold ); + + /** + * Enables both reading and writing of snapshots (default behavior). + *

+ * The live model will attempt to load from a snapshot when projecting state, + * and will create new snapshots when the event count threshold is reached. + * + * @return the bounded context builder for further configuration + */ + BoundedContextBuilder readAndWrite ( ); + + /** + * Enables reading snapshots but disables writing new snapshots. + *

+ * Useful when transitioning away from snapshotting or when you want to use + * existing snapshots but not create new ones. The live model will load from + * snapshots but will not create new ones regardless of event count. + * + * @return the bounded context builder for further configuration + */ + BoundedContextBuilder readOnly ( ); + + /** + * Enables writing new snapshots but disables reading existing snapshots. + *

+ * Useful when transitioning to snapshotting or testing snapshot creation. + * The live model will always be projected from the full event history, + * but new snapshots will be created for future use. + * + * @return the bounded context builder for further configuration + */ + BoundedContextBuilder writeOnly ( ); + +} diff --git a/sliceworkz-eventmodeling-api/src/main/java/org/sliceworkz/eventmodeling/snapshots/SnapshotCapable.java b/sliceworkz-eventmodeling-api/src/main/java/org/sliceworkz/eventmodeling/snapshots/SnapshotCapable.java index 909c705..9e62bc3 100644 --- a/sliceworkz-eventmodeling-api/src/main/java/org/sliceworkz/eventmodeling/snapshots/SnapshotCapable.java +++ b/sliceworkz-eventmodeling-api/src/main/java/org/sliceworkz/eventmodeling/snapshots/SnapshotCapable.java @@ -20,47 +20,46 @@ import org.sliceworkz.eventstore.events.Tags; /** - * Marks an aggregate as capable of snapshotting for performance optimization. + * Marks an aggregate or live model as capable of snapshotting for performance optimization. *

- * Aggregates with long event histories can implement this interface to enable periodic - * snapshots of their state. When loading, the aggregate can be restored from the most + * Aggregates and live models with long event histories can implement this interface to enable + * periodic snapshots of their state. When loading, the component can be restored from the most * recent snapshot and then replay only subsequent events, rather than replaying the * entire event history from the beginning. *

* Snapshot versioning ensures that snapshots are only loaded if they match the current - * aggregate implementation version, preventing incompatibility issues when the aggregate - * structure changes. + * implementation version, preventing incompatibility issues when the structure changes. * * @param the type representing the serialized snapshot state */ public interface SnapshotCapable { /** - * Captures the current state of the aggregate as a snapshot. + * Captures the current state as a snapshot. *

- * The snapshot should contain all state necessary to restore the aggregate + * The snapshot should contain all state necessary to restore the component * to its current condition without replaying events. * - * @return an immutable representation of the aggregate's current state + * @return an immutable representation of the current state */ SNAPSHOT_TYPE takeSnapshot ( ); /** - * Restores the aggregate's state from a previously captured snapshot. + * Restores state from a previously captured snapshot. *

- * This method is called before event replay begins, allowing the aggregate + * This method is called before event replay begins, allowing the component * to skip replaying events that occurred before the snapshot was taken. * - * @param snapshot the snapshot containing the aggregate's previous state + * @param snapshot the snapshot containing the previous state */ void fromSnapshot ( SNAPSHOT_TYPE snapshot ); /** - * Returns the version identifier for this aggregate's snapshot format. + * Returns the version identifier for the snapshot format. *

* The version should be changed whenever the snapshot structure is modified * in a way that makes old snapshots incompatible. Only snapshots with matching - * versions will be loaded; mismatched snapshots are ignored and the aggregate + * versions will be loaded; mismatched snapshots are ignored and the component * is rebuilt from events. * * @return a version identifier for snapshot compatibility checking @@ -107,4 +106,29 @@ default String key ( String name, Tags identity ) { return keyBuilder.toString(); } + /** + * Generates a unique storage key for a live model's snapshot. + *

+ * The key is generated in the format: "name/param1/param2/..." where each constructor + * parameter's string representation is appended as a path segment. Null parameters are + * represented as empty strings. + * Implementations may override this to provide custom key generation logic. + * + * @param name the name of the read model class + * @param constructorParams the constructor parameters uniquely identifying this live model instance + * @return a unique key for storing and retrieving this live model's snapshot + */ + default String key ( String name, Object... constructorParams ) { + StringBuilder keyBuilder = new StringBuilder(name != null ? name : ""); + + if ( constructorParams != null ) { + for ( Object param : constructorParams ) { + keyBuilder.append("/"); + keyBuilder.append(param != null ? param.toString() : ""); + } + } + + return keyBuilder.toString(); + } + } diff --git a/sliceworkz-eventmodeling-api/src/test/java/org/sliceworkz/eventmodeling/snapshots/SnapshotCapableTest.java b/sliceworkz-eventmodeling-api/src/test/java/org/sliceworkz/eventmodeling/snapshots/SnapshotCapableTest.java index f38ac3d..70ad1fc 100644 --- a/sliceworkz-eventmodeling-api/src/test/java/org/sliceworkz/eventmodeling/snapshots/SnapshotCapableTest.java +++ b/sliceworkz-eventmodeling-api/src/test/java/org/sliceworkz/eventmodeling/snapshots/SnapshotCapableTest.java @@ -93,7 +93,7 @@ void testKeyWithNullName() { @Test void testKeyWithNullIdentity() { - String key = snapshotCapable.key("Account", null); + String key = snapshotCapable.key("Account", (Tags) null); assertEquals("Account", key); } @@ -162,7 +162,7 @@ void testKeyWithComplexScenario() { @Test void testKeyWithNullNameAndNullIdentity() { - String key = snapshotCapable.key(null, null); + String key = snapshotCapable.key(null, (Tags) null); assertEquals("", key); } diff --git a/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/boundedcontext/BoundedContextBuilderImpl.java b/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/boundedcontext/BoundedContextBuilderImpl.java index 66d241d..8986907 100644 --- a/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/boundedcontext/BoundedContextBuilderImpl.java +++ b/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/boundedcontext/BoundedContextBuilderImpl.java @@ -45,11 +45,15 @@ import org.sliceworkz.eventmodeling.module.dcb.DCBModule; import org.sliceworkz.eventmodeling.module.inbound.InboundModule; import org.sliceworkz.eventmodeling.module.outbound.OutboundModule; +import org.sliceworkz.eventmodeling.module.readmodels.LiveModelSpecificationAccessor; import org.sliceworkz.eventmodeling.module.readmodels.ReadModelModule; +import org.sliceworkz.eventmodeling.module.snapshots.LiveModelSnapshotSpecificationImpl; import org.sliceworkz.eventmodeling.outbound.Dispatcher; import org.sliceworkz.eventmodeling.readmodels.LiveModelSpecification; import org.sliceworkz.eventmodeling.readmodels.LongLivedReadModelSpecification; import org.sliceworkz.eventmodeling.readmodels.ReadModelWithMetaData; +import org.sliceworkz.eventmodeling.snapshots.LiveModelSnapshotSpecification; +import org.sliceworkz.eventmodeling.snapshots.SnapshotStorage; import org.sliceworkz.eventmodeling.slices.AnnotationBasedDiscoveryAndConfiguration; import org.sliceworkz.eventmodeling.slices.FeatureSlice; import org.sliceworkz.eventmodeling.slices.FeatureSliceConfiguration; @@ -278,22 +282,20 @@ public >> liveModelClasses = liveModelSpecs.stream().map(LiveModelSpecificationImpl::readModelClass).collect(Collectors.toCollection(ArrayList::new)); - Collection> consistentReadModels = longLivedReadModelSpecs.stream().filter(s->s.consistency()==Consistency.CONSISTENT).map(LongLivedReadModelSpecificationImpl::readModel).collect(Collectors.toCollection(ArrayList::new)); Collection> eventuallyConsistentSharedReadModels = longLivedReadModelSpecs.stream().filter(s->s.consistency()==Consistency.EVENTUALLY_CONSISTENT&&s.isShared()).map(LongLivedReadModelSpecificationImpl::readModel).collect(Collectors.toCollection(ArrayList::new)); Collection> eventuallyConsistentLocalReadModels = longLivedReadModelSpecs.stream().filter(s->s.consistency()==Consistency.EVENTUALLY_CONSISTENT&&s.isLocal()&&!s.isEphemeral()).map(LongLivedReadModelSpecificationImpl::readModel).collect(Collectors.toCollection(ArrayList::new)); Collection> eventuallyConsistentEphemeralReadModels = longLivedReadModelSpecs.stream().filter(s->s.consistency()==Consistency.EVENTUALLY_CONSISTENT&&s.isLocal()&&s.isEphemeral()).map(LongLivedReadModelSpecificationImpl::readModel).collect(Collectors.toCollection(ArrayList::new)); - + Collection> translators = translatorSpecs.stream().map(i->(Translator)i).collect(Collectors.toCollection(ArrayList::new)); InboundModule im = new InboundModule<>(name, inboundEventStream, translators, instance, meterRegistry); Collection> dispatchers = dispatcherSpecs.stream().map(i->(Dispatcher)i).collect(Collectors.toCollection(ArrayList::new)); OutboundModule om = new OutboundModule(name, outboundEventStream, dispatchers, instance, meterRegistry); - + AutomationModule am = new AutomationModule<>(name, domainEventStream, automations, instance, meterRegistry); - - ReadModelModule rmm = new ReadModelModule(name, domainEventStream, readAllInStoreEventStream, liveModelClasses, consistentReadModels, eventuallyConsistentSharedReadModels, eventuallyConsistentLocalReadModels, eventuallyConsistentEphemeralReadModels, instance, meterRegistry); + + ReadModelModule rmm = new ReadModelModule(name, domainEventStream, readAllInStoreEventStream, liveModelSpecs, consistentReadModels, eventuallyConsistentSharedReadModels, eventuallyConsistentLocalReadModels, eventuallyConsistentEphemeralReadModels, instance, meterRegistry); DCBModule dcb = new DCBModule(name, instance, rmm, domainEventStream, outboundEventStream, false, meterRegistry); AggregateModule aggregateModule = new AggregateModule(name, instance, aggregateSpecifications, domainEventStream, meterRegistry); @@ -322,12 +324,13 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl }); } - public class LiveModelSpecificationImpl implements LiveModelSpecification { - + public class LiveModelSpecificationImpl implements LiveModelSpecification, LiveModelSpecificationAccessor { + private BoundedContextBuilder builder; private Class> readModelClass; private Consistency consistency = Consistency.LIVE; - + private LiveModelSnapshotSpecificationImpl snapshotSpecification; + public LiveModelSpecificationImpl ( BoundedContextBuilder builder, Class> readModelClass ) { this.builder = builder; this.readModelClass = readModelClass; @@ -349,14 +352,41 @@ public BoundedContextBuilder LiveModelSnapshotSpecification snapshots ( + SnapshotStorage snapshotStorage ) { + if ( snapshotStorage == null ) { + throw new IllegalArgumentException("snapshotStorage can not be null"); + } + this.snapshotSpecification = new LiveModelSnapshotSpecificationImpl(builder, snapshotStorage); + return snapshotSpecification; + } + public Consistency consistency ( ) { return consistency; } - + public Class> readModelClass ( ) { return readModelClass; } + @SuppressWarnings("unchecked") + public SnapshotStorage snapshotStorage ( ) { + return snapshotSpecification == null ? null : (SnapshotStorage) snapshotSpecification.snapshotStorage(); + } + + public boolean readSnapshots ( ) { + return snapshotSpecification != null && snapshotSpecification.readAndOrWrite().mustRead(); + } + + public boolean writeSnapshots ( ) { + return snapshotSpecification != null && snapshotSpecification.readAndOrWrite().mustWrite(); + } + + public int snapshotEventCountThreshold ( ) { + return snapshotSpecification == null ? 0 : snapshotSpecification.eventCountThreshold(); + } + } public class LongLivedReadModelSpecificationImpl implements LongLivedReadModelSpecification { diff --git a/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/readmodels/LiveModelSpecificationAccessor.java b/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/readmodels/LiveModelSpecificationAccessor.java new file mode 100644 index 0000000..33ce332 --- /dev/null +++ b/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/readmodels/LiveModelSpecificationAccessor.java @@ -0,0 +1,38 @@ +/* + * Sliceworkz Event Modeling - an opinionated Event Modeling framework in Java + * Copyright © 2025 Sliceworkz / XTi (info@sliceworkz.org) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see . + */ +package org.sliceworkz.eventmodeling.module.readmodels; + +import org.sliceworkz.eventmodeling.readmodels.ReadModelWithMetaData; +import org.sliceworkz.eventmodeling.snapshots.SnapshotStorage; + +/** + * Internal accessor interface for live model specification data needed by ReadModelModule. + */ +public interface LiveModelSpecificationAccessor { + + Class> readModelClass ( ); + + SnapshotStorage snapshotStorage ( ); + + boolean readSnapshots ( ); + + boolean writeSnapshots ( ); + + int snapshotEventCountThreshold ( ); + +} diff --git a/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/readmodels/ProjectLiveModelCommand.java b/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/readmodels/ProjectLiveModelCommand.java index b516468..ae0d611 100644 --- a/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/readmodels/ProjectLiveModelCommand.java +++ b/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/readmodels/ProjectLiveModelCommand.java @@ -29,8 +29,12 @@ import org.sliceworkz.eventmodeling.module.boundedcontext.KernelEvent; import org.sliceworkz.eventmodeling.module.boundedcontext.PerformanceLogger; import org.sliceworkz.eventmodeling.module.boundedcontext.PerformanceLogger.Metrics; +import org.sliceworkz.eventmodeling.module.readmodels.ReadModelModule.LiveModelInfo; import org.sliceworkz.eventmodeling.readmodels.ReadModel; import org.sliceworkz.eventmodeling.readmodels.ReadModelWithMetaData; +import org.sliceworkz.eventmodeling.snapshots.SnapshotCapable; +import org.sliceworkz.eventmodeling.snapshots.SnapshotStorage; +import org.sliceworkz.eventstore.events.EventReference; import org.sliceworkz.eventstore.projection.Projector; import org.sliceworkz.eventstore.projection.Projector.ProjectorMetrics; import org.sliceworkz.eventstore.stream.EventSource; @@ -38,27 +42,29 @@ class ProjectLiveModelCommand implements Command { private Logger LOGGER = LoggerFactory.getLogger(ProjectLiveModelCommand.class); - + private Class> readModelClass; private Object[] constructorParams; private EventSource eventSource; private String boundedContext; private Instance instance; - + private LiveModelInfo liveModelInfo; + private ReadModelWithMetaData readModel; - - public ProjectLiveModelCommand ( String boundedContext, Instance instance, EventSource eventSource, Class> readModelClass, Object... constructorParams ) { + + public ProjectLiveModelCommand ( String boundedContext, Instance instance, EventSource eventSource, Class> readModelClass, LiveModelInfo liveModelInfo, Object... constructorParams ) { this.boundedContext = boundedContext; this.instance = instance; this.eventSource = eventSource; this.readModelClass = readModelClass; + this.liveModelInfo = liveModelInfo; this.constructorParams = constructorParams; } - + public ReadModelWithMetaData readModel ( ) { return readModel; } - + @SuppressWarnings("unchecked") @Override public CommandResult execute(CommandContext context) { @@ -66,19 +72,51 @@ public CommandResult execute(CommandContext result = context.noDecisionModels(); try { readModel = (ReadModelWithMetaData) selectConstructor(readModelClass, constructorParams).newInstance(constructorParams); - Projector projector = Projector.from(eventSource).towards(readModel).build(); + + EventReference lastEventReference = null; + + // Load snapshot if configured and read model implements SnapshotCapable + if ( liveModelInfo.readSnapshots() && readModel instanceof SnapshotCapable snapshotCapable ) { + String key = snapshotCapable.key(readModel.readmodelName(), constructorParams); + String version = snapshotCapable.version(); + var loadedSnapshot = liveModelInfo.snapshotStorage().load(key, version); + if ( loadedSnapshot.isPresent() ) { + liveModelInfo.counterSnapshotRead().increment(); + ((SnapshotCapable) snapshotCapable).fromSnapshot(loadedSnapshot.get().snapshot()); + lastEventReference = loadedSnapshot.get().lastEventReference(); + } + } + + // Replay events — starting after snapshot's last event reference if available + Projector projector = Projector.from(eventSource).towards(readModel).startingAfter(lastEventReference).build(); ProjectorMetrics projectorMetrics = projector.run(); + + // Save snapshot if configured and threshold met + saveSnapshotIfNeeded(projectorMetrics); + long finish = System.currentTimeMillis(); long duration = finish - start; Metrics metrics = map(duration, projectorMetrics); PerformanceLogger.entry().context(boundedContext).instance(instance).metrics(metrics).type("readmodel.live").readmodel(readModel.readmodelName()).log(); - return result; + return result; } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { LOGGER.error(e.getMessage(), e); throw new RuntimeException(e); } } - + + @SuppressWarnings("unchecked") + private void saveSnapshotIfNeeded ( ProjectorMetrics projectorMetrics ) { + SnapshotStorage snapshotStorageForWrite = liveModelInfo.snapshotStorageForWrite(); + if ( snapshotStorageForWrite != null + && projectorMetrics.eventsStreamed() >= liveModelInfo.snapshotEventCountThreshold() + && readModel instanceof SnapshotCapable snapshotCapable ) { + String key = snapshotCapable.key(readModel.readmodelName(), constructorParams); + snapshotStorageForWrite.save(key, snapshotCapable.version(), ((SnapshotCapable) snapshotCapable).takeSnapshot(), projectorMetrics.lastEventReference()); + liveModelInfo.counterSnapshotWrite().increment(); + } + } + Constructor selectConstructor ( Class readModelClass, Object[] constructorParams ) { for ( Constructor ctr : readModelClass.getDeclaredConstructors() ) { // TODO improve constructor selection, not only on parameter count but also on type! @@ -88,9 +126,9 @@ Constructor selectConstructor ( Class readModelClass, Object[] constructor } throw new IllegalArgumentException("no public constructur found on " + readModelClass + " for parameters " + constructorParams); } - + private Metrics map ( long duration, ProjectorMetrics projectorMetrics ) { return new Metrics(duration, projectorMetrics.queriesDone(), projectorMetrics.eventsStreamed(), projectorMetrics.eventsHandled(), projectorMetrics.lastEventReference()); } - -} \ No newline at end of file + +} diff --git a/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/readmodels/ProjectLiveModelUnboundedCommand.java b/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/readmodels/ProjectLiveModelUnboundedCommand.java index 65731b6..e3af846 100644 --- a/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/readmodels/ProjectLiveModelUnboundedCommand.java +++ b/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/readmodels/ProjectLiveModelUnboundedCommand.java @@ -26,8 +26,12 @@ import org.sliceworkz.eventmodeling.module.boundedcontext.KernelEvent; import org.sliceworkz.eventmodeling.module.boundedcontext.PerformanceLogger; import org.sliceworkz.eventmodeling.module.boundedcontext.PerformanceLogger.Metrics; +import org.sliceworkz.eventmodeling.module.readmodels.ReadModelModule.LiveModelInfo; import org.sliceworkz.eventmodeling.readmodels.ReadModel; import org.sliceworkz.eventmodeling.readmodels.ReadModelWithMetaData; +import org.sliceworkz.eventmodeling.snapshots.SnapshotCapable; +import org.sliceworkz.eventmodeling.snapshots.SnapshotStorage; +import org.sliceworkz.eventstore.events.EventReference; import org.sliceworkz.eventstore.projection.Projector; import org.sliceworkz.eventstore.projection.Projector.ProjectorMetrics; import org.sliceworkz.eventstore.stream.EventSource; @@ -39,21 +43,23 @@ public class ProjectLiveModelUnboundedCommand implements Com private EventSource eventSource; private String boundedContext; private Instance instance; - + private LiveModelInfo liveModelInfo; + private ReadModelWithMetaData readModel; - - public ProjectLiveModelUnboundedCommand ( String boundedContext, Instance instance, EventSource eventSource, Class> readModelClass, Object... constructorParams ) { + + public ProjectLiveModelUnboundedCommand ( String boundedContext, Instance instance, EventSource eventSource, Class> readModelClass, LiveModelInfo liveModelInfo, Object... constructorParams ) { this.boundedContext = boundedContext; this.instance = instance; this.eventSource = eventSource; this.readModelClass = readModelClass; + this.liveModelInfo = liveModelInfo; this.constructorParams = constructorParams; } - + public ReadModelWithMetaData readModel ( ) { return readModel; } - + @SuppressWarnings("unchecked") @Override public CommandResult execute(CommandContext context) { @@ -61,16 +67,48 @@ public CommandResult execute(CommandContext result = context.noDecisionModels(); try { readModel = (ReadModelWithMetaData) readModelClass.getDeclaredConstructors()[0].newInstance(constructorParams); - Projector projector = Projector.from(eventSource).towards(readModel).build(); + + EventReference lastEventReference = null; + + // Load snapshot if configured and read model implements SnapshotCapable + if ( liveModelInfo.readSnapshots() && readModel instanceof SnapshotCapable snapshotCapable ) { + String key = snapshotCapable.key(readModel.readmodelName(), constructorParams); + String version = snapshotCapable.version(); + var loadedSnapshot = liveModelInfo.snapshotStorage().load(key, version); + if ( loadedSnapshot.isPresent() ) { + liveModelInfo.counterSnapshotRead().increment(); + ((SnapshotCapable) snapshotCapable).fromSnapshot(loadedSnapshot.get().snapshot()); + lastEventReference = loadedSnapshot.get().lastEventReference(); + } + } + + // Replay events — starting after snapshot's last event reference if available + Projector projector = Projector.from(eventSource).towards(readModel).startingAfter(lastEventReference).build(); ProjectorMetrics projectorMetrics = projector.run(); + + // Save snapshot if configured and threshold met + saveSnapshotIfNeeded(projectorMetrics); + long finish = System.currentTimeMillis(); long duration = finish - start; Metrics metrics = new Metrics(duration, projectorMetrics.queriesDone(), projectorMetrics.eventsStreamed(), projectorMetrics.eventsHandled(), projectorMetrics.lastEventReference()); PerformanceLogger.entry().context(boundedContext).instance(instance).metrics(metrics).type("readmodel.live").readmodel(readModel.readmodelName()).log(); - return result; + return result; } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { throw new RuntimeException(e); } } - -} \ No newline at end of file + + @SuppressWarnings("unchecked") + private void saveSnapshotIfNeeded ( ProjectorMetrics projectorMetrics ) { + SnapshotStorage snapshotStorageForWrite = liveModelInfo.snapshotStorageForWrite(); + if ( snapshotStorageForWrite != null + && projectorMetrics.eventsStreamed() >= liveModelInfo.snapshotEventCountThreshold() + && readModel instanceof SnapshotCapable snapshotCapable ) { + String key = snapshotCapable.key(readModel.readmodelName(), constructorParams); + snapshotStorageForWrite.save(key, snapshotCapable.version(), ((SnapshotCapable) snapshotCapable).takeSnapshot(), projectorMetrics.lastEventReference()); + liveModelInfo.counterSnapshotWrite().increment(); + } + } + +} diff --git a/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/readmodels/ReadModelModule.java b/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/readmodels/ReadModelModule.java index b17db30..7633858 100644 --- a/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/readmodels/ReadModelModule.java +++ b/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/readmodels/ReadModelModule.java @@ -19,6 +19,9 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.stream.Stream; import org.slf4j.Logger; @@ -33,56 +36,89 @@ import org.sliceworkz.eventmodeling.module.threading.EventuallyConsistentProcessorIdentification.Storage; import org.sliceworkz.eventmodeling.module.threading.ProcessorThreadManager; import org.sliceworkz.eventmodeling.readmodels.ReadModelWithMetaData; +import org.sliceworkz.eventmodeling.snapshots.SnapshotStorage; import org.sliceworkz.eventstore.events.Event; import org.sliceworkz.eventstore.stream.EventSource; import org.sliceworkz.eventstore.stream.EventStream; +import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.MeterRegistry; public class ReadModelModule implements LifecycleCapability { - + private static Logger LOGGER = LoggerFactory.getLogger(ReadModelModule.class); private Collection> eventuallyConsistentReadModelThreadManagers; private ProcessorThreadManager processorThreadManager; - + private BoundedContextFunctions kernelFunctions; private EventSource domainEventStream; private EventSource allInStorageEventStream; - private Collection>> liveModels = new ArrayList<>(); + private Map>, LiveModelInfo> liveModels = new HashMap<>(); private Collection> consistentReadModels = new ArrayList<>(); private Collection> eventuallyConsistentSharedReadModels = new ArrayList<>(); private Collection> eventuallyConsistentLocalReadModels = new ArrayList<>(); private String boundedContext; private Instance instance; - + private MeterRegistry meterRegistry; - - - public ReadModelModule ( + + public record LiveModelInfo ( + Class> readModelClass, + SnapshotStorage snapshotStorage, + boolean readSnapshots, + boolean writeSnapshots, + int snapshotEventCountThreshold, + Counter counterSnapshotRead, + Counter counterSnapshotWrite + ) { + + public SnapshotStorage snapshotStorageForWrite ( ) { + return writeSnapshots ? snapshotStorage : null; + } + + } + + public > ReadModelModule ( String boundedContext, EventStream domainEventStream, EventStream allInStorageEventStream, - Collection>> liveModelClasses, - Collection> consistentReadModels, - Collection> eventuallyConsistentSharedReadModels, + List liveModelSpecs, + Collection> consistentReadModels, + Collection> eventuallyConsistentSharedReadModels, Collection> eventuallyConsistentLocalReadModels, Collection> eventuallyConsistentEphemeralReadModels, Instance instance, MeterRegistry meterRegistry ) { - + this.domainEventStream = domainEventStream; this.allInStorageEventStream = allInStorageEventStream; this.boundedContext = boundedContext; this.instance = instance; - for ( Class> liveModelClass : liveModelClasses ) { - if ( this.liveModels.contains(liveModelClass)) { - LOGGER.error("multiple live readmodels of type '%s' registered".formatted(liveModelClass)); - throw new IllegalArgumentException("duplicate live readmodel %s".formatted(liveModelClass)); + for ( LMSI spec : liveModelSpecs ) { + Class> readModelClass = spec.readModelClass(); + if ( this.liveModels.containsKey(readModelClass) ) { + LOGGER.error("multiple live readmodels of type '%s' registered".formatted(readModelClass)); + throw new IllegalArgumentException("duplicate live readmodel %s".formatted(readModelClass)); } - this.liveModels.add(liveModelClass); + + io.micrometer.core.instrument.Tags tags = io.micrometer.core.instrument.Tags + .of("context", boundedContext) + .and("readmodel", readModelClass.getSimpleName()); + + Counter counterSnapshotRead = meterRegistry.counter("sliceworkz.eventmodeling.readmodel.live.snapshot.read.count", tags); + Counter counterSnapshotWrite = meterRegistry.counter("sliceworkz.eventmodeling.readmodel.live.snapshot.write.count", tags); + + this.liveModels.put(readModelClass, new LiveModelInfo<>( + readModelClass, + spec.snapshotStorage(), + spec.readSnapshots(), + spec.writeSnapshots(), + spec.snapshotEventCountThreshold(), + counterSnapshotRead, + counterSnapshotWrite)); } for ( ReadModelWithMetaData consistentReadModel : consistentReadModels ) { @@ -104,7 +140,7 @@ public ReadModelModule ( this.eventuallyConsistentReadModelThreadManagers = createEventuallyConsistentEventProcessors(eventuallyConsistentSharedReadModels, eventuallyConsistentLocalReadModels, eventuallyConsistentEphemeralReadModels); this.processorThreadManager = new ProcessorThreadManager("readmodel", this.eventuallyConsistentReadModelThreadManagers); - LOGGER.info("live readmodels: %s".formatted(liveModelClasses)); + LOGGER.info("live readmodels: %s".formatted(liveModels.keySet())); } Collection> createEventuallyConsistentEventProcessors ( Collection> shared, Collection> local, Collection> ephemeral ) { @@ -116,21 +152,22 @@ Collection> createEventual return result; } - + public void kernelFunctions ( BoundedContextFunctions kernelFunctions ) { this.kernelFunctions = kernelFunctions; } @SuppressWarnings({ "unchecked", "rawtypes" }) public T liveModel ( Class> readModelClass, Tracing tracing, Object... constructorParams) { - if ( liveModels.contains(readModelClass)) { + LiveModelInfo info = liveModels.get(readModelClass); + if ( info != null ) { io.micrometer.core.instrument.Tags tags = io.micrometer.core.instrument.Tags .of("context", boundedContext) .and("readmodel", readModelClass.getSimpleName()); meterRegistry.counter("sliceworkz.eventmodeling.readmodel.live.render", tags).increment(); return meterRegistry.timer("sliceworkz.eventmodeling.readmodel.live.duration", tags).record(()->{ - ProjectLiveModelCommand cmd = new ProjectLiveModelCommand(boundedContext, instance, domainEventStream, readModelClass, constructorParams); + ProjectLiveModelCommand cmd = new ProjectLiveModelCommand(boundedContext, instance, domainEventStream, readModelClass, info, constructorParams); kernelFunctions.executeKernelCommand(cmd, tracing); return (T) cmd.readModel(); }); @@ -139,17 +176,18 @@ public T liveModel ( Class T liveModelUnbounded ( Class> readModelClass, Tracing tracing, Object... constructorParams) { - if ( liveModels.contains(readModelClass)) { + LiveModelInfo info = liveModels.get(readModelClass); + if ( info != null ) { io.micrometer.core.instrument.Tags tags = io.micrometer.core.instrument.Tags .of("context", boundedContext) .and("readmodel", readModelClass.getSimpleName()); meterRegistry.counter("sliceworkz.eventmodeling.readmodel.live.render", tags).increment(); return meterRegistry.timer("sliceworkz.eventmodeling.readmodel.live.duration", tags).record(()->{ - ProjectLiveModelUnboundedCommand cmd = new ProjectLiveModelUnboundedCommand(boundedContext, instance, allInStorageEventStream, readModelClass, constructorParams); + ProjectLiveModelUnboundedCommand cmd = new ProjectLiveModelUnboundedCommand(boundedContext, instance, allInStorageEventStream, readModelClass, info, constructorParams); kernelFunctions.executeKernelCommand(cmd, tracing); return (T) cmd.readModel(); }); @@ -176,7 +214,7 @@ public void updateSharedConsistentModels ( Stream> events ) { // TODO implement preloading of (inmemory) models upon kernel start } - + @Override public void start ( ) { this.processorThreadManager.start(); @@ -186,10 +224,10 @@ public void start ( ) { public void stop ( ) { this.processorThreadManager.stop(); } - + @Override public void terminate ( ) { this.processorThreadManager.terminate(); } -} \ No newline at end of file +} diff --git a/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/snapshots/LiveModelSnapshotSpecificationImpl.java b/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/snapshots/LiveModelSnapshotSpecificationImpl.java new file mode 100644 index 0000000..96ed6e7 --- /dev/null +++ b/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/snapshots/LiveModelSnapshotSpecificationImpl.java @@ -0,0 +1,93 @@ +/* + * Sliceworkz Event Modeling - an opinionated Event Modeling framework in Java + * Copyright © 2025 Sliceworkz / XTi (info@sliceworkz.org) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see . + */ +package org.sliceworkz.eventmodeling.module.snapshots; + +import org.sliceworkz.eventmodeling.boundedcontext.BoundedContextBuilder; +import org.sliceworkz.eventmodeling.module.snapshots.SnapshotSpecificationImpl.READ_AND_OR_WRITE; +import org.sliceworkz.eventmodeling.snapshots.LiveModelSnapshotSpecification; +import org.sliceworkz.eventmodeling.snapshots.SnapshotStorage; + +public class LiveModelSnapshotSpecificationImpl implements LiveModelSnapshotSpecification { + + public static final int DEFAULT_EVENT_COUNT_THRESHOLD = 100; + + private SnapshotStorage snapshotStorage; + private READ_AND_OR_WRITE readAndOrWrite = READ_AND_OR_WRITE.READ_WRITE; + private int eventCountThreshold = DEFAULT_EVENT_COUNT_THRESHOLD; + private BoundedContextBuilder parent; + + public LiveModelSnapshotSpecificationImpl ( BoundedContextBuilder parent, SnapshotStorage snapshotStorage ) { + this.parent = parent; + this.snapshotStorage = snapshotStorage; + if ( snapshotStorage != null ) { + this.readAndOrWrite = READ_AND_OR_WRITE.READ_WRITE; + } else { + this.readAndOrWrite = READ_AND_OR_WRITE.NO_READ_NO_WRITE; + } + } + + @Override + public LiveModelSnapshotSpecification eventCountThreshold ( int eventCountThreshold ) { + validateSnapshotStorage(); + if ( eventCountThreshold <= 0 ) { + throw new IllegalArgumentException("eventCountThreshold must be above 0"); + } + this.eventCountThreshold = eventCountThreshold; + return this; + } + + @Override + public BoundedContextBuilder readAndWrite ( ) { + validateSnapshotStorage(); + this.readAndOrWrite = READ_AND_OR_WRITE.READ_WRITE; + return parent; + } + + @Override + public BoundedContextBuilder readOnly ( ) { + validateSnapshotStorage(); + this.readAndOrWrite = READ_AND_OR_WRITE.READ_ONLY; + return parent; + } + + @Override + public BoundedContextBuilder writeOnly ( ) { + validateSnapshotStorage(); + this.readAndOrWrite = READ_AND_OR_WRITE.WRITE_ONLY; + return parent; + } + + private void validateSnapshotStorage ( ) { + if ( snapshotStorage == null ) { + throw new IllegalArgumentException("snapshot storage cannot be null"); + } + } + + public READ_AND_OR_WRITE readAndOrWrite ( ) { + return readAndOrWrite; + } + + public SnapshotStorage snapshotStorage ( ) { + return snapshotStorage; + } + + public int eventCountThreshold ( ) { + return eventCountThreshold; + } + +} diff --git a/sliceworkz-eventmodeling-tests-inmem/src/test/java/org/sliceworkz/eventmodeling/module/readmodels/LiveModelSnapshotTest.java b/sliceworkz-eventmodeling-tests-inmem/src/test/java/org/sliceworkz/eventmodeling/module/readmodels/LiveModelSnapshotTest.java new file mode 100644 index 0000000..a5e4308 --- /dev/null +++ b/sliceworkz-eventmodeling-tests-inmem/src/test/java/org/sliceworkz/eventmodeling/module/readmodels/LiveModelSnapshotTest.java @@ -0,0 +1,317 @@ +/* + * Sliceworkz Event Modeling - an opinionated Event Modeling framework in Java + * Copyright © 2025 Sliceworkz / XTi (info@sliceworkz.org) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see . + */ +package org.sliceworkz.eventmodeling.module.readmodels; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.sliceworkz.eventmodeling.boundedcontext.BoundedContext; +import org.sliceworkz.eventmodeling.events.InstanceFactory; +import org.sliceworkz.eventmodeling.mock.boundedcontext.AbstractMockDomainTest; +import org.sliceworkz.eventmodeling.mock.boundedcontext.MockBoundedContext; +import org.sliceworkz.eventmodeling.mock.boundedcontext.MockDomainEvent; +import org.sliceworkz.eventmodeling.mock.boundedcontext.MockDomainEvent.FirstDomainEvent; +import org.sliceworkz.eventmodeling.mock.boundedcontext.MockInboundEvent; +import org.sliceworkz.eventmodeling.mock.boundedcontext.MockOutboundEvent; +import org.sliceworkz.eventmodeling.readmodels.ReadModel; +import org.sliceworkz.eventmodeling.snapshots.SnapshotCapable; +import org.sliceworkz.eventmodeling.snapshots.SnapshotStorage; +import org.sliceworkz.eventstore.events.EventReference; +import org.sliceworkz.eventstore.infra.inmem.InMemoryEventStorage; +import org.sliceworkz.eventstore.query.EventQuery; +import org.sliceworkz.eventstore.query.EventTypesFilter; +import org.sliceworkz.eventstore.events.Tags; +import org.sliceworkz.eventstore.spi.EventStorage; + +public class LiveModelSnapshotTest extends AbstractMockDomainTest { + + private EventStorage eventStorage; + + private MockLiveModelSnapshotStorage snapshotStorage = new MockLiveModelSnapshotStorage(); + + @BeforeEach + protected void setUp ( ) { + super.setUp(); + SnapshotLiveModel.VERSION = "v1"; + this.eventStorage = createEventStorage(); + } + + @AfterEach + protected void tearDown ( ) { + destroyEventStorage(eventStorage); + if ( boundedContext() != null ) { + boundedContext().stop(); + } + } + + public EventStorage createEventStorage ( ) { + return InMemoryEventStorage.newBuilder().build(); + } + + public void destroyEventStorage ( EventStorage storage ) { + + } + + @Test + void testLiveModelWithoutSnapshots ( ) { + MockBoundedContext domain = domainWithLiveModel(0); + + for ( int i = 0; i < 100; i++ ) { + domain.event(new FirstDomainEvent("test " + i)); + } + + SnapshotLiveModel model = domain.read(SnapshotLiveModel.class, "myModel"); + assertEquals(100, model.getCounter()); + assertEquals(0, snapshotStorage.getSaveInvokes()); + } + + @Test + void testLiveModelSnapshots ( ) { + MockBoundedContext domain = domainWithLiveModel(5); + + // Produce 10 events + for ( int i = 0; i < 10; i++ ) { + domain.event(new FirstDomainEvent("test " + i)); + } + + // First read: projects all 10 events, saves snapshot (10 >= threshold of 5) + SnapshotLiveModel model = domain.read(SnapshotLiveModel.class, "myModel"); + assertEquals(10, model.getCounter()); + assertEquals(1, snapshotStorage.getSaveInvokes()); + + // Second read: loads snapshot, replays 0 events on top (no new events) + model = domain.read(SnapshotLiveModel.class, "myModel"); + assertEquals(10, model.getCounter()); + assertEquals(0, model.getEventsOnTopOfSnapshot()); + + // Add 3 more events — below threshold + for ( int i = 10; i < 13; i++ ) { + domain.event(new FirstDomainEvent("test " + i)); + } + + // Third read: loads snapshot, replays 3 events on top (3 < 5, no new snapshot) + model = domain.read(SnapshotLiveModel.class, "myModel"); + assertEquals(13, model.getCounter()); + assertEquals(3, model.getEventsOnTopOfSnapshot()); + assertEquals(1, snapshotStorage.getSaveInvokes()); // still 1 from the first save + + // Add 7 more events — now 10 new events since snapshot + for ( int i = 13; i < 20; i++ ) { + domain.event(new FirstDomainEvent("test " + i)); + } + + // Fourth read: loads snapshot, replays 10 events on top (10 >= 5, saves new snapshot) + model = domain.read(SnapshotLiveModel.class, "myModel"); + assertEquals(20, model.getCounter()); + assertEquals(10, model.getEventsOnTopOfSnapshot()); + assertEquals(2, snapshotStorage.getSaveInvokes()); + + // Fifth read: loads latest snapshot, replays 0 events + model = domain.read(SnapshotLiveModel.class, "myModel"); + assertEquals(20, model.getCounter()); + assertEquals(0, model.getEventsOnTopOfSnapshot()); + } + + @Test + void testLiveModelSnapshotsChangingVersion ( ) { + MockBoundedContext domain = domainWithLiveModel(5); + + // Produce 10 events and read to create first snapshot + for ( int i = 0; i < 10; i++ ) { + domain.event(new FirstDomainEvent("test " + i)); + } + SnapshotLiveModel model = domain.read(SnapshotLiveModel.class, "myModel"); + assertEquals(10, model.getCounter()); + assertEquals(1, snapshotStorage.getSaveInvokes()); + + // Change version — snapshot should not be loaded + SnapshotLiveModel.VERSION = "v" + UUID.randomUUID().toString(); + + model = domain.read(SnapshotLiveModel.class, "myModel"); + assertEquals(10, model.getCounter()); + assertEquals(10, model.getEventsOnTopOfSnapshot()); // full replay since version mismatch + assertEquals(2, snapshotStorage.getSaveInvokes()); // new snapshot saved with new version + + // Revert version — latest snapshot with original version is still there + SnapshotLiveModel.VERSION = "v1"; + model = domain.read(SnapshotLiveModel.class, "myModel"); + assertEquals(10, model.getCounter()); + assertEquals(0, model.getEventsOnTopOfSnapshot()); // snapshot with v1 was loaded + } + + @Test + void testLiveModelSnapshotReadOnly ( ) { + // First, create a domain with readAndWrite to establish a snapshot + MockBoundedContext domain = domainWithLiveModelMode(5, "readAndWrite"); + + for ( int i = 0; i < 10; i++ ) { + domain.event(new FirstDomainEvent("test " + i)); + } + SnapshotLiveModel model = domain.read(SnapshotLiveModel.class, "myModel"); + assertEquals(10, model.getCounter()); + assertEquals(1, snapshotStorage.getSaveInvokes()); + domain.stop(); + + // Now create a new domain with readOnly mode + MockBoundedContext domain2 = domainWithLiveModelMode(5, "readOnly"); + model = domain2.read(SnapshotLiveModel.class, "myModel"); + assertEquals(10, model.getCounter()); + assertEquals(0, model.getEventsOnTopOfSnapshot()); // snapshot was loaded + assertEquals(1, snapshotStorage.getSaveInvokes()); // no new save in readOnly + } + + @Test + void testLiveModelSnapshotWriteOnly ( ) { + MockBoundedContext domain = domainWithLiveModelMode(5, "writeOnly"); + + for ( int i = 0; i < 10; i++ ) { + domain.event(new FirstDomainEvent("test " + i)); + } + SnapshotLiveModel model = domain.read(SnapshotLiveModel.class, "myModel"); + assertEquals(10, model.getCounter()); + assertEquals(10, model.getEventsOnTopOfSnapshot()); // full replay in writeOnly + assertEquals(1, snapshotStorage.getSaveInvokes()); // but snapshot was saved + } + + + MockBoundedContext domainWithLiveModel ( int snapshotAfterEventCount ) { + return domainWithLiveModelMode(snapshotAfterEventCount, "readAndWrite"); + } + + MockBoundedContext domainWithLiveModelMode ( int snapshotAfterEventCount, String mode ) { + + var builder = BoundedContext.newBuilder(MockDomainEvent.class, MockInboundEvent.class, MockOutboundEvent.class) + .name("UnitTestBoundedContext") + .eventStorage(eventStorage) + .instance(InstanceFactory.determine("unittests")); + + if ( snapshotAfterEventCount == 0 ) { + builder.readmodel(SnapshotLiveModel.class).live(); + } else { + var snapshotSpec = builder.readmodel(SnapshotLiveModel.class).snapshots(snapshotStorage).eventCountThreshold(snapshotAfterEventCount); + switch ( mode ) { + case "readOnly" -> snapshotSpec.readOnly(); + case "writeOnly" -> snapshotSpec.writeOnly(); + default -> snapshotSpec.readAndWrite(); + } + } + + return buildBoundedContext(builder); + } + +} + +class SnapshotLiveModel implements ReadModel, SnapshotCapable { + + public static String VERSION = "v1"; + + private String name; + private int counter; + private int eventsOnTopOfSnapshot; + + public SnapshotLiveModel ( String name ) { + this.name = name; + } + + @Override + public EventQuery eventQuery ( ) { + return EventQuery.forEvents(EventTypesFilter.any(), Tags.none()); + } + + @Override + public void when ( MockDomainEvent event ) { + counter++; + eventsOnTopOfSnapshot++; + } + + @Override + public String readmodelName ( ) { + return name; + } + + public int getCounter ( ) { + return counter; + } + + public int getEventsOnTopOfSnapshot ( ) { + return eventsOnTopOfSnapshot; + } + + // --- SnapshotCapable --- + + @Override + public SnapshotData takeSnapshot ( ) { + return new SnapshotData(counter); + } + + @Override + public void fromSnapshot ( SnapshotData snapshot ) { + this.counter = snapshot.counter; + this.eventsOnTopOfSnapshot = 0; + } + + @Override + public String version ( ) { + return VERSION; + } + + public static class SnapshotData { + private int counter; + + public SnapshotData ( ) { + } + + public SnapshotData ( int counter ) { + this.counter = counter; + } + + public SnapshotData clone ( ) { + return new SnapshotData(counter); + } + } + +} + +class MockLiveModelSnapshotStorage implements SnapshotStorage { + + private int saveInvokes; + private Map> snapshots = new HashMap<>(); + + @Override + public Optional> load ( String key, String version ) { + return Optional.ofNullable(snapshots.get(key + version)); + } + + @Override + public void save ( String key, String version, SnapshotLiveModel.SnapshotData snapshot, EventReference lastEventReference ) { + saveInvokes++; + snapshots.put(key + version, new SnapshotRecord<>(snapshot.clone(), lastEventReference)); + } + + public int getSaveInvokes ( ) { + return saveInvokes; + } + +}