Skip to content

Commit 4b04883

Browse files
authored
Merge pull request #2280 from Cratis:fix/projection-definitions-reload
Fix/projection-definitions-reload
2 parents 78668b3 + 98ca0cb commit 4b04883

File tree

10 files changed

+239
-11
lines changed

10 files changed

+239
-11
lines changed

.github/workflows/publish.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,3 +346,10 @@ jobs:
346346
repository: cratis/documentation
347347
event-type: build-docs
348348

349+
- name: Trigger Dependency Updates on Sample Repository
350+
uses: peter-evans/repository-dispatch@v3
351+
with:
352+
token: ${{ secrets.PAT_DOCUMENTATION }}
353+
repository: cratis/samples
354+
event-type: update-dependencies
355+

Directory.Packages.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
<PackageVersion Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.14.0" />
5656
<PackageVersion Include="OpenTelemetry.Instrumentation.GrpcNetClient" Version="1.10.0-beta.1" />
5757
<PackageVersion Include="OpenTelemetry.Exporter.InMemory" Version="1.14.0" />
58-
<PackageVersion Include="Azure.Monitor.OpenTelemetry.Exporter" Version="1.4.0" />
58+
<PackageVersion Include="Azure.Monitor.OpenTelemetry.Exporter" Version="1.5.0" />
5959
<!-- Roslyn-->
6060
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp" Version="4.14.0" />
6161
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp.Workspaces" Version="4.14.0" />

Source/Kernel/Grains/Projections/ImmediateProjection.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,15 @@ public async Task<ProjectionResult> GetModelInstance()
8282
projectionChanged = State.LastUpdated > _lastUpdated;
8383
_lastUpdated = State.LastUpdated ?? DateTimeOffset.UtcNow;
8484

85+
if (projectionChanged)
86+
{
87+
var readModel = await GrainFactory.GetGrain<IReadModel>(new ReadModelGrainKey(State.ReadModel, _projectionKey!.EventStore)).GetDefinition();
88+
_projection = await projectionFactory.Create(_projectionKey!.EventStore, _projectionKey!.Namespace, State, readModel);
89+
_lastHandledEventSequenceNumber = EventSequenceNumber.Unavailable;
90+
_initialState = null;
91+
fromSequenceNumber = EventSequenceNumber.First;
92+
}
93+
8594
var eventSequenceKey = new EventSequenceKey(_projectionKey!.EventSequenceId, _projectionKey!.EventStore, _projectionKey!.Namespace);
8695
var eventSequence = GrainFactory.GetGrain<IEventSequence>(eventSequenceKey);
8796
var tail = await eventSequence.GetTailSequenceNumberForEventTypes(_projection!.EventTypes);

