Skip to content

Commit c65962c

Browse files
committed
Merge pull request #415 from ruby-concurrency/new-channel
New channel implementation
2 parents df5ebec + f7e3672 commit c65962c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+3566
-1097
lines changed

README.md

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,11 @@ be obeyed though. Features developed in `concurrent-ruby-edge` are expected to m
130130
`Promise`, `IVar`, `Event`, `dataflow`, `Delay`, and `TimerTask` into a single framework. It extensively uses the
131131
new synchronization layer to make all the features **non-blocking** and **lock-free**, with the exception of obviously blocking
132132
operations like `#wait`, `#value`. It also offers better performance.
133-
* [Channel](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Channel.html):
134-
Communicating Sequential Processes (CSP).
133+
* New [channel](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Edge/Channel.html):
134+
Functionally equivalent to Go [channels](https://tour.golang.org/concurrency/2) with additional
135+
inspiration from Clojure [core.async](https://clojure.github.io/core.async/).
136+
* Old [channel](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Channel.html):
137+
Communicating Sequential Processes ([CSP](https://en.wikipedia.org/wiki/Communicating_sequential_processes)).
135138
* [LazyRegister](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/LazyRegister.html)
136139
* [AtomicMarkableReference](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Edge/AtomicMarkableReference.html)
137140
* [LockFreeLinkedSet](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Edge/LockFreeLinkedSet.html)
@@ -143,7 +146,8 @@ be obeyed though. Features developed in `concurrent-ruby-edge` are expected to m
143146

144147
- **Actor** - Partial documentation and tests; depends on new future/promise framework; stability is good.
145148
- **Future/Promise Framework** - API changes; partial documentation and tests; stability good.
146-
- **Channel** - Missing documentation; limited features; stability good.
149+
- **New channel** - Missing documentation; very new; stability good.
150+
- **Old channel** - Deprecated; missing documentation; limited features.
147151
- **LazyRegister** - Missing documentation and tests.
148152
- **AtomicMarkableReference, LockFreeLinkedSet, LockFreeStack** - Need real world battle testing
149153

doc/channel.md

Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
# Channels and Goroutines
2+
3+
Channels, popularized by the [Go programming language](https://golang.org/doc/effective_go.html#channels), are a modern variation of [communicating sequential processes (CSP)](https://en.wikipedia.org/wiki/Communicating_sequential_processes). CSP was first proposed by C. A. R. Hoare in 1978. The Go philosophy on concurrency is:
4+
5+
> Do not communicate by sharing memory; instead, share memory by communicating.
6+
7+
As [Rob Pike](https://en.wikipedia.org/wiki/Rob_Pike) eloquently explains in his [Concurrency Is Not Parallelism](https://vimeo.com/49718712) conference talk, concurrency is the "composition of independently executing things." Combining these two ideas, channels are a queue-like mechanism that can be used to communicate between independently executing things.
8+
9+
The channel implementation in this library was highly influenced by Go, but also incorporates ideas from Clojure's [core.async](https://clojure.github.io/core.async/) library. Runtime differences aside, this channel library is functionally equivalent to Go and even includes a few features Go does not.
10+
11+
### Example Programs
12+
13+
Every code example in the channel chapters of both [A Tour of Go](https://tour.golang.org/welcome/1) and [Go By Example](https://gobyexample.com/) has been reproduced in Ruby. The code can be found in the [examples](https://github.com/ruby-concurrency/concurrent-ruby/tree/master/examples) directory of the source repository. Many of those examples appear in the documentation below, but many do not. They are a valuable resource for learning how to use channels.
14+
15+
### Additional Resources
16+
17+
* "A Tour of Go" [channels exercises](https://tour.golang.org/concurrency/2)
18+
* "Go By Example" [channels exercises](https://gobyexample.com/channels)
19+
* "Effective Go" [concurrency chapters](https://golang.org/doc/effective_go.html#concurrency)
20+
* "Concurrency Is Not Parallelims" [conference presentation](https://vimeo.com/49718712) by Rob Pike, principal designer of Go
21+
* "Clojure core.async Channels" [blog post](http://clojure.com/blog/2013/06/28/clojure-core-async-channels.html) by Rich Hickey, inventor of Clojure
22+
* Clojure core.async [API reference](https://clojure.github.io/core.async/)
23+
24+
## Goroutines
25+
26+
The Go programming languages uses "goroutines" as the core concurrency mechanism. A goroutine is little more than an independently executing function, multiplexed with all other goroutines onto a thread pool managed by the runtime. Ruby has a very different runtime so true goroutines are not possible. Instread, a {.go} method is provided for running a block asynchronously, multiplexed onto a special thread pool reserved just for `Channel` operations. This is similar to what Clojure does with the `go` function from the [core.async](https://clojure.github.io/core.async/#clojure.core.async/go) library.
27+
28+
```ruby
29+
puts "Main thread: #{Thread.current}"
30+
31+
Concurrent::Channel.go do
32+
puts "Goroutine thread: #{Thread.current}"
33+
end
34+
35+
# Main thread: #<Thread:0x007fcb4c8bc3f0>
36+
# Goroutine thread: #<Thread:0x007fcb4c21f4e8>
37+
```
38+
39+
Although it is possible to use `Channel.go` independent of channels or to use channels with other asynchronous processing tools (such as `Future` and `Actor`), mixing the tools is not advised. Most high-level asynchronous processing tools already have a queue-like mechanism built in. Adding channels to the mix may indicate a design flaw. Conversely, `Channel.go` provides no mechanism for coordination and communication. That's what channels are for. Additionally, strictly using `Channel.go` along with channels provides an opportunity for future optimizations, such as Clojure's inversion of control (IOC) threads.
40+
41+
## Channel Basics
42+
43+
Channels are "pipes" through which values can be sent. They are thread safe and naturally concurrent. When shared between goroutines they provide a communication mechanism for coordinating asynchronous actions.
44+
45+
The core channel operations are {#put} and {#take} (aliased as {#send} and {#receive}, respectively). The former function inserts a value into a channel where the latter removes a value. By default these are blocking operations. A call to `put` will block until the channel is ready to receive the value. Similarly, a call to `take` will block until a value is available to be removed.
46+
47+
The following, simple example creates a channel, launches a goroutine from which a value is placed into the channel, then reads that value from the channel. When run this example will display "ping" in the console.
48+
49+
```ruby
50+
messages = Concurrent::Channel.new
51+
52+
Concurrent::Channel.go do
53+
messages.put 'ping'
54+
end
55+
56+
msg = messages.take
57+
puts msg
58+
```
59+
60+
By default, channels are *unbuffered*, meaning that they have a size of zero and only accept puts and takes when both a putting and a taking thread are available. If a `put` is started when there is no taker thread the call will block. As soon as another thread calls `take` the exchange will occur and both calls will return on their respective threads. Similarly, is a `take` is started when there is no putting thread the call will block until another thread calls `put`.
61+
62+
The following, slightly more complex example, concurrently sums two different halves of a list then combines the results. It uses an unbuffered channel to pass the results from the two goroutines back to the main thread. The main thread blocks on the two `take` calls until the worker goroutines are done. This example also uses the convenience aliases {#<<} and {#~}. Since channels in Go are part of the language, channel operations are performed using special channel operators rather than functions. These operators help clearly indicate that channel operations are being performed. The operator overloads `<<` for `put` and `~` for `take` help reinforce this idea in Ruby.
63+
64+
```ruby
65+
def sum(a, c)
66+
sum = a.reduce(0, &:+)
67+
c << sum # `<<` is an alias for `put` or `send`
68+
end
69+
70+
a = [7, 2, 8, -9, 4, 0]
71+
l = a.length / 2
72+
c = Concurrent::Channel.new
73+
74+
Concurrent::Channel.go { sum(a[-l, l], c) }
75+
Concurrent::Channel.go { sum(a[0, l], c) }
76+
x, y = ~c, ~c # `~` is an alias for `take` or `receive`
77+
78+
puts [x, y, x+y].join(' ')
79+
```
80+
81+
## Channel Buffering
82+
83+
One common channel variation is a *buffered* channel. A buffered channel has a finite number of slots in the buffer which can be filled. Putting threads can put values into the channel even if there is no taking threads, up to the point where the buffer is filled. Once a buffer becomes full the normal blocking behavior resumes. A buffered channel is created by giving a `:size` option on channel creation:
84+
85+
The following example creates a buffered channel with two slots. It then makes two `put` calls, adding values to the channel. These calls do not block because the buffer has room. Were a third `put` call to be made before an `take` calls, the third `put` would block.
86+
87+
```ruby
88+
ch = Concurrent::Channel.new(size: 2)
89+
ch << 1
90+
ch << 2
91+
92+
puts ~ch
93+
puts ~ch
94+
```
95+
96+
## Channel Synchronization
97+
98+
The main purpose of channels is to synchronize operations across goroutines. One common pattern for this is to created a `size: 1` buffered channel which is used to signal that work is complete. The following example calls a `worker` function on a goroutine and passes it a "done" channel. The main thread then calls `take` on the "done" channel and blocks until signaled.
99+
100+
```ruby
101+
def worker(done_channel)
102+
print "working...\n"
103+
sleep(1)
104+
print "done\n"
105+
106+
done_channel << true
107+
end
108+
109+
done = Concurrent::Channel.new(size: 1)
110+
Concurrent::Channel.go{ worker(done) }
111+
112+
~done # block until signaled
113+
```
114+
115+
## Multichannel Select
116+
117+
Often it is necessary for a single thread to operate on more than one channel. The {.select} method facilitates multivariate channel operations. The `select` method takes a block and passes through a special "selector" object as the first block parameter. The selector can then be used to specify various channel operations. The `select` call will block until one of the operations occurs. If a block is provided for the triggered clause (required for some clauses, optional for others) the block will then be called. Finally, the `select` call will immediately exit, guaranteeing that only one of the select clauses will trigger.
118+
119+
The following example spawns two goroutines, each of which goes to sleep before putting a value onto a channel. The main thread loops twice over a `select` and, in each loop, takes a value off of whichever channel returns one first.
120+
121+
```ruby
122+
c1 = Concurrent::Channel.new
123+
c2 = Concurrent::Channel.new
124+
125+
Concurrent::Channel.go do
126+
sleep(1)
127+
c1 << 'one'
128+
end
129+
130+
Concurrent::Channel.go do
131+
sleep(2)
132+
c1 << 'two'
133+
end
134+
135+
2.times do
136+
Concurrent::Channel.select do |s|
137+
s.take(c1) { |msg| print "received #{msg}\n" }
138+
s.take(c2) { |msg| print "received #{msg}\n" }
139+
end
140+
end
141+
```
142+
143+
The output from the above example is:
144+
145+
```
146+
received one
147+
received two
148+
```
149+
150+
The next example calculates the first 10 fibonacci numbers, passing them to the main thread via a channel. The fibonacci function puts each calculated value onto a channel while simultaneously listening to a different channel for the signal to stop. This example uses the `case` method on the selector object. This is just a convenience method for `put` and `take`, allowing the Ruby code to look more like Go.
151+
152+
```ruby
153+
def fibonacci(c, quit)
154+
x, y = 0, 1
155+
loop do
156+
Concurrent::Channel.select do |s|
157+
s.case(c, :<<, x) { x, y = y, x+y; x } # alias for `s.put`
158+
s.case(quit, :~) do # alias for `s.take`
159+
puts 'quit'
160+
return
161+
end
162+
end
163+
end
164+
end
165+
166+
c = Concurrent::Channel.new
167+
quit = Concurrent::Channel.new
168+
169+
Concurrent::Channel.go do
170+
10.times { puts ~c }
171+
quit << 0
172+
end
173+
174+
fibonacci(c, quit)
175+
```
176+
177+
## Closing and Iterating Over Channels
178+
179+
Newly created channels are in an "open" state. Open channels can receive values via `put` operations. When a program is done with a channel it can be closed by calling the {#close} method. Once a channel is closed it will no longer allow values to be `put`. If the channel is buffered and values are in the buffer when the channel is closed, the remaining values can still be removed via `take` operations.
180+
181+
The `Channel` class implements an {#each} method which can be used to retrieve successive values from the channel. The `each` method is a blocking method. When the channel is open and there are no values in the buffer, `each` will block until a new item is `put`. The `each` method will not exit until the channel is closed.
182+
183+
The following example launches a goroutine which calculates several fibonacci values and puts them into a channel. The main thread uses the `each` method to retrieve all the values successively and display them in the console. Once the fibonacci goroutine is done it closes the channel which subsequently causes the `each` iteration to end, unblocking the main thread.
184+
185+
```ruby
186+
def fibonacci(n, c)
187+
x, y = 0, 1
188+
(1..n).each do
189+
c << x
190+
x, y = y, x+y
191+
end
192+
c.close
193+
end
194+
195+
chan = Concurrent::Channel.new(size: 10)
196+
Concurrent::Channel.go { fibonacci(chan.capacity, c) }
197+
chan.each { |i| puts i }
198+
```
199+
200+
`Channel` also includes Ruby's [Enumerable](http://ruby-doc.org/core-2.2.3/Enumerable.html) mixin, allowing for a wide range of list comprehensions. Since the `Enumerable` methods iterate over the entire set of objects they can only complete once the channel is closed. Calling a method from `Enumerable` on an open channel will cause the method to block until the channel is closed.
201+
202+
## Timers and Tickers
203+
204+
A {.timer} is a specialized channel which triggers at a predefined time, specified as a number of seconds in the future. It is similar in concept to a {Concurrent::ScheduledTask} but operates as a channel and can fully participate in all channel operations.
205+
206+
The following code example creates two timers with different delay values. The first timer is allowed to expire (trigger) by having the main thread perform a `take` on it. When the timer expires it puts a {Concurrent::Channel::Tick} object into its buffer and closes. The second timer is listened to on a goroutine but the it never expires: the main thread stops (closes) the timer before it expires. Note that the goroutine in this example blocks forever and never exits. Since the timer is closed it never puts the `Tick` into its buffer.
207+
208+
```ruby
209+
timer1 = Concurrent::Channel.timer(2)
210+
211+
~timer1
212+
puts 'Timer 1 expired'
213+
214+
timer2 = Concurrent::Channel.timer(1)
215+
Concurrent::Channel.go do
216+
~timer2
217+
print "Timer 2 expired\n"
218+
end
219+
220+
stop2 = timer2.stop # alias for `close`
221+
print "Timer 2 stopped\n" if stop2
222+
```
223+
224+
A {.ticker} is a specialized channel which triggers over and over again at a predefined interval, specified as a number of seconds between ticks. It is similar in concept to a {Concurrent::TimerTask} but operates as a channel and can fully participate in all channel operations.
225+
226+
The following example creates a ticker which triggers every half-second. A goroutine iterates over the ticker using the `each` method, printing the tick at every interval. When the main thread stops (closes) the ticker the `each` call ends and the goroutine exits.
227+
228+
```ruby
229+
ticker = Concurrent::Channel.ticker(0.5)
230+
Concurrent::Channel.go do
231+
ticker.each do |tick|
232+
print "Tick at #{tick}\n"
233+
end
234+
end
235+
236+
sleep(1.6)
237+
ticker.stop # alias for `close`
238+
print "Ticker stopped\n"
239+
```
240+
241+
## Default Selection
242+
243+
As with a Ruby `case` statement, a `Channel.select` statement will accept a `default` clause which will trigger if none of the other clauses trigger. Not surprisingly, the `default` clause must be the last clause in a `select` block.
244+
245+
```ruby
246+
tick = Concurrent::Channel.tick(0.1) # alias for `ticker`
247+
boom = Concurrent::Channel.after(0.5) # alias for `timer`
248+
249+
loop do
250+
Concurrent::Channel.select do |s|
251+
s.take(tick) { print "tick.\n" }
252+
s.take(boom) do
253+
print "BOOM!\n"
254+
exit
255+
end
256+
s.default do
257+
print " .\n"
258+
sleep(0.05)
259+
end
260+
end
261+
end
262+
```
263+
264+
The output of this code example is:
265+
266+
```
267+
.
268+
.
269+
tick.
270+
.
271+
.
272+
tick.
273+
.
274+
.
275+
tick.
276+
.
277+
.
278+
tick.
279+
.
280+
.
281+
tick.
282+
BOOM!
283+
```
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
#!/usr/bin/env ruby
2+
3+
$: << File.expand_path('../../../lib', __FILE__)
4+
require 'concurrent-edge'
5+
Channel = Concurrent::Channel
6+
7+
## A Tour of Go: Buffered Channels
8+
# https://tour.golang.org/concurrency/3
9+
10+
ch = Channel.new(size: 2)
11+
ch << 1
12+
ch << 2
13+
14+
puts ~ch
15+
puts ~ch
16+
17+
expected = <<-STDOUT
18+
1
19+
2
20+
STDOUT
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
#!/usr/bin/env ruby
2+
3+
$: << File.expand_path('../../../lib', __FILE__)
4+
require 'concurrent-edge'
5+
Channel = Concurrent::Channel
6+
7+
## A Tour of Go: Channels
8+
# https://tour.golang.org/concurrency/2
9+
10+
def sum(a, c)
11+
sum = a.reduce(0, &:+)
12+
c << sum # `<<` is an alias for `put` or `send`
13+
end
14+
15+
a = [7, 2, 8, -9, 4, 0]
16+
l = a.length / 2
17+
c = Channel.new
18+
19+
Channel.go { sum(a[-l, l], c) }
20+
Channel.go { sum(a[0, l], c) }
21+
x, y = ~c, ~c # `~` is an alias for `take` or `receive`
22+
23+
puts [x, y, x+y].join(' ')
24+
25+
expected = <<-STDOUT
26+
-5 17 12
27+
STDOUT

0 commit comments

Comments
 (0)