Skip to content

Commit 8538673

Browse files
authored
Implement shared buffers (#22)
1 parent d127ceb commit 8538673

File tree

10 files changed

+414
-154
lines changed

10 files changed

+414
-154
lines changed

docs/make.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ makedocs(
55
format=:html,
66
sitename="TranscodingStreams.jl",
77
modules=[TranscodingStreams],
8-
pages=["index.md", "examples.md", "references.md"],
8+
pages=["index.md", "examples.md", "references.md", "devnotes.md"],
99
assets=["assets/custom.css"])
1010

1111
deploydocs(

docs/src/assets/modes.dot

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
digraph modes {
2+
"idle" -> "read";
3+
"idle" -> "write";
4+
"idle" -> "close";
5+
"idle" -> "panic";
6+
7+
"read" -> "read";
8+
"read" -> "close";
9+
"read" -> "panic";
10+
11+
"write" -> "write";
12+
"write" -> "close";
13+
"write" -> "panic";
14+
}

docs/src/assets/modes.svg

Lines changed: 88 additions & 0 deletions
Loading

docs/src/devnotes.md

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
Developer's Notes
2+
=================
3+
4+
These notes are not for end users but rather for developers who are interested
5+
in the design of the package.
6+
7+
8+
`TranscodingStream` type
9+
------------------------
10+
11+
`TranscodingStream{C,S}` (defined in src/stream.jl) has three fields:
12+
- `codec`: data codec (`<:C where C<:Codec`)
13+
- `stream`: data stream (`<:S where S<:IO`)
14+
- `state`: current state (`<:State`).
15+
16+
A codec will be implemented by package developers and only a special codec
17+
`Noop` is defined in this package. A stream can be any object that implements
18+
at least `Base.isopen`, `Base.eof`, `Base.close`, `Base.nb_available`,
19+
`Base.unsafe_read`, and `Base.unsafe_write`. All mutable fields are delegated
20+
to `state` and hence the stream type itself is immutable.
21+
22+
A stream has two buffers in the `state` field. These are used to store
23+
pre-transcoded and transcoded data in the stream. The stream passes references
24+
of these two buffers to the codec when processing data. The following diagram
25+
illustrates the flow of data:
26+
27+
When reading data (`state.mode == :read`):
28+
user <--- |state.buffer1| <--- <stream.codec> <--- |state.buffer2| <--- stream
29+
30+
When writing data (`state.mode == :write`):
31+
user ---> |state.buffer1| ---> <stream.codec> ---> |state.buffer2| ---> stream
32+
33+
In the read mode, a user pull out data from `state.buffer1` and pre-transcoded
34+
data are filled in `state.buffer2`. In the write mode, a user will push data
35+
into `state.buffer1` and transcoded data are filled in `state.buffer2`. The
36+
default buffer size is 16KiB for each.
37+
38+
`State` (defined in src/state.jl) has five fields:
39+
- `mode`: current stream mode (`<:Symbol`)
40+
- `code`: return code of the last codec's method call (`<:Symbol`)
41+
- `error`: exception returned by the codec (`<:Error`)
42+
- `buffer1`: data buffer that is closer to the user (`<:Buffer`)
43+
- `buffer2`: data buffer that is farther to the user (`<:Buffer`)
44+
45+
The `mode` field may be one of the following value:
46+
- `:idle` : initial and intermediate mode, no buffered data
47+
- `:read` : ready to read data, data may be buffered
48+
- `:write`: ready to write data, data may be buffered
49+
- `:close`: closed, no buffered data
50+
- `:panic`: an exception has been thrown in codec, data may be buffered but we
51+
cannot do anything
52+
53+
The initial mode is `:idle` and mode transition happens as shown in the
54+
following diagram:
55+
![Mode transition](./assets/modes.svg)
56+
57+
The mode transition should happen in the `changemode!(stream, newmode)` function
58+
in src/stream.jl. Trying an undefined transition will thrown an exception.
59+
60+
A transition happens based on actions (or function calls) of the user or return
61+
code of the codec. For example, calling `read(stream)` will change the mode from
62+
`:init` to `:read` and then calling `close(stream)` will change the mode from
63+
`:read` to `:close`. When data processing fails in the codec, a codec will
64+
return `:error` and the stream will result in `:panic`.
65+
66+
67+
Shared buffers
68+
--------------
69+
70+
Adjacent transcoding streams may share their buffers. This will reduce memory
71+
allocation and eliminate data copy between buffers.
72+
73+
`readdata!(input::IO, output::Buffer)` and `writedata!(output::IO,
74+
input::Buffer)` do the actual work of read/write data from/to the underlying
75+
stream. These methods have a special pass for shared buffers.
76+
77+
78+
`Noop` codec
79+
------------
80+
81+
`Noop` (`NoopStream`) is a codec that does *nothing*. It works as a buffering
82+
layer on top of the underlying stream. Since `NoopStream` does not need to have
83+
two distinct buffers, `buffer1` and `buffer2` in the `State` object are shared
84+
and some specialized methods are defined for the type. All of these are defined
85+
in src/noop.jl.

src/buffer.jl

Lines changed: 47 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -77,16 +77,27 @@ function buffermem(buf::Buffer)
7777
return Memory(bufferptr(buf), buffersize(buf))
7878
end
7979

80+
# Notify that `n` bytes are consumed from `buf`.
81+
function consumed!(buf::Buffer, n::Integer)
82+
buf.bufferpos += n
83+
return buf
84+
end
85+
86+
# Notify that `n` bytes are supplied to `buf`.
87+
function supplied!(buf::Buffer, n::Integer)
88+
buf.marginpos += n
89+
return buf
90+
end
91+
8092
function readbyte!(buf::Buffer)
8193
b = buf.data[buf.bufferpos]
82-
buf.bufferpos += 1
94+
consumed!(buf, 1)
8395
return b
8496
end
8597

8698
function writebyte!(buf::Buffer, b::UInt8)
8799
buf.data[buf.marginpos] = b
88-
buf.marginpos += 1
89-
buf.total += 1
100+
supplied!(buf, 1)
90101
return 1
91102
end
92103

@@ -102,6 +113,10 @@ function marginmem(buf::Buffer)
102113
return Memory(marginptr(buf), marginsize(buf))
103114
end
104115

116+
function ismarked(buf::Buffer)
117+
return buf.markpos != 0
118+
end
119+
105120
function mark!(buf::Buffer)
106121
return buf.markpos = buf.bufferpos
107122
end
@@ -160,11 +175,12 @@ function emptybuffer!(buf::Buffer)
160175
return buf
161176
end
162177

178+
# Skip `n` bytes in the buffer.
163179
function skipbuffer!(buf::Buffer, n::Integer)
164180
if n > buffersize(buf)
165181
throw(ArgumentError("too large skip size"))
166182
end
167-
buf.bufferpos += n
183+
consumed!(buf, n)
168184
return buf
169185
end
170186

@@ -176,12 +192,6 @@ function initbuffer!(buf::Buffer)
176192
return buf
177193
end
178194

179-
# Copy marked data.
180-
function copymarked(buf::Buffer)
181-
@assert buf.markpos > 0
182-
return buf.data[buf.markpos:buf.marginpos-1]
183-
end
184-
185195
# Take the ownership of the marked data.
186196
function takemarked!(buf::Buffer)
187197
@assert buf.markpos > 0
@@ -191,52 +201,43 @@ function takemarked!(buf::Buffer)
191201
return resize!(buf.data, sz)
192202
end
193203

194-
# Insert data to the current buffer.
195-
function insertdata!(buf::Buffer, data::Ptr{UInt8}, nbytes::Integer)
204+
# Copy data from `data` to `buf`.
205+
function copydata!(buf::Buffer, data::Ptr{UInt8}, nbytes::Integer)
196206
makemargin!(buf, nbytes)
197-
copy!(buf.data, buf.bufferpos + nbytes, buf.data, buf.bufferpos, buffersize(buf))
198-
unsafe_copy!(bufferptr(buf), data, nbytes)
199-
buf.marginpos += nbytes
207+
unsafe_copy!(marginptr(buf), data, nbytes)
208+
supplied!(buf, nbytes)
200209
return buf
201210
end
202211

203-
# Read as much data as possbile from `input` to the margin of `output`.
204-
# This function will not block if `input` has buffered data.
205-
function readdata!(input::IO, output::Buffer)
206-
nread::Int = 0
207-
navail = nb_available(input)
208-
if navail == 0 && marginsize(output) > 0 && !eof(input)
209-
nread += writebyte!(output, read(input, UInt8))
210-
navail = nb_available(input)
211-
end
212-
n = min(navail, marginsize(output))
213-
Base.unsafe_read(input, marginptr(output), n)
214-
output.marginpos += n
215-
nread += n
216-
output.total += nread
217-
return nread
212+
# Copy data from `buf` to `data`.
213+
function copydata!(data::Ptr{UInt8}, buf::Buffer, nbytes::Integer)
214+
# NOTE: It's caller's responsibility to ensure that the buffer has at least
215+
# nbytes.
216+
@assert buffersize(buf) nbytes
217+
unsafe_copy!(data, bufferptr(buf), nbytes)
218+
consumed!(buf, nbytes)
219+
return data
218220
end
219221

220-
# Read data from `buf` to `dst`.
221-
function readdata!(buf::Buffer, dst::Vector{UInt8}, dpos::Integer, sz::Integer)
222-
copy!(dst, dpos, buf.data, buf.bufferpos, sz)
223-
buf.bufferpos += sz
224-
return dst
225-
end
226-
227-
# Write all data to `output` from the buffer of `input`.
228-
function writebuffer!(output::IO, input::Buffer)
229-
while buffersize(input) > 0
230-
input.bufferpos += Base.unsafe_write(output, bufferptr(input), buffersize(input))
231-
end
222+
# Insert data to the current buffer.
223+
function insertdata!(buf::Buffer, data::Ptr{UInt8}, nbytes::Integer)
224+
makemargin!(buf, nbytes)
225+
copy!(buf.data, buf.bufferpos + nbytes, buf.data, buf.bufferpos, buffersize(buf))
226+
unsafe_copy!(bufferptr(buf), data, nbytes)
227+
supplied!(buf, nbytes)
228+
return buf
232229
end
233230

234231
# Find the first occurrence of a specific byte.
235232
function findbyte(buf::Buffer, byte::UInt8)
236-
ptr = ccall(:memchr, Ptr{Void}, (Ptr{Void}, Cint, Csize_t), pointer(buf.data, buf.bufferpos), byte, buffersize(buf))
237-
if ptr == C_NULL
238-
return 0
233+
p = ccall(
234+
:memchr,
235+
Ptr{UInt8},
236+
(Ptr{UInt8}, Cint, Csize_t),
237+
pointer(buf.data, buf.bufferpos), byte, buffersize(buf))
238+
if p == C_NULL
239+
return marginptr(buf)
239240
else
240-
return Int(ptr - pointer(buf.data, buf.bufferpos)) + buf.bufferpos
241+
return p
241242
end
242243
end

0 commit comments

Comments
 (0)