@@ -16,14 +16,14 @@ threads = Array.new(3) { |i| Thread.new { ch.push message: i } }
16
16
sleep 0.01 # let the threads run
17
17
threads
18
18
# => [#<Thread:[email protected] :14 dead>,
19
- # #<Thread:[email protected] :14 dead >,
20
- # #<Thread:[email protected] :14 sleep_forever >]
19
+ # #<Thread:[email protected] :14 sleep_forever >,
20
+ # #<Thread:[email protected] :14 dead >]
21
21
```
22
22
23
23
When message is popped the last thread continues and finishes as well.
24
24
25
25
``` ruby
26
- ch.pop # => {:message=>0 }
26
+ ch.pop # => {:message=>2 }
27
27
threads.map(& :join )
28
28
# => [#<Thread:[email protected] :14 dead>,
29
29
# #<Thread:[email protected] :14 dead>,
@@ -38,14 +38,11 @@ one will be blocked until new messages is pushed.
38
38
``` ruby
39
39
threads = Array .new (3 ) { |i | Thread .new { ch.pop } }
40
40
sleep 0.01 # let the threads run
41
- threads
42
- # => [#<Thread:[email protected] :32 dead>,
43
- # #<Thread:[email protected] :32 dead>,
44
- # #<Thread:[email protected] :32 sleep_forever>]
41
+ threads.map(& :status ) # => [false, false, "sleep"]
45
42
ch.push message: 3
46
43
# => #<Concurrent::Promises::Channel:0x000002 capacity taken 0 of 2>
47
44
threads.map(& :value )
48
- # => [{:message=>1 }, {:message=>2 }, {:message=>3}]
45
+ # => [{:message=>0 }, {:message=>1 }, {:message=>3}]
49
46
```
50
47
51
48
### Promises integration
@@ -55,11 +52,11 @@ therefore all operations can be represented as futures.
55
52
56
53
``` ruby
57
54
ch = Concurrent ::Promises ::Channel .new 2
58
- # => #<Concurrent::Promises::Channel:0x000009 capacity taken 0 of 2>
55
+ # => #<Concurrent::Promises::Channel:0x000006 capacity taken 0 of 2>
59
56
push_operations = Array .new (3 ) { |i | ch.push_op message: i }
60
- # => [#<Concurrent::Promises::Future:0x00000a fulfilled with #<Concurrent::Promises::Channel:0x000009 capacity taken 2 of 2>>,
61
- # #<Concurrent::Promises::Future:0x00000b fulfilled with #<Concurrent::Promises::Channel:0x000009 capacity taken 2 of 2>>,
62
- # #<Concurrent::Promises::ResolvableFuture:0x00000c pending>]
57
+ # => [#<Concurrent::Promises::Future:0x000007 fulfilled with #<Concurrent::Promises::Channel:0x000006 capacity taken 2 of 2>>,
58
+ # #<Concurrent::Promises::Future:0x000008 fulfilled with #<Concurrent::Promises::Channel:0x000006 capacity taken 2 of 2>>,
59
+ # #<Concurrent::Promises::ResolvableFuture:0x000009 pending>]
63
60
```
64
61
65
62
> We do not have to sleep here letting the futures execute as Threads.
@@ -73,14 +70,14 @@ making a space for a new message.
73
70
``` ruby
74
71
ch.pop_op.value! # => {:message=>0}
75
72
push_operations.map(& :value! )
76
- # => [#<Concurrent::Promises::Channel:0x000009 capacity taken 2 of 2>,
77
- # #<Concurrent::Promises::Channel:0x000009 capacity taken 2 of 2>,
78
- # #<Concurrent::Promises::Channel:0x000009 capacity taken 2 of 2>]
73
+ # => [#<Concurrent::Promises::Channel:0x000006 capacity taken 2 of 2>,
74
+ # #<Concurrent::Promises::Channel:0x000006 capacity taken 2 of 2>,
75
+ # #<Concurrent::Promises::Channel:0x000006 capacity taken 2 of 2>]
79
76
80
77
pop_operations = Array .new (3 ) { |i | ch.pop_op }
81
- # => [#<Concurrent::Promises::ResolvableFuture:0x00000d fulfilled with {:message=>1}>,
82
- # #<Concurrent::Promises::ResolvableFuture:0x00000e fulfilled with {:message=>2}>,
83
- # #<Concurrent::Promises::ResolvableFuture:0x00000f pending>]
78
+ # => [#<Concurrent::Promises::ResolvableFuture:0x00000a fulfilled with {:message=>1}>,
79
+ # #<Concurrent::Promises::ResolvableFuture:0x00000b fulfilled with {:message=>2}>,
80
+ # #<Concurrent::Promises::ResolvableFuture:0x00000c pending>]
84
81
ch.push message: 3 # (push|pop) can be freely mixed with (push_o|pop_op)
85
82
pop_operations.map(& :value )
86
83
# => [{:message=>1}, {:message=>2}, {:message=>3}]
@@ -94,21 +91,21 @@ returns a pair to be able to find out which channel had the message available.
94
91
95
92
``` ruby
96
93
ch1 = Concurrent ::Promises ::Channel .new 2
97
- # => #<Concurrent::Promises::Channel:0x000010 capacity taken 0 of 2>
94
+ # => #<Concurrent::Promises::Channel:0x00000d capacity taken 0 of 2>
98
95
ch2 = Concurrent ::Promises ::Channel .new 2
99
- # => #<Concurrent::Promises::Channel:0x000011 capacity taken 0 of 2>
96
+ # => #<Concurrent::Promises::Channel:0x00000e capacity taken 0 of 2>
100
97
ch1.push 1
101
- # => #<Concurrent::Promises::Channel:0x000010 capacity taken 1 of 2>
98
+ # => #<Concurrent::Promises::Channel:0x00000d capacity taken 1 of 2>
102
99
ch2.push 2
103
- # => #<Concurrent::Promises::Channel:0x000011 capacity taken 1 of 2>
100
+ # => #<Concurrent::Promises::Channel:0x00000e capacity taken 1 of 2>
104
101
105
102
Concurrent ::Promises ::Channel .select ([ch1, ch2])
106
- # => [#<Concurrent::Promises::Channel:0x000010 capacity taken 0 of 2>, 1]
103
+ # => [#<Concurrent::Promises::Channel:0x00000d capacity taken 0 of 2>, 1]
107
104
ch1.select (ch2)
108
- # => [#<Concurrent::Promises::Channel:0x000011 capacity taken 0 of 2>, 2]
105
+ # => [#<Concurrent::Promises::Channel:0x00000e capacity taken 0 of 2>, 2]
109
106
110
107
Concurrent ::Promises .future { 3 + 4 }.then_channel_push(ch1)
111
- # => #<Concurrent::Promises::Future:0x000012 pending>
108
+ # => #<Concurrent::Promises::Future:0x00000f pending>
112
109
Concurrent ::Promises ::Channel .
113
110
# or `ch1.select_op(ch2)` would be equivalent
114
111
select_op([ch1, ch2]).
@@ -125,7 +122,7 @@ They always return immediately and indicate either success or failure.
125
122
126
123
``` ruby
127
124
ch
128
- # => #<Concurrent::Promises::Channel:0x000009 capacity taken 0 of 2>
125
+ # => #<Concurrent::Promises::Channel:0x000006 capacity taken 0 of 2>
129
126
ch.try_push 1 # => true
130
127
ch.try_push 2 # => true
131
128
ch.try_push 3 # => false
@@ -142,7 +139,7 @@ when the timeout option is used.
142
139
143
140
``` ruby
144
141
ch
145
- # => #<Concurrent::Promises::Channel:0x000009 capacity taken 0 of 2>
142
+ # => #<Concurrent::Promises::Channel:0x000006 capacity taken 0 of 2>
146
143
ch.push 1 , 0.01 # => true
147
144
ch.push 2 , 0.01 # => true
148
145
ch.push 3 , 0.01 # => false
@@ -159,7 +156,7 @@ if the consumers are not keeping up.
159
156
160
157
``` ruby
161
158
channel = Concurrent ::Promises ::Channel .new 2
162
- # => #<Concurrent::Promises::Channel:0x000013 capacity taken 0 of 2>
159
+ # => #<Concurrent::Promises::Channel:0x000010 capacity taken 0 of 2>
163
160
log = Concurrent ::Array .new # => []
164
161
165
162
producers = Array .new 2 do |i |
@@ -170,8 +167,8 @@ producers = Array.new 2 do |i|
170
167
end
171
168
end
172
169
end
173
- # => [#<Thread:0x000014 @channel.in.md:133 run>,
174
- # #<Thread:0x000015 @channel.in.md:133 run>]
170
+ # => [#<Thread:0x000011 @channel.in.md:133 run>,
171
+ # #<Thread:0x000012 @channel.in.md:133 run>]
175
172
176
173
consumers = Array .new 4 do |i |
177
174
Thread .new (i) do |consumer |
@@ -183,22 +180,22 @@ consumers = Array.new 4 do |i|
183
180
end
184
181
end
185
182
end
186
- # => [#<Thread:0x000016 @channel.in.md:142 run>,
187
- # #<Thread:0x000017 @channel.in.md:142 run>,
188
- # #<Thread:0x000018 @channel.in.md:142 run>,
189
- # #<Thread:0x000019 @channel.in.md:142 run>]
183
+ # => [#<Thread:0x000013 @channel.in.md:142 run>,
184
+ # #<Thread:0x000014 @channel.in.md:142 run>,
185
+ # #<Thread:0x000015 @channel.in.md:142 run>,
186
+ # #<Thread:0x000016 @channel.in.md:142 run>]
190
187
191
188
# wait for all to finish
192
189
producers.map(& :join )
193
- # => [#<Thread:0x000014 @channel.in.md:133 dead>,
194
- # #<Thread:0x000015 @channel.in.md:133 dead>]
190
+ # => [#<Thread:0x000011 @channel.in.md:133 dead>,
191
+ # #<Thread:0x000012 @channel.in.md:133 dead>]
195
192
consumers.map(& :join )
196
- # => [#<Thread:0x000016 @channel.in.md:142 dead>,
197
- # #<Thread:0x000017 @channel.in.md:142 dead>,
198
- # #<Thread:0x000018 @channel.in.md:142 dead>,
199
- # #<Thread:0x000019 @channel.in.md:142 dead>]
193
+ # => [#<Thread:0x000013 @channel.in.md:142 dead>,
194
+ # #<Thread:0x000014 @channel.in.md:142 dead>,
195
+ # #<Thread:0x000015 @channel.in.md:142 dead>,
196
+ # #<Thread:0x000016 @channel.in.md:142 dead>]
200
197
# investigate log
201
- log
198
+ log
202
199
# => ["producer 0 pushing 0",
203
200
# "producer 0 pushing 1",
204
201
# "producer 0 pushing 2",
@@ -229,7 +226,7 @@ that run a thread pool.
229
226
230
227
``` ruby
231
228
channel = Concurrent ::Promises ::Channel .new 2
232
- # => #<Concurrent::Promises::Channel:0x00001a capacity taken 0 of 2>
229
+ # => #<Concurrent::Promises::Channel:0x000017 capacity taken 0 of 2>
233
230
log = Concurrent ::Array .new # => []
234
231
235
232
def produce (channel , log , producer , i )
@@ -251,22 +248,22 @@ end # => :consume
251
248
producers = Array .new 2 do |i |
252
249
Concurrent ::Promises .future(channel, log, i) { |* args | produce * args, 0 }.run
253
250
end
254
- # => [#<Concurrent::Promises::Future:0x00001b pending>,
255
- # #<Concurrent::Promises::Future:0x00001c pending>]
251
+ # => [#<Concurrent::Promises::Future:0x000018 pending>,
252
+ # #<Concurrent::Promises::Future:0x000019 pending>]
256
253
257
254
consumers = Array .new 4 do |i |
258
255
Concurrent ::Promises .future(channel, log, i) { |* args | consume * args, 0 }.run
259
256
end
260
- # => [#<Concurrent::Promises::Future:0x00001d pending>,
261
- # #<Concurrent::Promises::Future:0x00001e pending>,
262
- # #<Concurrent::Promises::Future:0x00001f pending>,
263
- # #<Concurrent::Promises::Future:0x000020 pending>]
257
+ # => [#<Concurrent::Promises::Future:0x00001a pending>,
258
+ # #<Concurrent::Promises::Future:0x00001b pending>,
259
+ # #<Concurrent::Promises::Future:0x00001c pending>,
260
+ # #<Concurrent::Promises::Future:0x00001d pending>]
264
261
265
262
# wait for all to finish
266
263
producers.map(& :value! ) # => [:done, :done]
267
264
consumers.map(& :value! ) # => [:done, :done, :done, :done]
268
265
# investigate log
269
- log
266
+ log
270
267
# => ["producer 0 pushing 0",
271
268
# "producer 1 pushing 0",
272
269
# "consumer 1 got 0. payload 0 from producer 1",
@@ -295,19 +292,19 @@ The operations have to be paired to succeed.
295
292
296
293
``` ruby
297
294
channel = Concurrent ::Promises ::Channel .new 0
298
- # => #<Concurrent::Promises::Channel:0x000021 capacity taken 0 of 0>
295
+ # => #<Concurrent::Promises::Channel:0x00001e capacity taken 0 of 0>
299
296
thread = Thread .new { channel.pop }; sleep 0.01
300
297
# allow the thread to go to sleep
301
298
thread
302
- # => #<Thread:0x000022 @channel.in.md:214 sleep_forever>
299
+ # => #<Thread:0x00001f @channel.in.md:246 sleep_forever>
303
300
# succeeds because there is matching pop operation waiting in the thread
304
301
channel.try_push(:v1 ) # => true
305
302
# remains pending, since there is no matching operation
306
303
push = channel.push_op(:v2 )
307
- # => #<Concurrent::Promises::ResolvableFuture:0x000023 pending>
304
+ # => #<Concurrent::Promises::ResolvableFuture:0x000020 pending>
308
305
thread.value # => :v1
309
306
# the push operation resolves as a pairing pop is called
310
307
channel.pop # => :v2
311
308
push
312
- # => #<Concurrent::Promises::ResolvableFuture:0x000023 fulfilled with #<Concurrent::Promises::Channel:0x000021 capacity taken 0 of 0>>
309
+ # => #<Concurrent::Promises::ResolvableFuture:0x000020 fulfilled with #<Concurrent::Promises::Channel:0x00001e capacity taken 0 of 0>>
313
310
```
0 commit comments