Skip to content

Commit 8b4b117

Browse files
committed
Fix #115 Concurrency issue when adding documents.
include date on lock Tweaked logic
1 parent a4354cd commit 8b4b117

File tree

7 files changed

+131
-11
lines changed

7 files changed

+131
-11
lines changed

src/Foundatio.Repositories.Elasticsearch/Configuration/DailyIndex.cs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using System.Threading.Tasks;
88
using Exceptionless.DateTimeExtensions;
99
using Foundatio.Caching;
10+
using Foundatio.Lock;
1011
using Foundatio.Parsers.ElasticQueries;
1112
using Foundatio.Parsers.ElasticQueries.Extensions;
1213
using Foundatio.Repositories.Elasticsearch.Extensions;
@@ -30,19 +31,22 @@ public class DailyIndex : VersionedIndex
3031
private TimeSpan? _maxIndexAge;
3132
protected readonly Func<object, DateTime> _getDocumentDateUtc;
3233
protected readonly string[] _defaultIndexes;
34+
private readonly CacheLockProvider _ensureIndexLock;
35+
private readonly Dictionary<DateTime, object> _ensuredDates = new();
3336

3437
public DailyIndex(IElasticConfiguration configuration, string name, int version = 1, Func<object, DateTime> getDocumentDateUtc = null)
3538
: base(configuration, name, version)
3639
{
3740
AddAlias(Name);
3841
_frozenAliases = new Lazy<IReadOnlyCollection<IndexAliasAge>>(() => _aliases.AsReadOnly());
3942
_aliasCache = new ScopedCacheClient(configuration.Cache, "alias");
43+
_ensureIndexLock = new CacheLockProvider(configuration.Cache, configuration.MessageBus, configuration.LoggerFactory);
4044
_getDocumentDateUtc = getDocumentDateUtc;
4145
_defaultIndexes = new[] { Name };
4246
HasMultipleIndexes = true;
4347

4448
if (_getDocumentDateUtc != null)
45-
_getDocumentDateUtc = (document) =>
49+
_getDocumentDateUtc = document =>
4650
{
4751
var date = getDocumentDateUtc(document);
4852
return date != DateTime.MinValue ? date : DefaultDocumentDateFunc(document);
@@ -131,13 +135,19 @@ protected override DateTime GetIndexDate(string index)
131135
return DateTime.MaxValue;
132136
}
133137

134-
private readonly Dictionary<DateTime, object> _ensuredDates = new();
135138
protected async Task EnsureDateIndexAsync(DateTime utcDate)
136139
{
137140
utcDate = utcDate.Date;
138141
if (_ensuredDates.ContainsKey(utcDate))
139142
return;
140143

144+
await using var indexLock = await _ensureIndexLock.AcquireAsync($"Index:{GetVersionedIndex(utcDate)}", TimeSpan.FromMinutes(1)).AnyContext();
145+
if (indexLock is null)
146+
throw new Exception("Unable to acquire index lock");
147+
148+
if (_ensuredDates.ContainsKey(utcDate))
149+
return;
150+
141151
var indexExpirationUtcDate = GetIndexExpirationDate(utcDate);
142152
if (Configuration.TimeProvider.GetUtcNow().UtcDateTime > indexExpirationUtcDate)
143153
throw new ArgumentException($"Index max age exceeded: {indexExpirationUtcDate}", nameof(utcDate));
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
using System;
2+
using System.Linq;
3+
using System.Threading.Tasks;
4+
using Foundatio.Repositories.Elasticsearch.Tests.Repositories;
5+
using Foundatio.Repositories.Elasticsearch.Tests.Repositories.Models;
6+
using Foundatio.Repositories.Models;
7+
using Foundatio.Utility;
8+
using Microsoft.Extensions.Time.Testing;
9+
using Xunit;
10+
using Xunit.Abstractions;
11+
12+
namespace Foundatio.Repositories.Elasticsearch.Tests;
13+
14+
public sealed class DailyRepositoryTests : ElasticRepositoryTestBase
15+
{
16+
private readonly IFileAccessHistoryRepository _fileAccessHistoryRepository;
17+
18+
public DailyRepositoryTests(ITestOutputHelper output) : base(output)
19+
{
20+
_fileAccessHistoryRepository = new FileAccessHistoryRepository(_configuration.DailyFileAccessHistory);
21+
}
22+
23+
public override async Task InitializeAsync()
24+
{
25+
await base.InitializeAsync();
26+
await RemoveDataAsync();
27+
}
28+
29+
[Fact]
30+
public async Task AddAsyncWithCustomDateIndex()
31+
{
32+
var utcNow = new DateTime(2023, 1, 1, 0, 0, 0, DateTimeKind.Utc);
33+
var history = await _fileAccessHistoryRepository.AddAsync(new FileAccessHistory { Path = "path1", AccessedDateUtc = utcNow }, o => o.ImmediateConsistency());
34+
Assert.NotNull(history?.Id);
35+
36+
var result = await _fileAccessHistoryRepository.FindOneAsync(f => f.Id(history.Id));
37+
Assert.Equal("file-access-history-daily-v1-2023.01.01", result.Data.GetString("index"));
38+
}
39+
40+
[Fact]
41+
public async Task AddAsyncWithCurrentDateViaDocumentsAdding()
42+
{
43+
_configuration.TimeProvider = new FakeTimeProvider(new DateTimeOffset(2023, 02, 1, 0, 0, 0, TimeSpan.Zero));
44+
45+
try
46+
{
47+
// NOTE: This has to be async handler as there is no way to remove a sync handler.
48+
_fileAccessHistoryRepository.DocumentsAdding.AddHandler(OnDocumentsAdding);
49+
50+
var history = await _fileAccessHistoryRepository.AddAsync(new FileAccessHistory { Path = "path2" }, o => o.ImmediateConsistency());
51+
Assert.NotNull(history?.Id);
52+
53+
var result = await _fileAccessHistoryRepository.FindOneAsync(f => f.Id(history.Id));
54+
Assert.Equal("file-access-history-daily-v1-2023.02.01", result.Data.GetString("index"));
55+
}
56+
finally
57+
{
58+
_fileAccessHistoryRepository.DocumentsAdding.RemoveHandler(OnDocumentsAdding);
59+
}
60+
}
61+
62+
private Task OnDocumentsAdding(object sender, DocumentsEventArgs<FileAccessHistory> arg)
63+
{
64+
foreach (var document in arg.Documents)
65+
{
66+
if (document.AccessedDateUtc == DateTime.MinValue || document.AccessedDateUtc > _configuration.TimeProvider.GetUtcNow().UtcDateTime)
67+
document.AccessedDateUtc = _configuration.TimeProvider.GetUtcNow().UtcDateTime;
68+
}
69+
70+
return Task.CompletedTask;
71+
}
72+
73+
[Fact]
74+
public async Task CanAddAsync()
75+
{
76+
var history = await _fileAccessHistoryRepository.AddAsync(new FileAccessHistory { AccessedDateUtc = DateTime.UtcNow });
77+
Assert.NotNull(history?.Id);
78+
}
79+
80+
[Fact]
81+
public Task AddAsyncConcurrentUpdates()
82+
{
83+
return Parallel.ForEachAsync(Enumerable.Range(0, 50), async (i, _) =>
84+
{
85+
var history = await _fileAccessHistoryRepository.AddAsync(new FileAccessHistory { AccessedDateUtc = DateTime.UtcNow });
86+
Assert.NotNull(history?.Id);
87+
});
88+
}
89+
}

tests/Foundatio.Repositories.Elasticsearch.Tests/IndexTests.cs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
using System;
1+
using System;
22
using System.Collections.Generic;
33
using System.Linq;
44
using System.Threading.Tasks;
@@ -120,10 +120,8 @@ public async Task GetByDateBasedIndexAsync()
120120

121121
await _configuration.DailyLogEvents.ConfigureAsync();
122122

123-
124-
// TODO: Fix this once https://github.com/elastic/elasticsearch-net/issues/3829 is fixed in beta2
125-
//var indexes = await _client.GetIndicesPointingToAliasAsync(_configuration.DailyLogEvents.Name);
126-
//Assert.Empty(indexes);
123+
var indexes = await _client.GetIndicesPointingToAliasAsync(_configuration.DailyLogEvents.Name);
124+
Assert.Empty(indexes);
127125

128126
var alias = await _client.Indices.GetAliasAsync(_configuration.DailyLogEvents.Name);
129127
_logger.LogRequest(alias);
@@ -142,7 +140,7 @@ public async Task GetByDateBasedIndexAsync()
142140
Assert.True(alias.IsValid);
143141
Assert.Equal(2, alias.Indices.Count);
144142

145-
var indexes = await _client.GetIndicesPointingToAliasAsync(_configuration.DailyLogEvents.Name);
143+
indexes = await _client.GetIndicesPointingToAliasAsync(_configuration.DailyLogEvents.Name);
146144
Assert.Equal(2, indexes.Count);
147145

148146
await repository.RemoveAllAsync(o => o.ImmediateConsistency());

tests/Foundatio.Repositories.Elasticsearch.Tests/MonthlyRepositoryTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public sealed class MonthlyRepositoryTests : ElasticRepositoryTestBase
1616

1717
public MonthlyRepositoryTests(ITestOutputHelper output) : base(output)
1818
{
19-
_fileAccessHistoryRepository = new FileAccessHistoryRepository(_configuration);
19+
_fileAccessHistoryRepository = new FileAccessHistoryRepository(_configuration.MonthlyFileAccessHistory);
2020
}
2121

2222
public override async Task InitializeAsync()
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
using Foundatio.Repositories.Elasticsearch.Configuration;
2+
using Foundatio.Repositories.Elasticsearch.Tests.Repositories.Models;
3+
using Nest;
4+
5+
namespace Foundatio.Repositories.Elasticsearch.Tests.Repositories.Configuration.Indexes;
6+
7+
public sealed class DailyFileAccessHistoryIndex : DailyIndex<FileAccessHistory>
8+
{
9+
public DailyFileAccessHistoryIndex(IElasticConfiguration configuration) : base(configuration, "file-access-history-daily", 1, d => ((FileAccessHistory)d).AccessedDateUtc)
10+
{
11+
}
12+
13+
public override CreateIndexDescriptor ConfigureIndex(CreateIndexDescriptor idx)
14+
{
15+
return base.ConfigureIndex(idx.Settings(s => s.NumberOfReplicas(0).NumberOfShards(1)));
16+
}
17+
}

tests/Foundatio.Repositories.Elasticsearch.Tests/Repositories/Configuration/MyAppElasticConfiguration.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public MyAppElasticConfiguration(IQueue<WorkItemData> workItemQueue, ICacheClien
2727
AddIndex(DailyLogEvents = new DailyLogEventIndex(this));
2828
AddIndex(MonthlyLogEvents = new MonthlyLogEventIndex(this));
2929
AddIndex(ParentChild = new ParentChildIndex(this));
30+
AddIndex(DailyFileAccessHistory = new DailyFileAccessHistoryIndex(this));
3031
AddIndex(MonthlyFileAccessHistory = new MonthlyFileAccessHistoryIndex(this));
3132
AddCustomFieldIndex(replicas: 0);
3233
}
@@ -95,5 +96,6 @@ protected override void ConfigureSettings(ConnectionSettings settings)
9596
public DailyLogEventIndex DailyLogEvents { get; }
9697
public MonthlyLogEventIndex MonthlyLogEvents { get; }
9798
public ParentChildIndex ParentChild { get; }
99+
public DailyFileAccessHistoryIndex DailyFileAccessHistory { get; }
98100
public MonthlyFileAccessHistoryIndex MonthlyFileAccessHistory { get; }
99101
}
Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
using Foundatio.Repositories.Elasticsearch.Tests.Repositories.Configuration;
1+
using Foundatio.Repositories.Elasticsearch.Configuration;
22
using Foundatio.Repositories.Elasticsearch.Tests.Repositories.Models;
33

44
namespace Foundatio.Repositories.Elasticsearch.Tests.Repositories;
@@ -7,7 +7,11 @@ public interface IFileAccessHistoryRepository : ISearchableRepository<FileAccess
77

88
public class FileAccessHistoryRepository : ElasticRepositoryBase<FileAccessHistory>, IFileAccessHistoryRepository
99
{
10-
public FileAccessHistoryRepository(MyAppElasticConfiguration elasticConfiguration) : base(elasticConfiguration.MonthlyFileAccessHistory)
10+
public FileAccessHistoryRepository(DailyIndex<FileAccessHistory> dailyIndex) : base(dailyIndex)
11+
{
12+
}
13+
14+
public FileAccessHistoryRepository(MonthlyIndex<FileAccessHistory> monthlyIndex) : base(monthlyIndex)
1115
{
1216
}
1317
}

0 commit comments

Comments
 (0)