diff --git a/lib/io/stream/buffered.rb b/lib/io/stream/buffered.rb index a1660fa..a1f4c2b 100644 --- a/lib/io/stream/buffered.rb +++ b/lib/io/stream/buffered.rb @@ -133,8 +133,7 @@ def syswrite(buffer) # Reads data from the underlying stream as efficiently as possible. def sysread(size, buffer) - # Come on Ruby, why couldn't this just return `nil`? EOF is not exceptional. Every file has one. - while true + while !@io.closed? result = @io.read_nonblock(size, buffer, exception: false) case result @@ -146,8 +145,10 @@ def sysread(size, buffer) return result end end - rescue Errno::EBADF - raise ::IOError, "stream closed" + + # Otherwise, the `@io` was closed while reading: + # https://github.com/ruby/openssl/issues/798 + raise ::IOError, "closed stream" end end end diff --git a/lib/io/stream/readable.rb b/lib/io/stream/readable.rb index 2aa7592..665c5da 100644 --- a/lib/io/stream/readable.rb +++ b/lib/io/stream/readable.rb @@ -116,17 +116,28 @@ def readpartial(size = nil) # @parameter offset [Integer] The offset to start searching from. # @parameter limit [Integer | Nil] The maximum number of bytes to read while searching. # @returns [Integer | Nil] The index of the pattern, or nil if not found. - private def index_of(pattern, offset, limit) + private def index_of(pattern, offset, limit, discard = false) # We don't want to split on the pattern, so we subtract the size of the pattern. split_offset = pattern.bytesize - 1 - + until index = @read_buffer.index(pattern, offset) offset = @read_buffer.bytesize - split_offset offset = 0 if offset < 0 - return nil if limit and offset >= limit - return nil unless fill_read_buffer + if limit and offset >= limit + return nil + end + + unless fill_read_buffer + return nil + end + + if discard + # If we are discarding, we should consume the read buffer up to the offset: + consume_read_buffer(offset) + offset = 0 + end end return index @@ -136,7 +147,8 @@ def readpartial(size = nil) # @parameter pattern [String] The pattern to match. # @parameter offset [Integer] The offset to start searching from. # @parameter limit [Integer] The maximum number of bytes to read, including the pattern (even if chomped). - # @returns [String | Nil] The contents of the stream up until the pattern, which is consumed but not returned. + # @parameter chomp [Boolean] Whether to remove the pattern from the returned data. + # @returns [String | Nil] The contents of the stream up until the pattern, or nil if the pattern was not found. def read_until(pattern, offset = 0, limit: nil, chomp: true) if index = index_of(pattern, offset, limit) return nil if limit and index >= limit @@ -149,6 +161,28 @@ def read_until(pattern, offset = 0, limit: nil, chomp: true) end end + # Efficiently discard data from the stream until encountering pattern. + # @parameter pattern [String] The pattern to match. + # @parameter offset [Integer] The offset to start searching from. + # @parameter limit [Integer] The maximum number of bytes to read, including the pattern. + # @returns [String | Nil] The contents of the stream up until the pattern, or nil if the pattern was not found. + def discard_until(pattern, offset = 0, limit: nil) + if index = index_of(pattern, offset, limit, true) + @read_buffer.freeze + + if limit and index >= limit + @read_buffer = @read_buffer.byteslice(limit, @read_buffer.bytesize) + + return nil + end + + matched = @read_buffer.byteslice(0, index+pattern.bytesize) + @read_buffer = @read_buffer.byteslice(index+pattern.bytesize, @read_buffer.bytesize) + + return matched + end + end + # Peek at data in the buffer without consuming it. # @parameter size [Integer | Nil] The number of bytes to peek at. If nil, peek at all available data. # @returns [String] The data in the buffer without consuming it. diff --git a/releases.md b/releases.md index 74581fc..9692efd 100644 --- a/releases.md +++ b/releases.md @@ -3,6 +3,7 @@ ## Unreleased - On Ruby v3.3+, use `IO#write` directly instead of `IO#write_nonblock`, for better performance. + - `Buffered#sysread` now checks `@io.closed?` before attempting to read, improving error handling. ## v0.7.0 diff --git a/test/io/stream/buffered.rb b/test/io/stream/buffered.rb index e088c13..e92bee3 100644 --- a/test/io/stream/buffered.rb +++ b/test/io/stream/buffered.rb @@ -438,6 +438,107 @@ def before expect(write_buffer).to be(:empty?) end end + + with "#discard_until" do + it "can discard data until pattern" do + server.write("hello\nworld\ntest") + server.close + + # Discard until "\n" - should return chunk ending with the pattern + chunk = client.discard_until("\n") + expect(chunk).not.to be_nil + expect(chunk).to be(:end_with?, "\n") + # Read the remaining data to verify it starts with "world" + expect(client.read(5)).to be == "world" + + # Discard until "t" - should return chunk ending with the pattern + chunk = client.discard_until("t") + expect(chunk).not.to be_nil + expect(chunk).to be(:end_with?, "t") + # Read remaining data + expect(client.read).to be == "est" + end + + it "returns nil when pattern not found and discards all data" do + server.write("hello world") + server.close + + expect(client.discard_until("\n")).to be_nil + # Data should still be available since pattern was not found + expect(client.read).to be == "hello world" + end + + it "can discard with a limit" do + server.write("hello\nworld\n") + server.close + + # Use peek to verify initial buffer state + expect(client.peek).to be == "hello\nworld\n" + + # Limit too small to find pattern - discards up to limit + expect(client.discard_until("\n", limit: 4)).to be_nil + + # Use peek to verify that 4 bytes were discarded + expect(client.peek).to be == "o\nworld\n" + + # After discarding 4 bytes, should find pattern in remaining data + chunk = client.discard_until("\n", limit: 5) + expect(chunk).not.to be_nil + expect(chunk).to be(:end_with?, "\n") + + # Use peek to verify final buffer state + expect(client.peek).to be == "world\n" + expect(client.read).to be == "world\n" + end + + it "handles patterns spanning buffer boundaries" do + # Use a small block size to force the pattern to span boundaries + client.block_size = 3 + + server.write("ab") + server.flush + server.write("cdef") + server.close + + # Pattern "cd" spans the boundary between "ab" and "cdef" + chunk = client.discard_until("cd") + expect(chunk).not.to be_nil + expect(chunk).to be(:end_with?, "cd") + expect(client.read).to be == "ef" + end + + it "handles large patterns efficiently" do + large_pattern = "X" * 20 # Trigger sliding window logic + server.write("some data before") + server.write(large_pattern) + server.write("some data after") + server.close + + chunk = client.discard_until(large_pattern) + expect(chunk).not.to be_nil + expect(chunk).to be(:end_with?, large_pattern) + expect(client.read).to be == "some data after" + end + + with "with 1-byte block size" do + it "can discard data with a multi-byte pattern" do + server.write("hello\nworld\n") + server.close + + client.block_size = 1 + + chunk1 = client.discard_until("\n") + expect(chunk1).not.to be_nil + expect(chunk1).to be(:end_with?, "\n") + + chunk2 = client.discard_until("\n") + expect(chunk2).not.to be_nil + expect(chunk2).to be(:end_with?, "\n") + + expect(client.discard_until("\n")).to be_nil + end + end + end end ABidirectionalStream = Sus::Shared("a bidirectional stream") do