Skip to content

Commit 249e754

Browse files
committed
Fix stale reads in MongoDB Sink during bulk mode catch-up
1 parent d08480d commit 249e754

File tree

4 files changed

+67
-12
lines changed

4 files changed

+67
-12
lines changed

Source/Kernel/Core/Namespaces/NamespacesReactor.cs

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,31 +3,50 @@
33

44
using Cratis.Chronicle.Concepts.Events;
55
using Cratis.Chronicle.Concepts.EventSequences;
6+
using Cratis.Chronicle.Concepts.Seeding;
67
using Cratis.Chronicle.Observation.Reactors.Kernel;
7-
8-
namespace Cratis.Chronicle.Namespaces;
8+
using Cratis.Chronicle.Seeding;
99

1010
#pragma warning disable IDE0060 // Remove unused parameter
1111

12+
namespace Cratis.Chronicle.Namespaces;
13+
1214
/// <summary>
1315
/// Represents a reactor that handles namespace events.
1416
/// </summary>
17+
/// <param name="grainFactory">The <see cref="IGrainFactory"/> for creating grains.</param>
1518
[Reactor(eventSequence: WellKnownEventSequences.System, systemEventStoreOnly: true)]
16-
public class NamespacesReactor : Reactor
19+
public class NamespacesReactor(IGrainFactory grainFactory) : Reactor
1720
{
1821
/// <summary>
19-
/// Handles the addition of a namespace.
22+
/// Handles the addition of a namespace by applying any existing global seed data to it.
2023
/// </summary>
2124
/// <param name="event">The event containing the namespace information.</param>
2225
/// <param name="eventContext">The context of the event.</param>
2326
/// <returns>Await Task.</returns>
24-
public Task Added(NamespaceAdded @event, EventContext eventContext)
27+
public async Task Added(NamespaceAdded @event, EventContext eventContext)
2528
{
26-
// TODO: In the future, we need to retrieve global seed data and apply it to the new namespace
27-
// When implemented, we will need to use IGrainFactory to:
28-
// 1. Get the global seed grain using EventSeedingKey.ForGlobal(@event.EventStore)
29-
// 2. Get the namespace-specific seed grain using EventSeedingKey.ForNamespace(@event.EventStore, @event.Namespace)
30-
// 3. Retrieve global seed data and apply it to the new namespace
31-
return Task.CompletedTask;
29+
var globalKey = EventSeedingKey.ForGlobal(@event.EventStore);
30+
var globalGrain = grainFactory.GetGrain<IEventSeeding>(globalKey.ToString());
31+
var seeds = await globalGrain.GetSeededEvents();
32+
33+
if (seeds.ByEventSource.Count == 0)
34+
{
35+
return;
36+
}
37+
38+
var entries = seeds.ByEventSource
39+
.SelectMany(kvp => kvp.Value)
40+
.GroupBy(e => new { e.EventSourceId, e.EventTypeId, e.Content })
41+
.Select(g => g.First())
42+
.Select(e => new SeedingEntry(e.EventSourceId, e.EventTypeId, e.Content, e.Tags?.Select(t => new Tag(t))))
43+
.ToArray();
44+
45+
if (entries.Length > 0)
46+
{
47+
var namespaceKey = EventSeedingKey.ForNamespace(@event.EventStore, @event.Namespace);
48+
var nsGrain = grainFactory.GetGrain<IEventSeeding>(namespaceKey.ToString());
49+
await nsGrain.Seed(entries);
50+
}
3251
}
3352
}

Source/Kernel/Core/Seeding/EventSeeding.cs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
using Cratis.Chronicle.Concepts.Identities;
1111
using Cratis.Chronicle.Concepts.Seeding;
1212
using Cratis.Chronicle.EventSequences;
13+
using Cratis.Chronicle.Namespaces;
1314
using Cratis.Chronicle.Storage.Seeding;
1415
using Microsoft.Extensions.Logging;
1516
using Orleans.Providers;
@@ -60,19 +61,34 @@ public async Task Seed(IEnumerable<SeedingEntry> entries)
6061
new Dictionary<EventTypeId, IEnumerable<SeededEventEntry>>(),
6162
new Dictionary<EventSourceId, IEnumerable<SeededEventEntry>>());
6263

