Skip to content

Commit fa5417e

Browse files
committed
Polishing throttling
1 parent 7e17458 commit fa5417e

File tree

4 files changed

+209
-188
lines changed

4 files changed

+209
-188
lines changed

doc/promises.in.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -757,12 +757,12 @@ pool of size 3. We create throttle with the same size
757757
```ruby
758758
DB_INTERNAL_POOL = Concurrent::Array.new data
759759

760-
max_tree = Promises.throttle 3
760+
max_tree = Promises::Throttle.new 3
761761

762762
futures = 11.times.map do |i|
763763
max_tree.
764764
# throttled tasks, at most 3 simultaneous calls of [] on the database
765-
then_throttle { DB_INTERNAL_POOL[i] }.
765+
then_throttled { DB_INTERNAL_POOL[i] }.
766766
# un-throttled tasks, unlimited concurrency
767767
then { |starts| starts.size }.
768768
rescue { |reason| reason.message }

doc/promises.out.md

Lines changed: 55 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@ composition.
88

99
```ruby
1010
Concurrent::Promises::FactoryMethods.instance_methods false
11-
# => [:select,
12-
# :zip,
11+
# => [:zip,
1312
# :create,
1413
# :delay,
1514
# :future,
@@ -35,8 +34,7 @@ Concurrent::Promises::FactoryMethods.instance_methods false
3534
# :any_fulfilled_future,
3635
# :any_fulfilled_future_on,
3736
# :any_event,
38-
# :any_event_on,
39-
# :throttle]
37+
# :any_event_on]
4038
```
4139

4240
The module can be included or extended where needed.
@@ -49,17 +47,17 @@ Class.new do
4947
resolvable_event
5048
end
5149
end.new.a_method
52-
# => <#Concurrent::Promises::ResolvableEvent:0x7fbfd2d2a890 pending>
50+
# => <#Concurrent::Promises::ResolvableEvent:0x7fa95ca52488 pending>
5351

5452
Module.new { extend Concurrent::Promises::FactoryMethods }.resolvable_event
55-
# => <#Concurrent::Promises::ResolvableEvent:0x7fbfd2d28978 pending>
53+
# => <#Concurrent::Promises::ResolvableEvent:0x7fa95ca4a9e0 pending>
5654
```
5755

5856
The module is already extended into {Concurrent::Promises} for convenience.
5957

6058
```ruby
6159
Concurrent::Promises.resolvable_event
62-
# => <#Concurrent::Promises::ResolvableEvent:0x7fbfd2d197c0 pending>
60+
# => <#Concurrent::Promises::ResolvableEvent:0x7fa95ca48708 pending>
6361
```
6462

6563
For this guide we introduce a shortcut in `main` so we can call the factory
@@ -68,7 +66,7 @@ methods in following examples by using `Promisses` directly.
6866
```ruby
6967
Promises = Concurrent::Promises
7068
Promises.resolvable_event
71-
# => <#Concurrent::Promises::ResolvableEvent:0x7fbfd2d12628 pending>
69+
# => <#Concurrent::Promises::ResolvableEvent:0x7fa95ca41d18 pending>
7270
```
7371

7472
## Asynchronous task
@@ -84,7 +82,7 @@ future = Promises.future(0.1) do |duration|
8482
sleep duration
8583
:result
8684
end
87-
# => <#Concurrent::Promises::Future:0x7fbfd2d027c8 pending>
85+
# => <#Concurrent::Promises::Future:0x7fa95b3eb410 pending>
8886
```
8987

9088
Asks if the future is resolved, here it will be still in the middle of the
@@ -105,7 +103,7 @@ If the task fails we talk about the future being rejected.
105103

106104
```ruby
107105
future = Promises.future { raise 'Boom' }
108-
# => <#Concurrent::Promises::Future:0x7fbfd2cf1c70 pending>
106+
# => <#Concurrent::Promises::Future:0x7fa95b3dae58 pending>
109107
```
110108

111109
There is no result, the future was rejected with a reason.
@@ -200,20 +198,20 @@ through evaluation as follows.
200198

201199
```ruby
202200
Promises.future { :value }
203-
# => <#Concurrent::Promises::Future:0x7fbfd2c69ca8 pending>
201+
# => <#Concurrent::Promises::Future:0x7fa95b89f510 pending>
204202
```
205203

206204
Instead it can be created directly.
207205

208206
```ruby
209207
Promises.fulfilled_future(:value)
210-
# => <#Concurrent::Promises::Future:0x7fbfd2c61a30 fulfilled>
208+
# => <#Concurrent::Promises::Future:0x7fa95b390bc8 fulfilled>
211209
Promises.rejected_future(StandardError.new('Ups'))
212-
# => <#Concurrent::Promises::Future:0x7fbfd2c606f8 rejected>
210+
# => <#Concurrent::Promises::Future:0x7fa95b38b628 rejected>
213211
Promises.resolved_future(true, :value, nil)
214-
# => <#Concurrent::Promises::Future:0x7fbfd2c5b1a8 fulfilled>
212+
# => <#Concurrent::Promises::Future:0x7fa95b38a688 fulfilled>
215213
Promises.resolved_future(false, nil, StandardError.new('Ups'))
216-
# => <#Concurrent::Promises::Future:0x7fbfd2c591f0 rejected>
214+
# => <#Concurrent::Promises::Future:0x7fa95b3892b0 rejected>
217215
```
218216

219217
## Chaining
@@ -254,9 +252,9 @@ do_stuff arg }`) is **required**, both following examples may break.
254252
```ruby
255253
arg = 1 # => 1
256254
Thread.new { do_stuff arg }
257-
# => #<Thread:0x007fbfd2bf3238@promises.in.md:193 run>
255+
# => #<Thread:0x007fa95b322fd8@promises.in.md:193 run>
258256
Promises.future { do_stuff arg }
259-
# => <#Concurrent::Promises::Future:0x7fbfd2bebf10 pending>
257+
# => <#Concurrent::Promises::Future:0x7fa95b321020 pending>
260258
```
261259

