Skip to content

Commit 9c5b03a

Browse files
committed
Merge pull request #433 from ruby-concurrency/channel-refactor
Fixed Channel/Buffer size and capacity methods.
2 parents b17d3bd + 4043e35 commit 9c5b03a

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+1209
-447
lines changed

doc/channel.md

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ msg = messages.take
5757
puts msg
5858
```
5959

60-
By default, channels are *unbuffered*, meaning that they have a size of zero and only accept puts and takes when both a putting and a taking thread are available. If a `put` is started when there is no taker thread the call will block. As soon as another thread calls `take` the exchange will occur and both calls will return on their respective threads. Similarly, is a `take` is started when there is no putting thread the call will block until another thread calls `put`.
60+
By default, channels are *unbuffered*, meaning that they have a capacity of zero and only accept puts and takes when both a putting and a taking thread are available. If a `put` is started when there is no taker thread the call will block. As soon as another thread calls `take` the exchange will occur and both calls will return on their respective threads. Similarly, is a `take` is started when there is no putting thread the call will block until another thread calls `put`.
6161

6262
The following, slightly more complex example, concurrently sums two different halves of a list then combines the results. It uses an unbuffered channel to pass the results from the two goroutines back to the main thread. The main thread blocks on the two `take` calls until the worker goroutines are done. This example also uses the convenience aliases {#<<} and {#~}. Since channels in Go are part of the language, channel operations are performed using special channel operators rather than functions. These operators help clearly indicate that channel operations are being performed. The operator overloads `<<` for `put` and `~` for `take` help reinforce this idea in Ruby.
6363

@@ -80,12 +80,12 @@ puts [x, y, x+y].join(' ')
8080

8181
## Channel Buffering
8282

83-
One common channel variation is a *buffered* channel. A buffered channel has a finite number of slots in the buffer which can be filled. Putting threads can put values into the channel even if there is no taking threads, up to the point where the buffer is filled. Once a buffer becomes full the normal blocking behavior resumes. A buffered channel is created by giving a `:size` option on channel creation:
83+
One common channel variation is a *buffered* channel. A buffered channel has a finite number of slots in the buffer which can be filled. Putting threads can put values into the channel even if there is no taking threads, up to the point where the buffer is filled. Once a buffer becomes full the normal blocking behavior resumes. A buffered channel is created by giving a `:capacity` option on channel creation:
8484

8585
The following example creates a buffered channel with two slots. It then makes two `put` calls, adding values to the channel. These calls do not block because the buffer has room. Were a third `put` call to be made before an `take` calls, the third `put` would block.
8686

8787
```ruby
88-
ch = Concurrent::Channel.new(size: 2)
88+
ch = Concurrent::Channel.new(capacity: 2)
8989
ch << 1
9090
ch << 2
9191

@@ -95,7 +95,7 @@ puts ~ch
9595

9696
## Channel Synchronization
9797

98-
The main purpose of channels is to synchronize operations across goroutines. One common pattern for this is to created a `size: 1` buffered channel which is used to signal that work is complete. The following example calls a `worker` function on a goroutine and passes it a "done" channel. The main thread then calls `take` on the "done" channel and blocks until signaled.
98+
The main purpose of channels is to synchronize operations across goroutines. One common pattern for this is to created a `capacity: 1` buffered channel which is used to signal that work is complete. The following example calls a `worker` function on a goroutine and passes it a "done" channel. The main thread then calls `take` on the "done" channel and blocks until signaled.
9999

100100
```ruby
101101
def worker(done_channel)
@@ -106,7 +106,7 @@ def worker(done_channel)
106106
done_channel << true
107107
end
108108

109-
done = Concurrent::Channel.new(size: 1)
109+
done = Concurrent::Channel.new(capacity: 1)
110110
Concurrent::Channel.go{ worker(done) }
111111

112112
~done # block until signaled
@@ -176,7 +176,7 @@ fibonacci(c, quit)
176176

177177
## Closing and Iterating Over Channels
178178

179-
Newly created channels are in an "open" state. Open channels can receive values via `put` operations. When a program is done with a channel it can be closed by calling the {#close} method. Once a channel is closed it will no longer allow values to be `put`. If the channel is buffered and values are in the buffer when the channel is closed, the remaining values can still be removed via `take` operations.
179+
Newly created channels are in an "open" state. Open channels can receive values via `put` operations. When a program is done with a channel it can be closed by calling the `#close` method. Once a channel is closed it will no longer allow values to be `put`. If the channel is buffered and values are in the buffer when the channel is closed, the remaining values can still be removed via `take` operations.
180180