63-
// For global grains, just store the entries without appending
64+
// For global grains, store the entries and apply to all existing namespaces
6465
if (_key.IsGlobal)
6566
{
67+
var newEntries = new List<SeedingEntry>();
6668
foreach (var entry in entriesList)
6769
{
6870
var tags = entry.Tags?.Select(t => t.Value) ?? [];
6971
var seededEntry = new SeededEventEntry(entry.EventSourceId, entry.EventTypeId, entry.Content, tags);
7072
if (!IsAlreadySeeded(seededEntry))
7173
{
7274
TrackSeededEvent(seededEntry);
75+
newEntries.Add(entry);
7376
}
7477
}
7578
await state.WriteStateAsync();
79+
80+
if (newEntries.Count > 0)
81+
{
82+
var namespacesGrain = GrainFactory.GetGrain<INamespaces>(_key.EventStore.Value);
83+
var namespaces = await namespacesGrain.GetAll();
84+
foreach (var ns in namespaces)
85+
{
86+
logger.ApplyingSeedsToNamespace(ns.Value);
87+
var namespaceKey = EventSeedingKey.ForNamespace(_key.EventStore, ns);
88+
var nsGrain = GrainFactory.GetGrain<IEventSeeding>(namespaceKey.ToString());
89+
await nsGrain.Seed(newEntries);
90+
}
91+
}
7692
}
7793
else
7894
{

Source/Kernel/Core/Seeding/EventSeedingLogging.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,7 @@ internal static partial class EventSeedingLogMessages
1919

2020
[LoggerMessage(LogLevel.Debug, "All events have already been seeded, skipping")]
2121
internal static partial void AllEventsAlreadySeeded(this ILogger<EventSeeding> logger);
22+
23+
[LoggerMessage(LogLevel.Debug, "Applying global seeds to namespace '{Namespace}'")]
24+
internal static partial void ApplyingSeedsToNamespace(this ILogger<EventSeeding> logger, string @namespace);
2225
}

Source/Kernel/Storage.MongoDB/Sinks/Sink.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public class Sink(
4747

4848
readonly List<WriteModel<BsonDocument>> _bulkOperations = [];
4949
readonly Dictionary<int, (Key EventSourceId, EventSequenceNumber SequenceNumber)> _bulkOperationMetadata = [];
50+
readonly Dictionary<string, ExpandoObject> _bulkStateCache = [];
5051
int _currentBulkSize;
5152
bool _isBulkMode;
5253

@@ -59,6 +60,15 @@ public class Sink(
5960
/// <inheritdoc/>
6061
public async Task<ExpandoObject?> FindOrDefault(Key key)
6162
{
63+
if (_isBulkMode)
64+
{
65+
var cacheKey = converter.ToBsonValue(key).ToString()!;
66+
if (_bulkStateCache.TryGetValue(cacheKey, out var cachedState))
67+
{
68+
return cachedState;
69+
}
70+
}
71+
6272
var collection = Collection;
6373

6474
using var result = await collection.FindAsync(Builders<BsonDocument>.Filter.Eq("_id", converter.ToBsonValue(key)));
@@ -86,6 +96,7 @@ public async Task<IEnumerable<FailedPartition>> ApplyChanges(
8696
if (_isBulkMode)
8797
{
8898
AddToBulk(new DeleteOneModel<BsonDocument>(filter), key, eventSequenceNumber);
99+
_bulkStateCache.Remove(converter.ToBsonValue(key).ToString()!);
89100
return await FlushBulkIfNeeded();
90101
}
91102

@@ -110,6 +121,10 @@ public async Task<IEnumerable<FailedPartition>> ApplyChanges(
110121
ArrayFilters = converted.ArrayFilters
111122
};
112123
AddToBulk(updateModel, key, eventSequenceNumber);
124+
if (!changeset.HasJoined())
125+
{
126+
_bulkStateCache[converter.ToBsonValue(key).ToString()!] = changeset.CurrentState;
127+
}
113128
return await FlushBulkIfNeeded();
114129
}
115130

@@ -130,6 +145,7 @@ public Task BeginBulk()
130145
_isBulkMode = true;
131146
_bulkOperations.Clear();
132147
_bulkOperationMetadata.Clear();
148+
_bulkStateCache.Clear();
133149
_currentBulkSize = 0;
134150
return Task.CompletedTask;
135151
}
@@ -144,6 +160,7 @@ public async Task EndBulk()
144160
_isBulkMode = false;
145161
_bulkOperations.Clear();
146162
_bulkOperationMetadata.Clear();
163+
_bulkStateCache.Clear();
147164
_currentBulkSize = 0;
148165
}
149166

0 commit comments

Comments
 (0)