From 2acb03863c8644b67cab4cff15fbf8b824491923 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 12 Feb 2026 07:51:11 +0000 Subject: [PATCH 1/3] Add implementation plan for optional live model snapshotting Design proposal that mirrors aggregate snapshotting techniques for live models (read models). Reuses SnapshotCapable and SnapshotStorage SPI, adds LiveModelSnapshotSpecification builder API, and modifies ProjectLiveModelCommand to load/save snapshots around projection. https://claude.ai/code/session_01CxJGnsJcX4mJtMxGHrWqH3 --- plan.md | 336 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 336 insertions(+) create mode 100644 plan.md diff --git a/plan.md b/plan.md new file mode 100644 index 0000000..41f694e --- /dev/null +++ b/plan.md @@ -0,0 +1,336 @@ +# Implementation Plan: Optional Live Model Snapshotting + +## Problem + +Live models are rebuilt from scratch on every `read()` call by replaying all matching events through a `Projector`. For read models with large event histories, this becomes a performance bottleneck. Aggregates already solve this with snapshotting — we apply the same techniques to live models. + +## Design Principles (mirroring aggregate snapshotting) + +| Aggregate Snapshotting | Live Model Snapshotting (proposed) | +|---|---| +| `SnapshotCapable` on aggregate | `SnapshotCapable` on read model (reused) | +| `SnapshotStorage` SPI | Same `SnapshotStorage` SPI (reused, no changes) | +| `SnapshotSpecification` builder API | `LiveModelSnapshotSpecification` builder API (new, mirrors it) | +| `AggregateSpecification.snapshots(storage)` | `LiveModelSpecification.snapshots(storage)` | +| Key from `name + Tags` | Key from `readModelName + constructorParams` | +| `AggregateContextImpl` does load/save | `ProjectLiveModelCommand` does load/save | +| `AggregateModule` orchestrates | `ReadModelModule` orchestrates | +| Event count threshold triggers save | Event count threshold triggers save | +| `readAndWrite() / readOnly() / writeOnly()` | Same modes | +| Micrometer metrics | Same pattern of metrics | + +## Changes by Module + +--- + +### 1. `sliceworkz-eventmodeling-api` (public API) + +#### 1a. Modify `SnapshotCapable` — add overloaded `key()` for constructor-param-based identity + +**File:** `snapshots/SnapshotCapable.java` + +Add a second `default` key method alongside the existing Tags-based one: + +```java +default String key(String name, Object... constructorParams) { + StringBuilder keyBuilder = new StringBuilder(name != null ? name : ""); + if (constructorParams != null) { + for (Object param : constructorParams) { + keyBuilder.append("/"); + keyBuilder.append(param != null ? param.toString() : ""); + } + } + return keyBuilder.toString(); +} +``` + +This keeps `SnapshotCapable` as the single marker interface for both aggregates and live models. The Tags-based `key(String, Tags)` remains untouched. Read models use the `key(String, Object...)` overload. Both can be overridden for custom key logic. + +**Rationale:** Reusing `SnapshotCapable` rather than creating a new interface means: +- Read models and aggregates share the same contract (`takeSnapshot`, `fromSnapshot`, `version`) +- The `SnapshotStorage` SPI needs zero changes +- Existing `SnapshotSpecificationImpl` logic can be reused + +#### 1b. New `LiveModelSnapshotSpecification` interface + +**File:** `snapshots/LiveModelSnapshotSpecification.java` (new) + +```java +public interface LiveModelSnapshotSpecification { + + LiveModelSnapshotSpecification eventCountThreshold(int threshold); + + BoundedContextBuilder readAndWrite(); + BoundedContextBuilder readOnly(); + BoundedContextBuilder writeOnly(); +} +``` + +Mirrors `SnapshotSpecification` exactly, just returns to the builder via a different path. This is the fluent API returned after calling `.snapshots(storage)` on a live model registration. + +#### 1c. Modify `LiveModelSpecification` — add `snapshots()` entry point + +**File:** `readmodels/LiveModelSpecification.java` + +Add: +```java + LiveModelSnapshotSpecification snapshots(SnapshotStorage snapshotStorage); +``` + +This creates the same entry point pattern as `AggregateSpecification.snapshots(storage)`. + +--- + +### 2. `sliceworkz-eventmodeling-impl` (implementation) + +#### 2a. New `LiveModelSnapshotSpecificationImpl` + +**File:** `module/snapshots/LiveModelSnapshotSpecificationImpl.java` (new) + +Direct mirror of `SnapshotSpecificationImpl`. Holds: +- `SnapshotStorage snapshotStorage` +- `READ_AND_OR_WRITE readAndOrWrite` (reuse the existing enum from `SnapshotSpecificationImpl`) +- `int eventCountThreshold` (default 100) + +Returns to `BoundedContextBuilder` on terminal methods. + +#### 2b. Modify `BoundedContextBuilderImpl.LiveModelSpecificationImpl` + +**File:** `module/boundedcontext/BoundedContextBuilderImpl.java` + +Add snapshot state to the inner class: + +```java +class LiveModelSpecificationImpl implements LiveModelSpecification { + // existing fields... + private LiveModelSnapshotSpecificationImpl snapshotSpecification; + + @Override + public LiveModelSnapshotSpecification snapshots( + SnapshotStorage snapshotStorage) { + this.snapshotSpecification = new LiveModelSnapshotSpecificationImpl<>(builder, snapshotStorage); + return snapshotSpecification; + } + + // Accessor methods for ReadModelModule to use: + public SnapshotStorage snapshotStorage() { ... } + public boolean readSnapshots() { ... } + public boolean writeSnapshots() { ... } + public int snapshotEventCountThreshold() { ... } +} +``` + +#### 2c. Modify `ReadModelModule` — pass snapshot config, store per-readmodel info + +**File:** `module/readmodels/ReadModelModule.java` + +The constructor and `liveModel()` method need to know about snapshot configuration per live model class. Instead of just holding `Collection>` for live models, hold a map of `LiveModelInfo` records (same pattern as `AggregateModule.AggregateInfo`): + +```java +record LiveModelInfo( + Class> readModelClass, + SnapshotStorage snapshotStorage, + boolean readSnapshots, + boolean writeSnapshots, + int snapshotEventCountThreshold, + Counter counterSnapshotRead, + Counter counterSnapshotWrite +) { } +``` + +The `liveModel()` method passes snapshot config to `ProjectLiveModelCommand`. + +#### 2d. Modify `ProjectLiveModelCommand` — the core change + +**File:** `module/readmodels/ProjectLiveModelCommand.java` + +This is the equivalent of `AggregateContextImpl` + `AggregateModule.aggregate()` for live models. The `execute()` method changes from: + +```java +// BEFORE: +readModel = instantiate(readModelClass, constructorParams); +Projector.from(eventSource).towards(readModel).build().run(); +``` + +To: + +```java +// AFTER: +readModel = instantiate(readModelClass, constructorParams); + +EventReference lastEventReference = null; + +// 1. LOAD snapshot (if configured and read model implements SnapshotCapable) +if (readSnapshots && readModel instanceof SnapshotCapable snapshotCapable) { + String key = snapshotCapable.key(readModel.readmodelName(), constructorParams); + String version = snapshotCapable.version(); + var loaded = snapshotStorage.load(key, version); + if (loaded.isPresent()) { + snapshotCapable.fromSnapshot(loaded.get().snapshot()); + lastEventReference = loaded.get().lastEventReference(); + counterSnapshotRead.increment(); + } +} + +// 2. Replay remaining events from after snapshot +Projector projector = Projector.from(eventSource) + .towards(readModel) + .startingAfter(lastEventReference) // <-- key: resume from snapshot point + .build(); +ProjectorMetrics metrics = projector.run(); + +// 3. SAVE snapshot (if configured and threshold met) +if (writeSnapshots && readModel instanceof SnapshotCapable snapshotCapable) { + long totalEvents = (lastEventReference != null ? 1 : 0) + metrics.eventsStreamed(); + if (metrics.eventsStreamed() >= snapshotEventCountThreshold) { + String key = snapshotCapable.key(readModel.readmodelName(), constructorParams); + snapshotStorage.save(key, snapshotCapable.version(), + snapshotCapable.takeSnapshot(), metrics.lastEventReference()); + counterSnapshotWrite.increment(); + } +} +``` + +The same logic applies to `ProjectLiveModelUnboundedCommand`. + +#### 2e. Wire snapshot config through `BoundedContextBuilderImpl.build()` + +**File:** `module/boundedcontext/BoundedContextBuilderImpl.java` + +Change the `build()` method to pass `LiveModelSpecificationImpl` objects (not just classes) to `ReadModelModule`, so it has access to snapshot configuration: + +```java +// BEFORE: +Collection> liveModelClasses = liveModelSpecs.stream() + .map(LiveModelSpecificationImpl::readModelClass) + .collect(...); +new ReadModelModule<>(..., liveModelClasses, ...); + +// AFTER: +new ReadModelModule<>(..., liveModelSpecs, ...); +``` + +--- + +### 3. `sliceworkz-eventmodeling-testing` — no changes needed + +The existing test infrastructure supports the pattern. Mock snapshot storages are test-local. + +--- + +### 4. `sliceworkz-eventmodeling-tests-inmem` — new test class + +#### New: `LiveModelSnapshotTest` + +**File:** `module/readmodels/LiveModelSnapshotTest.java` (new) + +Mirrors `AggregateCapabilityTest` for live models: + +1. **`testLiveModelSnapshots()`** — Create a live read model that implements `SnapshotCapable`. Append N events, read the model. Verify snapshot is saved. Read again, verify snapshot is loaded and only delta events are replayed. + +2. **`testLiveModelSnapshotsChangingVersion()`** — Same as aggregate test: changing version causes snapshot miss, full replay. + +3. **`testLiveModelWithoutSnapshots()`** — Verify that live models without `SnapshotCapable` work exactly as before (no regression). + +4. **`testLiveModelSnapshotReadOnly()`** / **`testLiveModelSnapshotWriteOnly()`** — Test the modes. + +A mock read model + mock snapshot storage following the exact same pattern as `MockAggregate` + `MockSnapshotStorage` in the aggregate tests. + +--- + +## Builder API Usage (end-user perspective) + +### Without snapshots (unchanged): +```java +BoundedContext.newBuilder(...) + .readmodel(AccountDetailsReadModel.class).live() + .build(); +``` + +### With snapshots (new): +```java +SnapshotStorage snapshotStorage = new InMemorySnapshotStorage<>(); +// or: new PostgresSnapshotStorage<>(dataSource); +// or: any custom SnapshotStorage implementation + +BoundedContext.newBuilder(...) + .readmodel(AccountDetailsReadModel.class) + .snapshots(snapshotStorage) + .eventCountThreshold(200) + .readAndWrite() // returns to builder (terminal) + .build(); +``` + +### Read model implementation: +```java +public class AccountDetailsReadModel + implements ReadModel, SnapshotCapable { + + private DomainConceptId accountId; + private AccountDetails account; + + public AccountDetailsReadModel(DomainConceptId accountId) { + this.accountId = accountId; + } + + // --- ReadModel methods (unchanged) --- + @Override public EventQuery eventQuery() { ... } + @Override public void when(BankingDomainEvent event) { ... } + public Optional getAccountDetails() { ... } + + // --- SnapshotCapable methods (new) --- + @Override + public AccountDetailsSnapshot takeSnapshot() { + return new AccountDetailsSnapshot(account); + } + + @Override + public void fromSnapshot(AccountDetailsSnapshot snapshot) { + this.account = snapshot.accountDetails(); + } + + @Override + public String version() { return "v1"; } + + // key() uses default implementation: "AccountDetailsReadModel/" +} +``` + +## Snapshot Key Strategy + +- Aggregates: `key(name, Tags)` → `"AggregateName/tagKey-tagValue/..."` +- Live models: `key(name, Object...)` → `"ReadModelName/param1.toString()/param2.toString()/..."` +- Both overridable by the user for custom key schemes +- Version matching identical: exact match required, mismatch = full replay + +## Metrics (mirroring aggregates) + +| Metric | Tags | Purpose | +|--------|------|---------| +| `sliceworkz.eventmodeling.readmodel.live.snapshot.read.count` | context, readmodel | Snapshot loads | +| `sliceworkz.eventmodeling.readmodel.live.snapshot.write.count` | context, readmodel | Snapshot saves | + +Existing `sliceworkz.eventmodeling.readmodel.live.render` and `sliceworkz.eventmodeling.readmodel.live.duration` metrics remain unchanged. + +## Files Changed Summary + +| Module | File | Change | +|--------|------|--------| +| api | `snapshots/SnapshotCapable.java` | Add `key(String, Object...)` default method | +| api | `snapshots/LiveModelSnapshotSpecification.java` | **New** — builder spec interface | +| api | `readmodels/LiveModelSpecification.java` | Add `snapshots()` method | +| impl | `module/snapshots/LiveModelSnapshotSpecificationImpl.java` | **New** — mirrors `SnapshotSpecificationImpl` | +| impl | `module/boundedcontext/BoundedContextBuilderImpl.java` | Add snapshot fields to `LiveModelSpecificationImpl`, wire through `build()` | +| impl | `module/readmodels/ReadModelModule.java` | Hold `LiveModelInfo` instead of just classes, pass to commands | +| impl | `module/readmodels/ProjectLiveModelCommand.java` | Load/save snapshot around projection | +| impl | `module/readmodels/ProjectLiveModelUnboundedCommand.java` | Same as above | +| tests-inmem | `module/readmodels/LiveModelSnapshotTest.java` | **New** — test class mirroring `AggregateCapabilityTest` | + +## What Stays Unchanged + +- `SnapshotStorage` interface — fully reused, zero changes +- `SnapshotSpecification` — aggregate-specific, untouched +- `SnapshotSpecificationImpl` — aggregate-specific, untouched (but `READ_AND_OR_WRITE` enum reused) +- `AggregateModule`, `AggregateContextImpl` — untouched +- Existing read model behavior without snapshots — fully backward compatible +- `BoundedContext.read()` / `readUnbounded()` — signature unchanged From bf9cfd2b3fc2a949e38bc43d6499508ee0ab5dfd Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 12 Feb 2026 09:34:02 +0000 Subject: [PATCH 2/3] Implement optional live model snapshotting with pluggable SnapshotStorage SPI Mirrors the aggregate snapshotting approach for live models (read models projected on-demand). Live models that implement SnapshotCapable can now be configured with a SnapshotStorage at build time to cache their projected state, avoiding full event replay on subsequent reads. API changes: - SnapshotCapable: add key(String, Object...) overload for live model identity based on constructor params (existing Tags-based key unchanged) - LiveModelSpecification: add snapshots(SnapshotStorage) entry point - New LiveModelSnapshotSpecification: fluent builder with eventCountThreshold, readAndWrite/readOnly/writeOnly modes Implementation: - ProjectLiveModelCommand/Unbounded: load snapshot before projection (Projector.startingAfter), save after if threshold met - ReadModelModule: LiveModelInfo record holds per-model snapshot config and Micrometer counters (snapshot.read.count, snapshot.write.count) - LiveModelSnapshotSpecificationImpl: reuses SnapshotSpecificationImpl's READ_AND_OR_WRITE enum Tests: - LiveModelSnapshotTest: snapshot save/load lifecycle, version compatibility, readOnly/writeOnly modes, no-snapshot backward compat https://claude.ai/code/session_01CxJGnsJcX4mJtMxGHrWqH3 --- .../readmodels/LiveModelSpecification.java | 15 + .../LiveModelSnapshotSpecification.java | 82 +++++ .../snapshots/SnapshotCapable.java | 50 ++- .../BoundedContextBuilderImpl.java | 50 ++- .../LiveModelSpecificationAccessor.java | 38 +++ .../readmodels/ProjectLiveModelCommand.java | 62 +++- .../ProjectLiveModelUnboundedCommand.java | 56 +++- .../module/readmodels/ReadModelModule.java | 90 +++-- .../LiveModelSnapshotSpecificationImpl.java | 93 +++++ .../readmodels/LiveModelSnapshotTest.java | 317 ++++++++++++++++++ 10 files changed, 783 insertions(+), 70 deletions(-) create mode 100644 sliceworkz-eventmodeling-api/src/main/java/org/sliceworkz/eventmodeling/snapshots/LiveModelSnapshotSpecification.java create mode 100644 sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/readmodels/LiveModelSpecificationAccessor.java create mode 100644 sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/snapshots/LiveModelSnapshotSpecificationImpl.java create mode 100644 sliceworkz-eventmodeling-tests-inmem/src/test/java/org/sliceworkz/eventmodeling/module/readmodels/LiveModelSnapshotTest.java diff --git a/sliceworkz-eventmodeling-api/src/main/java/org/sliceworkz/eventmodeling/readmodels/LiveModelSpecification.java b/sliceworkz-eventmodeling-api/src/main/java/org/sliceworkz/eventmodeling/readmodels/LiveModelSpecification.java index ef6f124..32688b6 100644 --- a/sliceworkz-eventmodeling-api/src/main/java/org/sliceworkz/eventmodeling/readmodels/LiveModelSpecification.java +++ b/sliceworkz-eventmodeling-api/src/main/java/org/sliceworkz/eventmodeling/readmodels/LiveModelSpecification.java @@ -18,6 +18,8 @@ package org.sliceworkz.eventmodeling.readmodels; import org.sliceworkz.eventmodeling.boundedcontext.BoundedContextBuilder; +import org.sliceworkz.eventmodeling.snapshots.LiveModelSnapshotSpecification; +import org.sliceworkz.eventmodeling.snapshots.SnapshotStorage; public interface LiveModelSpecification { @@ -27,4 +29,17 @@ public interface LiveModelSpecification eventuallyConsistent(); + /** + * Configures snapshotting for this live model to optimize projection performance. + *

