Skip to content

Commit ea0303b

Browse files
committed
Made Exercise with Multi-Stream Marten projection to be async
Multi-Stream projections should be registered as async, especially if they raise side effects. I was lazy before, now this PR fixes it. It adds registration as async and properly starting the Async Daemon in tests and waiting for projection data to be processed.
1 parent bca3d6c commit ea0303b

File tree

9 files changed

+124
-14
lines changed

9 files changed

+124
-14
lines changed

Workshops/IntroductionToEventSourcing/16-Projections.MultiStream.OutOfOrder/README.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,20 @@ Fix the projection from Exercise 15 to handle out-of-order events.
66

77
Learn how to build resilient projections that work even when events arrive in any order.
88

9+
## Scenario
10+
11+
Events arrive from three different streams (payment, merchant, and fraud check), but they all reference the same `PaymentId`. Your projection must:
12+
13+
1. Collect data from all three event types
14+
2. Store them in a single `PaymentVerification` read model
15+
3. Derive the payment verification status when all data is present
16+
17+
Decision logic:
18+
- Reject if merchant failed
19+
- Reject if fraud score > 0.75
20+
- Reject if amount > 10000 AND fraud score > 0.5
21+
- Otherwise approve
22+
-
923
## Context
1024

1125
Events can arrive out of order (e.g., from different RabbitMQ queues or Kafka topics). The projection from Exercise 15 was built assuming ordered events — run the test to see it fail.
@@ -17,3 +31,4 @@ For example, `FraudScoreCalculated` might fire before `PaymentRecorded`, meaning
1731
## Decision Logic
1832

1933
Only derive a final status when you have all three pieces of data (payment, merchant check, fraud check). Then apply the same rules as Exercise 15.
34+

Workshops/IntroductionToEventSourcing/17-Projections.MultiStream.Marten/ProjectionsTests.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,11 @@ public async Task MultiStreamProjection_WithMarten_ShouldSucceed()
8282
// 2. Register the projection here using: options.Projections.Add<PaymentVerificationProjection>(ProjectionLifecycle.Inline);
8383
});
8484

85+
// Let's start Async Daemon to process async projections in the background
86+
// Read more: https://martendb.io/events/projections/async-daemon.html#async-projections-daemon
87+
using var daemon = await documentStore.BuildProjectionDaemonAsync();
88+
await daemon.StartAllAsync();
89+
8590
await using var session = documentStore.LightweightSession();
8691

8792
// Payment 1: Approved — all checks pass
@@ -110,6 +115,9 @@ public async Task MultiStreamProjection_WithMarten_ShouldSucceed()
110115

111116
await session.SaveChangesAsync();
112117

118+
// Wait until Async Daemon processes all events
119+
await daemon.WaitForNonStaleData(TimeSpan.FromSeconds(5));
120+
113121
// Assert Payment 1: Approved
114122
var payment1 = await session.LoadAsync<PaymentVerification>(payment1Id);
115123
payment1.Should().NotBeNull();

Workshops/IntroductionToEventSourcing/17-Projections.MultiStream.Marten/README.md

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,19 @@ Events arrive from three different streams (payment, merchant, and fraud check),
1414
2. Store them in a single `PaymentVerification` read model
1515
3. Derive the payment verification status when all data is present
1616

17+
Decision logic:
18+
- Reject if merchant failed
19+
- Reject if fraud score > 0.75
20+
- Reject if amount > 10000 AND fraud score > 0.5
21+
- Otherwise approve
22+
1723
## Steps
1824

1925
1. Create a `PaymentVerificationProjection` class with `Handle` methods for each event type
2026
2. Register your handlers using document store options `options.Projections.Add` as inline.
21-
3. Implement decision logic in the `FraudScoreCalculated` handler (always last for completed payments):
22-
- Reject if merchant failed
23-
- Reject if fraud score > 0.75
24-
- Reject if amount > 10000 AND fraud score > 0.5
25-
- Otherwise approve
27+
3. Implement decision logic in the `FraudScoreCalculated` handler (always last for completed payments)
2628

2729
## Reference
2830