262260
## Branching, and zipping
@@ -318,7 +316,7 @@ Promises.
318316
result
319317
# => [false,
320318
# nil,
321-
# #<NoMethodError: undefined method `succ' for #<Object:0x007fbfd2b932c0>>]
319+
# #<NoMethodError: undefined method `succ' for #<Object:0x007fa95ca28368>>]
322320
```
323321

324322
As `then` chained tasks execute only on fulfilled futures, there is a `rescue`
@@ -366,7 +364,7 @@ Zip is rejected if any of the zipped futures is.
366364
rejected_zip = Promises.zip(
367365
Promises.fulfilled_future(1),
368366
Promises.rejected_future(StandardError.new('Ups')))
369-
# => <#Concurrent::Promises::Future:0x7fbfd2b12dc8 rejected>
367+
# => <#Concurrent::Promises::Future:0x7fa95b2428e8 rejected>
370368
rejected_zip.result
371369
# => [false, [1, nil], [nil, #<StandardError: Ups>]]
372370
rejected_zip.
@@ -381,11 +379,11 @@ requiring resolution.
381379

382380
```ruby
383381
future = Promises.delay { sleep 0.1; 'lazy' }
384-
# => <#Concurrent::Promises::Future:0x7fbfd2af8f68 pending>
382+
# => <#Concurrent::Promises::Future:0x7fa95b229eb0 pending>
385383
sleep 0.1
386384
future.resolved? # => false
387385
future.touch
388-
# => <#Concurrent::Promises::Future:0x7fbfd2af8f68 pending>
386+
# => <#Concurrent::Promises::Future:0x7fa95b229eb0 pending>
389387
sleep 0.2
390388
future.resolved? # => true
391389
```
@@ -462,7 +460,7 @@ Schedule task to be executed in 0.1 seconds.
462460

463461
```ruby
464462
scheduled = Promises.schedule(0.1) { 1 }
465-
# => <#Concurrent::Promises::Future:0x7fbfd2a72490 pending>
463+
# => <#Concurrent::Promises::Future:0x7fa95c9b1ce0 pending>
466464
scheduled.resolved? # => false
467465
```
468466

@@ -487,7 +485,7 @@ Time can be used as well.
487485

488486
```ruby
489487
Promises.schedule(Time.now + 10) { :val }
490-
# => <#Concurrent::Promises::Future:0x7fbfd41b0698 pending>
488+
# => <#Concurrent::Promises::Future:0x7fa95c972ae0 pending>
491489
```
492490

493491
## Resolvable Future and Event:
@@ -499,15 +497,15 @@ Sometimes it is required to resolve a future externally, in these cases
499497

500498
```ruby
501499
future = Promises.resolvable_future
502-
# => <#Concurrent::Promises::ResolvableFuture:0x7fbfd2a5a480 pending>
500+
# => <#Concurrent::Promises::ResolvableFuture:0x7fa95c970e98 pending>
503501
```
504502

505503
The thread will be blocked until the future is resolved
506504

507505
```ruby
508506
thread = Thread.new { future.value }
509507
future.fulfill 1
510-
# => <#Concurrent::Promises::ResolvableFuture:0x7fbfd2a5a480 fulfilled>
508+
# => <#Concurrent::Promises::ResolvableFuture:0x7fa95c970e98 fulfilled>
511509
thread.value # => 1
512510
```
513511

@@ -524,9 +522,9 @@ future.fulfill 2, false # => false
524522
## Callbacks
525523

526524
```ruby
527-
queue = Queue.new # => #<Thread::Queue:0x007fbfd2a32ac0>
525+
queue = Queue.new # => #<Thread::Queue:0x007fa95e0565b8>
528526
future = Promises.delay { 1 + 1 }
529-
# => <#Concurrent::Promises::Future:0x7fbfd2a30f90 pending>
527+
# => <#Concurrent::Promises::Future:0x7fa95e0547b8 pending>
530528

