Skip to content

Commit b6b3f13

Browse files
Add a cap of 50000 saga state change transitions to the SagaDetailsIndex (#3882)
* Add a cap of 50000 saga state change transitions to the SagaDetailsIndex * Add test structure for checking that incorrect index is deleted * Add a test for saga details index replacement at start * Add test structure for checking that incorrect index is deleted (#3883) * Add test structure for checking that incorrect index is deleted * Add a test for saga details index replacement at start --------- Co-authored-by: Szymon Pobiega <[email protected]> * Refactor the test * Add a unit test for the Raven 3.5 version of the audit storage --------- Co-authored-by: Szymon Pobiega <[email protected]>
1 parent 9f180fa commit b6b3f13

File tree

7 files changed

+273
-0
lines changed

7 files changed

+273
-0
lines changed

src/ServiceControl.Audit.Persistence.RavenDb/Indexes/SagaDetailsIndex.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ into g
5050
SagaType = first.SagaType,
5151
Changes = g.SelectMany(x => x.Changes)
5252
.OrderByDescending(x => x.FinishTime)
53+
.Take(50000)
5354
.ToList()
5455
};
5556
}

src/ServiceControl.Audit.Persistence.RavenDb/RavenDbInstaller.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System.Threading.Tasks;
55
using Audit.Infrastructure.Migration;
66
using NServiceBus.Logging;
7+
using Raven.Client;
78
using Raven.Client.Embedded;
89
using Raven.Client.Indexes;
910
using RavenDB;
@@ -22,6 +23,8 @@ public async Task Install(CancellationToken cancellationToken)
2223
documentStore.Initialize();
2324
Logger.Info("Database initialization complete");
2425

26+
DeleteLegacySagaDetailsIndex(documentStore);
27+
2528
Logger.Info("Index creation started");
2629
var indexProvider = ravenStartup.CreateIndexProvider();
2730
await IndexCreation.CreateIndexesAsync(indexProvider, documentStore)
@@ -37,6 +40,20 @@ await endpointMigrations.Migrate(cancellationToken: cancellationToken)
3740
Logger.Info("Data migrations complete");
3841
}
3942

43+
public static void DeleteLegacySagaDetailsIndex(IDocumentStore documentStore)
44+
{
45+
// If the SagaDetailsIndex exists but does not have a .Take(50000), then we remove the current SagaDetailsIndex and
46+
// create a new one. If we do not remove the current one, then RavenDB will attempt to do a side-by-side migration.
47+
// Doing a side-by-side migration results in the index never swapping if there is constant ingestion as RavenDB will wait.
48+
// for the index to not be stale before swapping to the new index. Constant ingestion means the index will never be not-stale.
49+
// This needs to stay in place on version v4.x.x indefinitely.
50+
var sagaDetailsIndexDefinition = documentStore.DatabaseCommands.GetIndex("SagaDetailsIndex");
51+
if (sagaDetailsIndexDefinition != null && !sagaDetailsIndexDefinition.Reduce.Contains("Take(50000)"))
52+
{
53+
documentStore.DatabaseCommands.DeleteIndex("SagaDetailsIndex");
54+
}
55+
}
56+
4057
readonly EmbeddableDocumentStore documentStore;
4158
readonly RavenStartup ravenStartup;
4259

src/ServiceControl.Audit.Persistence.RavenDb/RavenDbPersistenceLifecycle.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ public async Task Start(CancellationToken cancellationToken)
2222
documentStore.Initialize();
2323
Logger.Info("Database initialization complete");
2424

25+
RavenDbInstaller.DeleteLegacySagaDetailsIndex(documentStore);
26+
2527
Logger.Info("Index creation started");
2628
var indexProvider = ravenStartup.CreateIndexProvider();
2729
await IndexCreation.CreateIndexesAsync(indexProvider, documentStore)

src/ServiceControl.Audit.Persistence.RavenDb5/DatabaseSetup.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ await documentStore.Maintenance.Server
4848
new SagaDetailsIndex()
4949
};
5050