2931
- [Marten Multi-Stream Projections](https://martendb.io/events/projections/multi-stream-projections.html)
32+
- [Marten Async Daemon - Backround worker processing async projections](https://martendb.io/events/projections/async-daemon.html#async-projections-daemon)

Workshops/IntroductionToEventSourcing/18-Projections.MultiStream.OutOfOrder.Marten/ProjectionsTests.cs

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ public class PaymentVerification
5959
public PaymentStatus Status { get; set; }
6060
}
6161

62+
63+
// TODO: This projection was built assuming ordered events. Run the test — it fails.
64+
// Events can arrive out of order (e.g. from different RabbitMQ queues or Kafka topics).
65+
// Fix it to handle out-of-order events and derive the verification decision.
6266
public class PaymentVerificationProjection: MultiStreamProjection<PaymentVerification, string>
6367
{
6468
public PaymentVerificationProjection()
@@ -138,11 +142,15 @@ public async Task MultiStreamProjection_WithOutOfOrderEventsAndMarten_ShouldSucc
138142
// TODO: This projection was built assuming ordered events. Run the test — it fails.
139143
// Events can arrive out of order (e.g. from different RabbitMQ queues or Kafka topics).
140144
// Fix it to handle out-of-order events and derive the verification decision.
141-
142-
options.Projections.Add<PaymentVerificationProjection>(ProjectionLifecycle.Inline);
145+
options.Projections.Add<PaymentVerificationProjection>(ProjectionLifecycle.Async);
143146
options.Events.StreamIdentity = StreamIdentity.AsString;
144147
});
145148

149+
// Let's start Async Daemon to process async projections in the background
150+
// Read more: https://martendb.io/events/projections/async-daemon.html#async-projections-daemon
151+
using var daemon = await documentStore.BuildProjectionDaemonAsync();
152+
await daemon.StartAllAsync();
153+
146154
await using var session = documentStore.LightweightSession();
147155

148156
// Payment 1: Approved — FraudScore arrives first
@@ -171,29 +179,61 @@ public async Task MultiStreamProjection_WithOutOfOrderEventsAndMarten_ShouldSucc
171179

172180
await session.SaveChangesAsync();
173181

182+
await daemon.WaitForNonStaleData(TimeSpan.FromSeconds(5));
183+
174184
// Assert Payment 1: Approved
175185
var payment1 = await session.LoadAsync<PaymentVerification>(payment1Id);
176186
payment1.Should().NotBeNull();
187+
payment1.Id.Should().Be(payment1Id);
188+
payment1.OrderId.Should().Be(order1Id);
189+
payment1.Amount.Should().Be(100m);
190+
payment1.MerchantLimitStatus.Should().Be(VerificationStatus.Passed);
191+
payment1.FraudStatus.Should().Be(VerificationStatus.Passed);
192+
payment1.FraudScore.Should().Be(0.1m);
177193
payment1.Status.Should().Be(PaymentStatus.Approved);
178194

179195
// Assert Payment 2: Rejected
180196
var payment2 = await session.LoadAsync<PaymentVerification>(payment2Id);
181197
payment2.Should().NotBeNull();
198+
payment2.Id.Should().Be(payment2Id);
199+
payment2.OrderId.Should().Be(order2Id);
200+
payment2.Amount.Should().Be(5000m);
201+
payment2.MerchantLimitStatus.Should().Be(VerificationStatus.Failed);
202+
payment2.FraudStatus.Should().Be(VerificationStatus.Passed);
203+
payment2.FraudScore.Should().Be(0.2m);
182204
payment2.Status.Should().Be(PaymentStatus.Rejected);
183205

184206
// Assert Payment 3: Rejected
185207
var payment3 = await session.LoadAsync<PaymentVerification>(payment3Id);
186208
payment3.Should().NotBeNull();
209+
payment3.Id.Should().Be(payment3Id);
210+
payment3.OrderId.Should().Be(order3Id);
211+
payment3.Amount.Should().Be(200m);
212+
payment3.MerchantLimitStatus.Should().Be(VerificationStatus.Passed);
213+
payment3.FraudStatus.Should().Be(VerificationStatus.Failed);
214+
payment3.FraudScore.Should().Be(0.95m);
187215
payment3.Status.Should().Be(PaymentStatus.Rejected);
188216

189217
// Assert Payment 4: Rejected
190218
var payment4 = await session.LoadAsync<PaymentVerification>(payment4Id);
191219
payment4.Should().NotBeNull();
220+
payment4.Id.Should().Be(payment4Id);
221+
payment4.OrderId.Should().Be(order4Id);
222+
payment4.Amount.Should().Be(15000m);
223+
payment4.MerchantLimitStatus.Should().Be(VerificationStatus.Passed);
224+
payment4.FraudStatus.Should().Be(VerificationStatus.Passed);
225+
payment4.FraudScore.Should().Be(0.6m);
192226
payment4.Status.Should().Be(PaymentStatus.Rejected);
193227

