Skip to content

Commit 9bd0f95

Browse files
Fix "Data sizes violation" error (#5492)
Under certain conditions a follower/readonlyreplica can fail to join a cluster with a "Data sizes violation" error. This PR fixes the error condition, after which the node is able to join the cluster again. There is no risk to the integrity of the data. Co-authored-by: Timothy Coleman <[email protected]>
1 parent a26128f commit 9bd0f95

File tree

3 files changed

+76
-1
lines changed

3 files changed

+76
-1
lines changed
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
2+
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).
3+
4+
using System;
5+
using System.IO;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
using KurrentDB.Core.TransactionLog.Chunks;
9+
using KurrentDB.Core.TransactionLog.Chunks.TFChunk;
10+
using KurrentDB.Core.TransactionLog.LogRecords;
11+
using KurrentDB.Core.Transforms;
12+
using NUnit.Framework;
13+
14+
namespace KurrentDB.Core.Tests.TransactionLog;
15+
16+
[TestFixture]
17+
public class when_opening_existing_ongoing_tfchunk : SpecificationWithFilePerTestFixture {
18+
private TFChunk _chunk;
19+
private TFChunk _testChunk;
20+
21+
[OneTimeSetUp]
22+
public override async Task TestFixtureSetUp() {
23+
await base.TestFixtureSetUp();
24+
_chunk = await TFChunkHelper.CreateNewChunk(Filename, chunkSize: 10 * 1024);
25+
_chunk.TryClose();
26+
_testChunk = await TFChunk.FromOngoingFile(
27+
fileSystem: new ChunkLocalFileSystem(Path.GetDirectoryName(Filename)),
28+
filename: Filename,
29+
writePosition: 0,
30+
unbuffered: false,
31+
writethrough: false,
32+
reduceFileCachePressure: false, tracker: new TFChunkTracker.NoOp(),
33+
getTransformFactory: DbTransformManager.Default,
34+
token: CancellationToken.None);
35+
}
36+
37+
[OneTimeTearDown]
38+
public override void TestFixtureTearDown() {
39+
_chunk.Dispose();
40+
_testChunk.Dispose();
41+
base.TestFixtureTearDown();
42+
}
43+
44+
[Test]
45+
public void the_chunk_is_cached() {
46+
Assert.IsTrue(_testChunk.IsCached);
47+
}
48+
49+
[Test]
50+
public void the_chunk_is_not_readonly() {
51+
Assert.IsFalse(_testChunk.IsReadOnly);
52+
}
53+
54+
[Test]
55+
// a flush before write can be triggered in practice when joining a cluster and the first
56+
// replication message (8k) does not contain a full transaction.
57+
public async Task can_flush_and_then_write() {
58+
await _testChunk.Flush(CancellationToken.None);
59+
var result = await _testChunk.TryAppend(new CommitLogRecord(0, Guid.NewGuid(), 0, DateTime.UtcNow, 0), CancellationToken.None);
60+
Assert.IsTrue(result.Success);
61+
}
62+
}

src/KurrentDB.Core.Tests/TransactionLog/when_opening_existing_tfchunk.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public override async Task TestFixtureSetUp() {
2828
getTransformFactory: DbTransformManager.Default);
2929
}
3030

31-
[TearDown]
31+
[OneTimeTearDown]
3232
public override void TestFixtureTearDown() {
3333
_chunk.Dispose();
3434
_testChunk.Dispose();

src/KurrentDB.Plugins/Transforms/ChunkDataWriteStream.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public override void Write(ReadOnlySpan<byte> buffer) {
2424
ReadAndChecksum(count);
2525

2626
Debug.Assert(ChunkFileStream.Position == count);
27+
ResetPoolingBufferedStreamReadBuffer(count);
2728
_positionToHash = null;
2829
}
2930

@@ -48,6 +49,7 @@ private async ValueTask WriteAndChecksumAsync(long count, ReadOnlyMemory<byte> b
4849
await ReadAndChecksumAsync(count, token);
4950

5051
Debug.Assert(ChunkFileStream.Position == count);
52+
ResetPoolingBufferedStreamReadBuffer(count);
5153
await ChunkFileStream.WriteAsync(buffer, token);
5254
checksumAlgorithm.AppendData(buffer.Span);
5355
_positionToHash = null;
@@ -95,4 +97,15 @@ private void ReadAndChecksum(long count) {
9597
checksumAlgorithm.AppendData(buffer.Slice(0, bytesRead));
9698
}
9799
}
100+
101+
// Workaround PoolingBufferedStream behavior. If we leave the stream with bytes in its read buffer
102+
// then when we flush it will discard those bytes and advance the Position to end of the read buffer.
103+
// This would then result in attempts to write records to the wrong place in the chunk
104+
// (which would be detected and rejected as a "Data sizes violation").
105+
// Instead we flush here and restore the position so that the stream is set up for regular use.
106+
// This can be removed if/when the PoolingBufferedStream flush behavior is updated.
107+
private void ResetPoolingBufferedStreamReadBuffer(long position) {
108+
ChunkFileStream.Flush();
109+
ChunkFileStream.Position = position;
110+
}
98111
}

0 commit comments

Comments
 (0)