51+
await DeleteLegacySagaDetailsIndex(documentStore, cancellationToken).ConfigureAwait(false);
52+
5153
if (configuration.EnableFullTextSearch)
5254
{
5355
indexList.Add(new MessagesViewIndexWithFullTextSearch());
@@ -74,6 +76,21 @@ await documentStore.Maintenance.SendAsync(new ConfigureExpirationOperation(expir
7476
.ConfigureAwait(false);
7577
}
7678

79+
public static async Task DeleteLegacySagaDetailsIndex(IDocumentStore documentStore, CancellationToken cancellationToken)
80+
{
81+
// If the SagaDetailsIndex exists but does not have a .Take(50000), then we remove the current SagaDetailsIndex and
82+
// create a new one. If we do not remove the current one, then RavenDB will attempt to do a side-by-side migration.
83+
// Doing a side-by-side migration results in the index never swapping if there is constant ingestion as RavenDB will wait.
84+
// for the index to not be stale before swapping to the new index. Constant ingestion means the index will never be not-stale.
85+
// This needs to stay in place on version v4.x.x indefinitely.
86+
var sagaDetailsIndexOperation = new GetIndexOperation("SagaDetailsIndex");
87+
var sagaDetailsIndexDefinition = await documentStore.Maintenance.SendAsync(sagaDetailsIndexOperation, cancellationToken).ConfigureAwait(false);
88+
if (sagaDetailsIndexDefinition != null && !sagaDetailsIndexDefinition.Reduce.Contains("Take(50000)"))
89+
{
90+
await documentStore.Maintenance.SendAsync(new DeleteIndexOperation("SagaDetailsIndex"), cancellationToken).ConfigureAwait(false);
91+
}
92+
}
93+
7794
readonly DatabaseConfiguration configuration;
7895
}
7996
}

