@@ -16,14 +16,14 @@ threads = Array.new(3) { |i| Thread.new { ch.push message: i } }
16
16
sleep 0.01 # let the threads run
17
17
threads
18
18
# => [#<Thread:[email protected] :14 dead>,
19
- # #<Thread:[email protected] :14 sleep_forever >,
20
- # #<Thread:[email protected] :14 dead >]
19
+ # #<Thread:[email protected] :14 dead >,
20
+ # #<Thread:[email protected] :14 sleep_forever >]
21
21
```
22
22
23
23
When message is popped the last thread continues and finishes as well.
24
24
25
25
``` ruby
26
- ch.pop # => {:message=>2 }
26
+ ch.pop # => {:message=>0 }
27
27
threads.map(& :join )
28
28
# => [#<Thread:[email protected] :14 dead>,
29
29
# #<Thread:[email protected] :14 dead>,
@@ -38,11 +38,14 @@ one will be blocked until new messages is pushed.
38
38
``` ruby
39
39
threads = Array .new (3 ) { |i | Thread .new { ch.pop } }
40
40
sleep 0.01 # let the threads run
41
- threads.map(& :status ) # => [false, false, "sleep"]
41
+ threads
42
+ # => [#<Thread:[email protected] :32 dead>,
43
+ # #<Thread:[email protected] :32 dead>,
44
+ # #<Thread:[email protected] :32 sleep_forever>]
42
45
ch.push message: 3
43
46
# => #<Concurrent::Promises::Channel:0x000002 capacity taken 0 of 2>
44
47
threads.map(& :value )
45
- # => [{:message=>0 }, {:message=>1 }, {:message=>3}]
48
+ # => [{:message=>1 }, {:message=>2 }, {:message=>3}]
46
49
```
47
50
48
51
### Promises integration
@@ -52,11 +55,11 @@ therefore all operations can be represented as futures.
52
55
53
56
``` ruby
54
57
ch = Concurrent ::Promises ::Channel .new 2
55
- # => #<Concurrent::Promises::Channel:0x000006 capacity taken 0 of 2>
58
+ # => #<Concurrent::Promises::Channel:0x000009 capacity taken 0 of 2>
56
59
push_operations = Array .new (3 ) { |i | ch.push_op message: i }
57
- # => [#<Concurrent::Promises::Future:0x000007 fulfilled with #<Concurrent::Promises::Channel:0x000006 capacity taken 2 of 2>>,
58
- # #<Concurrent::Promises::Future:0x000008 fulfilled with #<Concurrent::Promises::Channel:0x000006 capacity taken 2 of 2>>,
59
- # #<Concurrent::Promises::ResolvableFuture:0x000009 pending>]
60
+ # => [#<Concurrent::Promises::Future:0x00000a fulfilled with #<Concurrent::Promises::Channel:0x000009 capacity taken 2 of 2>>,
61
+ # #<Concurrent::Promises::Future:0x00000b fulfilled with #<Concurrent::Promises::Channel:0x000009 capacity taken 2 of 2>>,
62
+ # #<Concurrent::Promises::ResolvableFuture:0x00000c pending>]
60
63
```
61
64
62
65
> We do not have to sleep here letting the futures execute as Threads.
@@ -70,14 +73,14 @@ making a space for a new message.
70
73
``` ruby
71
74
ch.pop_op.value! # => {:message=>0}
72
75
push_operations.map(& :value! )
73
- # => [#<Concurrent::Promises::Channel:0x000006 capacity taken 2 of 2>,
74
- # #<Concurrent::Promises::Channel:0x000006 capacity taken 2 of 2>,
75
- # #<Concurrent::Promises::Channel:0x000006 capacity taken 2 of 2>]
76
+ # => [#<Concurrent::Promises::Channel:0x000009 capacity taken 2 of 2>,
77
+ # #<Concurrent::Promises::Channel:0x000009 capacity taken 2 of 2>,
78
+ # #<Concurrent::Promises::Channel:0x000009 capacity taken 2 of 2>]
76
79
77
80
pop_operations = Array .new (3 ) { |i | ch.pop_op }
78
- # => [#<Concurrent::Promises::ResolvableFuture:0x00000a fulfilled with {:message=>1}>,
79
- # #<Concurrent::Promises::ResolvableFuture:0x00000b fulfilled with {:message=>2}>,
80
- # #<Concurrent::Promises::ResolvableFuture:0x00000c pending>]
81
+ # => [#<Concurrent::Promises::ResolvableFuture:0x00000d fulfilled with {:message=>1}>,
82
+ # #<Concurrent::Promises::ResolvableFuture:0x00000e fulfilled with {:message=>2}>,
83
+ # #<Concurrent::Promises::ResolvableFuture:0x00000f pending>]
81
84
ch.push message: 3 # (push|pop) can be freely mixed with (push_o|pop_op)
82
85
pop_operations.map(& :value )
83
86
# => [{:message=>1}, {:message=>2}, {:message=>3}]
@@ -91,21 +94,21 @@ returns a pair to be able to find out which channel had the message available.
91
94
92
95
``` ruby
93
96
ch1 = Concurrent ::Promises ::Channel .new 2
94
- # => #<Concurrent::Promises::Channel:0x00000d capacity taken 0 of 2>
97
+ # => #<Concurrent::Promises::Channel:0x000010 capacity taken 0 of 2>
95
98
ch2 = Concurrent ::Promises ::Channel .new 2
96
- # => #<Concurrent::Promises::Channel:0x00000e capacity taken 0 of 2>
99
+ # => #<Concurrent::Promises::Channel:0x000011 capacity taken 0 of 2>
97
100
ch1.push 1
98
- # => #<Concurrent::Promises::Channel:0x00000d capacity taken 1 of 2>
101
+ # => #<Concurrent::Promises::Channel:0x000010 capacity taken 1 of 2>
99
102
ch2.push 2
100
- # => #<Concurrent::Promises::Channel:0x00000e capacity taken 1 of 2>
103
+ # => #<Concurrent::Promises::Channel:0x000011 capacity taken 1 of 2>
101
104
102
105
Concurrent ::Promises ::Channel .select ([ch1, ch2])
103
- # => [#<Concurrent::Promises::Channel:0x00000d capacity taken 0 of 2>, 1]
106
+ # => [#<Concurrent::Promises::Channel:0x000010 capacity taken 0 of 2>, 1]
104
107
ch1.select (ch2)
105
- # => [#<Concurrent::Promises::Channel:0x00000e capacity taken 0 of 2>, 2]
108
+ # => [#<Concurrent::Promises::Channel:0x000011 capacity taken 0 of 2>, 2]
106
109
107
110
Concurrent ::Promises .future { 3 + 4 }.then_channel_push(ch1)
108
- # => #<Concurrent::Promises::Future:0x00000f pending>
111
+ # => #<Concurrent::Promises::Future:0x000012 pending>
109
112
Concurrent ::Promises ::Channel .
110
113
# or `ch1.select_op(ch2)` would be equivalent
111
114
select_op([ch1, ch2]).
@@ -122,7 +125,7 @@ They always return immediately and indicate either success or failure.
122
125
123
126
``` ruby
124
127
ch
125
- # => #<Concurrent::Promises::Channel:0x000006 capacity taken 0 of 2>
128
+ # => #<Concurrent::Promises::Channel:0x000009 capacity taken 0 of 2>
126
129
ch.try_push 1 # => true
127
130
ch.try_push 2 # => true
128
131
ch.try_push 3 # => false
@@ -139,7 +142,7 @@ when the timeout option is used.
139
142
140
143
``` ruby
141
144
ch
142
- # => #<Concurrent::Promises::Channel:0x000006 capacity taken 0 of 2>
145
+ # => #<Concurrent::Promises::Channel:0x000009 capacity taken 0 of 2>
143
146
ch.push 1 , 0.01 # => true
144
147
ch.push 2 , 0.01 # => true
145
148
ch.push 3 , 0.01 # => false
@@ -156,7 +159,7 @@ if the consumers are not keeping up.
156
159
157
160
``` ruby
158
161
channel = Concurrent ::Promises ::Channel .new 2
159
- # => #<Concurrent::Promises::Channel:0x000010 capacity taken 0 of 2>
162
+ # => #<Concurrent::Promises::Channel:0x000013 capacity taken 0 of 2>
160
163
log = Concurrent ::Array .new # => []
161
164
162
165
producers = Array .new 2 do |i |
@@ -167,8 +170,8 @@ producers = Array.new 2 do |i|
167
170
end
168
171
end
169
172
end
170
- # => [#<Thread:0x000011 @channel.in.md:133 run>,
171
- # #<Thread:0x000012 @channel.in.md:133 run>]
173
+ # => [#<Thread:0x000014 @channel.in.md:133 run>,
174
+ # #<Thread:0x000015 @channel.in.md:133 run>]
172
175
173
176
consumers = Array .new 4 do |i |
174
177
Thread .new (i) do |consumer |
@@ -180,38 +183,38 @@ consumers = Array.new 4 do |i|
180
183
end
181
184
end
182
185
end
183
- # => [#<Thread:0x000013 @channel.in.md:142 run>,
184
- # #<Thread:0x000014 @channel.in.md:142 run>,
185
- # #<Thread:0x000015 @channel.in.md:142 run>,
186
- # #<Thread:0x000016 @channel.in.md:142 run>]
186
+ # => [#<Thread:0x000016 @channel.in.md:142 run>,
187
+ # #<Thread:0x000017 @channel.in.md:142 run>,
188
+ # #<Thread:0x000018 @channel.in.md:142 run>,
189
+ # #<Thread:0x000019 @channel.in.md:142 run>]
187
190
188
191
# wait for all to finish
189
192
producers.map(& :join )
190
- # => [#<Thread:0x000011 @channel.in.md:133 dead>,
191
- # #<Thread:0x000012 @channel.in.md:133 dead>]
193
+ # => [#<Thread:0x000014 @channel.in.md:133 dead>,
194
+ # #<Thread:0x000015 @channel.in.md:133 dead>]
192
195
consumers.map(& :join )
193
- # => [#<Thread:0x000013 @channel.in.md:142 dead>,
194
- # #<Thread:0x000014 @channel.in.md:142 dead>,
195
- # #<Thread:0x000015 @channel.in.md:142 dead>,
196
- # #<Thread:0x000016 @channel.in.md:142 dead>]
196
+ # => [#<Thread:0x000016 @channel.in.md:142 dead>,
197
+ # #<Thread:0x000017 @channel.in.md:142 dead>,
198
+ # #<Thread:0x000018 @channel.in.md:142 dead>,
199
+ # #<Thread:0x000019 @channel.in.md:142 dead>]
197
200
# investigate log
198
- log
201
+ log
199
202
# => ["producer 0 pushing 0",
200
203
# "producer 0 pushing 1",
201
204
# "producer 0 pushing 2",
202
205
# "producer 1 pushing 0",
203
206
# "consumer 0 got 0. payload 0 from producer 0",
204
- # "producer 0 pushing 3",
205
- # "consumer 2 got 0. payload 1 from producer 0",
207
+ # "consumer 1 got 0. payload 1 from producer 0",
206
208
# "producer 1 pushing 1",
207
- # "consumer 1 got 0. payload 2 from producer 0",
208
- # "consumer 3 got 0. payload 0 from producer 1",
209
+ # "consumer 3 got 0. payload 2 from producer 0",
209
210
# "producer 1 pushing 2",
210
- # "consumer 2 got 1. payload 3 from producer 0",
211
- # "consumer 0 got 1. payload 1 from producer 1",
212
- # "consumer 3 got 1. payload 2 from producer 1",
211
+ # "consumer 2 got 0. payload 0 from producer 1",
213
212
# "producer 1 pushing 3",
214
- # "consumer 1 got 1. payload 3 from producer 1"]
213
+ # "producer 0 pushing 3",
214
+ # "consumer 0 got 1. payload 1 from producer 1",
215
+ # "consumer 1 got 1. payload 2 from producer 1",
216
+ # "consumer 3 got 1. payload 3 from producer 1",
217
+ # "consumer 2 got 1. payload 3 from producer 0"]
215
218
```
216
219
217
220
The producers are much faster than consumers
@@ -226,7 +229,7 @@ that run a thread pool.
226
229
227
230
``` ruby
228
231
channel = Concurrent ::Promises ::Channel .new 2
229
- # => #<Concurrent::Promises::Channel:0x000017 capacity taken 0 of 2>
232
+ # => #<Concurrent::Promises::Channel:0x00001a capacity taken 0 of 2>
230
233
log = Concurrent ::Array .new # => []
231
234
232
235
def produce (channel , log , producer , i )
@@ -248,38 +251,38 @@ end # => :consume
248
251
producers = Array .new 2 do |i |
249
252
Concurrent ::Promises .future(channel, log, i) { |* args | produce * args, 0 }.run
250
253
end
251
- # => [#<Concurrent::Promises::Future:0x000018 pending>,
252
- # #<Concurrent::Promises::Future:0x000019 pending>]
254
+ # => [#<Concurrent::Promises::Future:0x00001b pending>,
255
+ # #<Concurrent::Promises::Future:0x00001c pending>]
253
256
254
257
consumers = Array .new 4 do |i |
255
258
Concurrent ::Promises .future(channel, log, i) { |* args | consume * args, 0 }.run
256
259
end
257
- # => [#<Concurrent::Promises::Future:0x00001a pending>,
258
- # #<Concurrent::Promises::Future:0x00001b pending>,
259
- # #<Concurrent::Promises::Future:0x00001c pending>,
260
- # #<Concurrent::Promises::Future:0x00001d pending>]
260
+ # => [#<Concurrent::Promises::Future:0x00001d pending>,
261
+ # #<Concurrent::Promises::Future:0x00001e pending>,
262
+ # #<Concurrent::Promises::Future:0x00001f pending>,
263
+ # #<Concurrent::Promises::Future:0x000020 pending>]
261
264
262
265
# wait for all to finish
263
266
producers.map(& :value! ) # => [:done, :done]
264
267
consumers.map(& :value! ) # => [:done, :done, :done, :done]
265
268
# investigate log
266
- log
269
+ log
267
270
# => ["producer 0 pushing 0",
268
271
# "producer 1 pushing 0",
269
- # "consumer 1 got 0. payload 0 from producer 1",
270
272
# "producer 0 pushing 1",
271
273
# "producer 1 pushing 1",
272
274
# "consumer 0 got 0. payload 0 from producer 0",
273
- # "consumer 3 got 0. payload 1 from producer 0",
275
+ # "consumer 1 got 0. payload 0 from producer 1",
276
+ # "consumer 2 got 0. payload 1 from producer 0",
274
277
# "producer 0 pushing 2",
278
+ # "consumer 3 got 0. payload 1 from producer 1",
275
279
# "producer 1 pushing 2",
276
- # "producer 1 pushing 3",
277
280
# "producer 0 pushing 3",
278
- # "consumer 2 got 0. payload 1 from producer 1 ",
279
- # "consumer 0 got 1. payload 2 from producer 0 ",
280
- # "consumer 2 got 1. payload 3 from producer 1 ",
281
- # "consumer 1 got 1. payload 3 from producer 0",
282
- # "consumer 3 got 1. payload 2 from producer 1"]
281
+ # "producer 1 pushing 3 ",
282
+ # "consumer 0 got 1. payload 2 from producer 1 ",
283
+ # "consumer 2 got 1. payload 3 from producer 0 ",
284
+ # "consumer 1 got 1. payload 2 from producer 0",
285
+ # "consumer 3 got 1. payload 3 from producer 1"]
283
286
```
284
287
285
288
### Synchronization of workers by passing a value
@@ -292,19 +295,19 @@ The operations have to be paired to succeed.
292
295
293
296
``` ruby
294
297
channel = Concurrent ::Promises ::Channel .new 0
295
- # => #<Concurrent::Promises::Channel:0x00001e capacity taken 0 of 0>
298
+ # => #<Concurrent::Promises::Channel:0x000021 capacity taken 0 of 0>
296
299
thread = Thread .new { channel.pop }; sleep 0.01
297
300
# allow the thread to go to sleep
298
301
thread
299
- # => #<Thread:0x00001f @channel.in.md:246 sleep_forever>
302
+ # => #<Thread:0x000022 @channel.in.md:214 sleep_forever>
300
303
# succeeds because there is matching pop operation waiting in the thread
301
304
channel.try_push(:v1 ) # => true
302
305
# remains pending, since there is no matching operation
303
306
push = channel.push_op(:v2 )
304
- # => #<Concurrent::Promises::ResolvableFuture:0x000020 pending>
307
+ # => #<Concurrent::Promises::ResolvableFuture:0x000023 pending>
305
308
thread.value # => :v1
306
309
# the push operation resolves as a pairing pop is called
307
310
channel.pop # => :v2
308
311
push
309
- # => #<Concurrent::Promises::ResolvableFuture:0x000020 fulfilled with #<Concurrent::Promises::Channel:0x00001e capacity taken 0 of 0>>
312
+ # => #<Concurrent::Promises::ResolvableFuture:0x000023 fulfilled with #<Concurrent::Promises::Channel:0x000021 capacity taken 0 of 0>>
310
313
```
0 commit comments