diff --git a/src/SIL.Harmony.Sample/Changes/NewWordChange.cs b/src/SIL.Harmony.Sample/Changes/NewWordChange.cs index fd01fea..6629f8b 100644 --- a/src/SIL.Harmony.Sample/Changes/NewWordChange.cs +++ b/src/SIL.Harmony.Sample/Changes/NewWordChange.cs @@ -4,13 +4,15 @@ namespace SIL.Harmony.Sample.Changes; -public class NewWordChange(Guid entityId, string text, string? note = null) : CreateChange(entityId), ISelfNamedType +public class NewWordChange(Guid entityId, string text, string? note = null, Guid? antonymId = null) : CreateChange(entityId), ISelfNamedType { public string Text { get; } = text; public string? Note { get; } = note; + public Guid? AntonymId { get; } = antonymId; - public override ValueTask NewEntity(Commit commit, ChangeContext context) + public override async ValueTask NewEntity(Commit commit, ChangeContext context) { - return new(new Word { Text = Text, Note = Note, Id = EntityId }); + var antonymShouldBeNull = AntonymId is null || (await context.IsObjectDeleted(AntonymId.Value)); + return (new Word { Text = Text, Note = Note, Id = EntityId, AntonymId = antonymShouldBeNull ? null : AntonymId }); } } diff --git a/src/SIL.Harmony.Sample/Changes/SetTagChange.cs b/src/SIL.Harmony.Sample/Changes/SetTagChange.cs new file mode 100644 index 0000000..5f5933c --- /dev/null +++ b/src/SIL.Harmony.Sample/Changes/SetTagChange.cs @@ -0,0 +1,26 @@ +using SIL.Harmony.Changes; +using SIL.Harmony.Entities; +using SIL.Harmony.Sample.Models; + +namespace SIL.Harmony.Sample.Changes; + +public class SetTagChange(Guid entityId, string text) : Change(entityId), ISelfNamedType +{ + public string Text { get; } = text; + + public override ValueTask NewEntity(Commit commit, ChangeContext context) + { + return new(new Tag() + { + Id = EntityId, + Text = Text + }); + } + + + public override ValueTask ApplyChange(Tag entity, ChangeContext context) + { + entity.Text = Text; + return ValueTask.CompletedTask; + } +} diff --git a/src/SIL.Harmony.Sample/CrdtSampleKernel.cs b/src/SIL.Harmony.Sample/CrdtSampleKernel.cs index 410eb93..348d220 100644 --- a/src/SIL.Harmony.Sample/CrdtSampleKernel.cs +++ b/src/SIL.Harmony.Sample/CrdtSampleKernel.cs @@ -48,9 +48,11 @@ public static IServiceCollection AddCrdtDataSample(this IServiceCollection servi .Add() .Add>() .Add() + .Add() .Add>() .Add>() .Add>() + .Add>() ; config.ObjectTypeListBuilder.DefaultAdapter() .Add() @@ -61,7 +63,11 @@ public static IServiceCollection AddCrdtDataSample(this IServiceCollection servi .HasForeignKey(d => d.WordId) .OnDelete(DeleteBehavior.Cascade); }) - .Add(); + .Add() + .Add(builder => + { + builder.HasIndex(tag => tag.Text).IsUnique(); + }); }); return services; } diff --git a/src/SIL.Harmony.Sample/Models/Tag.cs b/src/SIL.Harmony.Sample/Models/Tag.cs new file mode 100644 index 0000000..1b3c3d5 --- /dev/null +++ b/src/SIL.Harmony.Sample/Models/Tag.cs @@ -0,0 +1,30 @@ +using SIL.Harmony.Entities; + +namespace SIL.Harmony.Sample.Models; + +public class Tag : IObjectBase +{ + public required string Text { get; set; } + + public Guid Id { get; init; } + public DateTimeOffset? DeletedAt { get; set; } + + public Guid[] GetReferences() + { + return []; + } + + public void RemoveReference(Guid id, Commit commit) + { + } + + public IObjectBase Copy() + { + return new Tag + { + Id = Id, + Text = Text, + DeletedAt = DeletedAt + }; + } +} \ No newline at end of file diff --git a/src/SIL.Harmony.Tests/DataModelReferenceTests.cs b/src/SIL.Harmony.Tests/DataModelReferenceTests.cs index e871630..65dcb0f 100644 --- a/src/SIL.Harmony.Tests/DataModelReferenceTests.cs +++ b/src/SIL.Harmony.Tests/DataModelReferenceTests.cs @@ -1,4 +1,5 @@ -using SIL.Harmony.Changes; +using Microsoft.EntityFrameworkCore; +using SIL.Harmony.Changes; using SIL.Harmony.Sample.Changes; using SIL.Harmony.Sample.Models; using SIL.Harmony.Tests; @@ -65,4 +66,22 @@ public async Task DeleteRetroactivelyRemovesRefs() var entry = await DataModel.GetLatest(entityId3); entry!.AntonymId.Should().BeNull(); } -} + + [Fact] + public async Task DeleteDoesNotEffectARootSnapshotCreatedBeforeTheDelete() + { + var wordId = Guid.NewGuid(); + var initialWordCommit = await WriteNextChange(new NewWordChange(wordId, "entity1", antonymId: _word1Id), add: false); + var deleteWordCommit = await WriteNextChange(DeleteWord(_word1Id), add: false); + await AddCommitsViaSync([ + initialWordCommit, + deleteWordCommit + ]); + var snapshot = await DbContext.Snapshots.SingleAsync(s => s.CommitId == initialWordCommit.Id); + var initialWord = (Word) snapshot.Entity; + initialWord.AntonymId.Should().Be(_word1Id); + snapshot = await DbContext.Snapshots.SingleAsync(s => s.CommitId == deleteWordCommit.Id && s.EntityId == wordId); + var wordWithoutRef = (Word) snapshot.Entity; + wordWithoutRef.AntonymId.Should().BeNull(); + } +} \ No newline at end of file diff --git a/src/SIL.Harmony.Tests/DataModelSimpleChanges.Writing2ChangesAtOnceWithMergedHistory.verified.txt b/src/SIL.Harmony.Tests/DataModelSimpleChanges.Writing2ChangesAtOnceWithMergedHistory.verified.txt index 312272e..237d439 100644 --- a/src/SIL.Harmony.Tests/DataModelSimpleChanges.Writing2ChangesAtOnceWithMergedHistory.verified.txt +++ b/src/SIL.Harmony.Tests/DataModelSimpleChanges.Writing2ChangesAtOnceWithMergedHistory.verified.txt @@ -50,29 +50,12 @@ }, { $type: Commit, - Snapshots: [ - { - $type: ObjectSnapshot, - Id: Guid_5, - TypeName: Word, - Entity: { - $type: Word, - Text: first, - Note: a word note, - Id: Guid_2 - }, - EntityId: Guid_2, - EntityIsDeleted: false, - CommitId: Guid_6, - IsRoot: false - } - ], Hash: Hash_2, ParentHash: Hash_1, ChangeEntities: [ { $type: ChangeEntity, - CommitId: Guid_6, + CommitId: Guid_5, EntityId: Guid_2, Change: { $type: SetWordNoteChange, @@ -85,9 +68,9 @@ CompareKey: { $type: ValueTuple, - CommitId: Guid_8, + CommitId: Guid_7, EntityId: Guid_2, Change: { $type: SetWordTextChange, @@ -134,9 +117,9 @@ CompareKey: { $type: ValueTuple, - CommitId: Guid_10, + CommitId: Guid_9, EntityId: Guid_2, Change: { $type: SetWordTextChange, @@ -184,9 +167,9 @@ CompareKey: { $type: ValueTuple(entityId); + } + + public IChange SetTag(Guid entityId, string value) + { + return new SetTagChange(entityId, value); + } + + public IChange DeleteTag(Guid entityId) + { + return new DeleteChange(entityId); + } + public IChange NewDefinition(Guid wordId, string text, string partOfSpeech, diff --git a/src/SIL.Harmony.Tests/DbContextTests.VerifyModel.verified.txt b/src/SIL.Harmony.Tests/DbContextTests.VerifyModel.verified.txt index 3bf3a9e..e3858b3 100644 --- a/src/SIL.Harmony.Tests/DbContextTests.VerifyModel.verified.txt +++ b/src/SIL.Harmony.Tests/DbContextTests.VerifyModel.verified.txt @@ -162,6 +162,27 @@ Relational:TableName: Example Relational:ViewName: Relational:ViewSchema: + EntityType: Tag + Properties: + Id (Guid) Required PK AfterSave:Throw ValueGenerated.OnAdd + DeletedAt (DateTimeOffset?) + SnapshotId (no field, Guid?) Shadow FK Index + Text (string) Required Index + Keys: + Id PK + Foreign keys: + Tag {'SnapshotId'} -> ObjectSnapshot {'Id'} Unique SetNull + Indexes: + SnapshotId Unique + Text Unique + Annotations: + DiscriminatorProperty: + Relational:FunctionName: + Relational:Schema: + Relational:SqlQuery: + Relational:TableName: Tag + Relational:ViewName: + Relational:ViewSchema: EntityType: Word Properties: Id (Guid) Required PK AfterSave:Throw ValueGenerated.OnAdd diff --git a/src/SIL.Harmony.Tests/SnapshotTests.cs b/src/SIL.Harmony.Tests/SnapshotTests.cs index 3967228..08662b3 100644 --- a/src/SIL.Harmony.Tests/SnapshotTests.cs +++ b/src/SIL.Harmony.Tests/SnapshotTests.cs @@ -74,4 +74,35 @@ await WriteChange(_localClientId, ]); DbContext.Snapshots.Should().ContainSingle(); } + + [Fact] + public async Task DontAddTheSameSnapshotTwice() + { + var entityId = Guid.NewGuid(); + await WriteNextChange(SetWord(entityId, "test root")); + await WriteNextChange(SetWord(entityId, "test non root")); + + await AddCommitsViaSync([ + //the order here is important, the second commit was causing the snapshot for 'test non root' to attempt to be inserted again + await WriteNextChange(SetWord(Guid.NewGuid(), "test 1"), add: false), + await WriteNextChange(SetWord(entityId, "test 2"), add: false), + ]); + } + + [Fact] + public async Task CanRecreateUniqueConstraintConflictingValueInOneCommit() + { + var entityId = Guid.NewGuid(); + await WriteChange(_localClientId, + DateTimeOffset.Now, + [ + SetTag(entityId, "tag-1"), + ]); + await WriteChange(_localClientId, + DateTimeOffset.Now, + [ + DeleteTag(entityId), + SetTag(Guid.NewGuid(), "tag-1"), + ]); + } } diff --git a/src/SIL.Harmony/CrdtConfig.cs b/src/SIL.Harmony/CrdtConfig.cs index 19729dd..a96ea03 100644 --- a/src/SIL.Harmony/CrdtConfig.cs +++ b/src/SIL.Harmony/CrdtConfig.cs @@ -74,6 +74,8 @@ private void JsonTypeModifier(JsonTypeInfo typeInfo) public bool RemoteResourcesEnabled { get; private set; } public string LocalResourceCachePath { get; set; } = Path.GetFullPath("./localResourceCache"); + public string FailedSyncOutputPath { get; set; } = Path.GetFullPath("./failedSyncs"); + public void AddRemoteResourceEntity(string? cachePath = null) { RemoteResourcesEnabled = true; diff --git a/src/SIL.Harmony/CrdtKernel.cs b/src/SIL.Harmony/CrdtKernel.cs index 724ae99..7cc8f12 100644 --- a/src/SIL.Harmony/CrdtKernel.cs +++ b/src/SIL.Harmony/CrdtKernel.cs @@ -26,7 +26,8 @@ public static IServiceCollection AddCrdtData(this IServiceCollection s provider.GetRequiredService(), provider.GetRequiredService(), provider.GetRequiredService(), - provider.GetRequiredService>() + provider.GetRequiredService>(), + provider.GetRequiredService>() )); //must use factory method because ResourceService constructor is internal services.AddScoped(provider => new ResourceService( diff --git a/src/SIL.Harmony/DataModel.cs b/src/SIL.Harmony/DataModel.cs index a8beded..4e6aed3 100644 --- a/src/SIL.Harmony/DataModel.cs +++ b/src/SIL.Harmony/DataModel.cs @@ -1,6 +1,7 @@ using System.Text.Json; using SIL.Harmony.Core; using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using SIL.Harmony.Changes; using SIL.Harmony.Db; @@ -22,17 +23,20 @@ public class DataModel : ISyncable, IAsyncDisposable private readonly JsonSerializerOptions _serializerOptions; private readonly IHybridDateTimeProvider _timeProvider; private readonly IOptions _crdtConfig; + private readonly ILogger _logger; //constructor must be internal because CrdtRepository is internal internal DataModel(CrdtRepository crdtRepository, JsonSerializerOptions serializerOptions, IHybridDateTimeProvider timeProvider, - IOptions crdtConfig) + IOptions crdtConfig, + ILogger logger) { _crdtRepository = crdtRepository; _serializerOptions = serializerOptions; _timeProvider = timeProvider; _crdtConfig = crdtConfig; + _logger = logger; } @@ -131,19 +135,48 @@ private static ChangeEntity ToChangeEntity(IChange change, int index) async Task ISyncable.AddRangeFromSync(IEnumerable commits) { commits = commits.ToArray(); - _timeProvider.TakeLatestTime(commits.Select(c => c.HybridDateTime)); - var (oldestChange, newCommits) = await _crdtRepository.FilterExistingCommits(commits.ToArray()); - //no changes added - if (oldestChange is null || newCommits is []) return; - - await using var transaction = await _crdtRepository.BeginTransactionAsync(); - //if there are deferred commits, update snapshots with them first - if (_deferredCommits is not []) await UpdateSnapshotsByDeferredCommits(); - //don't save since UpdateSnapshots will also modify newCommits with hashes, so changes will be saved once that's done - await _crdtRepository.AddCommits(newCommits, false); - await UpdateSnapshots(oldestChange, newCommits); - await ValidateCommits(); - await transaction.CommitAsync(); + try + { + _timeProvider.TakeLatestTime(commits.Select(c => c.HybridDateTime)); + var (oldestChange, newCommits) = await _crdtRepository.FilterExistingCommits(commits.ToArray()); + //no changes added + if (oldestChange is null || newCommits is []) return; + + await using var transaction = await _crdtRepository.BeginTransactionAsync(); + //if there are deferred commits, update snapshots with them first + if (_deferredCommits is not []) await UpdateSnapshotsByDeferredCommits(); + //don't save since UpdateSnapshots will also modify newCommits with hashes, so changes will be saved once that's done + await _crdtRepository.AddCommits(newCommits, false); + await UpdateSnapshots(oldestChange, newCommits); + await ValidateCommits(); + await transaction.CommitAsync(); + } + catch (DbUpdateException e) + { + _logger.LogError(e, "Failed to sync commits, check {FailedImportPath} for more details", _crdtConfig.Value.FailedSyncOutputPath); + await DumpFailedSync(new + { + ExceptionMessage = e.ToString(), + Commits = commits.DefaultOrder(), + Objects = e.Entries.Select(entry => entry.Entity) + }); + throw; + } + } + + private async Task DumpFailedSync(object data) + { + try + { + Directory.CreateDirectory(_crdtConfig.Value.FailedSyncOutputPath); + await using var failedImport = + File.Create(Path.Combine(_crdtConfig.Value.FailedSyncOutputPath, "last-failed-import.json")); + await JsonSerializer.SerializeAsync(failedImport, data, _serializerOptions); + } + catch (Exception e) + { + _logger.LogError(e, "Failed to dump failed import"); + } } ValueTask ISyncable.ShouldSync() diff --git a/src/SIL.Harmony/Db/CrdtRepository.cs b/src/SIL.Harmony/Db/CrdtRepository.cs index 39de888..9027ff4 100644 --- a/src/SIL.Harmony/Db/CrdtRepository.cs +++ b/src/SIL.Harmony/Db/CrdtRepository.cs @@ -5,8 +5,6 @@ using Microsoft.EntityFrameworkCore.Storage; using Microsoft.Extensions.Options; using SIL.Harmony.Changes; -using SIL.Harmony.Entities; -using SIL.Harmony.Helpers; using SIL.Harmony.Resource; namespace SIL.Harmony.Db; @@ -25,7 +23,6 @@ public CrdtRepository(ICrdtDbContext dbContext, IOptions crdtConfig, //but since we're using a custom query, we can use it directly and apply the scoped filters manually _currentSnapshotsQueryable = MakeCurrentSnapshotsQuery(dbContext, ignoreChangesAfter); } - private IQueryable Snapshots => _dbContext.Snapshots.AsNoTracking(); @@ -215,55 +212,47 @@ public async Task> GetChanges(SyncState remoteState) public async Task AddSnapshots(IEnumerable snapshots) { - foreach (var objectSnapshot in snapshots) + var projectedEntityIds = new HashSet(); + foreach (var snapshot in snapshots.DefaultOrderDescending()) { - _dbContext.Add(objectSnapshot); - await SnapshotAdded(objectSnapshot); - } - - await _dbContext.SaveChangesAsync(); - } - - public async ValueTask AddIfNew(IEnumerable snapshots) - { - foreach (var snapshot in snapshots) - { - - if (_dbContext.Set().Local.FindEntry(snapshot.Id) is not null) continue; _dbContext.Add(snapshot); - await SnapshotAdded(snapshot); + if (projectedEntityIds.Add(snapshot.EntityId)) + { + await ProjectSnapshot(snapshot); + } } await _dbContext.SaveChangesAsync(); } - private async ValueTask SnapshotAdded(ObjectSnapshot objectSnapshot) + private async ValueTask ProjectSnapshot(ObjectSnapshot objectSnapshot) { if (!_crdtConfig.Value.EnableProjectedTables) return; if (objectSnapshot.IsRoot && objectSnapshot.EntityIsDeleted) return; //need to check if an entry exists already, even if this is the root commit it may have already been added to the db var existingEntry = await GetEntityEntry(objectSnapshot.Entity.DbObject.GetType(), objectSnapshot.EntityId); - object? entity; - if (existingEntry is null && objectSnapshot.IsRoot) + if (existingEntry is null && objectSnapshot.EntityIsDeleted) return; + + if (existingEntry is null) // add { + // this is a new entity even though it might not be a root snapshot, because we only project the latest snapshot of each entity per sync + //if we don't make a copy first then the entity will be tracked by the context and be modified //by future changes in the same session - entity = objectSnapshot.Entity.Copy().DbObject; + var entity = objectSnapshot.Entity.Copy().DbObject; _dbContext.Add(entity) .Property(ObjectSnapshot.ShadowRefName).CurrentValue = objectSnapshot.Id; - return; } - - if (existingEntry is null) return; - if (objectSnapshot.EntityIsDeleted) + else if (objectSnapshot.EntityIsDeleted) // delete { _dbContext.Remove(existingEntry.Entity); - return; } - - entity = objectSnapshot.Entity.DbObject; - existingEntry.CurrentValues.SetValues(entity); - existingEntry.Property(ObjectSnapshot.ShadowRefName).CurrentValue = objectSnapshot.Id; + else // update + { + var entity = objectSnapshot.Entity.DbObject; + existingEntry.CurrentValues.SetValues(entity); + existingEntry.Property(ObjectSnapshot.ShadowRefName).CurrentValue = objectSnapshot.Id; + } } private async ValueTask GetEntityEntry(Type entityType, Guid entityId) @@ -327,7 +316,6 @@ public IQueryable LocalResourceIds() { return await _dbContext.Set().FindAsync(resourceId); } - } internal class ScopedDbContext(ICrdtDbContext inner, Commit ignoreChangesAfter) : ICrdtDbContext @@ -377,4 +365,4 @@ public EntityEntry Remove(object entity) { return inner.Remove(entity); } -} \ No newline at end of file +} diff --git a/src/SIL.Harmony/Db/DbSetExtensions.cs b/src/SIL.Harmony/Db/DbSetExtensions.cs index dce0553..9bb0066 100644 --- a/src/SIL.Harmony/Db/DbSetExtensions.cs +++ b/src/SIL.Harmony/Db/DbSetExtensions.cs @@ -21,6 +21,14 @@ public static IQueryable DefaultOrderDescending(this IQueryable< .ThenByDescending(c => c.Commit.Id); } + public static IEnumerable DefaultOrderDescending(this IEnumerable queryable) + { + return queryable + .OrderByDescending(c => c.Commit.HybridDateTime.DateTime) + .ThenByDescending(c => c.Commit.HybridDateTime.Counter) + .ThenByDescending(c => c.Commit.Id); + } + public static IQueryable WhereAfter(this IQueryable queryable, Commit after) { return queryable.Where( diff --git a/src/SIL.Harmony/SnapshotWorker.cs b/src/SIL.Harmony/SnapshotWorker.cs index 5d8d492..ebc6be2 100644 --- a/src/SIL.Harmony/SnapshotWorker.cs +++ b/src/SIL.Harmony/SnapshotWorker.cs @@ -57,12 +57,11 @@ public async Task UpdateSnapshots(Commit oldestAddedCommit, Commit[] newCommits) var commits = await _crdtRepository.GetCommitsAfter(previousCommit); await ApplyCommitChanges(commits.UnionBy(newCommits, c => c.Id), true, previousCommit?.Hash ?? CommitBase.NullParentHash); - // First add any new entities/snapshots as they might be referenced by intermediate snapshots - await _crdtRepository.AddSnapshots(_rootSnapshots.Values); - // Then add any intermediate snapshots we're hanging onto for performance benefits - await _crdtRepository.AddIfNew(_newIntermediateSnapshots); - // And finally the up-to-date snapshots, which will be used in the projected tables - await _crdtRepository.AddSnapshots(_pendingSnapshots.Values); + await _crdtRepository.AddSnapshots([ + .._rootSnapshots.Values, + .._newIntermediateSnapshots, + .._pendingSnapshots.Values + ]); } private async ValueTask ApplyCommitChanges(IEnumerable commits, bool updateCommitHash, string? previousCommitHash) @@ -121,7 +120,7 @@ private async ValueTask ApplyCommitChanges(IEnumerable commits, bool upd { //do nothing, will cause prevSnapshot to be overriden in _pendingSnapshots if it exists } - else if (commitIndex % 2 == 0 && !prevSnapshot.IsRoot) + else if (commitIndex % 2 == 0 && !prevSnapshot.IsRoot && IsNew(prevSnapshot)) { intermediateSnapshots[prevSnapshot.Entity.Id] = prevSnapshot; } @@ -151,6 +150,7 @@ private async ValueTask MarkDeleted(Guid deletedEntityId, Commit commit) .Select(s => s.EntityId) .ToArrayAsync()); //snapshots from the db might be out of date, we want to use the most up to date data in the worker as well + toRemoveRefFromIds.UnionWith(_rootSnapshots.Values.Where(predicate).Select(s => s.EntityId)); toRemoveRefFromIds.UnionWith(_pendingSnapshots.Values.Where(predicate).Select(s => s.EntityId)); foreach (var entityId in toRemoveRefFromIds) { @@ -221,4 +221,21 @@ private void AddSnapshot(ObjectSnapshot snapshot) _pendingSnapshots[snapshot.Entity.Id] = snapshot; } } + + /// + /// snapshot is not from the database + /// + private bool IsNew(ObjectSnapshot snapshot) + { + var entityId = snapshot.EntityId; + if (_rootSnapshots.TryGetValue(entityId, out var rootSnapshot)) + { + return rootSnapshot.Id != snapshot.Id; + } + if (_pendingSnapshots.TryGetValue(entityId, out var pendingSnapshot)) + { + return pendingSnapshot.Id != snapshot.Id; + } + return false; + } }