src/ServiceControl.Audit.Persistence.RavenDb5/Indexes/SagaDetailsIndex.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ into g
5050
SagaType = first.SagaType,
5151
Changes = g.SelectMany(x => x.Changes)
5252
.OrderByDescending(x => x.FinishTime)
53+
.Take(50000)
5354
.ToList()
5455
};
5556

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
namespace ServiceControl.Audit.Persistence.Tests
2+
{
3+
using System;
4+
using System.Configuration;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using global::Raven.Abstractions.Indexing;
8+
using NUnit.Framework;
9+
using ServiceControl.Audit.Persistence.RavenDb;
10+
using ServiceControl.SagaAudit;
11+
12+
[TestFixture]
13+
class SagaDetailsIndexTests : PersistenceTestFixture
14+
{
15+
[Test]
16+
public void Deletes_index_that_does_not_have_cap_of_50000()
17+
{
18+
configuration.DocumentStore.DatabaseCommands.DeleteIndex("SagaDetailsIndex");
19+
20+
var indexWithout50000capDefinition = new IndexDefinition
21+
{
22+
Name = "SagaDetailsIndex",
23+
Maps = new System.Collections.Generic.HashSet<string>
24+
{
25+
@"from doc in docs
26+
select new
27+
{
28+
doc.SagaId,
29+
Id = doc.SagaId,
30+
doc.SagaType,
31+
Changes = new[]
32+
{
33+
new
34+
{
35+
Endpoint = doc.Endpoint,
36+
FinishTime = doc.FinishTime,
37+
InitiatingMessage = doc.InitiatingMessage,
38+
OutgoingMessages = doc.OutgoingMessages,
39+
StartTime = doc.StartTime,
40+
StateAfterChange = doc.StateAfterChange,
41+
Status = doc.Status
42+
}
43+
}
44+
}"
45+
},
46+
Reduce = @"from result in results
47+
group result by result.SagaId
48+
into g
49+
let first = g.First()
50+
select new
51+
{
52+
Id = first.SagaId,
53+
SagaId = first.SagaId,
54+
SagaType = first.SagaType,
55+
Changes = g.SelectMany(x => x.Changes)
56+
.OrderByDescending(x => x.FinishTime)
57+
.ToList()
58+
}"
59+
};
60+
61+
configuration.DocumentStore.DatabaseCommands.PutIndex("SagaDetailsIndex", indexWithout50000capDefinition);
62+
63+
var sagaDetailsIndexDefinition = configuration.DocumentStore.DatabaseCommands.GetIndex("SagaDetailsIndex");
64+
65+
Assert.IsNotNull(sagaDetailsIndexDefinition);
66+
67+
RavenDbInstaller.DeleteLegacySagaDetailsIndex(configuration.DocumentStore);
68+
69+
sagaDetailsIndexDefinition = configuration.DocumentStore.DatabaseCommands.GetIndex("SagaDetailsIndex");
70+
71+
Assert.IsNull(sagaDetailsIndexDefinition);
72+
}
73+
74+
[Test]
75+
public void Does_not_delete_index_that_does_have_cap_of_50000()
76+
{
77+
RavenDbInstaller.DeleteLegacySagaDetailsIndex(configuration.DocumentStore);
78+
79+
var sagaDetailsIndexDefinition = configuration.DocumentStore.DatabaseCommands.GetIndex("SagaDetailsIndex");
80+
81+
Assert.IsNotNull(sagaDetailsIndexDefinition);
82+
}
83+
84+
[Test]
85+
public async Task Should_only_reduce_the_last_50000_saga_state_changes()
86+
{
87+
var sagaType = "MySagaType";
88+
var sagaState = "some-saga-state";
89+
90+
await IngestSagaAudits(new SagaSnapshot
91+
{
92+
SagaId = Guid.NewGuid(),
93+
SagaType = sagaType,
94+
Status = SagaStateChangeStatus.New,
95+
StateAfterChange = sagaState
96+
});
97+
98+
await configuration.CompleteDBOperation();
99+
100+
var sagaDetailsIndexDefinition = configuration.DocumentStore.DatabaseCommands.GetIndex("SagaDetailsIndex");
101+
Assert.IsTrue(sagaDetailsIndexDefinition.Reduce.Contains("Take(50000)"), "The SagaDetails index definition does not contain a .Take(50000) to limit the number of saga state changes that are reduced by the map/reduce");
102+
}
103+
104+
async Task IngestSagaAudits(params SagaSnapshot[] snapshots)
105+
{
106+
var unitOfWork = StartAuditUnitOfWork(snapshots.Length);
107+
foreach (var snapshot in snapshots)
108+
{
109+
await unitOfWork.RecordSagaSnapshot(snapshot);
110+
}
111+
await unitOfWork.DisposeAsync();
112+
await configuration.CompleteDBOperation();
113+
}
114+
}
115+
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
namespace ServiceControl.Audit.Persistence.Tests
2+
{
3+
using System;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using NUnit.Framework;
7+
using Raven.Client.Documents.Indexes;
8+
using Raven.Client.Documents.Operations.Indexes;
9+
using ServiceControl.SagaAudit;
10+
11+
[TestFixture]
12+
class SagaDetailsIndexTests : PersistenceTestFixture
13+
{
14+
[Test]
15+
public async Task Deletes_index_that_does_not_have_cap_of_50000()
16+
{
17+
await configuration.DocumentStore.Maintenance.SendAsync(new DeleteIndexOperation("SagaDetailsIndex"));
18+
19+
var indexWithout50000capDefinition = new IndexDefinition
20+
{
21+
Name = "SagaDetailsIndex",
22+
Maps = new System.Collections.Generic.HashSet<string>
23+
{
24+
@"from doc in docs
25+
select new
26+
{
27+
doc.SagaId,
28+
Id = doc.SagaId,
29+
doc.SagaType,
30+
Changes = new[]
31+
{
32+
new
33+
{
34+
Endpoint = doc.Endpoint,
35+
FinishTime = doc.FinishTime,
36+
InitiatingMessage = doc.InitiatingMessage,
37+
OutgoingMessages = doc.OutgoingMessages,
38+
StartTime = doc.StartTime,
39+
StateAfterChange = doc.StateAfterChange,
40+
Status = doc.Status
41+
}
42+
}
43+
}"
44+
},
45+
Reduce = @"from result in results
46+
group result by result.SagaId
47+
into g
48+
let first = g.First()
49+
select new
50+
{
51+
Id = first.SagaId,
52+
SagaId = first.SagaId,
53+
SagaType = first.SagaType,
54+
Changes = g.SelectMany(x => x.Changes)
55+
.OrderByDescending(x => x.FinishTime)
56+
.ToList()
57+
}"
58+
};
59+
60+
var putIndexesOp = new PutIndexesOperation(indexWithout50000capDefinition);
61+
62+
await configuration.DocumentStore.Maintenance.SendAsync(putIndexesOp);
63+
64+
var sagaDetailsIndexOperation = new GetIndexOperation("SagaDetailsIndex");
65+
var sagaDetailsIndexDefinition = await configuration.DocumentStore.Maintenance.SendAsync(sagaDetailsIndexOperation);
66+
67+
Assert.IsNotNull(sagaDetailsIndexDefinition);
68+
69+
await RavenDb.DatabaseSetup.DeleteLegacySagaDetailsIndex(configuration.DocumentStore, CancellationToken.None);
70+
71+
sagaDetailsIndexDefinition = await configuration.DocumentStore.Maintenance.SendAsync(sagaDetailsIndexOperation);
72+
73+
Assert.IsNull(sagaDetailsIndexDefinition);
74+
}
75+
76+
[Test]
77+
public async Task Does_not_delete_index_that_does_have_cap_of_50000()
78+
{
79+
await RavenDb.DatabaseSetup.DeleteLegacySagaDetailsIndex(configuration.DocumentStore, CancellationToken.None);
80+
81+
var sagaDetailsIndexOperation = new GetIndexOperation("SagaDetailsIndex");
82+
var sagaDetailsIndexDefinition = await configuration.DocumentStore.Maintenance.SendAsync(sagaDetailsIndexOperation);
83+
84+
Assert.IsNotNull(sagaDetailsIndexDefinition);
85+
}
86+
87+
[Test]
88+
public async Task Should_only_reduce_the_last_50000_saga_state_changes()
89+
{
90+
var sagaType = "MySagaType";
91+
var sagaState = "some-saga-state";
92+
93+
await IngestSagaAudits(new SagaSnapshot
94+
{
95+
SagaId = Guid.NewGuid(),
96+
SagaType = sagaType,
97+
Status = SagaStateChangeStatus.New,
98+
StateAfterChange = sagaState
99+
});
100+
101+
await configuration.CompleteDBOperation();
102+
103+
var sagaDetailsIndexOperation = new GetIndexOperation("SagaDetailsIndex");
104+
var sagaDetailsIndexDefinition = await configuration.DocumentStore.Maintenance.SendAsync(sagaDetailsIndexOperation);
105+
106+
Assert.IsTrue(sagaDetailsIndexDefinition.Reduce.Contains("Take(50000)"), "The SagaDetails index definition does not contain a .Take(50000) to limit the number of saga state changes that are reduced by the map/reduce");
107+
}
108+
109+
async Task IngestSagaAudits(params SagaSnapshot[] snapshots)
110+
{
111+
var unitOfWork = StartAuditUnitOfWork(snapshots.Length);
112+
foreach (var snapshot in snapshots)
113+
{
114+
await unitOfWork.RecordSagaSnapshot(snapshot);
115+
}
116+
await unitOfWork.DisposeAsync();
117+
await configuration.CompleteDBOperation();
118+
}
119+
}
120+
}

0 commit comments

Comments
 (0)