Skip to content

Commit d51d1bb

Browse files
committed
Merge branch 'release/0.5' into develop
2 parents cfe9ef0 + 10599a4 commit d51d1bb

File tree

12 files changed

+156
-29
lines changed

12 files changed

+156
-29
lines changed

sliceworkz-eventstore-api/src/main/java/org/sliceworkz/eventstore/events/EphemeralEvent.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
* @see Event
5555
* @see Tags
5656
*/
57-
public record EphemeralEvent<DOMAIN_EVENT_TYPE> ( EventType type, DOMAIN_EVENT_TYPE data, Tags tags ) {
57+
public record EphemeralEvent<DOMAIN_EVENT_TYPE> ( EventType type, DOMAIN_EVENT_TYPE data, Tags tags, String idempotencyKey ) {
5858

5959
/**
6060
* Constructs an EphemeralEvent with validation of all required fields.
@@ -67,7 +67,7 @@ public record EphemeralEvent<DOMAIN_EVENT_TYPE> ( EventType type, DOMAIN_EVENT_T
6767
* @param tags the tags for this event (required, use Tags.none() if no tags)
6868
* @throws IllegalArgumentException if any required parameter is null
6969
*/
70-
public EphemeralEvent ( EventType type, DOMAIN_EVENT_TYPE data, Tags tags ) {
70+
public EphemeralEvent ( EventType type, DOMAIN_EVENT_TYPE data, Tags tags, String idempotencyKey ) {
7171
if ( type == null ) {
7272
throw new IllegalArgumentException("type is required on event");
7373
}
@@ -80,6 +80,7 @@ public EphemeralEvent ( EventType type, DOMAIN_EVENT_TYPE data, Tags tags ) {
8080
this.type = type;
8181
this.data = data;
8282
this.tags = tags;
83+
this.idempotencyKey = idempotencyKey;
8384
}
8485

8586
/**
@@ -92,7 +93,26 @@ public EphemeralEvent ( EventType type, DOMAIN_EVENT_TYPE data, Tags tags ) {
9293
* @return a new EphemeralEvent instance with the specified tags
9394
*/
9495
public EphemeralEvent<DOMAIN_EVENT_TYPE> withTags ( Tags tags ) {
95-
return new EphemeralEvent<>(type, data, tags);
96+
return new EphemeralEvent<>(type, data, tags, idempotencyKey);
97+
}
98+
99+
/**
100+
* Creates a copy of this ephemeral event with a different idempotency key.
101+
* <p>
102+
* All other properties remain unchanged. An idempotency key ensures that the same event
103+
* is not appended multiple times if the append operation is retried. When an event with
104+
* an idempotency key is appended a second time, the storage will silently ignore it
105+
* rather than creating a duplicate event.
106+
* <p>
107+
* <b>Important:</b> When appending multiple events in a single batch, none of them may
108+
* have an idempotency key. Idempotency keys can only be used with single-event appends.
109+
*
110+
* @param idempotencyKey the idempotency key to attach to the event, or null for no idempotency check
111+
* @return a new EphemeralEvent instance with the specified idempotency key
112+
* @see org.sliceworkz.eventstore.stream.EventStream#append(org.sliceworkz.eventstore.stream.AppendCriteria, java.util.List)
113+
*/
114+
public EphemeralEvent<DOMAIN_EVENT_TYPE> withIdempotencyKey ( String idempotencyKey ) {
115+
return new EphemeralEvent<>(type, data, tags, idempotencyKey);
96116
}
97117

98118
/**
@@ -107,7 +127,7 @@ public EphemeralEvent<DOMAIN_EVENT_TYPE> withTags ( Tags tags ) {
107127
* @return a new EphemeralEvent instance ready to be appended
108128
*/
109129
public static final <DOMAIN_EVENT_TYPE> EphemeralEvent<DOMAIN_EVENT_TYPE> of ( DOMAIN_EVENT_TYPE data, Tags tags) {
110-
return new EphemeralEvent<DOMAIN_EVENT_TYPE>(EventType.of(data), data, tags);
130+
return new EphemeralEvent<DOMAIN_EVENT_TYPE>(EventType.of(data), data, tags, null);
111131
}
112132

113133
/**

sliceworkz-eventstore-api/src/main/java/org/sliceworkz/eventstore/events/Event.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ public <OTHER_DOMAIN_EVENT_TYPE extends DOMAIN_EVENT_TYPE> Event<OTHER_DOMAIN_EV
170170
* @param timestamp the timestamp when this event was persisted
171171
* @return a new Event instance
172172
*/
173-
public static final <DOMAIN_EVENT_TYPE> Event<DOMAIN_EVENT_TYPE> of ( EventStreamId stream, EventReference reference, EventType type, EventType storedType, DOMAIN_EVENT_TYPE data, Tags tags, LocalDateTime timestamp ) {
173+
public static final <DOMAIN_EVENT_TYPE> Event<DOMAIN_EVENT_TYPE> of ( EventStreamId stream, EventReference reference, EventType type, EventType storedType, DOMAIN_EVENT_TYPE data, Tags tags, LocalDateTime timestamp) {
174174
return new Event<DOMAIN_EVENT_TYPE>(stream, type, storedType, reference, data, tags, timestamp);
175175
}
176176

sliceworkz-eventstore-api/src/main/java/org/sliceworkz/eventstore/projection/Projector.java

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -516,6 +516,7 @@ public static class Builder<EVENT_TYPE> {
516516
private EventSource<EVENT_TYPE> eventSource;
517517
private Projection<EVENT_TYPE> projection;
518518
private EventReference after;
519+
private boolean subscribe;
519520
private int maxEventsPerQuery = DEFAULT_MAX_EVENTS_PER_QUERY;
520521

521522
private BookmarkBuilder bookmarkBuilder = new BookmarkBuilder(this);
@@ -531,6 +532,31 @@ public Builder<EVENT_TYPE> from ( EventSource<EVENT_TYPE> eventSource ) {
531532
return this;
532533
}
533534

535+
/**
536+
* Configures the projector to automatically subscribe to the event source for eventually consistent updates.
537+
* <p>
538+
* When enabled, the projector will automatically re-run whenever new events are appended to the event source,
539+
* enabling near-real-time projection updates without manual polling. This is useful for:
540+
* <ul>
541+
* <li>Live dashboards and read models that need to reflect recent changes quickly</li>
542+
* <li>Reactive projections that respond to events as they arrive</li>
543+
* <li>Systems where projection staleness needs to be minimized</li>
544+
* </ul>
545+
* <p>
546+
* Note that the updates are <em>eventually consistent</em> - there may be a small delay between
547+
* event append and projection update. The subscription mechanism is optimized for efficiency
548+
* and may batch multiple events before triggering a projection run.
549+
* <p>
550+
* Without this setting, projections only update when explicitly triggered via {@link Projector#run()}.
551+
*
552+
* @return this builder for method chaining
553+
* @see EventStreamEventuallyConsistentAppendListener
554+
*/
555+
public Builder<EVENT_TYPE> subscribe ( ) {
556+
this.subscribe = true;
557+
return this;
558+
}
559+
534560
/**
535561
* Provides access to the bookmark configuration builder.
536562
* <p>
@@ -603,7 +629,12 @@ public Builder<EVENT_TYPE> inBatchesOf ( int maxEventsPerQuery ) {
603629
* @return a new Projector configured with the builder's settings
604630
*/
605631
public Projector<EVENT_TYPE> build ( ) {
606-
return new Projector<>(eventSource, projection, after, maxEventsPerQuery, bookmarkBuilder.readerName, bookmarkBuilder.tags, bookmarkBuilder.bookmarkReadFrequency);
632+
Projector<EVENT_TYPE> projector = new Projector<>(eventSource, projection, after, maxEventsPerQuery, bookmarkBuilder.readerName, bookmarkBuilder.tags, bookmarkBuilder.bookmarkReadFrequency);
633+
if ( subscribe ) {
634+
// subscribe for eventually consistent updates about event appends, so the projector will automatically trigger projection updates
635+
eventSource.subscribe(projector);
636+
}
637+
return projector;
607638
}
608639

609640
/**
@@ -973,9 +1004,25 @@ void stopBatchIfNeeded ( Optional<EventReference> lastEventReference ) {
9731004

9741005
}
9751006

1007+
/**
1008+
* Handles notifications of newly appended events when the projector is subscribed to an event source.
1009+
* <p>
1010+
* This method is called by the event source when new events are appended, triggering an automatic
1011+
* projection run to process the new events. It implements the {@link EventStreamEventuallyConsistentAppendListener}
1012+
* interface to enable reactive, near-real-time projection updates.
1013+
* <p>
1014+
* The method runs the projection and returns the reference of the last processed event. This allows
1015+
* the event source to track which events have been successfully processed by this projector.
1016+
* <p>
1017+
* This method is only invoked if the projector was configured with {@link Builder#subscribe()}.
1018+
*
1019+
* @param atLeastUntil the reference indicating events have been appended at least up to this point
1020+
* @return the reference of the last event processed by this projection run, or null if no events were processed
1021+
* @see Builder#subscribe()
1022+
*/
9761023
@Override
9771024
public EventReference eventsAppended(EventReference atLeastUntil) {
9781025
return run().lastEventReference();
9791026
}
980-
1027+
9811028
}

sliceworkz-eventstore-api/src/main/java/org/sliceworkz/eventstore/spi/EventStorage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,7 @@ interface EventStoreListener {
502502
* @see StoredEvent
503503
* @see #append(AppendCriteria, Optional, List)
504504
*/
505-
public record EventToStore ( EventStreamId stream, EventType type, String immutableData, String erasableData, Tags tags ) {
505+
public record EventToStore ( EventStreamId stream, EventType type, String immutableData, String erasableData, Tags tags, String idempotencyKey ) {
506506

507507
/**
508508
* Converts this event to a stored event by assigning a reference and timestamp.

sliceworkz-eventstore-benchmark/src/main/java/org/sliceworkz/eventstore/benchmark/BenchmarkApplication.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -120,26 +120,24 @@ public EventReference eventsAppended(EventReference atLeastUntil) {
120120
Projector<CustomerEvent> customerProjector = Projector.<CustomerEvent>newBuilder()
121121
.from(customerStream)
122122
.towards(customerProjection)
123+
.subscribe()
123124
.bookmarkProgress()
124125
.withReader("customer-projector")
125126
.readBeforeFirstExecution()
126127
.done()
127128
.build();
128-
customerStream.subscribe(customerProjector);
129-
130129

131130

132131
SupplierEventProjection supplierProjection = new SupplierEventProjection(dataSource);
133132
Projector<SupplierEvent> supplierProjector = Projector.<SupplierEvent>newBuilder()
134133
.from(supplierStream)
135134
.towards(supplierProjection)
135+
.subscribe()
136136
.bookmarkProgress()
137137
.withReader("supplier-projector")
138138
.readBeforeFirstExecution()
139139
.done()
140140
.build();
141-
customerStream.subscribe(supplierProjector);
142-
143141

144142
CustomerEventProducer cep = new CustomerEventProducer(customerStream, EVENTS_PER_PRODUCER_INSTANCE, MS_WAIT_BETWEEN_EVENTS);
145143
SupplierEventProducer sep = new SupplierEventProducer(supplierStream, EVENTS_PER_PRODUCER_INSTANCE, MS_WAIT_BETWEEN_EVENTS);

sliceworkz-eventstore-impl/src/main/java/org/sliceworkz/eventstore/impl/EventStoreImpl.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -115,14 +115,15 @@ public class EventStoreImpl implements EventStore {
115115
private final EventStorage eventStorage;
116116

117117
/**
118-
* Executor service using virtual threads for asynchronously notifying eventually consistent subscribers.
119-
* Named threads help with debugging and monitoring.
118+
* Executor service using virtual threads for asynchronously notifying eventually consistent subscribers
119+
* about new event appends. Named threads help with debugging and monitoring.
120120
*/
121121
private final ExecutorService executorServiceForEventAppends;
122122

123123
/**
124-
* Executor service using virtual threads for asynchronously notifying eventually consistent subscribers.
125-
* Named threads help with debugging and monitoring.
124+
* Executor service using virtual threads for asynchronously notifying eventually consistent subscribers
125+
* about bookmark updates. This uses a single-threaded executor to ensure bookmark notifications are
126+
* processed sequentially. Named threads help with debugging and monitoring.
126127
*/
127128
private final ExecutorService executorServiceForBookmarkUpdates;
128129

@@ -295,7 +296,7 @@ private EventToStore reduce ( EphemeralEvent<? extends EVENT_TYPE> event, EventS
295296
meterAppendEvent.increment(); // one event more sent to storage for appending
296297
Tags tags = event.tags();
297298
TypeAndSerializedPayload data = serde.serialize(event.data());
298-
return new EventToStore(streamToAppendTo, data.type(), data.immutablePayload(), data.erasablePayload(), tags);
299+
return new EventToStore(streamToAppendTo, data.type(), data.immutablePayload(), data.erasablePayload(), tags, event.idempotencyKey());
299300
}
300301

301302
private List<EventToStore> reduce ( List<? extends EphemeralEvent<? extends EVENT_TYPE>> events, EventStreamId streamToAppendTo ) {
@@ -322,6 +323,12 @@ public List<Event<EVENT_TYPE>> append(AppendCriteria appendCriteria, List<Epheme
322323
if ( !unAppendable.isEmpty() ) {
323324
throw new IllegalArgumentException("cannot append event type '%s' via this stream".formatted(unAppendable.getFirst()));
324325
}
326+
327+
if ( events.size() > 1 ) {
328+
if ( events.stream().filter(e->e.idempotencyKey()!=null).findAny().isPresent()) {
329+
throw new IllegalArgumentException("cannot append multiple events in combination with an idempotency key");
330+
}
331+
}
325332

326333
// append events to the eventstore (with optimistic locking)
327334
List<Event<EVENT_TYPE>> appendedEvents;

sliceworkz-eventstore-infra-inmem/src/main/java/org/sliceworkz/eventstore/infra/inmem/InMemoryEventStorage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ public static Builder newBuilder ( ) {
213213
* @see InMemoryEventStorageImpl
214214
*/
215215
public EventStorage build ( ) {
216-
return new InMemoryEventStorageImpl(limit, meterRegistry);
216+
return new InMemoryEventStorageImpl(limit);
217217
}
218218

219219
/**

sliceworkz-eventstore-infra-inmem/src/main/java/org/sliceworkz/eventstore/infra/inmem/InMemoryEventStorageImpl.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@
2222
import java.util.ArrayList;
2323
import java.util.Collections;
2424
import java.util.HashMap;
25+
import java.util.HashSet;
2526
import java.util.LinkedList;
2627
import java.util.List;
2728
import java.util.Map;
2829
import java.util.Optional;
30+
import java.util.Set;
2931
import java.util.concurrent.CopyOnWriteArrayList;
3032
import java.util.stream.Collectors;
3133
import java.util.stream.Stream;
@@ -45,8 +47,6 @@
4547
import com.fasterxml.jackson.databind.JsonMappingException;
4648
import com.fasterxml.jackson.databind.json.JsonMapper;
4749

48-
import io.micrometer.core.instrument.MeterRegistry;
49-
5050
/**
5151
* Thread-safe in-memory implementation of the {@link EventStorage} interface.
5252
* <p>
@@ -98,11 +98,11 @@ public class InMemoryEventStorageImpl implements EventStorage {
9898

9999
private String name;
100100
private List<StoredEvent> eventlog = new CopyOnWriteArrayList<>();
101+
private Set<String> idempotencyKeys = new HashSet<>();
101102
private List<WeakReference<EventStoreListener>> listeners = new CopyOnWriteArrayList<>();
102103
private Map<String,EventReference> bookmarks = new HashMap<>();
103104
private JsonMapper jsonMapper;
104105
private Limit absoluteLimit;
105-
private MeterRegistry meterRegistry;
106106

107107
/**
108108
* Constructs a new in-memory event storage instance with the specified absolute query limit and observability support.
@@ -117,19 +117,16 @@ public class InMemoryEventStorageImpl implements EventStorage {
117117
* <li>An empty list of event listeners</li>
118118
* <li>An empty bookmark map</li>
119119
* <li>A Jackson {@link JsonMapper} with auto-discovered modules for event serialization validation</li>
120-
* <li>A Micrometer meter registry for collecting metrics about storage operations</li>
121120
* </ul>
122121
*
123122
* @param absoluteLimit the absolute limit on query results, or {@link Limit#none()} for no limit
124-
* @param meterRegistry the Micrometer meter registry for collecting observability metrics
125123
* @see InMemoryEventStorage.Builder#build()
126124
*/
127-
public InMemoryEventStorageImpl ( Limit absoluteLimit, MeterRegistry meterRegistry ) {
125+
public InMemoryEventStorageImpl ( Limit absoluteLimit ) {
128126
this.name = "inmem-%s".formatted(System.identityHashCode(this)); // unique name in case different objects are used
129127
this.jsonMapper = new JsonMapper();
130128
this.jsonMapper.findAndRegisterModules();
131129
this.absoluteLimit = absoluteLimit;
132-
this.meterRegistry = meterRegistry;
133130
}
134131

135132
/**
@@ -333,7 +330,7 @@ private void verifyPersistableJson ( List<EventToStore> newEvents ) {
333330
}
334331

335332
private List<StoredEvent> addAndNotifyListeners ( List<EventToStore> events ) {
336-
var addedEvents = events.stream().map(this::addEventToEventLog).toList();
333+
var addedEvents = events.stream().map(this::addEventToEventLog).filter(e->e!=null).toList();
337334

338335
// notify each Listener about the appends, but if multiple Events were appended, only notify about the last one
339336
addedEvents.stream()
@@ -353,6 +350,14 @@ private List<StoredEvent> addAndNotifyListeners ( List<EventToStore> events ) {
353350
}
354351

355352
private StoredEvent addEventToEventLog ( EventToStore event ) {
353+
354+
if ( event.idempotencyKey() != null ) {
355+
if ( idempotencyKeys.contains(event.idempotencyKey())) {
356+
return null;
357+
}
358+
idempotencyKeys.add(event.idempotencyKey());
359+
}
360+
356361
int posAndTxAsWell = eventlog.size()+1;
357362
EventReference reference = EventReference.create(posAndTxAsWell, posAndTxAsWell);
358363
StoredEvent storedEvent = event.positionAt(reference, LocalDateTime.now());

sliceworkz-eventstore-infra-postgres/src/main/java/org/sliceworkz/eventstore/infra/postgres/PostgresEventStorageImpl.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.sql.Statement;
2929
import java.sql.Timestamp;
3030
import java.util.ArrayList;
31+
import java.util.Collections;
3132
import java.util.Iterator;
3233
import java.util.List;
3334
import java.util.Optional;
@@ -648,12 +649,12 @@ private void addEventQueryFiltering(StringBuilder sqlBuilder, List<Object> param
648649
public List<StoredEvent> append(AppendCriteria appendCriteria, Optional<EventStreamId> streamId, List<EventToStore> events) {
649650
// Build conditional insert with optimistic locking check
650651
StringBuilder sqlBuilder = new StringBuilder();
651-
sqlBuilder.append("INSERT INTO %sevents (event_id, stream_context, stream_purpose, event_type, event_data, event_erasable_data, event_tags) SELECT * FROM ( VALUES ".formatted(prefix));
652+
sqlBuilder.append("INSERT INTO %sevents (event_id, idempotency_key, stream_context, stream_purpose, event_type, event_data, event_erasable_data, event_tags) SELECT * FROM ( VALUES ".formatted(prefix));
652653
for ( int i = 0; i < events.size(); i++ ) {
653654
if ( i > 0 ) {
654655
sqlBuilder.append(", ");
655656
}
656-
sqlBuilder.append("(?::uuid, ?, ?, ?, ?::jsonb, ?::jsonb, ?)");
657+
sqlBuilder.append("(?::uuid, ?, ?, ?, ?, ?::jsonb, ?::jsonb, ?)");
657658
}
658659
sqlBuilder.append(")");
659660

@@ -668,6 +669,7 @@ public List<StoredEvent> append(AppendCriteria appendCriteria, Optional<EventStr
668669

669670
// Add to-be-appended-event parameters first
670671
parameters.add(id.value());
672+
parameters.add(event.idempotencyKey());
671673
parameters.add(event.stream().context());
672674
parameters.add(event.stream().purpose());
673675
parameters.add(event.type().name());
@@ -764,7 +766,13 @@ WHERE NOT EXISTS (
764766
return storedEvents;
765767
} catch (SQLException e) {
766768
writeConnection.rollback();
767-
throw new EventStorageException("SQLException during append", e);
769+
770+
// idempotency issue
771+
if ( e.getMessage().contains("idempotency_key")) {
772+
return Collections.emptyList();
773+
} else {
774+
throw new EventStorageException("SQLException during append", e);
775+
}
768776
}
769777

770778
} catch (SQLException e) {

sliceworkz-eventstore-infra-postgres/src/main/quickstart/quickstart.ddl.sql

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ CREATE TABLE events (
4040
-- Event identification
4141
event_id UUID NOT NULL UNIQUE,
4242

43+
-- Idempotency key
44+
idempotency_key TEXT UNIQUE,
45+
4346
-- Stream identification
4447
stream_context TEXT NOT NULL,
4548
stream_purpose TEXT NOT NULL DEFAULT '',

0 commit comments

Comments
 (0)