531529
future.on_fulfillment { queue << 1 } # evaluated asynchronously
532530
future.on_fulfillment! { queue << 2 } # evaluated on resolving thread
@@ -549,7 +547,7 @@ and `:io` for blocking and long tasks.
549547
```ruby
550548
Promises.future_on(:fast) { 2 }.
551549
then_on(:io) { File.read __FILE__ }.
552-
value.size # => 18754
550+
value.size # => 18760
553551
```
554552

555553
# Interoperability
@@ -562,7 +560,7 @@ Create an actor which takes received numbers and returns the number squared.
562560
actor = Concurrent::Actor::Utils::AdHoc.spawn :square do
563561
-> v { v ** 2 }
564562
end
565-
# => #<Concurrent::Actor::Reference:0x7fbfd29cbe88 /square (Concurrent::Actor::Utils::AdHoc)>
563+
# => #<Concurrent::Actor::Reference:0x7fa95c919e90 /square (Concurrent::Actor::Utils::AdHoc)>
566564
```
567565

568566
Send result of `1+1` to the actor, and add 2 to the result send back from the
@@ -594,17 +592,17 @@ actor.ask(2).then(&:succ).value! # => 5
594592

595593
```ruby
596594
Promises.future { do_stuff }
597-
# => <#Concurrent::Promises::Future:0x7fbfd298a758 pending>
595+
# => <#Concurrent::Promises::Future:0x7fa95b1eb8e0 pending>
598596
```
599597

600598
## Parallel background processing
601599

602600
```ruby
603601
tasks = 4.times.map { |i| Promises.future(i) { |i| i*2 } }
604-
# => [<#Concurrent::Promises::Future:0x7fbfd2982738 pending>,
605-
# <#Concurrent::Promises::Future:0x7fbfd29813b0 pending>,
606-
# <#Concurrent::Promises::Future:0x7fbfd297bf78 pending>,
607-
# <#Concurrent::Promises::Future:0x7fbfd297b0f0 pending>]
602+
# => [<#Concurrent::Promises::Future:0x7fa95b1e2e48 pending>,
603+
# <#Concurrent::Promises::Future:0x7fa95b1e1f70 pending>,
604+
# <#Concurrent::Promises::Future:0x7fa95b1e1188 pending>,
605+
# <#Concurrent::Promises::Future:0x7fa95b1e0198 pending>]
608606
Promises.zip(*tasks).value! # => [0, 2, 4, 6]
609607
```
610608

@@ -657,11 +655,11 @@ Create the computer actor and send it 3 jobs.
657655

658656
```ruby
659657
computer = Concurrent::Actor.spawn Computer, :computer
660-
# => #<Concurrent::Actor::Reference:0x7fbfd3141050 /computer (Computer)>
658+
# => #<Concurrent::Actor::Reference:0x7fa95a3f2c48 /computer (Computer)>
661659
results = 3.times.map { computer.ask [:run, -> { sleep 0.1; :result }] }
662-
# => [<#Concurrent::Promises::Future:0x7fbfd3130d68 pending>,
663-
# <#Concurrent::Promises::Future:0x7fbfd312b2a0 pending>,
664-
# <#Concurrent::Promises::Future:0x7fbfd3129d60 pending>]
660+
# => [<#Concurrent::Promises::Future:0x7fa95a3d30c8 pending>,
661+
# <#Concurrent::Promises::Future:0x7fa95a3d0e40 pending>,
662+
# <#Concurrent::Promises::Future:0x7fa95a3cb760 pending>]
665663
computer.ask(:status).value! # => {:running_jobs=>3}
666664
results.map(&:value!) # => [:result, :result, :result]
667665
```
@@ -708,8 +706,8 @@ Lets have two processes which will count until cancelled.
708706

709707
```ruby
710708
source, token = Concurrent::Cancellation.create
711-
# => [<#Concurrent::Cancellation:0x7fbfd3b596c8 canceled:false>,
712-
# <#Concurrent::Cancellation::Token:0x7fbfd3b58a70 canceled:false>]
709+
# => [<#Concurrent::Cancellation:0x7fa95a223840 canceled:false>,
710+
# <#Concurrent::Cancellation::Token:0x7fa95a2222b0 canceled:false>]
713711

