Skip to content

Commit 0d266ce

Browse files
authored
add stats (#30)
1 parent c0bc3b3 commit 0d266ce

File tree

6 files changed

+203
-44
lines changed

6 files changed

+203
-44
lines changed

docs/src/examples.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,35 @@ read(GzipDecompressionStream(stream, stop_on_end=true)) #> the content of foo.t
160160
read(GzipDecompressionStream(stream, stop_on_end=true)) #> the content of bar.txt
161161
```
162162

163+
Check I/O statistics
164+
--------------------
165+
166+
`TranscodingStreams.stats` returns a snapshot of the I/O statistics. For
167+
example, the following function shows progress of decompression to the standard
168+
error:
169+
```julia
170+
using CodecZlib
171+
172+
function decompress(input, output)
173+
buffer = Vector{UInt8}(16 * 1024)
174+
while !eof(input)
175+
n = min(nb_available(input), length(buffer))
176+
unsafe_read(input, pointer(buffer), n)
177+
unsafe_write(output, pointer(buffer), n)
178+
stats = TranscodingStreams.stats(input)
179+
print(STDERR, "\rin: $(stats.in), out: $(stats.out)")
180+
end
181+
println(STDERR)
182+
end
183+
184+
input = GzipDecompressionStream(open("foobar.txt.gz"))
185+
output = IOBuffer()
186+
decompress(input, output)
187+
```
188+
189+
`stats.in` is the number of bytes supplied to the stream and `stats.out` is the
190+
number of bytes consumed out of the stream.
191+
163192
Transcode data in one shot
164193
--------------------------
165194

docs/src/references.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,14 @@ TranscodingStreams.unread
1717
TranscodingStreams.unsafe_unread
1818
```
1919

20+
Statistics
21+
----------
22+
23+
```@docs
24+
TranscodingStreams.Stats
25+
TranscodingStreams.stats
26+
```
27+
2028
Codec
2129
-----
2230

src/buffer.jl

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,18 @@ function supplied!(buf::Buffer, n::Integer)
8989
return buf
9090
end
9191

92+
function consumed2!(buf::Buffer, n::Integer)
93+
buf.bufferpos += n
94+
buf.total += n
95+
return buf
96+
end
97+
98+
function supplied2!(buf::Buffer, n::Integer)
99+
buf.marginpos += n
100+
buf.total += n
101+
return buf
102+
end
103+
92104
function readbyte!(buf::Buffer)
93105
b = buf.data[buf.bufferpos]
94106
consumed!(buf, 1)

src/noop.jl

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,30 @@ function Base.transcode(::Noop, data::Vector{UInt8})
114114
end
115115

116116

117+
# Stats
118+
# -----
119+
120+
function stats(stream::NoopStream)
121+
state = stream.state
122+
mode = state.mode
123+
@checkmode (:idle, :read, :write)
124+
buffer = state.buffer1
125+
@assert buffer == stream.state.buffer2
126+
if mode == :idle
127+
consumed = supplied = 0
128+
elseif mode == :read
129+
supplied = buffer.total
130+
consumed = supplied - buffersize(buffer)
131+
elseif mode == :write
132+
supplied = buffer.total + buffersize(buffer)
133+
consumed = buffer.total
134+
else
135+
assert(false)
136+
end
137+
return Stats(consumed, supplied, supplied, supplied)
138+
end
139+
140+
117141
# Buffering
118142
# ---------
119143
#
@@ -133,29 +157,28 @@ function fillbuffer(stream::NoopStream)
133157
makemargin!(buffer, 1)
134158
nfilled += readdata!(stream.stream, buffer)
135159
end
160+
buffer.total += nfilled
136161
return nfilled
137162
end
138163

139-
function flushbuffer(stream::NoopStream)
164+
function flushbuffer(stream::NoopStream, all::Bool=false)
140165
changemode!(stream, :write)
141166
buffer = stream.state.buffer1
142167
@assert buffer === stream.state.buffer2
143-
nflushed = writedata!(stream.stream, buffer)
144-
makemargin!(buffer, 0)
145-
return nflushed
146-
end
147-
148-
function flushbufferall(stream::NoopStream)
149-
changemode!(stream, :write)
150-
buffer = stream.state.buffer1
151-
bufsize = buffersize(buffer)
152-
while buffersize(buffer) > 0
153-
writedata!(stream.stream, buffer)
168+
nflushed::Int = 0
169+
if all
170+
while buffersize(buffer) > 0
171+
nflushed += writedata!(stream.stream, buffer)
172+
end
173+
else
174+
nflushed += writedata!(stream.stream, buffer)
175+
makemargin!(buffer, 0)
154176
end
155-
return bufsize
177+
buffer.total += nflushed
178+
return nflushed
156179
end
157180

158181
function flushuntilend(stream::NoopStream)
159-
writedata!(stream.stream, stream.state.buffer1)
182+
stream.state.buffer1.total += writedata!(stream.stream, stream.state.buffer1)
160183
return
161184
end

src/stream.jl

Lines changed: 53 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -431,31 +431,64 @@ function Base.flush(stream::TranscodingStream)
431431
end
432432

433433

434-
# Utils
434+
# Stats
435435
# -----
436436

437-
function total_in(stream::TranscodingStream)::Int64
438-
checkmode(stream)
439-
state = stream.state
440-
if state.mode == :read
441-
return state.buffer2.total
442-
elseif state.mode == :write
443-
return state.buffer1.total
444-
else
445-
return zero(Int64)
446-
end
437+
"""
438+
I/O statistics.
439+
440+
Its object has four fields:
441+
- `in`: the number of bytes supplied into the stream
442+
- `out`: the number of bytes consumed out of the stream
443+
- `transcoded_in`: the number of bytes transcoded from the input buffer
444+
- `transcoded_out`: the number of bytes transcoded to the output buffer
445+
446+
Note that, since the transcoding stream does buffering, `in` is `transcoded_in +
447+
{size of buffered data}` and `out` is `transcoded_out - {size of buffered
448+
data}`.
449+
"""
450+
struct Stats
451+
in::Int64
452+
out::Int64
453+
transcoded_in::Int64
454+
transcoded_out::Int64
447455
end
448456

449-
function total_out(stream::TranscodingStream)::Int64
450-
checkmode(stream)
457+
function Base.show(io::IO, stats::Stats)
458+
println(io, summary(stats), ':')
459+
println(io, " in: ", stats.in)
460+
println(io, " out: ", stats.out)
461+
println(io, " transcoded_in: ", stats.transcoded_in)
462+
print(io, " transcoded_out: ", stats.transcoded_out)
463+
end
464+
465+
"""
466+
stats(stream::TranscodingStream)
467+
468+
Create an I/O statistics object of `stream`.
469+
"""
470+
function stats(stream::TranscodingStream)
451471
state = stream.state
452-
if state.mode == :read
453-
return state.buffer1.total
454-
elseif state.mode == :write
455-
return state.buffer2.total
472+
mode = state.mode
473+
@checkmode (:idle, :read, :write)
474+
buffer1 = state.buffer1
475+
buffer2 = state.buffer2
476+
if mode == :idle
477+
transcoded_in = transcoded_out = in = out = 0
478+
elseif mode == :read
479+
transcoded_in = buffer2.total
480+
transcoded_out = buffer1.total
481+
in = transcoded_in + buffersize(buffer2)
482+
out = transcoded_out - buffersize(buffer1)
483+
elseif mode == :write
484+
transcoded_in = buffer1.total
485+
transcoded_out = buffer2.total
486+
in = transcoded_in + buffersize(buffer1)
487+
out = transcoded_out - buffersize(buffer2)
456488
else
457-
return zero(Int64)
489+
assert(false)
458490
end
491+
return Stats(in, out, transcoded_in, transcoded_out)
459492
end
460493

461494

@@ -537,8 +570,8 @@ function callprocess(stream::TranscodingStream, inbuf::Buffer, outbuf::Buffer)
537570
input = buffermem(inbuf)
538571
makemargin!(outbuf, minoutsize(stream.codec, input))
539572
Δin, Δout, state.code = process(stream.codec, input, marginmem(outbuf), state.error)
540-
consumed!(inbuf, Δin)
541-
supplied!(outbuf, Δout)
573+
consumed2!(inbuf, Δin)
574+
supplied2!(outbuf, Δout)
542575
if state.code == :error
543576
changemode!(stream, :panic)
544577
elseif state.code == :ok && Δin == Δout == 0

test/runtests.jl

Lines changed: 64 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,21 @@ using Base.Test
3030
@test mem.size == sizeof(data)
3131
end
3232

33+
@testset "Stats" begin
34+
stats = TranscodingStreams.Stats(1,2,3,4)
35+
@test repr(stats) ==
36+
"""
37+
TranscodingStreams.Stats:
38+
in: 1
39+
out: 2
40+
transcoded_in: 3
41+
transcoded_out: 4"""
42+
@test stats.in == 1
43+
@test stats.out == 2
44+
@test stats.transcoded_in == 3
45+
@test stats.transcoded_out == 4
46+
end
47+
3348
@testset "TranscodingStream" begin
3449
@test TranscodingStreams.splitkwargs(
3550
[(:foo, 1), (:bar, true), (:baz, :ok)], (:foo,)) ==
@@ -200,24 +215,22 @@ end
200215
@test s1.state.buffer1 === s2.state.buffer1 === s3.state.buffer1 ===
201216
s1.state.buffer2 === s2.state.buffer2 === s3.state.buffer2
202217

203-
#= FIXME: restore these tests
204218
stream = TranscodingStream(Noop(), IOBuffer(b"foobar"))
205-
@test TranscodingStreams.total_in(stream) === Int64(0)
206-
@test TranscodingStreams.total_out(stream) === Int64(0)
219+
@test TranscodingStreams.stats(stream).in === Int64(0)
220+
@test TranscodingStreams.stats(stream).out === Int64(0)
207221
read(stream)
208-
@test TranscodingStreams.total_in(stream) === Int64(6)
209-
@test TranscodingStreams.total_out(stream) === Int64(6)
222+
@test TranscodingStreams.stats(stream).in === Int64(6)
223+
@test TranscodingStreams.stats(stream).out === Int64(6)
210224
close(stream)
211225

212226
stream = TranscodingStream(Noop(), IOBuffer())
213-
@test TranscodingStreams.total_in(stream) === Int64(0)
214-
@test TranscodingStreams.total_out(stream) === Int64(0)
227+
@test TranscodingStreams.stats(stream).in === Int64(0)
228+
@test TranscodingStreams.stats(stream).out === Int64(0)
215229
write(stream, b"foobar")
216230
flush(stream)
217-
@test TranscodingStreams.total_in(stream) === Int64(6)
218-
@test TranscodingStreams.total_out(stream) === Int64(6)
231+
@test TranscodingStreams.stats(stream).in === Int64(6)
232+
@test TranscodingStreams.stats(stream).out === Int64(6)
219233
close(stream)
220-
=#
221234

222235
stream = NoopStream(IOBuffer("foobar"))
223236
@test nb_available(stream) === 0
@@ -420,3 +433,44 @@ end
420433
@test read(stream) == b"abracadabra"
421434
end
422435
end
436+
437+
@testset "stats" begin
438+
size = filesize(joinpath(dirname(@__FILE__), "abra.gzip"))
439+
stream = CodecZlib.GzipDecompressionStream(open(joinpath(dirname(@__FILE__), "abra.gzip")))
440+
stats = TranscodingStreams.stats(stream)
441+
@test stats.in == 0
442+
@test stats.out == 0
443+
@test stats.transcoded_in == 0
444+
@test stats.transcoded_out == 0
445+
read(stream, UInt8)
446+
stats = TranscodingStreams.stats(stream)
447+
@test stats.in == size
448+
@test stats.out == 1
449+
@test stats.transcoded_in == size
450+
@test stats.transcoded_out == 11
451+
close(stream)
452+
@test_throws ArgumentError TranscodingStreams.stats(stream)
453+
454+
buf = IOBuffer()
455+
stream = CodecZlib.GzipCompressionStream(buf)
456+
stats = TranscodingStreams.stats(stream)
457+
@test stats.in == 0
458+
@test stats.out == 0
459+
@test stats.transcoded_in == 0
460+
@test stats.transcoded_out == 0
461+
write(stream, b"abracadabra")
462+
stats = TranscodingStreams.stats(stream)
463+
@test stats.in == 11
464+
@test stats.out == 0
465+
@test stats.transcoded_in == 0
466+
@test stats.transcoded_out == 0
467+
write(stream, TranscodingStreams.TOKEN_END)
468+
flush(stream)
469+
stats = TranscodingStreams.stats(stream)
470+
@test stats.in == 11
471+
@test stats.out == position(buf)
472+
@test stats.transcoded_in == 11
473+
@test stats.transcoded_out == position(buf)
474+
close(stream)
475+
@test_throws ArgumentError TranscodingStreams.stats(stream)
476+
end

0 commit comments

Comments
 (0)