194228
// Assert Payment 5: Pending
195229
var payment5 = await session.LoadAsync<PaymentVerification>(payment5Id);
196230
payment5.Should().NotBeNull();
231+
payment5.Id.Should().Be(payment5Id);
232+
payment5.OrderId.Should().Be(order5Id);
233+
payment5.Amount.Should().Be(50m);
234+
payment5.MerchantLimitStatus.Should().Be(VerificationStatus.Passed);
235+
payment5.FraudStatus.Should().Be(VerificationStatus.Pending);
236+
payment5.FraudScore.Should().Be(0m);
197237
payment5.Status.Should().Be(PaymentStatus.Pending);
198238

199239
// Assert Payment 1: Verification is emitted

Workshops/IntroductionToEventSourcing/18-Projections.MultiStream.OutOfOrder.Marten/README.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,20 @@ Fix the Marten projection from Exercise 17 to handle out-of-order events.
66

77
Learn how to build resilient Marten projections that work even when events arrive in any order.
88

9+
## Scenario
10+
11+
Events arrive from three different streams (payment, merchant, and fraud check), but they all reference the same `PaymentId`. Your projection must:
12+
13+
1. Collect data from all three event types
14+
2. Store them in a single `PaymentVerification` read model
15+
3. Derive the payment verification status when all data is present
16+
17+
Decision logic:
18+
- Reject if merchant failed
19+
- Reject if fraud score > 0.75
20+
- Reject if amount > 10000 AND fraud score > 0.5
21+
- Otherwise approve
22+
923
## Context
1024