+ * Snapshots store the complete state of a live model at a point in time, allowing + * faster projection by avoiding replay of the entire event history. The live model + * must implement {@link org.sliceworkz.eventmodeling.snapshots.SnapshotCapable} to use this feature. + * + * @param the type representing the live model's snapshot state + * @param snapshotStorage the storage mechanism for persisting and loading snapshots + * @return a snapshot specification for further configuration + */ + LiveModelSnapshotSpecification snapshots ( SnapshotStorage snapshotStorage ); + } \ No newline at end of file diff --git a/sliceworkz-eventmodeling-api/src/main/java/org/sliceworkz/eventmodeling/snapshots/LiveModelSnapshotSpecification.java b/sliceworkz-eventmodeling-api/src/main/java/org/sliceworkz/eventmodeling/snapshots/LiveModelSnapshotSpecification.java new file mode 100644 index 0000000..8e94149 --- /dev/null +++ b/sliceworkz-eventmodeling-api/src/main/java/org/sliceworkz/eventmodeling/snapshots/LiveModelSnapshotSpecification.java @@ -0,0 +1,82 @@ +/* + * Sliceworkz Event Modeling - an opinionated Event Modeling framework in Java + * Copyright © 2025 Sliceworkz / XTi (info@sliceworkz.org) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see . + */ +package org.sliceworkz.eventmodeling.snapshots; + +import org.sliceworkz.eventmodeling.boundedcontext.BoundedContextBuilder; + +/** + * Provides a fluent API for configuring snapshot behavior for a live model. + *

