Skip to content

Commit 328ae41

Browse files
squeek502andrewrk
authored andcommitted
Reader.peekDelimiterInclusive: Fix handling of stream implementations that return 0
Previously, the logic in peekDelimiterInclusive (when the delimiter was not found in the existing buffer) used the `n` returned from `r.vtable.stream` as the length of the slice to check, but it's valid for `vtable.stream` implementations to return 0 if they wrote to the buffer instead of `w`. In that scenario, the `indexOfScalarPos` would be given a 0-length slice so it would never be able to find the delimiter. This commit changes the logic to assume that `r.vtable.stream` can both: - return 0, and - modify seek/end (i.e. it's also valid for a `vtable.stream` implementation to rebase) Also introduces `std.testing.ReaderIndirect` which helps in being able to test against Reader implementations that return 0 from `stream`/`readVec` Fixes #25428
1 parent 60be67d commit 328ae41

File tree

2 files changed

+97
-4
lines changed

2 files changed

+97
-4
lines changed

lib/std/Io/Reader.zig

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -795,22 +795,23 @@ pub fn takeDelimiterInclusive(r: *Reader, delimiter: u8) DelimiterError![]u8 {
795795
pub fn peekDelimiterInclusive(r: *Reader, delimiter: u8) DelimiterError![]u8 {
796796
const buffer = r.buffer[0..r.end];
797797
const seek = r.seek;
798-
if (std.mem.indexOfScalarPos(u8, buffer, seek, delimiter)) |end| {
798+
if (std.mem.indexOfScalarPos(u8, buffer, seek, delimiter)) |delimiter_index| {
799799
@branchHint(.likely);
800-
return buffer[seek .. end + 1];
800+
return buffer[seek .. delimiter_index + 1];
801801
}
802802
// TODO take a parameter for max search length rather than relying on buffer capacity
803803
try rebase(r, r.buffer.len);
804804
while (r.buffer.len - r.end != 0) {
805+
const existing_buffered_len = r.end - r.seek;
805806
const end_cap = r.buffer[r.end..];
806807
var writer: Writer = .fixed(end_cap);
807808
const n = r.vtable.stream(r, &writer, .limited(end_cap.len)) catch |err| switch (err) {
808809
error.WriteFailed => unreachable,
809810
else => |e| return e,
810811
};
811812
r.end += n;
812-
if (std.mem.indexOfScalarPos(u8, end_cap[0..n], 0, delimiter)) |end| {
813-
return r.buffer[0 .. r.end - n + end + 1];
813+
if (std.mem.indexOfScalarPos(u8, r.buffer[0..r.end], r.seek + existing_buffered_len, delimiter)) |delimiter_index| {
814+
return r.buffer[r.seek .. delimiter_index + 1];
814815
}
815816
}
816817
return error.StreamTooLong;
@@ -1601,6 +1602,18 @@ test "readSliceShort with smaller buffer than Reader" {
16011602
try testing.expectEqualStrings(str, &buf);
16021603
}
16031604

1605+
test "readSliceShort with indirect reader" {
1606+
var r: Reader = .fixed("HelloFren");
1607+
var ri_buf: [3]u8 = undefined;
1608+
var ri: std.testing.ReaderIndirect = .init(&r, &ri_buf);
1609+
var buf: [5]u8 = undefined;
1610+
try testing.expectEqual(5, try ri.interface.readSliceShort(&buf));
1611+
try testing.expectEqualStrings("Hello", buf[0..5]);
1612+
try testing.expectEqual(4, try ri.interface.readSliceShort(&buf));
1613+
try testing.expectEqualStrings("Fren", buf[0..4]);
1614+
try testing.expectEqual(0, try ri.interface.readSliceShort(&buf));
1615+
}
1616+
16041617
test readVec {
16051618
var r: Reader = .fixed(std.ascii.letters);
16061619
var flat_buffer: [52]u8 = undefined;
@@ -1701,6 +1714,26 @@ test "takeDelimiterInclusive when it rebases" {
17011714
}
17021715
}
17031716

1717+
test "takeDelimiterInclusive on an indirect reader when it rebases" {
1718+
const written_line = "ABCDEFGHIJKLMNOPQRSTUVWXYZ\n";
1719+
var buffer: [128]u8 = undefined;
1720+
var tr: std.testing.Reader = .init(&buffer, &.{
1721+
.{ .buffer = written_line[0..4] },
1722+
.{ .buffer = written_line[4..] },
1723+
.{ .buffer = written_line },
1724+
.{ .buffer = written_line },
1725+
.{ .buffer = written_line },
1726+
.{ .buffer = written_line },
1727+
.{ .buffer = written_line },
1728+
});
1729+
var indirect_buffer: [128]u8 = undefined;
1730+
var tri: std.testing.ReaderIndirect = .init(&tr.interface, &indirect_buffer);
1731+
const r = &tri.interface;
1732+
for (0..6) |_| {
1733+
try std.testing.expectEqualStrings(written_line, try r.takeDelimiterInclusive('\n'));
1734+
}
1735+
}
1736+
17041737
test "takeStruct and peekStruct packed" {
17051738
var r: Reader = .fixed(&.{ 0b11110000, 0b00110011 });
17061739
const S = packed struct(u16) { a: u2, b: u6, c: u7, d: u1 };

lib/std/testing.zig

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1242,3 +1242,63 @@ pub const Reader = struct {
12421242
return n;
12431243
}
12441244
};
1245+
1246+
/// A `std.Io.Reader` that gets its data from another `std.Io.Reader`, and always
1247+
/// writes to its own buffer (and returns 0) during `stream` and `readVec`.
1248+
pub const ReaderIndirect = struct {
1249+
in: *std.Io.Reader,
1250+
interface: std.Io.Reader,
1251+
1252+
pub fn init(in: *std.Io.Reader, buffer: []u8) ReaderIndirect {
1253+
return .{
1254+
.in = in,
1255+
.interface = .{
1256+
.vtable = &.{
1257+
.stream = stream,
1258+
.readVec = readVec,
1259+
},
1260+
.buffer = buffer,
1261+
.seek = 0,
1262+
.end = 0,
1263+
},
1264+
};
1265+
}
1266+
1267+
fn readVec(r: *std.Io.Reader, _: [][]u8) std.Io.Reader.Error!usize {
1268+
try streamInner(r);
1269+
return 0;
1270+
}
1271+
1272+
fn stream(r: *std.Io.Reader, _: *std.Io.Writer, _: std.Io.Limit) std.Io.Reader.StreamError!usize {
1273+
try streamInner(r);
1274+
return 0;
1275+
}
1276+
1277+
fn streamInner(r: *std.Io.Reader) std.Io.Reader.Error!void {
1278+
const r_indirect: *ReaderIndirect = @alignCast(@fieldParentPtr("interface", r));
1279+
1280+
// If there's no room remaining in the buffer at all, make room.
1281+
if (r.buffer.len == r.end) {
1282+
try r.rebase(r.buffer.len);
1283+
}
1284+
1285+
var writer: std.Io.Writer = .{
1286+
.buffer = r.buffer,
1287+
.end = r.end,
1288+
.vtable = &.{
1289+
.drain = std.Io.Writer.unreachableDrain,
1290+
.rebase = std.Io.Writer.unreachableRebase,
1291+
},
1292+
};
1293+
defer r.end = writer.end;
1294+
1295+
r_indirect.in.streamExact(&writer, r.buffer.len - r.end) catch |err| switch (err) {
1296+
// Only forward EndOfStream if no new bytes were written to the buffer
1297+
error.EndOfStream => |e| if (r.end == writer.end) {
1298+
return e;
1299+
},
1300+
error.WriteFailed => unreachable,
1301+
else => |e| return e,
1302+
};
1303+
}
1304+
};

0 commit comments

Comments
 (0)