Source/Kernel/Grains/Projections/ProjectionObserverSubscriber.cs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,12 @@ namespace Cratis.Chronicle.Grains.Projections;
2828
/// <remarks>
2929
/// Initializes a new instance of the <see cref="ProjectionObserverSubscriber"/> class.
3030
/// </remarks>
31-
/// <param name="projectionManager"><see cref="Chronicle.Projections.IProjectionsManager"/> for getting projections.</param>
3231
/// <param name="projectionFactory"><see cref="IProjectionFactory"/> for creating projections.</param>
3332
/// <param name="projectionPipelineManager"><see cref="IProjectionPipelineManager"/> for creating projection pipelines.</param>
3433
/// <param name="expandoObjectConverter"><see cref="IExpandoObjectConverter"/> for converting to and from <see cref="ExpandoObject"/>.</param>
3534
/// <param name="logger">The logger.</param>
3635
[StorageProvider(ProviderName = WellKnownGrainStorageProviders.Projections)]
3736
public class ProjectionObserverSubscriber(
38-
Chronicle.Projections.IProjectionsManager projectionManager,
3937
IProjectionFactory projectionFactory,
4038
IProjectionPipelineManager projectionPipelineManager,
4139
IExpandoObjectConverter expandoObjectConverter,
@@ -119,10 +117,7 @@ public async Task<ObserverSubscriberResult> OnNext(Key partition, IEnumerable<Ap
119117
async Task HandlePipeline()
120118
{
121119
var readModel = await GrainFactory.GetGrain<IReadModel>(new ReadModelGrainKey(State.ReadModel, _key.EventStore)).GetDefinition();
122-
if (!projectionManager.TryGet(_key.EventStore, _key.Namespace, _key.ObserverId, out var projection))
123-
{
124-
projection = await projectionFactory.Create(_key.EventStore, _key.Namespace, State, readModel);
125-
}
120+
var projection = await projectionFactory.Create(_key.EventStore, _key.Namespace, State, readModel);
126121
_pipeline = projectionPipelineManager.GetFor(_key.EventStore, _key.Namespace, projection);
127122
_schema = readModel.GetSchemaForLatestGeneration();
128123
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
// Copyright (c) Cratis. All rights reserved.
2+
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
3+
4+
using Cratis.Chronicle.Concepts;
5+
using Cratis.Chronicle.Concepts.Events;
6+
using Cratis.Chronicle.Concepts.EventSequences;
7+
using Cratis.Chronicle.Concepts.Projections;
8+
using Cratis.Chronicle.Concepts.Projections.Definitions;
9+
using Cratis.Chronicle.Concepts.ReadModels;
10+
using Cratis.Chronicle.Concepts.Sinks;
11+
using Cratis.Chronicle.Properties;
12+
13+
namespace Cratis.Chronicle.Projections.for_ProjectionsManager.given;
14+
15+
public class a_projections_manager : Specification
16+
{
17+
protected ProjectionsManager _manager;
18+
protected IProjectionFactory _projectionFactory;
19+
protected EventStoreName _eventStore;
20+
protected EventStoreNamespaceName _namespace;
21+
protected ProjectionDefinition _firstDefinition;
22+
protected ProjectionDefinition _secondDefinition;
23+
protected ReadModelDefinition _firstReadModelDefinition;
24+
protected ReadModelDefinition _secondReadModelDefinition;
25+
26+
void Establish()
27+
{
28+
_projectionFactory = Substitute.For<IProjectionFactory>();
29+
_projectionFactory.Create(Arg.Any<EventStoreName>(), Arg.Any<EventStoreNamespaceName>(), Arg.Any<ProjectionDefinition>(), Arg.Any<ReadModelDefinition>())
30+
.Returns(callInfo => Substitute.For<IProjection>());
31+
32+
_manager = new ProjectionsManager(_projectionFactory);
33+
_eventStore = "event-store";
34+
_namespace = "namespace";
35+
36+
_firstDefinition = new ProjectionDefinition(
37+
ProjectionOwner.Client,
38+
EventSequenceId.Log,
39+
"first-projection",
40+
"first-read-model",
41+
true,
42+
true,
43+
new System.Text.Json.Nodes.JsonObject(),
44+
new Dictionary<EventType, FromDefinition>(),
45+
new Dictionary<EventType, JoinDefinition>(),
46+
new Dictionary<PropertyPath, ChildrenDefinition>(),
47+
[],
48+
new FromEveryDefinition(new Dictionary<PropertyPath, string>(), false),
49+
SinkDefinition.None,
50+
new Dictionary<EventType, RemovedWithDefinition>(),
51+
new Dictionary<EventType, RemovedWithJoinDefinition>(),
52+
null,
53+
DateTimeOffset.UtcNow);
54+
55+
_secondDefinition = new ProjectionDefinition(
56+
ProjectionOwner.Client,
57+
EventSequenceId.Log,
58+
"second-projection",
59+
"second-read-model",
60+
true,
61+
true,
62+
new System.Text.Json.Nodes.JsonObject(),
63+
new Dictionary<EventType, FromDefinition>(),
64+
new Dictionary<EventType, JoinDefinition>(),
65+
new Dictionary<PropertyPath, ChildrenDefinition>(),
66+
[],
67+
new FromEveryDefinition(new Dictionary<PropertyPath, string>(), false),
68+
SinkDefinition.None,
69+
new Dictionary<EventType, RemovedWithDefinition>(),
70+
new Dictionary<EventType, RemovedWithJoinDefinition>(),
71+
null,
72+
DateTimeOffset.UtcNow);
73+
74+
_firstReadModelDefinition = new ReadModelDefinition(
75+
"first-read-model",
76+
"FirstReadModel",
77+
ReadModelOwner.Client,
78+
new Dictionary<ReadModelGeneration, NJsonSchema.JsonSchema>());
79+
80+
_secondReadModelDefinition = new ReadModelDefinition(
81+
"second-read-model",
82+
"SecondReadModel",
83+
ReadModelOwner.Client,
84+
new Dictionary<ReadModelGeneration, NJsonSchema.JsonSchema>());
85+
}
86+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// Copyright (c) Cratis. All rights reserved.
2+
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
3+
4+
using Cratis.Chronicle.Concepts;
5+
6+
namespace Cratis.Chronicle.Projections.for_ProjectionsManager.when_adding_namespace;
7+
8+
public class with_definitions_from_different_event_stores : given.a_projections_manager
9+
{
10+
EventStoreName _secondEventStore;
11+
EventStoreNamespaceName _secondNamespace;
12+
13+
void Establish()
14+
{
15+
_secondEventStore = "second-event-store";
16+
_secondNamespace = "second-namespace";
17+
}
18+
19+
async Task Because()
20+
{
21+
await _manager.Register(
22+
_eventStore,
23+
[_firstDefinition, _secondDefinition],
24+
[_firstReadModelDefinition, _secondReadModelDefinition],
25+
[_namespace]);
26+
27+
await _manager.AddNamespace(
28+
_secondEventStore,
29+
_secondNamespace,
30+
[_firstReadModelDefinition, _secondReadModelDefinition]);
31+
}
32+
33+
[Fact] void should_not_create_projections_for_different_event_store() => _projectionFactory.DidNotReceive().Create(_secondEventStore, _secondNamespace, Arg.Any<Concepts.Projections.Definitions.ProjectionDefinition>(), Arg.Any<Concepts.ReadModels.ReadModelDefinition>());
34+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Copyright (c) Cratis. All rights reserved.
2+
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
3+
4+
using Cratis.Chronicle.Concepts;
5+
6+
namespace Cratis.Chronicle.Projections.for_ProjectionsManager.when_adding_namespace;
7+
8+
public class with_existing_definitions : given.a_projections_manager
9+
{
10+
EventStoreNamespaceName _secondNamespace;
11+
IProjection _firstProjectionInSecondNamespace;
12+
IProjection _secondProjectionInSecondNamespace;
13+
14+
void Establish()
15+
{
16+
_secondNamespace = "second-namespace";
17+
}
18+
19+
async Task Because()
20+
{
21+
await _manager.Register(
22+
_eventStore,
23+
[_firstDefinition, _secondDefinition],
24+
[_firstReadModelDefinition, _secondReadModelDefinition],
25+
[_namespace]);
26+
27+
await _manager.AddNamespace(
28+
_eventStore,
29+
_secondNamespace,
30+
[_firstReadModelDefinition, _secondReadModelDefinition]);
31+
}
32+
33+
[Fact] void should_create_first_projection_in_second_namespace() => _manager.TryGet(_eventStore, _secondNamespace, _firstDefinition.Identifier, out _firstProjectionInSecondNamespace).ShouldBeTrue();
34+
[Fact] void should_create_second_projection_in_second_namespace() => _manager.TryGet(_eventStore, _secondNamespace, _secondDefinition.Identifier, out _secondProjectionInSecondNamespace).ShouldBeTrue();
35+
[Fact] void should_create_projections_for_all_definitions_in_new_namespace() => _projectionFactory.Received(2).Create(_eventStore, _secondNamespace, Arg.Any<Concepts.Projections.Definitions.ProjectionDefinition>(), Arg.Any<Concepts.ReadModels.ReadModelDefinition>());
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Copyright (c) Cratis. All rights reserved.
2+
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
3+
4+
namespace Cratis.Chronicle.Projections.for_ProjectionsManager.when_registering;
5+
6+
public class and_then_registering_again_with_updated_definitions : given.a_projections_manager
7+
{
8+
IProjection _firstProjectionFirstTime;
9+
IProjection _firstProjectionSecondTime;
10+
IProjection _secondProjectionFirstTime;
11+
IProjection _secondProjectionSecondTime;
12+
13+
async Task Because()
14+
{
15+
await _manager.Register(
16+
_eventStore,
17+
[_firstDefinition, _secondDefinition],
18+
[_firstReadModelDefinition, _secondReadModelDefinition],
19+
[_namespace]);
20+
21+
_manager.TryGet(_eventStore, _namespace, _firstDefinition.Identifier, out _firstProjectionFirstTime);
22+
_manager.TryGet(_eventStore, _namespace, _secondDefinition.Identifier, out _secondProjectionFirstTime);
23+
24+
await _manager.Register(
25+
_eventStore,
26+
[_firstDefinition, _secondDefinition],
27+
[_firstReadModelDefinition, _secondReadModelDefinition],
28+
[_namespace]);
29+
30+
_manager.TryGet(_eventStore, _namespace, _firstDefinition.Identifier, out _firstProjectionSecondTime);
31+
_manager.TryGet(_eventStore, _namespace, _secondDefinition.Identifier, out _secondProjectionSecondTime);
32+
}
33+
34+
[Fact] void should_replace_first_projection_instance() => _firstProjectionSecondTime.ShouldNotEqual(_firstProjectionFirstTime);
35+
[Fact] void should_replace_second_projection_instance() => _secondProjectionSecondTime.ShouldNotEqual(_secondProjectionFirstTime);
36+
[Fact] void should_create_projections_twice_for_each_definition() => _projectionFactory.Received(4).Create(Arg.Any<Concepts.EventStoreName>(), Arg.Any<Concepts.EventStoreNamespaceName>(), Arg.Any<Concepts.Projections.Definitions.ProjectionDefinition>(), Arg.Any<Concepts.ReadModels.ReadModelDefinition>());
37+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
// Copyright (c) Cratis. All rights reserved.
2+
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
3+
4+
namespace Cratis.Chronicle.Projections.for_ProjectionsManager.when_registering;
5+
6+
public class with_multiple_projection_definitions : given.a_projections_manager
7+
{
8+
IProjection _firstProjection;
9+
IProjection _secondProjection;
10+
11+
async Task Because()
12+
{
13+
await _manager.Register(
14+
_eventStore,
15+
[_firstDefinition, _secondDefinition],
16+
[_firstReadModelDefinition, _secondReadModelDefinition],
17+
[_namespace]);
18+
}
19+
20+
[Fact] void should_be_able_to_retrieve_first_projection() => _manager.TryGet(_eventStore, _namespace, _firstDefinition.Identifier, out _firstProjection).ShouldBeTrue();
21+
[Fact] void should_be_able_to_retrieve_second_projection() => _manager.TryGet(_eventStore, _namespace, _secondDefinition.Identifier, out _secondProjection).ShouldBeTrue();
22+
[Fact] void should_create_projections_for_both_definitions() => _projectionFactory.Received(2).Create(Arg.Any<Concepts.EventStoreName>(), Arg.Any<Concepts.EventStoreNamespaceName>(), Arg.Any<Concepts.Projections.Definitions.ProjectionDefinition>(), Arg.Any<Concepts.ReadModels.ReadModelDefinition>());
23+
}

Source/Kernel/Projections/ProjectionsManager.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,21 @@ namespace Cratis.Chronicle.Projections;
1818
[Singleton]
1919
public class ProjectionsManager(IProjectionFactory projectionFactory) : IProjectionsManager
2020
{
21-
readonly ConcurrentDictionary<EventStoreName, ProjectionDefinition> _definitions = new();
21+
readonly ConcurrentDictionary<string, ProjectionDefinition> _definitions = new();
2222
readonly ConcurrentDictionary<string, IProjection> _projections = new();
2323

2424
/// <inheritdoc/>
2525
public async Task Register(EventStoreName eventStore, IEnumerable<ProjectionDefinition> definitions, IEnumerable<ReadModelDefinition> readModelDefinitions, IEnumerable<EventStoreNamespaceName> namespaces)
2626
{
2727
foreach (var definition in definitions)
2828
{
29-
_definitions[eventStore] = definition;
29+
var definitionKey = $"{eventStore}{KeyHelper.Separator}{definition.Identifier}";
30+
_definitions[definitionKey] = definition;
3031
var readModel = readModelDefinitions.Single(rm => rm.Identifier == definition.ReadModel);
3132
foreach (var @namespace in namespaces)
3233
{
3334
var projection = await projectionFactory.Create(eventStore, @namespace, definition, readModel);
34-
var key = KeyHelper.Combine(eventStore, @namespace, definition.Identifier);
35+
var key = $"{eventStore}{KeyHelper.Separator}{@namespace}{KeyHelper.Separator}{definition.Identifier}";
3536
_projections[key] = projection;
3637
}
3738
}
@@ -40,7 +41,7 @@ public async Task Register(EventStoreName eventStore, IEnumerable<ProjectionDefi
4041
/// <inheritdoc/>
4142
public async Task AddNamespace(EventStoreName eventStore, EventStoreNamespaceName @namespace, IEnumerable<ReadModelDefinition> readModelDefinitions)
4243
{
43-
foreach (var definition in _definitions.Values)
44+
foreach (var definition in _definitions.Where(kvp => kvp.Key.StartsWith($"{eventStore}{KeyHelper.Separator}")).Select(kvp => kvp.Value))
4445
{
4546
var key = KeyHelper.Combine(eventStore, @namespace, definition.Identifier);
4647
var readModel = readModelDefinitions.Single(rm => rm.Identifier == definition.ReadModel);

0 commit comments

Comments
 (0)