Skip to content

Commit 182b14f

Browse files
authored
Fixes for position with nested NoopStreams (#203)
1 parent 9617908 commit 182b14f

File tree

8 files changed

+89
-21
lines changed

8 files changed

+89
-21
lines changed

.github/workflows/CI.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,13 @@ jobs:
2626
- windows-latest
2727
arch:
2828
- x64
29+
include:
30+
- os: ubuntu-latest
31+
version: '1'
32+
arch: x86
33+
- os: macOS-14
34+
version: '1'
35+
arch: aarch64
2936
steps:
3037
- uses: actions/checkout@v4
3138
- uses: julia-actions/setup-julia@v1

ext/TestExt.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ function TranscodingStreams.test_chunked_read(Encoder, Decoder)
8989
for chunk in chunks
9090
stream = TranscodingStream(Decoder(), buffer, stop_on_end=true)
9191
ok &= read(stream) == chunk
92+
ok &= position(stream) == length(chunk)
9293
ok &= eof(stream)
9394
ok &= isreadable(stream)
9495
close(stream)

fuzz/fuzz.jl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,7 @@ end
120120
for r in rs
121121
d = r(stream)
122122
append!(x, d)
123-
# TODO fix position
124-
# length(x) == position(stream) || return false
123+
length(x) == position(stream) || return false
125124
end
126125
x == data[eachindex(x)]
127126
end

src/noop.jl

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,16 +53,18 @@ Note that this method may return a wrong position when
5353
- some data have been inserted by `TranscodingStreams.unread`, or
5454
- the position of the wrapped stream has been changed outside of this package.
5555
"""
56-
function Base.position(stream::NoopStream)
56+
function Base.position(stream::NoopStream)::Int64
5757
mode = stream.state.mode
58-
if mode === :idle
58+
if !isopen(stream)
59+
throw_invalid_mode(mode)
60+
elseif mode === :idle
5961
return Int64(0)
62+
elseif has_sharedbuf(stream)
63+
return position(stream.stream)
6064
elseif mode === :write
6165
return position(stream.stream) + buffersize(stream.buffer1)
62-
elseif mode === :read
66+
else # read
6367
return position(stream.stream) - buffersize(stream.buffer1)
64-
else
65-
throw_invalid_mode(mode)
6668
end
6769
@assert false "unreachable"
6870
end
@@ -97,16 +99,34 @@ function Base.seekend(stream::NoopStream)
9799
return stream
98100
end
99101

100-
function Base.unsafe_write(stream::NoopStream, input::Ptr{UInt8}, nbytes::UInt)
102+
function Base.write(stream::NoopStream, b::UInt8)::Int
103+
changemode!(stream, :write)
104+
if has_sharedbuf(stream)
105+
# directly write data to the underlying stream
106+
n = Int(write(stream.stream, b))
107+
return n
108+
end
109+
buffer1 = stream.buffer1
110+
marginsize(buffer1) > 0 || flushbuffer(stream)
111+
return writebyte!(buffer1, b)
112+
end
113+
114+
function Base.unsafe_write(stream::NoopStream, input::Ptr{UInt8}, nbytes::UInt)::Int
101115
changemode!(stream, :write)
116+
if has_sharedbuf(stream)
117+
# directly write data to the underlying stream
118+
n = Int(unsafe_write(stream.stream, input, nbytes))
119+
return n
120+
end
102121
buffer = stream.buffer1
103122
if marginsize(buffer) nbytes
104123
copydata!(buffer, input, nbytes)
105124
return Int(nbytes)
106125
else
107126
flushbuffer(stream)
108127
# directly write data to the underlying stream
109-
return unsafe_write(stream.stream, input, nbytes)
128+
n = Int(unsafe_write(stream.stream, input, nbytes))
129+
return n
110130
end
111131
end
112132

@@ -152,7 +172,7 @@ function fillbuffer(stream::NoopStream; eager::Bool = false)::Int
152172
changemode!(stream, :read)
153173
buffer = stream.buffer1
154174
@assert buffer === stream.buffer2
155-
if stream.stream isa TranscodingStream && buffer === stream.stream.buffer1
175+
if has_sharedbuf(stream)
156176
# Delegate the operation when buffers are shared.
157177
underlying_mode::Symbol = stream.stream.state.mode
158178
if underlying_mode === :idle || underlying_mode === :read

src/stream.jl

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,10 @@ end
157157
# throw ArgumentError that mode is invalid.
158158
throw_invalid_mode(mode) = throw(ArgumentError(string("invalid mode :", mode)))
159159

160+
# Return true if the stream shares buffers with underlying stream
161+
function has_sharedbuf(stream::TranscodingStream)::Bool
162+
stream.stream isa TranscodingStream && stream.buffer2 === stream.stream.buffer1
163+
end
160164

161165
# Base IO Functions
162166
# -----------------
@@ -264,7 +268,7 @@ function Base.position(stream::TranscodingStream)
264268
mode = stream.state.mode
265269
if mode === :idle
266270
return Int64(0)
267-
elseif mode === :read
271+
elseif mode === :read || mode === :stop
268272
return stats(stream).out
269273
elseif mode === :write
270274
return stats(stream).in
@@ -584,7 +588,7 @@ function stats(stream::TranscodingStream)
584588
buffer2 = stream.buffer2
585589
if mode === :idle
586590
transcoded_in = transcoded_out = in = out = 0
587-
elseif mode === :read
591+
elseif mode === :read || mode === :stop
588592
transcoded_in = buffer2.transcoded
589593
transcoded_out = buffer1.transcoded
590594
in = transcoded_in + buffersize(buffer2)

test/codecdoubleframe.jl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,7 @@ DoubleFrameDecoderStream(stream::IO; kwargs...) = TranscodingStream(DoubleFrameD
272272
)
273273
))
274274
@test read(s1) == b""
275+
@test position(s1) == 0
275276
@test eof(s1)
276277

277278
s2 = NoopStream(
@@ -281,6 +282,7 @@ DoubleFrameDecoderStream(stream::IO; kwargs...) = TranscodingStream(DoubleFrameD
281282
)
282283
)
283284
@test read(s2) == b""
285+
@test position(s1) == 0
284286
@test eof(s2)
285287
end
286288

test/codecnoop.jl

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,11 @@
5050
close(stream)
5151

5252
stream = TranscodingStream(Noop(), IOBuffer(b"foobarbaz"))
53-
@test position(stream) === 0
53+
@test position(stream) === Int64(0)
5454
read(stream, UInt8)
55-
@test position(stream) === 1
55+
@test position(stream) === Int64(1)
5656
read(stream)
57-
@test position(stream) === 9
57+
@test position(stream) === Int64(9)
5858

5959
data = collect(0x00:0x0f)
6060
stream = TranscodingStream(Noop(), IOBuffer(data))
@@ -368,6 +368,41 @@
368368
@test position(stream) == pos
369369
end
370370
end
371+
372+
@testset "writing nested NoopStream sharedbuf=$(sharedbuf)" for sharedbuf in (true, false)
373+
stream = NoopStream(NoopStream(IOBuffer()); sharedbuf, bufsize=4)
374+
@test position(stream) == 0
375+
write(stream, 0x01)
376+
@test position(stream) == 1
377+
flush(stream)
378+
@test position(stream) == 1
379+
write(stream, "abc")
380+
@test position(stream) == 4
381+
flush(stream)
382+
@test position(stream) == 4
383+
for i in 1:10
384+
write(stream, 0x01)
385+
@test position(stream) == 4 + i
386+
end
387+
end
388+
389+
@testset "reading nested NoopStream sharedbuf=$(sharedbuf)" for sharedbuf in (true, false)
390+
stream = NoopStream(NoopStream(IOBuffer("abcdefghijk")); sharedbuf, bufsize=4)
391+
@test position(stream) == 0
392+
@test !eof(stream)
393+
@test position(stream) == 0
394+
@test read(stream, UInt8) == b"a"[1]
395+
@test position(stream) == 1
396+
@test read(stream, 3) == b"bcd"
397+
@test position(stream) == 4
398+
@test !eof(stream)
399+
@test position(stream) == 4
400+
@test read(stream) == b"efghijk"
401+
@test position(stream) == 11
402+
@test eof(stream)
403+
@test position(stream) == 11
404+
end
405+
371406
end
372407

373408
@testset "seek doesn't delete data" begin

test/codecquadruple.jl

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,19 +51,19 @@ end
5151
close(stream)
5252

5353
stream = TranscodingStream(QuadrupleCodec(), IOBuffer("foo"))
54-
@test position(stream) === 0
54+
@test position(stream) === Int64(0)
5555
read(stream, 3)
56-
@test position(stream) === 3
56+
@test position(stream) === Int64(3)
5757
read(stream, UInt8)
58-
@test position(stream) === 4
58+
@test position(stream) === Int64(4)
5959
close(stream)
6060

6161
stream = TranscodingStream(QuadrupleCodec(), IOBuffer())
62-
@test position(stream) === 0
62+
@test position(stream) === Int64(0)
6363
write(stream, 0x00)
64-
@test position(stream) === 1
64+
@test position(stream) === Int64(1)
6565
write(stream, "foo")
66-
@test position(stream) === 4
66+
@test position(stream) === Int64(4)
6767
close(stream)
6868

6969
# Buffers are shared.

0 commit comments

Comments
 (0)