Skip to content

Commit 5991257

Browse files
authored
Merge pull request #935 from J-Tech-Japan/codex/serializable-maxcount-cache-batch
Add maxCount support for serializable reads and batch cache sync
2 parents 92676a5 + 74f8dfd commit 5991257

File tree

6 files changed

+372
-80
lines changed

6 files changed

+372
-80
lines changed

dcb/src/Sekiban.Dcb.Core/Storage/IEventStore.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,16 @@ public interface IEventStore
7676
Task<ResultBox<IEnumerable<SerializableEvent>>> ReadAllSerializableEventsAsync(SortableUniqueId? since = null)
7777
=> throw new NotSupportedException("SerializableEvent read not implemented");
7878

79+
/// <summary>
80+
/// Reads all events as SerializableEvent (no payload deserialization).
81+
/// </summary>
82+
/// <param name="since">Optional: Only return events after this ID</param>
83+
/// <param name="maxCount">Optional: Maximum number of events to return</param>
84+
Task<ResultBox<IEnumerable<SerializableEvent>>> ReadAllSerializableEventsAsync(
85+
SortableUniqueId? since,
86+
int? maxCount)
87+
=> ReadAllSerializableEventsAsync(since);
88+
7989
/// <summary>
8090
/// Reads events for a specific tag as SerializableEvent (no payload deserialization).
8191
/// </summary>

dcb/src/Sekiban.Dcb.CosmosDb/CosmosDbEventStore.cs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1077,6 +1077,12 @@ public async Task<ResultBox<IEnumerable<TagInfo>>> GetAllTagsAsync(string? tagGr
10771077
/// Reads all events as SerializableEvent (no payload deserialization).
10781078
/// </summary>
10791079
public async Task<ResultBox<IEnumerable<SerializableEvent>>> ReadAllSerializableEventsAsync(SortableUniqueId? since = null)
1080+
=> await ReadAllSerializableEventsAsync(since, null).ConfigureAwait(false);
1081+
1082+
/// <summary>
1083+
/// Reads all events as SerializableEvent (no payload deserialization).
1084+
/// </summary>
1085+
public async Task<ResultBox<IEnumerable<SerializableEvent>>> ReadAllSerializableEventsAsync(SortableUniqueId? since, int? maxCount)
10801086
{
10811087
try
10821088
{
@@ -1103,14 +1109,27 @@ public async Task<ResultBox<IEnumerable<SerializableEvent>>> ReadAllSerializable
11031109
.WithParameter(ParamServiceId, serviceId);
11041110
}
11051111
var queryRequestOptions = options.CreateOptimizedQueryRequestOptions();
1112+
if (options.MaxItemCountPerReadAllPage > 0)
1113+
{
1114+
queryRequestOptions.MaxItemCount = maxCount.HasValue
1115+
? Math.Min(maxCount.Value, options.MaxItemCountPerReadAllPage)
1116+
: options.MaxItemCountPerReadAllPage;
1117+
}
1118+
else if (maxCount.HasValue)
1119+
{
1120+
queryRequestOptions.MaxItemCount = maxCount.Value;
1121+
}
11061122

11071123
var events = new List<SerializableEvent>();
1124+
var totalRuConsumed = 0.0;
1125+
var totalEventsRead = 0;
11081126

11091127
using var iterator = container.GetItemQueryIterator<CosmosEvent>(queryDefinition, requestOptions: queryRequestOptions);
11101128

11111129
while (iterator.HasMoreResults)
11121130
{
11131131
var response = await iterator.ReadNextAsync().ConfigureAwait(false);
1132+
totalRuConsumed += response.RequestCharge;
11141133

11151134
foreach (var cosmosEvent in response)
11161135
{
@@ -1122,9 +1141,22 @@ public async Task<ResultBox<IEnumerable<SerializableEvent>>> ReadAllSerializable
11221141
cosmosEvent.CausationId ?? "",
11231142
cosmosEvent.CorrelationId ?? "",
11241143
cosmosEvent.ExecutedUser ?? ""),
1125-
cosmosEvent.Tags?.ToList() ?? new List<string>(),
1144+
cosmosEvent.Tags?.ToList() ?? new List<string>(),
11261145
cosmosEvent.EventType));
11271146
}
1147+
totalEventsRead += response.Count;
1148+
1149+
if (maxCount.HasValue && events.Count >= maxCount.Value)
1150+
{
1151+
break;
1152+
}
1153+
1154+
options.ReadProgressCallback?.Invoke(totalEventsRead, totalRuConsumed);
1155+
}
1156+
1157+
if (maxCount.HasValue && events.Count > maxCount.Value)
1158+
{
1159+
events = events.Take(maxCount.Value).ToList();
11281160
}
11291161