1125
Same out-of-order context as Exercise 16: events can arrive in any order (e.g., from different RabbitMQ queues or Kafka topics). The projection from Exercise 17 assumes ordered events — run the test to see it fail.
@@ -15,3 +29,6 @@ Same out-of-order context as Exercise 16: events can arrive in any order (e.g.,
1529
## Reference
1630

1731
- [Marten Multi-Stream Projections](https://martendb.io/events/projections/multi-stream-projections.html)
32+
- [Marten Async Daemon - Backround worker processing async projections](https://martendb.io/events/projections/async-daemon.html#async-projections-daemon)
33+
- [`RaiseSideEffects` method from Marten projections](https://martendb.io/events/projections/side-effects.html#side-effects)
34+

Workshops/IntroductionToEventSourcing/Solved/17-Projections.MultiStream.Marten/ProjectionsTests.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,9 +129,14 @@ public async Task MultiStreamProjection_WithMarten_ShouldSucceed()
129129
options.DatabaseSchemaName = options.Events.DatabaseSchemaName = "Exercise17MultiStreamMarten";
130130
options.AutoCreateSchemaObjects = AutoCreate.All;
131131

132-
options.Projections.Add<PaymentVerificationProjection>(ProjectionLifecycle.Inline);
132+
options.Projections.Add<PaymentVerificationProjection>(ProjectionLifecycle.Async);
133133
});
134134

135+
// Let's start Async Daemon to process async projections in the background
136+
// Read more: https://martendb.io/events/projections/async-daemon.html#async-projections-daemon
137+
using var daemon = await documentStore.BuildProjectionDaemonAsync();
138+
await daemon.StartAllAsync();
139+
135140
await using var session = documentStore.LightweightSession();
136141

137142
// Payment 1: Approved — all checks pass
@@ -160,6 +165,9 @@ public async Task MultiStreamProjection_WithMarten_ShouldSucceed()
160165

161166
await session.SaveChangesAsync();
162167

168+
// Wait until Async Daemon processes all events
169+
await daemon.WaitForNonStaleData(TimeSpan.FromSeconds(5));
170+
163171
// Assert Payment 1: Approved
164172
var payment1 = await session.LoadAsync<PaymentVerification>(payment1Id);
165173
payment1.Should().NotBeNull();

Workshops/IntroductionToEventSourcing/Solved/17-Projections.MultiStream.Marten/README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,13 @@ Learn how to use Marten's built-in multi-stream projection support to simplify c
1414
4. Put decision logic in the `FraudScoreCalculated` Apply method (same rules as Exercise 15)
1515
5. Register the projection using `options.Projections.Add<PaymentVerificationProjection>(ProjectionLifecycle.Inline)`
1616

17+
Decision logic:
18+
- Reject if merchant failed
19+
- Reject if fraud score > 0.75
20+
- Reject if amount > 10000 AND fraud score > 0.5
21+
- Otherwise approve
22+
23+
1724
## Key Differences from Exercise 15
1825

1926
Instead of manually handling event routing and database operations, Marten:
@@ -24,3 +31,4 @@ Instead of manually handling event routing and database operations, Marten:
2431
## Reference
2532

2633
- [Marten Multi-Stream Projections](https://martendb.io/events/projections/multi-stream-projections.html)
34+
- [Marten Async Daemon - Backround worker processing async projections](https://martendb.io/events/projections/async-daemon.html#async-projections-daemon)

Workshops/IntroductionToEventSourcing/Solved/18-Projections.MultiStream.OutOfOrder.Marten/ProjectionsTests.cs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,6 @@ public class ProjectionsTests
156156
"PORT = 5432; HOST = localhost; TIMEOUT = 15; POOLING = True; DATABASE = 'postgres'; PASSWORD = 'Password12!'; USER ID = 'postgres'";
157157

158158
[Fact]
159-
[Trait("Category", "SkipCI")]
160159
public async Task MultiStreamProjection_WithOutOfOrderEventsAndMarten_ShouldSucceed()
161160
{
162161
var payment1Id = $"payment:{Guid.CreateVersion7()}";
@@ -185,14 +184,15 @@ public async Task MultiStreamProjection_WithOutOfOrderEventsAndMarten_ShouldSucc
185184
options.DatabaseSchemaName = options.Events.DatabaseSchemaName = "Exercise18MultiStreamOutOfOrderMarten";
186185
options.AutoCreateSchemaObjects = AutoCreate.All;
187186

188-
// TODO: This projection was built assuming ordered events. Run the test — it fails.
189-
// Events can arrive out of order (e.g. from different RabbitMQ queues or Kafka topics).
190-
// Fix it to handle out-of-order events and derive the verification decision.
191-
192-
options.Projections.Add<PaymentVerificationProjection>(ProjectionLifecycle.Inline);
187+
options.Projections.Add<PaymentVerificationProjection>(ProjectionLifecycle.Async);
193188
options.Events.StreamIdentity = StreamIdentity.AsString;
194189
});
195190

191+
// Let's start Async Daemon to process async projections in the background
192+
// Read more: https://martendb.io/events/projections/async-daemon.html#async-projections-daemon
193+
using var daemon = await documentStore.BuildProjectionDaemonAsync();
194+
await daemon.StartAllAsync();
195+
196196
await using var session = documentStore.LightweightSession();
197197

198198
// Payment 1: Approved — FraudScore arrives first
@@ -221,6 +221,8 @@ public async Task MultiStreamProjection_WithOutOfOrderEventsAndMarten_ShouldSucc
221221

222222
await session.SaveChangesAsync();
223223

224+
await daemon.WaitForNonStaleData(TimeSpan.FromSeconds(5));
225+
224226
// Assert Payment 1: Approved
225227
var payment1 = await session.LoadAsync<PaymentVerification>(payment1Id);
226228
payment1.Should().NotBeNull();

Workshops/IntroductionToEventSourcing/Solved/18-Projections.MultiStream.OutOfOrder.Marten/README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,13 @@ Same out-of-order context as Exercise 16: events can arrive in any order (e.g.,
1818
4. **Track data quality**: Add a `DataQuality` enum and field to track completeness
1919
5. **Use RaiseSideEffects**: Override `RaiseSideEffects` to publish `PaymentVerificationCompleted` using `slice.AppendEvent()` when a decision is made
2020

21+
Decision logic:
22+
- Reject if merchant failed
23+
- Reject if fraud score > 0.75
24+
- Reject if amount > 10000 AND fraud score > 0.5
25+
- Otherwise approve
26+
27+
2128
## RaiseSideEffects Pattern
2229

2330
```csharp
@@ -39,3 +46,5 @@ public override ValueTask RaiseSideEffects(
3946

4047
- [Dealing with Race Conditions in Event-Driven Architecture](https://www.architecture-weekly.com/p/dealing-with-race-conditions-in-event)
4148
- [Marten Multi-Stream Projections](https://martendb.io/events/projections/multi-stream-projections.html)
49+
- [Marten Async Daemon - Backround worker processing async projections](https://martendb.io/events/projections/async-daemon.html#async-projections-daemon)
50+

0 commit comments

Comments
 (0)