Skip to content

Commit 7ae5d1f

Browse files
authored
Merge pull request #2717 from isbdnt1/dev
Fix ReadFull must read PAGE_SIZE bytes [{0}]
2 parents 2ec2daf + 1ab2331 commit 7ae5d1f

File tree

2 files changed

+159
-0
lines changed

2 files changed

+159
-0
lines changed
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.IO;
4+
using System.Linq;
5+
using FluentAssertions;
6+
using LiteDB.Engine;
7+
using Xunit;
8+
9+
namespace LiteDB.Tests.Issues;
10+
11+
public class Issue2523_ReadFull_Tests
12+
{
13+
[Fact]
14+
public void ReadFull_Must_See_Log_Page_Written_In_Same_Tick()
15+
{
16+
using var logStream = new DelayedPublishLogStream(); // <-- changed
17+
using var dataStream = new MemoryStream();
18+
19+
var settings = new EngineSettings
20+
{
21+
DataStream = dataStream,
22+
LogStream = logStream
23+
};
24+
25+
var state = new EngineState(null, settings);
26+
var disk = new DiskService(settings, state, new[] { 10 });
27+
28+
try
29+
{
30+
// Arrange: create a single, full page
31+
var page = disk.NewPage();
32+
page.Fill(0xAC);
33+
34+
// Act: write the page to the WAL/log
35+
disk.WriteLogDisk(new[] { page });
36+
37+
// Assert: immediately read the log back fully.
38+
// Pre-fix: throws (ReadFull must read PAGE_SIZE bytes)
39+
// Post-fix: returns 1 page, filled with 0xAC
40+
var logPages = disk.ReadFull(FileOrigin.Log).ToList();
41+
42+
logPages.Should().HaveCount(1);
43+
logPages[0].All(0xAC).Should().BeTrue();
44+
}
45+
finally
46+
{
47+
disk.Dispose();
48+
}
49+
}
50+
51+
/// <summary>
52+
/// Stream that "accepts" writes (increases Length as the writer would see it),
53+
/// but hides the bytes from readers until Flush/FlushAsync publishes them.
54+
/// This mirrors the visibility gap the fix (stream.Flush()) closes.
55+
/// </summary>
56+
private sealed class DelayedPublishLogStream : Stream
57+
{
58+
private readonly MemoryStream _committed = new(); // bytes visible to readers
59+
private readonly List<(long Position, byte[] Data)> _pending = new();
60+
61+
private long _writerLength; // total bytes "written" by Write(...)
62+
private long _visibleLength; // committed length visible to Read(...)
63+
private long _position; // logical cursor for both read/write
64+
65+
public override bool CanRead => true;
66+
public override bool CanSeek => true;
67+
public override bool CanWrite => true;
68+
69+
// IMPORTANT: advertise writer's view of length (includes pending).
70+
public override long Length => _writerLength;
71+
72+
public override long Position
73+
{
74+
get => _position;
75+
set => _position = value;
76+
}
77+
78+
public override void Flush()
79+
{
80+
// Publish pending bytes
81+
foreach (var (pos, data) in _pending)
82+
{
83+
_committed.Position = pos;
84+
_committed.Write(data, 0, data.Length);
85+
}
86+
_pending.Clear();
87+
_committed.Flush();
88+
89+
// Make everything visible
90+
_visibleLength = _writerLength;
91+
}
92+
93+
public override System.Threading.Tasks.Task FlushAsync(System.Threading.CancellationToken cancellationToken)
94+
{
95+
Flush();
96+
return System.Threading.Tasks.Task.CompletedTask;
97+
}
98+
99+
public override int Read(byte[] buffer, int offset, int count)
100+
{
101+
// Serve only what has been published (visibleLength)
102+
if (_position >= _visibleLength) return 0;
103+
104+
var available = (int)Math.Min(count, _visibleLength - _position);
105+
_committed.Position = _position;
106+
var read = _committed.Read(buffer, offset, available);
107+
_position += read;
108+
return read;
109+
}
110+
111+
public override long Seek(long offset, SeekOrigin origin)
112+
{
113+
_position = origin switch
114+
{
115+
SeekOrigin.Begin => offset,
116+
SeekOrigin.Current => _position + offset,
117+
// IMPORTANT: base End on the *advertised* writer length, not committed length
118+
SeekOrigin.End => _writerLength + offset,
119+
_ => throw new ArgumentOutOfRangeException(nameof(origin))
120+
};
121+
122+
if (_position < 0) throw new IOException("Negative position.");
123+
return _position;
124+
}
125+
126+
public override void SetLength(long value)
127+
{
128+
if (value < 0) throw new IOException("Negative length.");
129+
// Adjust both writer length and (if shrinking) visible length.
130+
_writerLength = value;
131+
if (_visibleLength > value) _visibleLength = value;
132+
if (_committed.Length < value) _committed.SetLength(value);
133+
if (_position > value) _position = value;
134+
}
135+
136+
public override void Write(byte[] buffer, int offset, int count)
137+
{
138+
if (buffer is null) throw new ArgumentNullException(nameof(buffer));
139+
if ((uint)offset > buffer.Length) throw new ArgumentOutOfRangeException(nameof(offset));
140+
if ((uint)count > buffer.Length - offset) throw new ArgumentOutOfRangeException(nameof(count));
141+
142+
// Capture write into pending (not visible yet)
143+
var copy = new byte[count];
144+
Buffer.BlockCopy(buffer, offset, copy, 0, count);
145+
_pending.Add((_position, copy));
146+
147+
_position += count;
148+
if (_position > _writerLength) _writerLength = _position;
149+
// NOTE: _visibleLength is NOT updated here; only Flush() publishes writes.
150+
}
151+
152+
protected override void Dispose(bool disposing)
153+
{
154+
if (disposing) _committed.Dispose();
155+
base.Dispose(disposing);
156+
}
157+
}
158+
}

LiteDB/Engine/Disk/DiskService.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ public int WriteLogDisk(IEnumerable<PageBuffer> pages)
202202

203203
count++;
204204
}
205+
stream.Flush();
205206
}
206207

207208
return count;

0 commit comments

Comments
 (0)