714712
count_until_cancelled = -> token, count do
715713
if token.canceled?
@@ -722,12 +720,12 @@ end
722720
futures = Array.new(2) do
723721
Promises.future(token, 0, &count_until_cancelled).run
724722
end
725-
# => [<#Concurrent::Promises::Future:0x7fbfd3b38310 pending>,
726-
# <#Concurrent::Promises::Future:0x7fbfd3b31628 pending>]
723+
# => [<#Concurrent::Promises::Future:0x7fa95b1b8a08 pending>,
724+
# <#Concurrent::Promises::Future:0x7fa95b1aa110 pending>]
727725

728726
sleep 0.01
729727
source.cancel # => true
730-
futures.map(&:value!) # => [50, 52]
728+
futures.map(&:value!) # => [65, 66]
731729
```
732730

733731
Cancellation can also be used as event or future to log or plan re-execution.
@@ -746,8 +744,8 @@ tasks share a cancellation, when one of them fails it cancels the others.
746744

747745
```ruby
748746
source, token = Concurrent::Cancellation.create
749-
# => [<#Concurrent::Cancellation:0x7fbfd3862c30 canceled:false>,
750-
# <#Concurrent::Cancellation::Token:0x7fbfd38622d0 canceled:false>]
747+
# => [<#Concurrent::Cancellation:0x7fa95c9c8c60 canceled:false>,
748+
# <#Concurrent::Cancellation::Token:0x7fa95c9c8710 canceled:false>]
751749
tasks = 4.times.map do |i|
752750
Promises.future(source, token, i) do |source, token, i|
753751
count = 0
@@ -763,22 +761,22 @@ tasks = 4.times.map do |i|
763761
end
764762
end
765763
end
766-
# => [<#Concurrent::Promises::Future:0x7fbfd3852358 pending>,
767-
# <#Concurrent::Promises::Future:0x7fbfd384b8c8 pending>,
768-
# <#Concurrent::Promises::Future:0x7fbfd3033ed8 pending>,
769-
# <#Concurrent::Promises::Future:0x7fbfd302bee0 pending>]
764+
# => [<#Concurrent::Promises::Future:0x7fa95c9a1818 pending>,
765+
# <#Concurrent::Promises::Future:0x7fa95c9a0aa8 pending>,
766+
# <#Concurrent::Promises::Future:0x7fa95c98bb30 pending>,
767+
# <#Concurrent::Promises::Future:0x7fa95c98aed8 pending>]
770768
Promises.zip(*tasks).result
771769
# => [false,
772-
# [nil, :cancelled, :cancelled, :cancelled],
773-
# [#<RuntimeError: random error>, nil, nil, nil]]
770+
# [nil, :cancelled, :cancelled, nil],
771+
# [#<RuntimeError: random error>, nil, nil, #<RuntimeError: random error>]]
774772
```
775773

776774
Without the randomly failing part it produces following.
777775

778776
```ruby
779777
source, token = Concurrent::Cancellation.create
780-
# => [<#Concurrent::Cancellation:0x7fbfd29aa990 canceled:false>,
781-
# <#Concurrent::Cancellation::Token:0x7fbfd29aa2b0 canceled:false>]
778+
# => [<#Concurrent::Cancellation:0x7fa95ca897d0 canceled:false>,
779+
# <#Concurrent::Cancellation::Token:0x7fa95ca83c68 canceled:false>]
782780
tasks = 4.times.map do |i|
783781
Promises.future(source, token, i) do |source, token, i|
784782
count = 0
@@ -886,13 +884,13 @@ DB_INTERNAL_POOL = Concurrent::Array.new data
886884
# "********",
887885
# "*********"]
888886

889-
max_tree = Promises.throttle 3
890-
# => <#Concurrent::Promises::Throttle:0x7fbfd294a018 limit:3>
887+
max_tree = Promises::Throttle.new 3
888+
# => <#Concurrent::Promises::Throttle:0x7fa95b2d0cb0 limit:3 can_run:3>
891889

892890
futures = 11.times.map do |i|
893891
max_tree.
894892
# throttled tasks, at most 3 simultaneous calls of [] on the database
895-
then_throttle { DB_INTERNAL_POOL[i] }.
893+
then_throttled { DB_INTERNAL_POOL[i] }.
896894
# un-throttled tasks, unlimited concurrency
897895
then { |starts| starts.size }.
898896
rescue { |reason| reason.message }
@@ -929,7 +927,7 @@ def schedule_job(interval, &job)
929927
end
930928
end
931929

932-
queue = Queue.new # => #<Thread::Queue:0x007fbfd5827f70>
930+
queue = Queue.new # => #<Thread::Queue:0x007fa95e08d838>
933931
count = 0 # => 0
934932
interval = 0.05 # small just not to delay execution of this example
935933

0 commit comments

Comments
 (0)