+ * This specification controls when snapshots are taken and whether they should + * be read during live model projection, written after projection, or both. + * Snapshot configuration allows fine-tuning of the performance/storage tradeoff. + * + * @param the base type of domain events in the bounded context + * @param the base type of inbound events from external systems + * @param the base type of outbound events to external systems + */ +public interface LiveModelSnapshotSpecification { + + /** + * Sets the event count threshold for triggering snapshot creation. + *

+ * A snapshot is automatically created when the number of events replayed since the last + * snapshot (or from the beginning if no snapshot exists) reaches this threshold. + * Lower values create snapshots more frequently, trading storage space for faster + * live model projection. Higher values reduce storage but may slow projection of live models + * with long event histories. + * + * @param eventCountThreshold number of events before creating a new snapshot, must be greater than 0 + * @return this specification for method chaining + * @throws IllegalArgumentException if threshold is less than or equal to 0 + */ + LiveModelSnapshotSpecification eventCountThreshold ( int eventCountThreshold ); + + /** + * Enables both reading and writing of snapshots (default behavior). + *

+ * The live model will attempt to load from a snapshot when projecting state, + * and will create new snapshots when the event count threshold is reached. + * + * @return the bounded context builder for further configuration + */ + BoundedContextBuilder readAndWrite ( ); + + /** + * Enables reading snapshots but disables writing new snapshots. + *

+ * Useful when transitioning away from snapshotting or when you want to use + * existing snapshots but not create new ones. The live model will load from + * snapshots but will not create new ones regardless of event count. + * + * @return the bounded context builder for further configuration + */ + BoundedContextBuilder readOnly ( ); + + /** + * Enables writing new snapshots but disables reading existing snapshots. + *

+ * Useful when transitioning to snapshotting or testing snapshot creation. + * The live model will always be projected from the full event history, + * but new snapshots will be created for future use. + * + * @return the bounded context builder for further configuration + */ + BoundedContextBuilder writeOnly ( ); + +} diff --git a/sliceworkz-eventmodeling-api/src/main/java/org/sliceworkz/eventmodeling/snapshots/SnapshotCapable.java b/sliceworkz-eventmodeling-api/src/main/java/org/sliceworkz/eventmodeling/snapshots/SnapshotCapable.java index 909c705..9e62bc3 100644 --- a/sliceworkz-eventmodeling-api/src/main/java/org/sliceworkz/eventmodeling/snapshots/SnapshotCapable.java +++ b/sliceworkz-eventmodeling-api/src/main/java/org/sliceworkz/eventmodeling/snapshots/SnapshotCapable.java @@ -20,47 +20,46 @@ import org.sliceworkz.eventstore.events.Tags; /** - * Marks an aggregate as capable of snapshotting for performance optimization. + * Marks an aggregate or live model as capable of snapshotting for performance optimization. *

- * Aggregates with long event histories can implement this interface to enable periodic - * snapshots of their state. When loading, the aggregate can be restored from the most + * Aggregates and live models with long event histories can implement this interface to enable + * periodic snapshots of their state. When loading, the component can be restored from the most * recent snapshot and then replay only subsequent events, rather than replaying the * entire event history from the beginning. *

* Snapshot versioning ensures that snapshots are only loaded if they match the current - * aggregate implementation version, preventing incompatibility issues when the aggregate - * structure changes. + * implementation version, preventing incompatibility issues when the structure changes. * * @param the type representing the serialized snapshot state */ public interface SnapshotCapable { /** - * Captures the current state of the aggregate as a snapshot. + * Captures the current state as a snapshot. *

- * The snapshot should contain all state necessary to restore the aggregate + * The snapshot should contain all state necessary to restore the component * to its current condition without replaying events. * - * @return an immutable representation of the aggregate's current state + * @return an immutable representation of the current state */ SNAPSHOT_TYPE takeSnapshot ( ); /** - * Restores the aggregate's state from a previously captured snapshot. + * Restores state from a previously captured snapshot. *

- * This method is called before event replay begins, allowing the aggregate + * This method is called before event replay begins, allowing the component * to skip replaying events that occurred before the snapshot was taken. * - * @param snapshot the snapshot containing the aggregate's previous state + * @param snapshot the snapshot containing the previous state */ void fromSnapshot ( SNAPSHOT_TYPE snapshot ); /** - * Returns the version identifier for this aggregate's snapshot format. + * Returns the version identifier for the snapshot format. *

* The version should be changed whenever the snapshot structure is modified * in a way that makes old snapshots incompatible. Only snapshots with matching - * versions will be loaded; mismatched snapshots are ignored and the aggregate + * versions will be loaded; mismatched snapshots are ignored and the component * is rebuilt from events. * * @return a version identifier for snapshot compatibility checking @@ -107,4 +106,29 @@ default String key ( String name, Tags identity ) { return keyBuilder.toString(); } + /** + * Generates a unique storage key for a live model's snapshot. + *

+ * The key is generated in the format: "name/param1/param2/..." where each constructor + * parameter's string representation is appended as a path segment. Null parameters are + * represented as empty strings. + * Implementations may override this to provide custom key generation logic. + * + * @param name the name of the read model class + * @param constructorParams the constructor parameters uniquely identifying this live model instance + * @return a unique key for storing and retrieving this live model's snapshot + */ + default String key ( String name, Object... constructorParams ) { + StringBuilder keyBuilder = new StringBuilder(name != null ? name : ""); + + if ( constructorParams != null ) { + for ( Object param : constructorParams ) { + keyBuilder.append("/"); + keyBuilder.append(param != null ? param.toString() : ""); + } + } + + return keyBuilder.toString(); + } + } diff --git a/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/boundedcontext/BoundedContextBuilderImpl.java b/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/boundedcontext/BoundedContextBuilderImpl.java index 66d241d..8986907 100644 --- a/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/boundedcontext/BoundedContextBuilderImpl.java +++ b/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/boundedcontext/BoundedContextBuilderImpl.java @@ -45,11 +45,15 @@ import org.sliceworkz.eventmodeling.module.dcb.DCBModule; import org.sliceworkz.eventmodeling.module.inbound.InboundModule; import org.sliceworkz.eventmodeling.module.outbound.OutboundModule; +import org.sliceworkz.eventmodeling.module.readmodels.LiveModelSpecificationAccessor; import org.sliceworkz.eventmodeling.module.readmodels.ReadModelModule; +import org.sliceworkz.eventmodeling.module.snapshots.LiveModelSnapshotSpecificationImpl; import org.sliceworkz.eventmodeling.outbound.Dispatcher; import org.sliceworkz.eventmodeling.readmodels.LiveModelSpecification; import org.sliceworkz.eventmodeling.readmodels.LongLivedReadModelSpecification; import org.sliceworkz.eventmodeling.readmodels.ReadModelWithMetaData; +import org.sliceworkz.eventmodeling.snapshots.LiveModelSnapshotSpecification; +import org.sliceworkz.eventmodeling.snapshots.SnapshotStorage; import org.sliceworkz.eventmodeling.slices.AnnotationBasedDiscoveryAndConfiguration; import org.sliceworkz.eventmodeling.slices.FeatureSlice; import org.sliceworkz.eventmodeling.slices.FeatureSliceConfiguration; @@ -278,22 +282,20 @@ public >> liveModelClasses = liveModelSpecs.stream().map(LiveModelSpecificationImpl::readModelClass).collect(Collectors.toCollection(ArrayList::new)); - Collection> consistentReadModels = longLivedReadModelSpecs.stream().filter(s->s.consistency()==Consistency.CONSISTENT).map(LongLivedReadModelSpecificationImpl::readModel).collect(Collectors.toCollection(ArrayList::new)); Collection> eventuallyConsistentSharedReadModels = longLivedReadModelSpecs.stream().filter(s->s.consistency()==Consistency.EVENTUALLY_CONSISTENT&&s.isShared()).map(LongLivedReadModelSpecificationImpl::readModel).collect(Collectors.toCollection(ArrayList::new)); Collection> eventuallyConsistentLocalReadModels = longLivedReadModelSpecs.stream().filter(s->s.consistency()==Consistency.EVENTUALLY_CONSISTENT&&s.isLocal()&&!s.isEphemeral()).map(LongLivedReadModelSpecificationImpl::readModel).collect(Collectors.toCollection(ArrayList::new)); Collection> eventuallyConsistentEphemeralReadModels = longLivedReadModelSpecs.stream().filter(s->s.consistency()==Consistency.EVENTUALLY_CONSISTENT&&s.isLocal()&&s.isEphemeral()).map(LongLivedReadModelSpecificationImpl::readModel).collect(Collectors.toCollection(ArrayList::new)); - + Collection> translators = translatorSpecs.stream().map(i->(Translator)i).collect(Collectors.toCollection(ArrayList::new)); InboundModule im = new InboundModule<>(name, inboundEventStream, translators, instance, meterRegistry); Collection> dispatchers = dispatcherSpecs.stream().map(i->(Dispatcher)i).collect(Collectors.toCollection(ArrayList::new)); OutboundModule om = new OutboundModule(name, outboundEventStream, dispatchers, instance, meterRegistry); - + AutomationModule am = new AutomationModule<>(name, domainEventStream, automations, instance, meterRegistry); - - ReadModelModule rmm = new ReadModelModule(name, domainEventStream, readAllInStoreEventStream, liveModelClasses, consistentReadModels, eventuallyConsistentSharedReadModels, eventuallyConsistentLocalReadModels, eventuallyConsistentEphemeralReadModels, instance, meterRegistry); + + ReadModelModule rmm = new ReadModelModule(name, domainEventStream, readAllInStoreEventStream, liveModelSpecs, consistentReadModels, eventuallyConsistentSharedReadModels, eventuallyConsistentLocalReadModels, eventuallyConsistentEphemeralReadModels, instance, meterRegistry); DCBModule dcb = new DCBModule(name, instance, rmm, domainEventStream, outboundEventStream, false, meterRegistry); AggregateModule aggregateModule = new AggregateModule(name, instance, aggregateSpecifications, domainEventStream, meterRegistry); @@ -322,12 +324,13 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl }); } - public class LiveModelSpecificationImpl implements LiveModelSpecification { - + public class LiveModelSpecificationImpl implements LiveModelSpecification, LiveModelSpecificationAccessor { + private BoundedContextBuilder builder; private Class> readModelClass; private Consistency consistency = Consistency.LIVE; - + private LiveModelSnapshotSpecificationImpl snapshotSpecification; + public LiveModelSpecificationImpl ( BoundedContextBuilder builder, Class> readModelClass ) { this.builder = builder; this.readModelClass = readModelClass; @@ -349,14 +352,41 @@ public BoundedContextBuilder LiveModelSnapshotSpecification snapshots ( + SnapshotStorage snapshotStorage ) { + if ( snapshotStorage == null ) { + throw new IllegalArgumentException("snapshotStorage can not be null"); + } + this.snapshotSpecification = new LiveModelSnapshotSpecificationImpl(builder, snapshotStorage); + return snapshotSpecification; + } + public Consistency consistency ( ) { return consistency; } - + public Class> readModelClass ( ) { return readModelClass; } + @SuppressWarnings("unchecked") + public SnapshotStorage snapshotStorage ( ) { + return snapshotSpecification == null ? null : (SnapshotStorage) snapshotSpecification.snapshotStorage(); + } + + public boolean readSnapshots ( ) { + return snapshotSpecification != null && snapshotSpecification.readAndOrWrite().mustRead(); + } + + public boolean writeSnapshots ( ) { + return snapshotSpecification != null && snapshotSpecification.readAndOrWrite().mustWrite(); + } + + public int snapshotEventCountThreshold ( ) { + return snapshotSpecification == null ? 0 : snapshotSpecification.eventCountThreshold(); + } + } public class LongLivedReadModelSpecificationImpl implements LongLivedReadModelSpecification { diff --git a/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/readmodels/LiveModelSpecificationAccessor.java b/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/readmodels/LiveModelSpecificationAccessor.java new file mode 100644 index 0000000..33ce332 --- /dev/null +++ b/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/readmodels/LiveModelSpecificationAccessor.java @@ -0,0 +1,38 @@ +/* + * Sliceworkz Event Modeling - an opinionated Event Modeling framework in Java + * Copyright © 2025 Sliceworkz / XTi (info@sliceworkz.org) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see . + */ +package org.sliceworkz.eventmodeling.module.readmodels; + +import org.sliceworkz.eventmodeling.readmodels.ReadModelWithMetaData; +import org.sliceworkz.eventmodeling.snapshots.SnapshotStorage; + +/** + * Internal accessor interface for live model specification data needed by ReadModelModule. + */ +public interface LiveModelSpecificationAccessor { + + Class> readModelClass ( ); + + SnapshotStorage snapshotStorage ( ); + + boolean readSnapshots ( ); + + boolean writeSnapshots ( ); + + int snapshotEventCountThreshold ( ); + +} diff --git a/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/readmodels/ProjectLiveModelCommand.java b/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/readmodels/ProjectLiveModelCommand.java index b516468..ae0d611 100644 --- a/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/readmodels/ProjectLiveModelCommand.java +++ b/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/readmodels/ProjectLiveModelCommand.java @@ -29,8 +29,12 @@ import org.sliceworkz.eventmodeling.module.boundedcontext.KernelEvent; import org.sliceworkz.eventmodeling.module.boundedcontext.PerformanceLogger; import org.sliceworkz.eventmodeling.module.boundedcontext.PerformanceLogger.Metrics; +import org.sliceworkz.eventmodeling.module.readmodels.ReadModelModule.LiveModelInfo; import org.sliceworkz.eventmodeling.readmodels.ReadModel; import org.sliceworkz.eventmodeling.readmodels.ReadModelWithMetaData; +import org.sliceworkz.eventmodeling.snapshots.SnapshotCapable; +import org.sliceworkz.eventmodeling.snapshots.SnapshotStorage; +import org.sliceworkz.eventstore.events.EventReference; import org.sliceworkz.eventstore.projection.Projector; import org.sliceworkz.eventstore.projection.Projector.ProjectorMetrics; import org.sliceworkz.eventstore.stream.EventSource; @@ -38,27 +42,29 @@ class ProjectLiveModelCommand implements Command { private Logger LOGGER = LoggerFactory.getLogger(ProjectLiveModelCommand.class); - + private Class> readModelClass; private Object[] constructorParams; private EventSource eventSource; private String boundedContext; private Instance instance; - + private LiveModelInfo liveModelInfo; + private ReadModelWithMetaData readModel; - - public ProjectLiveModelCommand ( String boundedContext, Instance instance, EventSource eventSource, Class> readModelClass, Object... constructorParams ) { + + public ProjectLiveModelCommand ( String boundedContext, Instance instance, EventSource eventSource, Class> readModelClass, LiveModelInfo liveModelInfo, Object... constructorParams ) { this.boundedContext = boundedContext; this.instance = instance; this.eventSource = eventSource; this.readModelClass = readModelClass; + this.liveModelInfo = liveModelInfo; this.constructorParams = constructorParams; } - + public ReadModelWithMetaData readModel ( ) { return readModel; } - + @SuppressWarnings("unchecked") @Override public CommandResult execute(CommandContext context) { @@ -66,19 +72,51 @@ public CommandResult execute(CommandContext result = context.noDecisionModels(); try { readModel = (ReadModelWithMetaData) selectConstructor(readModelClass, constructorParams).newInstance(constructorParams); - Projector projector = Projector.from(eventSource).towards(readModel).build(); + + EventReference lastEventReference = null; + + // Load snapshot if configured and read model implements SnapshotCapable + if ( liveModelInfo.readSnapshots() && readModel instanceof SnapshotCapable snapshotCapable ) { + String key = snapshotCapable.key(readModel.readmodelName(), constructorParams); + String version = snapshotCapable.version(); + var loadedSnapshot = liveModelInfo.snapshotStorage().load(key, version); + if ( loadedSnapshot.isPresent() ) { + liveModelInfo.counterSnapshotRead().increment(); + ((SnapshotCapable) snapshotCapable).fromSnapshot(loadedSnapshot.get().snapshot()); + lastEventReference = loadedSnapshot.get().lastEventReference(); + } + } + + // Replay events — starting after snapshot's last event reference if available + Projector projector = Projector.from(eventSource).towards(readModel).startingAfter(lastEventReference).build(); ProjectorMetrics projectorMetrics = projector.run(); + + // Save snapshot if configured and threshold met + saveSnapshotIfNeeded(projectorMetrics); + long finish = System.currentTimeMillis(); long duration = finish - start; Metrics metrics = map(duration, projectorMetrics); PerformanceLogger.entry().context(boundedContext).instance(instance).metrics(metrics).type("readmodel.live").readmodel(readModel.readmodelName()).log(); - return result; + return result; } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { LOGGER.error(e.getMessage(), e); throw new RuntimeException(e); } } - + + @SuppressWarnings("unchecked") + private void saveSnapshotIfNeeded ( ProjectorMetrics projectorMetrics ) { + SnapshotStorage snapshotStorageForWrite = liveModelInfo.snapshotStorageForWrite(); + if ( snapshotStorageForWrite != null + && projectorMetrics.eventsStreamed() >= liveModelInfo.snapshotEventCountThreshold() + && readModel instanceof SnapshotCapable snapshotCapable ) { + String key = snapshotCapable.key(readModel.readmodelName(), constructorParams); + snapshotStorageForWrite.save(key, snapshotCapable.version(), ((SnapshotCapable) snapshotCapable).takeSnapshot(), projectorMetrics.lastEventReference()); + liveModelInfo.counterSnapshotWrite().increment(); + } + } + Constructor selectConstructor ( Class readModelClass, Object[] constructorParams ) { for ( Constructor ctr : readModelClass.getDeclaredConstructors() ) { // TODO improve constructor selection, not only on parameter count but also on type! @@ -88,9 +126,9 @@ Constructor selectConstructor ( Class readModelClass, Object[] constructor } throw new IllegalArgumentException("no public constructur found on " + readModelClass + " for parameters " + constructorParams); } - + private Metrics map ( long duration, ProjectorMetrics projectorMetrics ) { return new Metrics(duration, projectorMetrics.queriesDone(), projectorMetrics.eventsStreamed(), projectorMetrics.eventsHandled(), projectorMetrics.lastEventReference()); } - -} \ No newline at end of file + +} diff --git a/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/readmodels/ProjectLiveModelUnboundedCommand.java b/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/readmodels/ProjectLiveModelUnboundedCommand.java index 65731b6..e3af846 100644 --- a/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/readmodels/ProjectLiveModelUnboundedCommand.java +++ b/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/readmodels/ProjectLiveModelUnboundedCommand.java @@ -26,8 +26,12 @@ import org.sliceworkz.eventmodeling.module.boundedcontext.KernelEvent; import org.sliceworkz.eventmodeling.module.boundedcontext.PerformanceLogger; import org.sliceworkz.eventmodeling.module.boundedcontext.PerformanceLogger.Metrics; +import org.sliceworkz.eventmodeling.module.readmodels.ReadModelModule.LiveModelInfo; import org.sliceworkz.eventmodeling.readmodels.ReadModel; import org.sliceworkz.eventmodeling.readmodels.ReadModelWithMetaData; +import org.sliceworkz.eventmodeling.snapshots.SnapshotCapable; +import org.sliceworkz.eventmodeling.snapshots.SnapshotStorage; +import org.sliceworkz.eventstore.events.EventReference; import org.sliceworkz.eventstore.projection.Projector; import org.sliceworkz.eventstore.projection.Projector.ProjectorMetrics; import org.sliceworkz.eventstore.stream.EventSource; @@ -39,21 +43,23 @@ public class ProjectLiveModelUnboundedCommand implements Com private EventSource eventSource; private String boundedContext; private Instance instance; - + private LiveModelInfo liveModelInfo; + private ReadModelWithMetaData readModel; - - public ProjectLiveModelUnboundedCommand ( String boundedContext, Instance instance, EventSource eventSource, Class> readModelClass, Object... constructorParams ) { + + public ProjectLiveModelUnboundedCommand ( String boundedContext, Instance instance, EventSource eventSource, Class> readModelClass, LiveModelInfo liveModelInfo, Object... constructorParams ) { this.boundedContext = boundedContext; this.instance = instance; this.eventSource = eventSource; this.readModelClass = readModelClass; + this.liveModelInfo = liveModelInfo; this.constructorParams = constructorParams; } - + public ReadModelWithMetaData readModel ( ) { return readModel; } - + @SuppressWarnings("unchecked") @Override public CommandResult execute(CommandContext context) { @@ -61,16 +67,48 @@ public CommandResult execute(CommandContext result = context.noDecisionModels(); try { readModel = (ReadModelWithMetaData) readModelClass.getDeclaredConstructors()[0].newInstance(constructorParams); - Projector projector = Projector.from(eventSource).towards(readModel).build(); + + EventReference lastEventReference = null; + + // Load snapshot if configured and read model implements SnapshotCapable + if ( liveModelInfo.readSnapshots() && readModel instanceof SnapshotCapable snapshotCapable ) { + String key = snapshotCapable.key(readModel.readmodelName(), constructorParams); + String version = snapshotCapable.version(); + var loadedSnapshot = liveModelInfo.snapshotStorage().load(key, version); + if ( loadedSnapshot.isPresent() ) { + liveModelInfo.counterSnapshotRead().increment(); + ((SnapshotCapable) snapshotCapable).fromSnapshot(loadedSnapshot.get().snapshot()); + lastEventReference = loadedSnapshot.get().lastEventReference(); + } + } + + // Replay events — starting after snapshot's last event reference if available + Projector projector = Projector.from(eventSource).towards(readModel).startingAfter(lastEventReference).build(); ProjectorMetrics projectorMetrics = projector.run(); + + // Save snapshot if configured and threshold met + saveSnapshotIfNeeded(projectorMetrics); + long finish = System.currentTimeMillis(); long duration = finish - start; Metrics metrics = new Metrics(duration, projectorMetrics.queriesDone(), projectorMetrics.eventsStreamed(), projectorMetrics.eventsHandled(), projectorMetrics.lastEventReference()); PerformanceLogger.entry().context(boundedContext).instance(instance).metrics(metrics).type("readmodel.live").readmodel(readModel.readmodelName()).log(); - return result; + return result; } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { throw new RuntimeException(e); } } - -} \ No newline at end of file + + @SuppressWarnings("unchecked") + private void saveSnapshotIfNeeded ( ProjectorMetrics projectorMetrics ) { + SnapshotStorage snapshotStorageForWrite = liveModelInfo.snapshotStorageForWrite(); + if ( snapshotStorageForWrite != null + && projectorMetrics.eventsStreamed() >= liveModelInfo.snapshotEventCountThreshold() + && readModel instanceof SnapshotCapable snapshotCapable ) { + String key = snapshotCapable.key(readModel.readmodelName(), constructorParams); + snapshotStorageForWrite.save(key, snapshotCapable.version(), ((SnapshotCapable) snapshotCapable).takeSnapshot(), projectorMetrics.lastEventReference()); + liveModelInfo.counterSnapshotWrite().increment(); + } + } + +} diff --git a/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/readmodels/ReadModelModule.java b/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/readmodels/ReadModelModule.java index b17db30..7633858 100644 --- a/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/readmodels/ReadModelModule.java +++ b/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/readmodels/ReadModelModule.java @@ -19,6 +19,9 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.stream.Stream; import org.slf4j.Logger; @@ -33,56 +36,89 @@ import org.sliceworkz.eventmodeling.module.threading.EventuallyConsistentProcessorIdentification.Storage; import org.sliceworkz.eventmodeling.module.threading.ProcessorThreadManager; import org.sliceworkz.eventmodeling.readmodels.ReadModelWithMetaData; +import org.sliceworkz.eventmodeling.snapshots.SnapshotStorage; import org.sliceworkz.eventstore.events.Event; import org.sliceworkz.eventstore.stream.EventSource; import org.sliceworkz.eventstore.stream.EventStream; +import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.MeterRegistry; public class ReadModelModule implements LifecycleCapability { - + private static Logger LOGGER = LoggerFactory.getLogger(ReadModelModule.class); private Collection> eventuallyConsistentReadModelThreadManagers; private ProcessorThreadManager processorThreadManager; - + private BoundedContextFunctions kernelFunctions; private EventSource domainEventStream; private EventSource allInStorageEventStream; - private Collection>> liveModels = new ArrayList<>(); + private Map>, LiveModelInfo> liveModels = new HashMap<>(); private Collection> consistentReadModels = new ArrayList<>(); private Collection> eventuallyConsistentSharedReadModels = new ArrayList<>(); private Collection> eventuallyConsistentLocalReadModels = new ArrayList<>(); private String boundedContext; private Instance instance; - + private MeterRegistry meterRegistry; - - - public ReadModelModule ( + + public record LiveModelInfo ( + Class> readModelClass, + SnapshotStorage snapshotStorage, + boolean readSnapshots, + boolean writeSnapshots, + int snapshotEventCountThreshold, + Counter counterSnapshotRead, + Counter counterSnapshotWrite + ) { + + public SnapshotStorage snapshotStorageForWrite ( ) { + return writeSnapshots ? snapshotStorage : null; + } + + } + + public > ReadModelModule ( String boundedContext, EventStream domainEventStream, EventStream allInStorageEventStream, - Collection>> liveModelClasses, - Collection> consistentReadModels, - Collection> eventuallyConsistentSharedReadModels, + List liveModelSpecs, + Collection> consistentReadModels, + Collection> eventuallyConsistentSharedReadModels, Collection> eventuallyConsistentLocalReadModels, Collection> eventuallyConsistentEphemeralReadModels, Instance instance, MeterRegistry meterRegistry ) { - + this.domainEventStream = domainEventStream; this.allInStorageEventStream = allInStorageEventStream; this.boundedContext = boundedContext; this.instance = instance; - for ( Class> liveModelClass : liveModelClasses ) { - if ( this.liveModels.contains(liveModelClass)) { - LOGGER.error("multiple live readmodels of type '%s' registered".formatted(liveModelClass)); - throw new IllegalArgumentException("duplicate live readmodel %s".formatted(liveModelClass)); + for ( LMSI spec : liveModelSpecs ) { + Class> readModelClass = spec.readModelClass(); + if ( this.liveModels.containsKey(readModelClass) ) { + LOGGER.error("multiple live readmodels of type '%s' registered".formatted(readModelClass)); + throw new IllegalArgumentException("duplicate live readmodel %s".formatted(readModelClass)); } - this.liveModels.add(liveModelClass); + + io.micrometer.core.instrument.Tags tags = io.micrometer.core.instrument.Tags + .of("context", boundedContext) + .and("readmodel", readModelClass.getSimpleName()); + + Counter counterSnapshotRead = meterRegistry.counter("sliceworkz.eventmodeling.readmodel.live.snapshot.read.count", tags); + Counter counterSnapshotWrite = meterRegistry.counter("sliceworkz.eventmodeling.readmodel.live.snapshot.write.count", tags); + + this.liveModels.put(readModelClass, new LiveModelInfo<>( + readModelClass, + spec.snapshotStorage(), + spec.readSnapshots(), + spec.writeSnapshots(), + spec.snapshotEventCountThreshold(), + counterSnapshotRead, + counterSnapshotWrite)); } for ( ReadModelWithMetaData consistentReadModel : consistentReadModels ) { @@ -104,7 +140,7 @@ public ReadModelModule ( this.eventuallyConsistentReadModelThreadManagers = createEventuallyConsistentEventProcessors(eventuallyConsistentSharedReadModels, eventuallyConsistentLocalReadModels, eventuallyConsistentEphemeralReadModels); this.processorThreadManager = new ProcessorThreadManager("readmodel", this.eventuallyConsistentReadModelThreadManagers); - LOGGER.info("live readmodels: %s".formatted(liveModelClasses)); + LOGGER.info("live readmodels: %s".formatted(liveModels.keySet())); } Collection> createEventuallyConsistentEventProcessors ( Collection> shared, Collection> local, Collection> ephemeral ) { @@ -116,21 +152,22 @@ Collection> createEventual return result; } - + public void kernelFunctions ( BoundedContextFunctions kernelFunctions ) { this.kernelFunctions = kernelFunctions; } @SuppressWarnings({ "unchecked", "rawtypes" }) public T liveModel ( Class> readModelClass, Tracing tracing, Object... constructorParams) { - if ( liveModels.contains(readModelClass)) { + LiveModelInfo info = liveModels.get(readModelClass); + if ( info != null ) { io.micrometer.core.instrument.Tags tags = io.micrometer.core.instrument.Tags .of("context", boundedContext) .and("readmodel", readModelClass.getSimpleName()); meterRegistry.counter("sliceworkz.eventmodeling.readmodel.live.render", tags).increment(); return meterRegistry.timer("sliceworkz.eventmodeling.readmodel.live.duration", tags).record(()->{ - ProjectLiveModelCommand cmd = new ProjectLiveModelCommand(boundedContext, instance, domainEventStream, readModelClass, constructorParams); + ProjectLiveModelCommand cmd = new ProjectLiveModelCommand(boundedContext, instance, domainEventStream, readModelClass, info, constructorParams); kernelFunctions.executeKernelCommand(cmd, tracing); return (T) cmd.readModel(); }); @@ -139,17 +176,18 @@ public T liveModel ( Class T liveModelUnbounded ( Class> readModelClass, Tracing tracing, Object... constructorParams) { - if ( liveModels.contains(readModelClass)) { + LiveModelInfo info = liveModels.get(readModelClass); + if ( info != null ) { io.micrometer.core.instrument.Tags tags = io.micrometer.core.instrument.Tags .of("context", boundedContext) .and("readmodel", readModelClass.getSimpleName()); meterRegistry.counter("sliceworkz.eventmodeling.readmodel.live.render", tags).increment(); return meterRegistry.timer("sliceworkz.eventmodeling.readmodel.live.duration", tags).record(()->{ - ProjectLiveModelUnboundedCommand cmd = new ProjectLiveModelUnboundedCommand(boundedContext, instance, allInStorageEventStream, readModelClass, constructorParams); + ProjectLiveModelUnboundedCommand cmd = new ProjectLiveModelUnboundedCommand(boundedContext, instance, allInStorageEventStream, readModelClass, info, constructorParams); kernelFunctions.executeKernelCommand(cmd, tracing); return (T) cmd.readModel(); }); @@ -176,7 +214,7 @@ public void updateSharedConsistentModels ( Stream> events ) { // TODO implement preloading of (inmemory) models upon kernel start } - + @Override public void start ( ) { this.processorThreadManager.start(); @@ -186,10 +224,10 @@ public void start ( ) { public void stop ( ) { this.processorThreadManager.stop(); } - + @Override public void terminate ( ) { this.processorThreadManager.terminate(); } -} \ No newline at end of file +} diff --git a/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/snapshots/LiveModelSnapshotSpecificationImpl.java b/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/snapshots/LiveModelSnapshotSpecificationImpl.java new file mode 100644 index 0000000..96ed6e7 --- /dev/null +++ b/sliceworkz-eventmodeling-impl/src/main/java/org/sliceworkz/eventmodeling/module/snapshots/LiveModelSnapshotSpecificationImpl.java @@ -0,0 +1,93 @@ +/* + * Sliceworkz Event Modeling - an opinionated Event Modeling framework in Java + * Copyright © 2025 Sliceworkz / XTi (info@sliceworkz.org) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see . + */ +package org.sliceworkz.eventmodeling.module.snapshots; + +import org.sliceworkz.eventmodeling.boundedcontext.BoundedContextBuilder; +import org.sliceworkz.eventmodeling.module.snapshots.SnapshotSpecificationImpl.READ_AND_OR_WRITE; +import org.sliceworkz.eventmodeling.snapshots.LiveModelSnapshotSpecification; +import org.sliceworkz.eventmodeling.snapshots.SnapshotStorage; + +public class LiveModelSnapshotSpecificationImpl implements LiveModelSnapshotSpecification { + + public static final int DEFAULT_EVENT_COUNT_THRESHOLD = 100; + + private SnapshotStorage snapshotStorage; + private READ_AND_OR_WRITE readAndOrWrite = READ_AND_OR_WRITE.READ_WRITE; + private int eventCountThreshold = DEFAULT_EVENT_COUNT_THRESHOLD; + private BoundedContextBuilder parent; + + public LiveModelSnapshotSpecificationImpl ( BoundedContextBuilder parent, SnapshotStorage snapshotStorage ) { + this.parent = parent; + this.snapshotStorage = snapshotStorage; + if ( snapshotStorage != null ) { + this.readAndOrWrite = READ_AND_OR_WRITE.READ_WRITE; + } else { + this.readAndOrWrite = READ_AND_OR_WRITE.NO_READ_NO_WRITE; + } + } + + @Override + public LiveModelSnapshotSpecification eventCountThreshold ( int eventCountThreshold ) { + validateSnapshotStorage(); + if ( eventCountThreshold <= 0 ) { + throw new IllegalArgumentException("eventCountThreshold must be above 0"); + } + this.eventCountThreshold = eventCountThreshold; + return this; + } + + @Override + public BoundedContextBuilder readAndWrite ( ) { + validateSnapshotStorage(); + this.readAndOrWrite = READ_AND_OR_WRITE.READ_WRITE; + return parent; + } + + @Override + public BoundedContextBuilder readOnly ( ) { + validateSnapshotStorage(); + this.readAndOrWrite = READ_AND_OR_WRITE.READ_ONLY; + return parent; + } + + @Override + public BoundedContextBuilder writeOnly ( ) { + validateSnapshotStorage(); + this.readAndOrWrite = READ_AND_OR_WRITE.WRITE_ONLY; + return parent; + } + + private void validateSnapshotStorage ( ) { + if ( snapshotStorage == null ) { + throw new IllegalArgumentException("snapshot storage cannot be null"); + } + } + + public READ_AND_OR_WRITE readAndOrWrite ( ) { + return readAndOrWrite; + } + + public SnapshotStorage snapshotStorage ( ) { + return snapshotStorage; + } + + public int eventCountThreshold ( ) { + return eventCountThreshold; + } + +} diff --git a/sliceworkz-eventmodeling-tests-inmem/src/test/java/org/sliceworkz/eventmodeling/module/readmodels/LiveModelSnapshotTest.java b/sliceworkz-eventmodeling-tests-inmem/src/test/java/org/sliceworkz/eventmodeling/module/readmodels/LiveModelSnapshotTest.java new file mode 100644 index 0000000..a5e4308 --- /dev/null +++ b/sliceworkz-eventmodeling-tests-inmem/src/test/java/org/sliceworkz/eventmodeling/module/readmodels/LiveModelSnapshotTest.java @@ -0,0 +1,317 @@ +/* + * Sliceworkz Event Modeling - an opinionated Event Modeling framework in Java + * Copyright © 2025 Sliceworkz / XTi (info@sliceworkz.org) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see . + */ +package org.sliceworkz.eventmodeling.module.readmodels; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.sliceworkz.eventmodeling.boundedcontext.BoundedContext; +import org.sliceworkz.eventmodeling.events.InstanceFactory; +import org.sliceworkz.eventmodeling.mock.boundedcontext.AbstractMockDomainTest; +import org.sliceworkz.eventmodeling.mock.boundedcontext.MockBoundedContext; +import org.sliceworkz.eventmodeling.mock.boundedcontext.MockDomainEvent; +import org.sliceworkz.eventmodeling.mock.boundedcontext.MockDomainEvent.FirstDomainEvent; +import org.sliceworkz.eventmodeling.mock.boundedcontext.MockInboundEvent; +import org.sliceworkz.eventmodeling.mock.boundedcontext.MockOutboundEvent; +import org.sliceworkz.eventmodeling.readmodels.ReadModel; +import org.sliceworkz.eventmodeling.snapshots.SnapshotCapable; +import org.sliceworkz.eventmodeling.snapshots.SnapshotStorage; +import org.sliceworkz.eventstore.events.EventReference; +import org.sliceworkz.eventstore.infra.inmem.InMemoryEventStorage; +import org.sliceworkz.eventstore.query.EventQuery; +import org.sliceworkz.eventstore.query.EventTypesFilter; +import org.sliceworkz.eventstore.events.Tags; +import org.sliceworkz.eventstore.spi.EventStorage; + +public class LiveModelSnapshotTest extends AbstractMockDomainTest { + + private EventStorage eventStorage; + + private MockLiveModelSnapshotStorage snapshotStorage = new MockLiveModelSnapshotStorage(); + + @BeforeEach + protected void setUp ( ) { + super.setUp(); + SnapshotLiveModel.VERSION = "v1"; + this.eventStorage = createEventStorage(); + } + + @AfterEach + protected void tearDown ( ) { + destroyEventStorage(eventStorage); + if ( boundedContext() != null ) { + boundedContext().stop(); + } + } + + public EventStorage createEventStorage ( ) { + return InMemoryEventStorage.newBuilder().build(); + } + + public void destroyEventStorage ( EventStorage storage ) { + + } + + @Test + void testLiveModelWithoutSnapshots ( ) { + MockBoundedContext domain = domainWithLiveModel(0); + + for ( int i = 0; i < 100; i++ ) { + domain.event(new FirstDomainEvent("test " + i)); + } + + SnapshotLiveModel model = domain.read(SnapshotLiveModel.class, "myModel"); + assertEquals(100, model.getCounter()); + assertEquals(0, snapshotStorage.getSaveInvokes()); + } + + @Test + void testLiveModelSnapshots ( ) { + MockBoundedContext domain = domainWithLiveModel(5); + + // Produce 10 events + for ( int i = 0; i < 10; i++ ) { + domain.event(new FirstDomainEvent("test " + i)); + } + + // First read: projects all 10 events, saves snapshot (10 >= threshold of 5) + SnapshotLiveModel model = domain.read(SnapshotLiveModel.class, "myModel"); + assertEquals(10, model.getCounter()); + assertEquals(1, snapshotStorage.getSaveInvokes()); + + // Second read: loads snapshot, replays 0 events on top (no new events) + model = domain.read(SnapshotLiveModel.class, "myModel"); + assertEquals(10, model.getCounter()); + assertEquals(0, model.getEventsOnTopOfSnapshot()); + + // Add 3 more events — below threshold + for ( int i = 10; i < 13; i++ ) { + domain.event(new FirstDomainEvent("test " + i)); + } + + // Third read: loads snapshot, replays 3 events on top (3 < 5, no new snapshot) + model = domain.read(SnapshotLiveModel.class, "myModel"); + assertEquals(13, model.getCounter()); + assertEquals(3, model.getEventsOnTopOfSnapshot()); + assertEquals(1, snapshotStorage.getSaveInvokes()); // still 1 from the first save + + // Add 7 more events — now 10 new events since snapshot + for ( int i = 13; i < 20; i++ ) { + domain.event(new FirstDomainEvent("test " + i)); + } + + // Fourth read: loads snapshot, replays 10 events on top (10 >= 5, saves new snapshot) + model = domain.read(SnapshotLiveModel.class, "myModel"); + assertEquals(20, model.getCounter()); + assertEquals(10, model.getEventsOnTopOfSnapshot()); + assertEquals(2, snapshotStorage.getSaveInvokes()); + + // Fifth read: loads latest snapshot, replays 0 events + model = domain.read(SnapshotLiveModel.class, "myModel"); + assertEquals(20, model.getCounter()); + assertEquals(0, model.getEventsOnTopOfSnapshot()); + } + + @Test + void testLiveModelSnapshotsChangingVersion ( ) { + MockBoundedContext domain = domainWithLiveModel(5); + + // Produce 10 events and read to create first snapshot + for ( int i = 0; i < 10; i++ ) { + domain.event(new FirstDomainEvent("test " + i)); + } + SnapshotLiveModel model = domain.read(SnapshotLiveModel.class, "myModel"); + assertEquals(10, model.getCounter()); + assertEquals(1, snapshotStorage.getSaveInvokes()); + + // Change version — snapshot should not be loaded + SnapshotLiveModel.VERSION = "v" + UUID.randomUUID().toString(); + + model = domain.read(SnapshotLiveModel.class, "myModel"); + assertEquals(10, model.getCounter()); + assertEquals(10, model.getEventsOnTopOfSnapshot()); // full replay since version mismatch + assertEquals(2, snapshotStorage.getSaveInvokes()); // new snapshot saved with new version + + // Revert version — latest snapshot with original version is still there + SnapshotLiveModel.VERSION = "v1"; + model = domain.read(SnapshotLiveModel.class, "myModel"); + assertEquals(10, model.getCounter()); + assertEquals(0, model.getEventsOnTopOfSnapshot()); // snapshot with v1 was loaded + } + + @Test + void testLiveModelSnapshotReadOnly ( ) { + // First, create a domain with readAndWrite to establish a snapshot + MockBoundedContext domain = domainWithLiveModelMode(5, "readAndWrite"); + + for ( int i = 0; i < 10; i++ ) { + domain.event(new FirstDomainEvent("test " + i)); + } + SnapshotLiveModel model = domain.read(SnapshotLiveModel.class, "myModel"); + assertEquals(10, model.getCounter()); + assertEquals(1, snapshotStorage.getSaveInvokes()); + domain.stop(); + + // Now create a new domain with readOnly mode + MockBoundedContext domain2 = domainWithLiveModelMode(5, "readOnly"); + model = domain2.read(SnapshotLiveModel.class, "myModel"); + assertEquals(10, model.getCounter()); + assertEquals(0, model.getEventsOnTopOfSnapshot()); // snapshot was loaded + assertEquals(1, snapshotStorage.getSaveInvokes()); // no new save in readOnly + } + + @Test + void testLiveModelSnapshotWriteOnly ( ) { + MockBoundedContext domain = domainWithLiveModelMode(5, "writeOnly"); + + for ( int i = 0; i < 10; i++ ) { + domain.event(new FirstDomainEvent("test " + i)); + } + SnapshotLiveModel model = domain.read(SnapshotLiveModel.class, "myModel"); + assertEquals(10, model.getCounter()); + assertEquals(10, model.getEventsOnTopOfSnapshot()); // full replay in writeOnly + assertEquals(1, snapshotStorage.getSaveInvokes()); // but snapshot was saved + } + + + MockBoundedContext domainWithLiveModel ( int snapshotAfterEventCount ) { + return domainWithLiveModelMode(snapshotAfterEventCount, "readAndWrite"); + } + + MockBoundedContext domainWithLiveModelMode ( int snapshotAfterEventCount, String mode ) { + + var builder = BoundedContext.newBuilder(MockDomainEvent.class, MockInboundEvent.class, MockOutboundEvent.class) + .name("UnitTestBoundedContext") + .eventStorage(eventStorage) + .instance(InstanceFactory.determine("unittests")); + + if ( snapshotAfterEventCount == 0 ) { + builder.readmodel(SnapshotLiveModel.class).live(); + } else { + var snapshotSpec = builder.readmodel(SnapshotLiveModel.class).snapshots(snapshotStorage).eventCountThreshold(snapshotAfterEventCount); + switch ( mode ) { + case "readOnly" -> snapshotSpec.readOnly(); + case "writeOnly" -> snapshotSpec.writeOnly(); + default -> snapshotSpec.readAndWrite(); + } + } + + return buildBoundedContext(builder); + } + +} + +class SnapshotLiveModel implements ReadModel, SnapshotCapable { + + public static String VERSION = "v1"; + + private String name; + private int counter; + private int eventsOnTopOfSnapshot; + + public SnapshotLiveModel ( String name ) { + this.name = name; + } + + @Override + public EventQuery eventQuery ( ) { + return EventQuery.forEvents(EventTypesFilter.any(), Tags.none()); + } + + @Override + public void when ( MockDomainEvent event ) { + counter++; + eventsOnTopOfSnapshot++; + } + + @Override + public String readmodelName ( ) { + return name; + } + + public int getCounter ( ) { + return counter; + } + + public int getEventsOnTopOfSnapshot ( ) { + return eventsOnTopOfSnapshot; + } + + // --- SnapshotCapable --- + + @Override + public SnapshotData takeSnapshot ( ) { + return new SnapshotData(counter); + } + + @Override + public void fromSnapshot ( SnapshotData snapshot ) { + this.counter = snapshot.counter; + this.eventsOnTopOfSnapshot = 0; + } + + @Override + public String version ( ) { + return VERSION; + } + + public static class SnapshotData { + private int counter; + + public SnapshotData ( ) { + } + + public SnapshotData ( int counter ) { + this.counter = counter; + } + + public SnapshotData clone ( ) { + return new SnapshotData(counter); + } + } + +} + +class MockLiveModelSnapshotStorage implements SnapshotStorage { + + private int saveInvokes; + private Map> snapshots = new HashMap<>(); + + @Override + public Optional> load ( String key, String version ) { + return Optional.ofNullable(snapshots.get(key + version)); + } + + @Override + public void save ( String key, String version, SnapshotLiveModel.SnapshotData snapshot, EventReference lastEventReference ) { + saveInvokes++; + snapshots.put(key + version, new SnapshotRecord<>(snapshot.clone(), lastEventReference)); + } + + public int getSaveInvokes ( ) { + return saveInvokes; + } + +} From 8ba402dcca14dc4aec7f1c09520a05b881d599a0 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 12 Feb 2026 18:24:05 +0000 Subject: [PATCH 3/3] Disambiguate null Tags arguments in SnapshotCapableTest Explicitly cast null to (Tags) in key() calls to avoid ambiguity with the new key(String, Object...) overload for live model snapshots. https://claude.ai/code/session_01CxJGnsJcX4mJtMxGHrWqH3 --- .../eventmodeling/snapshots/SnapshotCapableTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sliceworkz-eventmodeling-api/src/test/java/org/sliceworkz/eventmodeling/snapshots/SnapshotCapableTest.java b/sliceworkz-eventmodeling-api/src/test/java/org/sliceworkz/eventmodeling/snapshots/SnapshotCapableTest.java index f38ac3d..70ad1fc 100644 --- a/sliceworkz-eventmodeling-api/src/test/java/org/sliceworkz/eventmodeling/snapshots/SnapshotCapableTest.java +++ b/sliceworkz-eventmodeling-api/src/test/java/org/sliceworkz/eventmodeling/snapshots/SnapshotCapableTest.java @@ -93,7 +93,7 @@ void testKeyWithNullName() { @Test void testKeyWithNullIdentity() { - String key = snapshotCapable.key("Account", null); + String key = snapshotCapable.key("Account", (Tags) null); assertEquals("Account", key); } @@ -162,7 +162,7 @@ void testKeyWithComplexScenario() { @Test void testKeyWithNullNameAndNullIdentity() { - String key = snapshotCapable.key(null, null); + String key = snapshotCapable.key(null, (Tags) null); assertEquals("", key); }