Skip to content

Commit ccc714c

Browse files
authored
Merge pull request #1742 from c-teleport/feature/mongo-index-cleanup
Mongo index cleanup
2 parents 843e6d1 + d8b2411 commit ccc714c

File tree

1 file changed

+48
-2
lines changed

1 file changed

+48
-2
lines changed

src/DotNetCore.CAP.MongoDB/IStorageInitializer.MongoDB.cs

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the MIT License. See License.txt in the project root for license information.
33

44
using System;
5+
using System.Collections.Generic;
56
using System.Linq;
67
using System.Threading;
78
using System.Threading.Tasks;
@@ -67,6 +68,10 @@ await database.CreateCollectionAsync(options.PublishedCollection, cancellationTo
6768
await database.CreateCollectionAsync(options.LockCollection, cancellationToken: cancellationToken)
6869
.ConfigureAwait(false);
6970

71+
await Task.WhenAll(
72+
DropReceivedMessageDeprecatedIndexesAsync(),
73+
DropPublishedMessageDeprecatedIndexesAsync()).ConfigureAwait(false);
74+
7075
await Task.WhenAll(
7176
CreateReceivedMessageIndexesAsync(),
7277
CreatePublishedMessageIndexesAsync()).ConfigureAwait(false);
@@ -98,7 +103,6 @@ async Task CreateReceivedMessageIndexesAsync()
98103
new(builder.Ascending(x => x.Name)),
99104
new(builder.Ascending(x => x.Added)),
100105
new(builder.Ascending(x => x.ExpiresAt)),
101-
new(builder.Ascending(x => x.StatusName)),
102106
new(builder.Ascending(x => x.Retries)),
103107
new(builder.Ascending(x => x.Version)),
104108
new(builder.Ascending(x => x.StatusName).Ascending(x => x.ExpiresAt))
@@ -117,13 +121,55 @@ async Task CreatePublishedMessageIndexesAsync()
117121
new(builder.Ascending(x => x.Name)),
118122
new(builder.Ascending(x => x.Added)),
119123
new(builder.Ascending(x => x.ExpiresAt)),
120-
new(builder.Ascending(x => x.StatusName)),
121124
new(builder.Ascending(x => x.Retries)),
122125
new(builder.Ascending(x => x.Version)),
123126
new(builder.Ascending(x => x.StatusName).Ascending(x => x.ExpiresAt))
124127
};
125128

126129
await col.Indexes.CreateManyAsync(indexes, cancellationToken);
127130
}
131+
132+
async Task DropReceivedMessageDeprecatedIndexesAsync()
133+
{
134+
var obsoleteIndexes = new HashSet<string> { "Name", "Added", "ExpiresAt", "StatusName", "Retries", "Version" };
135+
136+
var col = database.GetCollection<ReceivedMessage>(options.ReceivedCollection);
137+
138+
await DropIndexesAsync(col, obsoleteIndexes);
139+
}
140+
141+
async Task DropPublishedMessageDeprecatedIndexesAsync()
142+
{
143+
var obsoleteIndexes = new HashSet<string> { "Name", "Added", "ExpiresAt", "StatusName", "Retries", "Version" };
144+
145+
var col = database.GetCollection<PublishedMessage>(options.PublishedCollection);
146+
147+
await DropIndexesAsync(col, obsoleteIndexes);
148+
149+
}
150+
151+
async Task DropIndexesAsync<T>(IMongoCollection<T> col, ISet<string> obsoleteIndexes)
152+
{
153+
using var cursor = await col.Indexes.ListAsync(cancellationToken);
154+
var indexList = await cursor.ToListAsync(cancellationToken);
155+
156+
foreach (var index in indexList)
157+
{
158+
var indexName = index["name"].AsString;
159+
if (!obsoleteIndexes.Contains(indexName)) continue;
160+
161+
try
162+
{
163+
await col.Indexes.DropOneAsync(indexName, cancellationToken);
164+
}
165+
catch (MongoCommandException ex) when (ex.CodeName == "IndexNotFound")
166+
{
167+
_logger.LogWarning(
168+
"Index '{IndexName}' on collection '{CollectionName}' was not found when attempting to drop it. " +
169+
"This may indicate concurrent initialization or an unexpected state. Verify deployment strategy if this happens repeatedly.",
170+
indexName, col.CollectionNamespace.CollectionName);
171+
}
172+
}
173+
}
128174
}
129175
}

0 commit comments

Comments
 (0)