11"""
2- A buffer that drops all elements put into it. Only to be used as the output
3- buffer for a task - will throw if attached as an input.
2+ A buffer that drops all elements put into it.
43"""
5- struct DropBuffer{T} end
4+ mutable struct DropBuffer{T}
5+ open:: Bool
6+ DropBuffer{T}() where T = new{T}(true )
7+ end
68DropBuffer{T}(_) where T = DropBuffer{T}()
79Base. isempty(:: DropBuffer ) = true
810isfull(:: DropBuffer ) = false
9- Base. put!(:: DropBuffer , _) = nothing
10- Base. take!(:: DropBuffer ) = error(" Cannot `take!` from a DropBuffer" )
11-
12- " A process-local buffer backed by a `Channel{T}`."
13- struct ChannelBuffer{T}
14- channel:: Channel{T}
15- len:: Int
16- count:: Threads.Atomic{Int}
17- ChannelBuffer{T}(len:: Int = 1024 ) where T =
18- new{T}(Channel{T}(len), len, Threads. Atomic{Int}(0 ))
19- end
20- Base. isempty(cb:: ChannelBuffer ) = isempty(cb. channel)
21- isfull(cb:: ChannelBuffer ) = cb. count[] == cb. len
22- function Base. put!(cb:: ChannelBuffer{T} , x) where T
23- put!(cb. channel, convert(T, x))
24- Threads. atomic_add!(cb. count, 1 )
25- end
26- function Base. take!(cb:: ChannelBuffer )
27- take!(cb. channel)
28- Threads. atomic_sub!(cb. count, 1 )
29- end
30-
31- " A cross-worker buffer backed by a `RemoteChannel{T}`."
32- struct RemoteChannelBuffer{T}
33- channel:: RemoteChannel{Channel{T}}
34- len:: Int
35- count:: Threads.Atomic{Int}
36- RemoteChannelBuffer{T}(len:: Int = 1024 ) where T =
37- new{T}(RemoteChannel(()-> Channel{T}(len)), len, Threads. Atomic{Int}(0 ))
38- end
39- Base. isempty(cb:: RemoteChannelBuffer ) = isempty(cb. channel)
40- isfull(cb:: RemoteChannelBuffer ) = cb. count[] == cb. len
41- function Base. put!(cb:: RemoteChannelBuffer{T} , x) where T
42- put!(cb. channel, convert(T, x))
43- Threads. atomic_add!(cb. count, 1 )
44- end
45- function Base. take!(cb:: RemoteChannelBuffer )
46- take!(cb. channel)
47- Threads. atomic_sub!(cb. count, 1 )
11+ capacity(:: DropBuffer ) = typemax(Int)
12+ Base. length(:: DropBuffer ) = 0
13+ Base. isopen(buf:: DropBuffer ) = buf. open
14+ function Base. close(buf:: DropBuffer )
15+ buf. open = false
16+ end
17+ function Base. put!(buf:: DropBuffer , _)
18+ if ! isopen(buf)
19+ throw(InvalidStateException(" DropBuffer is closed" , :closed))
20+ end
21+ task_may_cancel!(; must_force= true )
22+ yield()
23+ return
24+ end
25+ function Base. take!(buf:: DropBuffer )
26+ while true
27+ if ! isopen(buf)
28+ throw(InvalidStateException(" DropBuffer is closed" , :closed))
29+ end
30+ task_may_cancel!(; must_force= true )
31+ yield()
32+ end
4833end
4934
5035" A process-local ring buffer."
@@ -53,40 +38,45 @@ mutable struct ProcessRingBuffer{T}
5338 write_idx:: Int
5439 @atomic count:: Int
5540 buffer:: Vector{T}
56- open:: Bool
41+ @atomic open:: Bool
5742 function ProcessRingBuffer{T}(len:: Int = 1024 ) where T
5843 buffer = Vector{T}(undef, len)
5944 return new{T}(1 , 1 , 0 , buffer, true )
6045 end
6146end
6247Base. isempty(rb:: ProcessRingBuffer ) = (@atomic rb. count) == 0
6348isfull(rb:: ProcessRingBuffer ) = (@atomic rb. count) == length(rb. buffer)
49+ capacity(rb:: ProcessRingBuffer ) = length(rb. buffer)
6450Base. length(rb:: ProcessRingBuffer ) = @atomic rb. count
65- Base. isopen(rb:: ProcessRingBuffer ) = rb. open
51+ Base. isopen(rb:: ProcessRingBuffer ) = @atomic rb. open
6652function Base. close(rb:: ProcessRingBuffer )
67- rb. open = false
53+ @atomic rb. open = false
6854end
6955function Base. put!(rb:: ProcessRingBuffer{T} , x) where T
70- len = length(rb. buffer)
71- while (@atomic rb. count) == len
56+ while isfull(rb)
7257 yield()
7358 if ! isopen(rb)
74- throw(InvalidStateException(" Stream is closed" , :closed))
59+ throw(InvalidStateException(" ProcessRingBuffer is closed" , :closed))
7560 end
76- task_may_cancel!()
61+ task_may_cancel!(; must_force = true )
7762 end
78- to_write_idx = mod1(rb. write_idx, len )
63+ to_write_idx = mod1(rb. write_idx, length(rb . buffer) )
7964 rb. buffer[to_write_idx] = convert(T, x)
8065 rb. write_idx += 1
8166 @atomic rb. count += 1
8267end
8368function Base. take!(rb:: ProcessRingBuffer )
84- while (@atomic rb . count) == 0
69+ while isempty(rb)
8570 yield()
86- if ! isopen(rb)
87- throw(InvalidStateException(" Stream is closed" , :closed))
71+ if ! isopen(rb) && isempty(rb)
72+ throw(InvalidStateException(" ProcessRingBuffer is closed" , :closed))
8873 end
89- task_may_cancel!()
74+ if task_cancelled() && isempty(rb)
75+ # We respect a graceful cancellation only if the buffer is empty.
76+ # Otherwise, we may have values to continue communicating.
77+ task_may_cancel!()
78+ end
79+ task_may_cancel!(; must_force= true )
9080 end
9181 to_read_idx = rb. read_idx
9282 rb. read_idx += 1
@@ -106,123 +96,3 @@ function collect!(rb::ProcessRingBuffer{T}) where T
10696
10797 return output
10898end
109-
110- #= TODO
111- "A server-local ring buffer backed by shared-memory."
112- mutable struct ServerRingBuffer{T}
113- read_idx::Int
114- write_idx::Int
115- @atomic count::Int
116- buffer::Vector{T}
117- function ServerRingBuffer{T}(len::Int=1024) where T
118- buffer = Vector{T}(undef, len)
119- return new{T}(1, 1, 0, buffer)
120- end
121- end
122- Base.isempty(rb::ServerRingBuffer) = (@atomic rb.count) == 0
123- function Base.put!(rb::ServerRingBuffer{T}, x) where T
124- len = length(rb.buffer)
125- while (@atomic rb.count) == len
126- yield()
127- end
128- to_write_idx = mod1(rb.write_idx, len)
129- rb.buffer[to_write_idx] = convert(T, x)
130- rb.write_idx += 1
131- @atomic rb.count += 1
132- end
133- function Base.take!(rb::ServerRingBuffer)
134- while (@atomic rb.count) == 0
135- yield()
136- end
137- to_read_idx = rb.read_idx
138- rb.read_idx += 1
139- @atomic rb.count -= 1
140- to_read_idx = mod1(to_read_idx, length(rb.buffer))
141- return rb.buffer[to_read_idx]
142- end
143- =#
144-
145- #=
146- "A TCP-based ring buffer."
147- mutable struct TCPRingBuffer{T}
148- read_idx::Int
149- write_idx::Int
150- @atomic count::Int
151- buffer::Vector{T}
152- function TCPRingBuffer{T}(len::Int=1024) where T
153- buffer = Vector{T}(undef, len)
154- return new{T}(1, 1, 0, buffer)
155- end
156- end
157- Base.isempty(rb::TCPRingBuffer) = (@atomic rb.count) == 0
158- function Base.put!(rb::TCPRingBuffer{T}, x) where T
159- len = length(rb.buffer)
160- while (@atomic rb.count) == len
161- yield()
162- end
163- to_write_idx = mod1(rb.write_idx, len)
164- rb.buffer[to_write_idx] = convert(T, x)
165- rb.write_idx += 1
166- @atomic rb.count += 1
167- end
168- function Base.take!(rb::TCPRingBuffer)
169- while (@atomic rb.count) == 0
170- yield()
171- end
172- to_read_idx = rb.read_idx
173- rb.read_idx += 1
174- @atomic rb.count -= 1
175- to_read_idx = mod1(to_read_idx, length(rb.buffer))
176- return rb.buffer[to_read_idx]
177- end
178- =#
179-
180- #=
181- """
182- A flexible puller which switches to the most efficient buffer type based
183- on the sender and receiver locations.
184- """
185- mutable struct UniBuffer{T}
186- buffer::Union{ProcessRingBuffer{T}, Nothing}
187- end
188- function initialize_stream_buffer!(::Type{UniBuffer{T}}, T, send_proc, recv_proc, buffer_amount) where T
189- if buffer_amount == 0
190- error("Return NullBuffer")
191- end
192- send_osproc = get_parent(send_proc)
193- recv_osproc = get_parent(recv_proc)
194- if send_osproc.pid == recv_osproc.pid
195- inner = RingBuffer{T}(buffer_amount)
196- elseif system_uuid(send_osproc.pid) == system_uuid(recv_osproc.pid)
197- inner = ProcessBuffer{T}(buffer_amount)
198- else
199- inner = RemoteBuffer{T}(buffer_amount)
200- end
201- return UniBuffer{T}(buffer_amount)
202- end
203-
204- struct LocalPuller{T,B}
205- buffer::B{T}
206- id::UInt
207- function LocalPuller{T,B}(id::UInt, buffer_amount::Integer) where {T,B}
208- buffer = initialize_stream_buffer!(B, T, buffer_amount)
209- return new{T,B}(buffer, id)
210- end
211- end
212- function Base.take!(pull::LocalPuller{T,B}) where {T,B}
213- if pull.buffer === nothing
214- pull.buffer =
215- error("Return NullBuffer")
216- end
217- value = take!(pull.buffer)
218- end
219- function initialize_input_stream!(stream::Stream{T,B}, id::UInt, send_proc::Processor, recv_proc::Processor, buffer_amount::Integer) where {T,B}
220- local_buffer = remotecall_fetch(stream.ref.handle.owner, stream.ref.handle, id) do ref, id
221- local_buffer, remote_buffer = initialize_stream_buffer!(B, T, send_proc, recv_proc, buffer_amount)
222- ref.buffers[id] = remote_buffer
223- return local_buffer
224- end
225- stream.buffer = local_buffer
226- return stream
227- end
228- =#
0 commit comments