181181
The `Channel` class implements an {#each} method which can be used to retrieve successive values from the channel. The `each` method is a blocking method. When the channel is open and there are no values in the buffer, `each` will block until a new item is `put`. The `each` method will not exit until the channel is closed.
182182

@@ -192,7 +192,7 @@ def fibonacci(n, c)
192192
c.close
193193
end
194194

195-
chan = Concurrent::Channel.new(size: 10)
195+
chan = Concurrent::Channel.new(capacity: 10)
196196
Concurrent::Channel.go { fibonacci(chan.capacity, c) }
197197
chan.each { |i| puts i }
198198
```

doc/synchronization.md

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
Provides common parent for all objects which need to be synchronized or be using other synchronization tools. It provides:
88

99
- Synchronized block
10-
- Methods for waiting and signaling
10+
- Methods for waiting and signaling
1111
- Volatile fields
1212
- Ensure visibility of final fields
1313
- Fields with CAS operations
@@ -49,7 +49,7 @@ private
4949
def ns_compute
5050
ns_compute_reduce ns_compute_map
5151
end
52-
```
52+
```
5353
where `compute` defines how is it synchronized and `ns_compute` handles the behavior (in this case the computation). `ns_` methods should only call other `ns_` methods or `pr_` methods. They can call normal methods on other objects, but that should be done with care (better to avoid) because the thread escapes this object while the lock is still held, which can lead to deadlock. That's why the `report` method is called in `compute` and not in `ns_compute`.
5454

5555
`pr_` methods are pure functions they can be used in and outside of synchronized blocks.
@@ -60,16 +60,16 @@ Sometimes while already inside the synchronized block some condition is not met.
6060

6161
To fulfill these needs there are private methods:
6262

63-
- `ns_wait` {include:Concurrent::Synchronization::AbstractObject#ns_wait}
64-
- `ns_wait_until` {include:Concurrent::Synchronization::AbstractObject#ns_wait_until}
65-
- `ns_signal` {include:Concurrent::Synchronization::AbstractObject#ns_signal}
66-
- `ns_broadcast` {include:Concurrent::Synchronization::AbstractObject#ns_broadcast}
63+
- `ns_wait` {include:Concurrent::Synchronization::AbstractLockableObject#ns_wait}
64+
- `ns_wait_until` {include:Concurrent::Synchronization::AbstractLockableObject#ns_wait_until}
65+
- `ns_signal` {include:Concurrent::Synchronization::AbstractLockableObject#ns_signal}
66+
- `ns_broadcast` {include:Concurrent::Synchronization::AbstractLockableObject#ns_broadcast}
6767

6868
All methods have to be called inside synchronized block.
6969

7070
## Volatile fields
7171

72-
`Synchronization::Object` can have volatile fields (Java semantic). They are defined by `attr_volatile :field_name`. `attr_volatile` defines reader and writer with the `field_name`. Any write is always immediately visible for any subsequent reads of the same field.
72+
`Synchronization::Object` can have volatile fields (Java semantic). They are defined by `attr_volatile :field_name`. `attr_volatile` defines reader and writer with the `field_name`. Any write is always immediately visible for any subsequent reads of the same field.
7373

7474
## Ensure visibility of final fields
7575

@@ -83,7 +83,7 @@ class AbstractPromise < Synchronization::Object
8383
ensure_ivar_visibility!
8484
end
8585
# ...
86-
end
86+
end
8787
```
8888

8989
### Naming conventions
@@ -112,10 +112,10 @@ class Event < Synchronization::Object
112112
self
113113
end
114114
# ...
115-
end
115+
end
116116
```
117117

118-
Operations on `@Touched` field have volatile semantic.
118+
Operations on `@Touched` field have volatile semantic.
119119

120120
## Memory model
121121

@@ -125,7 +125,7 @@ When writing libraries in `concurrent-ruby` we are reasoning based on following
125125

126126
The memory model is constructed based on our best effort and knowledge of the 3 main Ruby implementations (CRuby, JRuby, Rubinius). When considering certain aspect we always choose the weakest guarantee (e.g. local variable updates are always visible in CRuby but not in JRuby, so in this case JRuby behavior is picked). If some Ruby behavior is omitted here it is considered unsafe for use in parallel environment (Reasons may be lack of information, or difficulty of verification).
127127

128-
This takes in account following implementations:
128+
This takes in account following implementations:
129129

130130
- CRuby 1.9 - 2.2 (no differences found)
131131
- JRuby 1.7
@@ -139,10 +139,10 @@ We are interested in following behaviors:
139139

140140
### Variables
141141

142-
- **Local variables** - atomic assignment (only Integer and Object), non-volatile.
142+
- **Local variables** - atomic assignment (only Integer and Object), non-volatile.
143143
- Consequence: a lambda defined on `thread1` executing on `thread2` may not see updated values in local variables captured in its closure.
144144
- Reason: local variables are non-volatile on Jruby and Rubinius.
145-
- **Instance variables** - atomic assignment (only Integer and Object), non-volatile.
145+
- **Instance variables** - atomic assignment (only Integer and Object), non-volatile.
146146
- Consequence: Different thread may see old values; different thread may see not fully-initialized object.
147147
- Reason: local variables are non-volatile on Jruby and Rubinius.
148148
- **Constants** - atomic assignment, volatile.

examples/a-tour-of-go-channels/buffered-channels.rb

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,15 @@
55
Channel = Concurrent::Channel
66

77
## A Tour of Go: Buffered Channels
8-
# https://tour.golang.org/concurrency/3
8+
# https://tour.golang.org/concurrency/3
99

10-
ch = Channel.new(size: 2)
10+
ch = Channel.new(capacity: 2)
1111
ch << 1
1212
ch << 2
1313

1414
puts ~ch
1515
puts ~ch
1616

17-
expected = <<-STDOUT
17+
__END__
1818
1
1919
2
20-
STDOUT

examples/a-tour-of-go-channels/channels.rb

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
Channel = Concurrent::Channel
66

77
## A Tour of Go: Channels
8-
# https://tour.golang.org/concurrency/2
8+
# https://tour.golang.org/concurrency/2
99

1010
def sum(a, c)
1111
sum = a.reduce(0, &:+)
@@ -22,6 +22,5 @@ def sum(a, c)
2222

2323
puts [x, y, x+y].join(' ')
2424

25-
expected = <<-STDOUT
25+
__END__
2626
-5 17 12
27-
STDOUT

examples/a-tour-of-go-channels/default-selection.rb

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
loop do
1414
Channel.select do |s|
15-
s.take(tick) { print "tick.\n" }
15+
s.take(tick) { |t| print "tick.\n" if t }
1616
s.take(boom) do
1717
print "BOOM!\n"
1818
exit
@@ -24,7 +24,7 @@
2424
end
2525
end
2626

27-
expected = <<-STDOUT
27+
__END__
2828
.
2929
.
3030
tick.
@@ -41,4 +41,3 @@
4141
.
4242
tick.
4343
BOOM!
44-
STDOUT

examples/a-tour-of-go-channels/equivalent-binary-trees.rb

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ def same(t1, t2)
6565
puts same(new_tree(1), new_tree(1))
6666
puts same(new_tree(1), new_tree(2))
6767

68-
expected = <<-STDOUT
68+
__END__
6969
true
7070
false
71-
STDOUT

examples/a-tour-of-go-channels/range-and-close.rb

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
Channel = Concurrent::Channel
66

77
## A Tour of Go: Range and Close
8-
# https://tour.golang.org/concurrency/4
8+
# https://tour.golang.org/concurrency/4
99

1010
def fibonacci(n, c)
1111
x, y = 0, 1
@@ -16,11 +16,11 @@ def fibonacci(n, c)
1616
c.close
1717
end
1818

19-
c = Channel.new(size: 10)
19+
c = Channel.new(capacity: 10)
2020
Channel.go { fibonacci(c.capacity, c) }
2121
c.each { |i| puts i }
2222

23-
expected = <<-STDOUT
23+
__END__
2424
0
2525
1
2626
1
@@ -31,4 +31,3 @@ def fibonacci(n, c)
3131
13
3232
21
3333
34
34-
STDOUT

examples/a-tour-of-go-channels/select.rb

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def fibonacci(c, quit)
3030

3131
fibonacci(c, quit)
3232

33-
expected = <<-STDOUT
33+
__END__
3434
0
3535
1
3636
1
@@ -42,4 +42,3 @@ def fibonacci(c, quit)
4242
21
4343
34
4444
quit
45-
STDOUT

examples/go-by-example-channels/channel-buffering.rb

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,14 @@
77
## Go by Example: Channel Buffering
88
# https://gobyexample.com/channel-buffering
99

10-
messages = Channel.new(size: 2) # buffered
10+
messages = Channel.new(capacity: 2) # buffered
1111

1212
messages.put 'buffered'
1313
messages.put 'channel'
1414

1515
puts messages.take
1616
puts messages.take
1717

18-
expected = <<-STDOUT
18+
__END__
1919
buffered
2020
channel
21-
STDOUT

examples/go-by-example-channels/channel-directions.rb

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,13 @@ def pong(pings, pongs)
1919
pongs << msg
2020
end
2121

22-
pings = Channel.new(size: 1) # buffered
23-
pongs = Channel.new(size: 1) # buffered
22+
pings = Channel.new(capacity: 1) # buffered
23+
pongs = Channel.new(capacity: 1) # buffered
2424

2525
ping(pings, 'passed message')
2626
pong(pings, pongs)
2727

2828
puts ~pongs
2929

30-
expected = <<-STDOUT
30+
__END__
3131
passed message
32-
STDOUT

0 commit comments

Comments
 (0)