11301162
return ResultBox.FromValue<IEnumerable<SerializableEvent>>(events);

dcb/src/Sekiban.Dcb.Sqlite/CacheSyncOptions.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,11 @@ namespace Sekiban.Dcb.Sqlite;
55
/// </summary>
66
public class CacheSyncOptions
77
{
8+
/// <summary>
9+
/// Number of events to fetch per remote read during cache sync.
10+
/// </summary>
11+
public int BatchSize { get; set; } = 3000;
12+
813
/// <summary>
914
/// Time window to exclude from caching.
1015
/// Events within this window are considered "unsafe" and will be fetched from remote.

dcb/src/Sekiban.Dcb.Sqlite/EventStoreCacheSync.cs

Lines changed: 132 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -92,57 +92,83 @@ public async Task<CacheSyncResult> SyncAsync(CancellationToken cancellationToken
9292
_logger?.LogInformation("Fetching events since {Since} until {Until}",
9393
since?.Value ?? "(beginning)", until?.Value ?? "(now)");
9494

95-
var remoteEventsResult = await _remoteStore.ReadAllSerializableEventsAsync(since);
96-
if (!remoteEventsResult.IsSuccess)
95+
var batchSize = GetEffectiveBatchSize();
96+
var totalEventsAdded = 0;
97+
var currentSince = since;
98+
99+
while (true)
97100
{
98-
return CacheSyncResult.Failed(
99-
$"Failed to read remote events: {remoteEventsResult.GetException().Message}",
100-
stopwatch.Elapsed);
101+
cancellationToken.ThrowIfCancellationRequested();
102+
103+
var remoteEventsResult = await _remoteStore.ReadAllSerializableEventsAsync(currentSince, batchSize);
104+
if (!remoteEventsResult.IsSuccess)
105+
{
106+
return CacheSyncResult.Failed(
107+
$"Failed to read remote events: {remoteEventsResult.GetException().Message}",
108+
stopwatch.Elapsed);
109+
}
110+
111+
var remoteEvents = remoteEventsResult.GetValue()
112+
.OrderBy(e => e.SortableUniqueIdValue, StringComparer.Ordinal)
113+
.ToList();
114+
115+
if (remoteEvents.Count == 0)
116+
{
117+
break;
118+
}
119+
120+
var eventsToCache = until != null
121+
? remoteEvents.Where(e => new SortableUniqueId(e.SortableUniqueIdValue).IsEarlierThanOrEqual(until)).ToList()
122+
: remoteEvents;
123+
124+
if (eventsToCache.Count > 0)
125+
{
126+
_logger?.LogInformation("Caching batch of {Count} events...", eventsToCache.Count);
127+
128+
var writeResult = await _localStore.WriteSerializableEventsAsync(eventsToCache);
129+
if (!writeResult.IsSuccess)
130+
{
131+
return CacheSyncResult.Failed(
132+
$"Failed to write events to cache: {writeResult.GetException().Message}",
133+
stopwatch.Elapsed);
134+
}
135+
136+
totalEventsAdded += eventsToCache.Count;
137+
138+
await _localStore.SetMetadataAsync(new CacheMetadata
139+
{
140+
RemoteEndpoint = _options.RemoteEndpoint,
141+
DatabaseName = _options.DatabaseName,
142+
SchemaVersion = _options.SchemaVersion,
143+
TotalCountAtFetch = remoteCount,
144+
LastCachedSortableUniqueId = eventsToCache.Last().SortableUniqueIdValue,
145+
LastSafeWindowUtc = until?.GetDateTime(),
146+
CreatedUtc = metadata?.CreatedUtc ?? DateTime.UtcNow,
147+
UpdatedUtc = DateTime.UtcNow
148+
});
149+
}
150+
151+
var reachedSafeWindow = until != null && eventsToCache.Count < remoteEvents.Count;
152+
if (reachedSafeWindow || remoteEvents.Count < batchSize)
153+
{
154+
break;
155+
}
156+
157+
currentSince = new SortableUniqueId(remoteEvents.Last().SortableUniqueIdValue);
101158
}
102159

103-
var remoteEvents = remoteEventsResult.GetValue().ToList();
104-
105-
// Filter by safe window
106-
var eventsToCache = until != null
107-
? remoteEvents.Where(e => new SortableUniqueId(e.SortableUniqueIdValue).IsEarlierThanOrEqual(until)).ToList()
108-
: remoteEvents;
109-
110-
if (eventsToCache.Count == 0)
160+
if (totalEventsAdded == 0)
111161
{
112162
_logger?.LogInformation("No new events to cache (all within safe window)");
113163
return CacheSyncResult.NoChanges(localCount, stopwatch.Elapsed);
114164
}
115165

116-
// 6. Write to local cache
117-
_logger?.LogInformation("Caching {Count} events...", eventsToCache.Count);
118-
119-
var writeResult = await _localStore.WriteSerializableEventsAsync(eventsToCache);
120-
if (!writeResult.IsSuccess)
121-
{
122-
return CacheSyncResult.Failed(
123-
$"Failed to write events to cache: {writeResult.GetException().Message}",
124-
stopwatch.Elapsed);
125-
}
126-
127-
// 7. Update metadata
128-
var newLocalCount = localCount + eventsToCache.Count;
129-
await _localStore.SetMetadataAsync(new CacheMetadata
130-
{
131-
RemoteEndpoint = _options.RemoteEndpoint,
132-
DatabaseName = _options.DatabaseName,
133-
SchemaVersion = _options.SchemaVersion,
134-
TotalCountAtFetch = remoteCount,
135-
LastCachedSortableUniqueId = eventsToCache.Last().SortableUniqueIdValue,
136-
LastSafeWindowUtc = until?.GetDateTime(),
137-
CreatedUtc = metadata?.CreatedUtc ?? DateTime.UtcNow,
138-
UpdatedUtc = DateTime.UtcNow
139-
});
140-
166+
var newLocalCount = localCount + totalEventsAdded;
141167
_logger?.LogInformation("Cache sync completed: {Count} events added, {Total} total",
142-
eventsToCache.Count, newLocalCount);
168+
totalEventsAdded, newLocalCount);
143169

144170
return CacheSyncResult.Success(
145-
eventsToCache.Count,
171+
totalEventsAdded,
146172
newLocalCount,
147173
CacheSyncAction.AppendedNewEvents,
148174
stopwatch.Elapsed);
@@ -193,58 +219,83 @@ private async Task<CacheSyncResult> RebuildFromScratchAsync(
193219
CancellationToken cancellationToken)
194220
{
195221
var until = GetSafeWindowThreshold();
222+
var batchSize = GetEffectiveBatchSize();
223+
SortableUniqueId? currentSince = null;
224+
var totalEventsAdded = 0;
196225

197226
_logger?.LogInformation("Rebuilding cache from scratch...");
198-
199-
var remoteEventsResult = await _remoteStore.ReadAllSerializableEventsAsync();
200-
if (!remoteEventsResult.IsSuccess)
227+
while (true)
201228
{
202-
return CacheSyncResult.Failed(
203-
$"Failed to read remote events: {remoteEventsResult.GetException().Message}",
204-
stopwatch.Elapsed);
205-
}
229+
cancellationToken.ThrowIfCancellationRequested();
206230

207-
var remoteEvents = remoteEventsResult.GetValue().ToList();
231+
var remoteEventsResult = await _remoteStore.ReadAllSerializableEventsAsync(currentSince, batchSize);
232+
if (!remoteEventsResult.IsSuccess)
233+
{
234+
return CacheSyncResult.Failed(
235+
$"Failed to read remote events: {remoteEventsResult.GetException().Message}",
236+
stopwatch.Elapsed);
237+
}
208238

209-
// Filter by safe window
210-
var eventsToCache = until != null
211-
? remoteEvents.Where(e => new SortableUniqueId(e.SortableUniqueIdValue).IsEarlierThanOrEqual(until)).ToList()
212-
: remoteEvents;
239+
var remoteEvents = remoteEventsResult.GetValue()
240+
.OrderBy(e => e.SortableUniqueIdValue, StringComparer.Ordinal)
241+
.ToList();
213242

214-
if (eventsToCache.Count == 0)
215-
{
216-
_logger?.LogInformation("No events to cache (all within safe window)");
217-
return CacheSyncResult.Success(0, 0, CacheSyncAction.RebuiltFromScratch, stopwatch.Elapsed);
218-
}
243+
if (remoteEvents.Count == 0)
244+
{
245+
break;
246+
}
219247

220-
_logger?.LogInformation("Caching {Count} events...", eventsToCache.Count);
248+
var eventsToCache = until != null
249+
? remoteEvents.Where(e => new SortableUniqueId(e.SortableUniqueIdValue).IsEarlierThanOrEqual(until)).ToList()
250+
: remoteEvents;
221251

222-
var writeResult = await _localStore.WriteSerializableEventsAsync(eventsToCache);
223-
if (!writeResult.IsSuccess)
224-
{
225-
return CacheSyncResult.Failed(
226-
$"Failed to write events to cache: {writeResult.GetException().Message}",
227-
stopwatch.Elapsed);
252+
if (eventsToCache.Count > 0)
253+
{
254+
_logger?.LogInformation("Caching batch of {Count} events...", eventsToCache.Count);
255+
256+
var writeResult = await _localStore.WriteSerializableEventsAsync(eventsToCache);
257+
if (!writeResult.IsSuccess)
258+
{
259+
return CacheSyncResult.Failed(
260+
$"Failed to write events to cache: {writeResult.GetException().Message}",
261+
stopwatch.Elapsed);
262+
}
263+
264+
totalEventsAdded += eventsToCache.Count;
265+
266+
await _localStore.SetMetadataAsync(new CacheMetadata
267+
{
268+
RemoteEndpoint = _options.RemoteEndpoint,
269+
DatabaseName = _options.DatabaseName,
270+
SchemaVersion = _options.SchemaVersion,
271+
TotalCountAtFetch = remoteCount,
272+
LastCachedSortableUniqueId = eventsToCache.Last().SortableUniqueIdValue,
273+
LastSafeWindowUtc = until?.GetDateTime(),
274+
CreatedUtc = DateTime.UtcNow,
275+
UpdatedUtc = DateTime.UtcNow
276+
});
277+
}
278+
279+
var reachedSafeWindow = until != null && eventsToCache.Count < remoteEvents.Count;
280+
if (reachedSafeWindow || remoteEvents.Count < batchSize)
281+
{
282+
break;
283+
}
284+
285+
currentSince = new SortableUniqueId(remoteEvents.Last().SortableUniqueIdValue);
228286
}
229287

230-
// Update metadata
231-
await _localStore.SetMetadataAsync(new CacheMetadata
288+
if (totalEventsAdded == 0)
232289
{
233-
RemoteEndpoint = _options.RemoteEndpoint,
234-
DatabaseName = _options.DatabaseName,
235-
SchemaVersion = _options.SchemaVersion,
236-
TotalCountAtFetch = remoteCount,
237-
LastCachedSortableUniqueId = eventsToCache.Last().SortableUniqueIdValue,
238-
LastSafeWindowUtc = until?.GetDateTime(),
239-
CreatedUtc = DateTime.UtcNow,
240-
UpdatedUtc = DateTime.UtcNow
241-
});
242-
243-
_logger?.LogInformation("Cache rebuilt: {Count} events cached", eventsToCache.Count);
290+
_logger?.LogInformation("No events to cache (all within safe window)");
291+
return CacheSyncResult.Success(0, 0, CacheSyncAction.RebuiltFromScratch, stopwatch.Elapsed);
292+
}
293+
294+
_logger?.LogInformation("Cache rebuilt: {Count} events cached", totalEventsAdded);
244295

245296
return CacheSyncResult.Success(
246-
eventsToCache.Count,
247-
eventsToCache.Count,
297+
totalEventsAdded,
298+
totalEventsAdded,
248299
CacheSyncAction.RebuiltFromScratch,
249300
stopwatch.Elapsed);
250301
}
@@ -296,6 +347,8 @@ private bool ValidateMetadata(CacheMetadata? metadata)
296347
var threshold = DateTime.UtcNow - _options.SafeWindow;
297348
return new SortableUniqueId(SortableUniqueId.Generate(threshold, Guid.Empty));
298349
}
350+
351+
private int GetEffectiveBatchSize() => _options.BatchSize > 0 ? _options.BatchSize : 3000;
299352
}
300353

301354
/// <summary>

0 commit comments

Comments
 (0)