Skip to content

Commit 04614d6

Browse files
committed
std.Io.Reader: add rebase to the vtable
This eliminates a footgun and special case handling with fixed buffers, as well as allowing decompression streams to keep a window in the output buffer.
1 parent de39c5f commit 04614d6

File tree

3 files changed

+63
-42
lines changed

3 files changed

+63
-42
lines changed

lib/std/Io.zig

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -757,7 +757,7 @@ pub fn Poller(comptime StreamEnum: type) type {
757757
const unused = r.buffer[r.end..];
758758
if (unused.len >= min_len) return unused;
759759
}
760-
if (r.seek > 0) r.rebase();
760+
if (r.seek > 0) r.rebase(r.buffer.len) catch unreachable;
761761
{
762762
var list: std.ArrayListUnmanaged(u8) = .{
763763
.items = r.buffer[0..r.end],

lib/std/Io/Reader.zig

Lines changed: 36 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,18 @@ pub const VTable = struct {
6767
///
6868
/// This function is only called when `buffer` is empty.
6969
discard: *const fn (r: *Reader, limit: Limit) Error!usize = defaultDiscard,
70+
71+
/// Ensures `capacity` more data can be buffered without rebasing.
72+
///
73+
/// Asserts `capacity` is within buffer capacity, or that the stream ends
74+
/// within `capacity` bytes.
75+
///
76+
/// Only called when `capacity` cannot fit into the unused capacity of
77+
/// `buffer`.
78+
///
79+
/// The default implementation moves buffered data to the start of
80+
/// `buffer`, setting `seek` to zero, and cannot fail.
81+
rebase: *const fn (r: *Reader, capacity: usize) RebaseError!void = defaultRebase,
7082
};
7183

7284
pub const StreamError = error{
@@ -97,6 +109,10 @@ pub const ShortError = error{
97109
ReadFailed,
98110
};
99111

112+
pub const RebaseError = error{
113+
EndOfStream,
114+
};
115+
100116
pub const failing: Reader = .{
101117
.vtable = &.{
102118
.stream = failingStream,
@@ -122,6 +138,7 @@ pub fn fixed(buffer: []const u8) Reader {
122138
.vtable = &.{
123139
.stream = endingStream,
124140
.discard = endingDiscard,
141+
.rebase = endingRebase,
125142
},
126143
// This cast is safe because all potential writes to it will instead
127144
// return `error.EndOfStream`.
@@ -780,11 +797,8 @@ pub fn peekDelimiterInclusive(r: *Reader, delimiter: u8) DelimiterError![]u8 {
780797
@branchHint(.likely);
781798
return buffer[seek .. end + 1];
782799
}
783-
if (r.vtable.stream == &endingStream) {
784-
// Protect the `@constCast` of `fixed`.
785-
return error.EndOfStream;
786-
}
787-
r.rebase();
800+
// TODO take a parameter for max search length rather than relying on buffer capacity
801+
try rebase(r, r.buffer.len);
788802
while (r.buffer.len - r.end != 0) {
789803
const end_cap = r.buffer[r.end..];
790804
var writer: Writer = .fixed(end_cap);
@@ -1050,11 +1064,7 @@ fn fillUnbuffered(r: *Reader, n: usize) Error!void {
10501064
};
10511065
if (r.seek + n <= r.end) return;
10521066
};
1053-
if (r.vtable.stream == &endingStream) {
1054-
// Protect the `@constCast` of `fixed`.
1055-
return error.EndOfStream;
1056-
}
1057-
rebaseCapacity(r, n);
1067+
try rebase(r, n);
10581068
var writer: Writer = .{
10591069
.buffer = r.buffer,
10601070
.vtable = &.{ .drain = Writer.fixedDrain },
@@ -1074,7 +1084,7 @@ fn fillUnbuffered(r: *Reader, n: usize) Error!void {
10741084
///
10751085
/// Asserts buffer capacity is at least 1.
10761086
pub fn fillMore(r: *Reader) Error!void {
1077-
rebaseCapacity(r, 1);
1087+
try rebase(r, 1);
10781088
var writer: Writer = .{
10791089
.buffer = r.buffer,
10801090
.end = r.end,
@@ -1251,7 +1261,7 @@ pub fn takeLeb128(r: *Reader, comptime Result: type) TakeLeb128Error!Result {
12511261

12521262
pub fn expandTotalCapacity(r: *Reader, allocator: Allocator, n: usize) Allocator.Error!void {
12531263
if (n <= r.buffer.len) return;
1254-
if (r.seek > 0) rebase(r);
1264+
if (r.seek > 0) rebase(r, r.buffer.len);
12551265
var list: ArrayList(u8) = .{
12561266
.items = r.buffer[0..r.end],
12571267
.capacity = r.buffer.len,
@@ -1297,37 +1307,20 @@ fn takeMultipleOf7Leb128(r: *Reader, comptime Result: type) TakeLeb128Error!Resu
12971307
}
12981308
}
12991309

1300-
/// Left-aligns data such that `r.seek` becomes zero.
1301-
///
1302-
/// If `r.seek` is not already zero then `buffer` is mutated, making it illegal
1303-
/// to call this function with a const-casted `buffer`, such as in the case of
1304-
/// `fixed`. This issue can be avoided:
1305-
/// * in implementations, by attempting a read before a rebase, in which
1306-
/// case the read will return `error.EndOfStream`, preventing the rebase.
1307-
/// * in usage, by copying into a mutable buffer before initializing `fixed`.
1308-
pub fn rebase(r: *Reader) void {
1309-
if (r.seek == 0) return;
1310+
/// Ensures `capacity` more data can be buffered without rebasing.
1311+
pub fn rebase(r: *Reader, capacity: usize) RebaseError!void {
1312+
if (r.end + capacity <= r.buffer.len) return;
1313+
return r.vtable.rebase(r, capacity);
1314+
}
1315+
1316+
pub fn defaultRebase(r: *Reader, capacity: usize) RebaseError!void {
1317+
if (r.end <= r.buffer.len - capacity) return;
13101318
const data = r.buffer[r.seek..r.end];
13111319
@memmove(r.buffer[0..data.len], data);
13121320
r.seek = 0;
13131321
r.end = data.len;
13141322
}
13151323

1316-
/// Ensures `capacity` more data can be buffered without rebasing, by rebasing
1317-
/// if necessary.
1318-
///
1319-
/// Asserts `capacity` is within the buffer capacity.
1320-
///
1321-
/// If the rebase occurs then `buffer` is mutated, making it illegal to call
1322-
/// this function with a const-casted `buffer`, such as in the case of `fixed`.
1323-
/// This issue can be avoided:
1324-
/// * in implementations, by attempting a read before a rebase, in which
1325-
/// case the read will return `error.EndOfStream`, preventing the rebase.
1326-
/// * in usage, by copying into a mutable buffer before initializing `fixed`.
1327-
pub fn rebaseCapacity(r: *Reader, capacity: usize) void {
1328-
if (r.end > r.buffer.len - capacity) rebase(r);
1329-
}
1330-
13311324
/// Advances the stream and decreases the size of the storage buffer by `n`,
13321325
/// returning the range of bytes no longer accessible by `r`.
13331326
///
@@ -1683,6 +1676,12 @@ fn endingDiscard(r: *Reader, limit: Limit) Error!usize {
16831676
return error.EndOfStream;
16841677
}
16851678

1679+
fn endingRebase(r: *Reader, capacity: usize) RebaseError!void {
1680+
_ = r;
1681+
_ = capacity;
1682+
return error.EndOfStream;
1683+
}
1684+
16861685
fn failingStream(r: *Reader, w: *Writer, limit: Limit) StreamError!usize {
16871686
_ = r;
16881687
_ = w;

lib/std/compress/zstd/Decompress.zig

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,12 @@ pub const Options = struct {
3131
/// Verifying checksums is not implemented yet and will cause a panic if
3232
/// you set this to true.
3333
verify_checksum: bool = false,
34-
/// Affects the minimum capacity of the provided buffer.
34+
35+
/// The output buffer is asserted to have capacity for `window_len` plus
36+
/// `zstd.block_size_max`.
37+
///
38+
/// If `window_len` is too small, then some streams will fail to decompress
39+
/// with `error.OutputBufferUndersize`.
3540
window_len: u32 = zstd.default_window_len,
3641
};
3742

@@ -69,23 +74,40 @@ pub const Error = error{
6974
WindowSizeUnknown,
7075
};
7176

72-
/// If buffer that is written to is not big enough, some streams will fail with
73-
/// `error.OutputBufferUndersize`. A safe value is `zstd.default_window_len * 2`.
77+
/// When connecting `reader` to a `Writer`, `buffer` should be empty, and
78+
/// `Writer.buffer` capacity has requirements based on `Options.window_len`.
79+
///
80+
/// Otherwise, `buffer` has those requirements.
7481
pub fn init(input: *Reader, buffer: []u8, options: Options) Decompress {
7582
return .{
7683
.input = input,
7784
.state = .new_frame,
7885
.verify_checksum = options.verify_checksum,
7986
.window_len = options.window_len,
8087
.reader = .{
81-
.vtable = &.{ .stream = stream },
88+
.vtable = &.{
89+
.stream = stream,
90+
.rebase = rebase,
91+
},
8292
.buffer = buffer,
8393
.seek = 0,
8494
.end = 0,
8595
},
8696
};
8797
}
8898

99+
fn rebase(r: *Reader, capacity: usize) Reader.RebaseError!void {
100+
const d: *Decompress = @alignCast(@fieldParentPtr("reader", r));
101+
assert(capacity <= r.buffer.len - d.window_len);
102+
assert(r.end + capacity > r.buffer.len);
103+
const buffered = r.buffer[0..r.end];
104+
const discard = buffered.len - d.window_len;
105+
const keep = buffered[discard..];
106+
@memmove(r.buffer[0..keep.len], keep);
107+
r.end = keep.len;
108+
r.seek -= discard;
109+
}
110+
89111
fn stream(r: *Reader, w: *Writer, limit: Limit) Reader.StreamError!usize {
90112
const d: *Decompress = @alignCast(@fieldParentPtr("reader", r));
91113
const in = d.input;

0 commit comments

Comments
 (0)