Skip to content

Commit 1917c2d

Browse files
author
Petr Chalupa
committed
Merge pull request #314 from pitr-ch/futures
Futures: passing args on construction, interoperability with channel, documentation
2 parents f2aaa92 + f008006 commit 1917c2d

File tree

15 files changed

+761
-272
lines changed

15 files changed

+761
-272
lines changed

README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,13 @@ be obeyed though. Features developed in `concurrent-ruby-edge` are expected to m
106106
Communicating Sequential Processes (CSP).
107107
* [Exchanger](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Exchanger.html)
108108
* [LazyRegister](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/LazyRegister.html)
109+
* [New Future Promise Framework](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Edge.html) - new
110+
unified implementation of Futures and Promises which combines Features of previous `Future`,
111+
`Promise`, `IVar`, `Probe`, `dataflow`, `Delay`, `TimerTask` into single framework. It uses extensively
112+
new synchronization layer to make all the paths lock-free with exception of blocking threads on `#wait`.
113+
It offers better performance and does not block threads (exception being `#wait` and similar methods where it's
114+
intended).
115+
109116

110117
## Usage
111118

@@ -149,6 +156,8 @@ require 'concurrent/actor' # Concurrent::Actor and supporting code
149156
require 'concurrent/channel ' # Concurrent::Channel and supporting code
150157
```
151158

159+
If the library does not behave as expected, `Concurrent.use_stdlib_logger(Logger::DEBUG)` could help to revel the problem.
160+
152161
## Installation
153162

154163
```shell

examples/edge_futures.in.rb

Lines changed: 80 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,15 @@
2222
head = Concurrent.future { 1 } #
2323
branch1 = head.then(&:succ) #
2424
branch2 = head.then(&:succ).then(&:succ) #
25-
branch1.zip(branch2).value
26-
(branch1 & branch2).then { |(a, b)| a + b }.value
25+
branch1.zip(branch2).value!
26+
(branch1 & branch2).then { |a, b| a + b }.value!
27+
(branch1 & branch2).then(&:+).value!
28+
Concurrent.zip(branch1, branch2, branch1).then { |*values| values.reduce &:+ }.value!
2729
# pick only first completed
28-
(branch1 | branch2).value
30+
(branch1 | branch2).value!
31+
32+
# auto splat arrays for blocks
33+
Concurrent.future { [1, 2] }.then(&:+).value!
2934

3035

3136
### Error handling
@@ -38,7 +43,7 @@
3843
### Delay
3944

4045
# will not evaluate until asked by #value or other method requiring completion
41-
scheduledfuture = Concurrent.delay { 'lazy' }
46+
future = Concurrent.delay { 'lazy' }
4247
sleep 0.1 #
4348
future.completed?
4449
future.value
@@ -136,6 +141,25 @@
136141
actor.ask(2).then(&:succ).value
137142

138143

144+
### Interoperability with channels
145+
146+
ch1 = Concurrent::Edge::Channel.new
147+
ch2 = Concurrent::Edge::Channel.new
148+
149+
result = Concurrent.select(ch1, ch2)
150+
ch1.push 1
151+
result.value!
152+
153+
Concurrent.
154+
future { 1+1 }.
155+
then_push(ch1)
156+
result = Concurrent.
157+
future { '%02d' }.
158+
then_select(ch1, ch2).
159+
then { |format, (value, channel)| format format, value }
160+
result.value!
161+
162+
139163
### Common use-cases Examples
140164

141165
# simple background processing
@@ -147,12 +171,62 @@
147171

148172

149173
# periodic task
174+
@end = false
175+
150176
def schedule_job
151177
Concurrent.schedule(1) { do_stuff }.
152-
rescue { |e| report_error e }.
153-
then { schedule_job }
178+
rescue { |e| StandardError === e ? report_error(e) : raise(e) }.
179+
then { schedule_job unless @end }
154180
end
155181

156182
schedule_job
183+
@end = true
184+
185+
186+
# How to limit processing where there are limited resources?
187+
# By creating an actor managing the resource
188+
DB = Concurrent::Actor::Utils::AdHoc.spawn :db do
189+
data = Array.new(10) { |i| '*' * i }
190+
lambda do |message|
191+
# pretending that this queries a DB
192+
data[message]
193+
end
194+
end
195+
196+
concurrent_jobs = 11.times.map do |v|
197+
Concurrent.
198+
future { v }.
199+
# ask the DB with the `v`, only one at the time, rest is parallel
200+
then_ask(DB).
201+
# get size of the string, fails for 11
202+
then(&:size).
203+
rescue { |reason| reason.message } # translate error to value (exception, message)
204+
end #
205+
206+
Concurrent.zip(*concurrent_jobs).value!
207+
208+
209+
# In reality there is often a pool though:
210+
data = Array.new(10) { |i| '*' * i }
211+
pool_size = 5
212+
213+
DB_POOL = Concurrent::Actor::Utils::Pool.spawn!('DB-pool', pool_size) do |index|
214+
# DB connection constructor
215+
Concurrent::Actor::Utils::AdHoc.spawn(name: "worker-#{index}", args: [data]) do |data|
216+
lambda do |message|
217+
# pretending that this queries a DB
218+
data[message]
219+
end
220+
end
221+
end
157222

223+
concurrent_jobs = 11.times.map do |v|
224+
Concurrent.
225+
future { v }.
226+
# ask the DB_POOL with the `v`, only 5 at the time, rest is parallel
227+
then_ask(DB_POOL).
228+
then(&:size).
229+
rescue { |reason| reason.message }
230+
end #
158231

232+
Concurrent.zip(*concurrent_jobs).value!

0 commit comments

Comments
 (0)