Skip to content

Commit f1135e7

Browse files
committed
Thread safe Async::Condition and Async::Notification.
1 parent 17b4e78 commit f1135e7

File tree

6 files changed

+41
-58
lines changed

6 files changed

+41
-58
lines changed

fixtures/async/a_condition.rb

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ module Async
1212
it "can signal waiting task" do
1313
state = nil
1414

15-
reactor.async do
15+
task = reactor.async do
1616
state = :waiting
1717
condition.wait
1818
state = :resumed
@@ -21,8 +21,7 @@ module Async
2121
expect(state).to be == :waiting
2222

2323
condition.signal
24-
25-
reactor.yield
24+
task.wait
2625

2726
expect(state).to be == :resumed
2827
end
@@ -46,16 +45,15 @@ module Async
4645
it "resumes tasks in order" do
4746
order = []
4847

49-
5.times do |i|
50-
task = reactor.async do
48+
tasks = 5.times.map do |i|
49+
reactor.async do
5150
condition.wait
5251
order << i
5352
end
5453
end
5554

5655
condition.signal
57-
58-
reactor.yield
56+
tasks.each(&:wait)
5957

6058
expect(order).to be == [0, 1, 2, 3, 4]
6159
end

lib/async/condition.rb

Lines changed: 14 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -13,65 +13,47 @@ module Async
1313
class Condition
1414
# Create a new condition.
1515
def initialize
16-
@waiting = List.new
16+
@ready = ::Thread::Queue.new
1717
end
1818

19-
class FiberNode < List::Node
20-
def initialize(fiber)
21-
@fiber = fiber
22-
end
23-
24-
def transfer(*arguments)
25-
@fiber.transfer(*arguments)
26-
end
27-
28-
def alive?
29-
@fiber.alive?
30-
end
31-
end
32-
33-
private_constant :FiberNode
34-
3519
# Queue up the current fiber and wait on yielding the task.
3620
# @returns [Object]
3721
def wait
38-
@waiting.stack(FiberNode.new(Fiber.current)) do
39-
Fiber.scheduler.transfer
40-
end
22+
@ready.pop
4123
end
4224

43-
# @deprecated Replaced by {#waiting?}
25+
# @returns [Boolean] If there are no fibers waiting on this condition.
4426
def empty?
45-
warn("`Async::Condition#empty?` is deprecated, use `Async::Condition#waiting?` instead.", uplevel: 1, category: :deprecated) if $VERBOSE
46-
47-
@waiting.empty?
27+
@ready.num_waiting.zero?
4828
end
4929

5030
# @returns [Boolean] Is any fiber waiting on this notification?
5131
def waiting?
52-
@waiting.size > 0
32+
!self.empty?
5333
end
5434

5535
# Signal to a given task that it should resume operations.
5636
# @parameter value [Object | Nil] The value to return to the waiting fibers.
5737
def signal(value = nil)
58-
return if @waiting.empty?
38+
return if empty?
5939

60-
waiting = self.exchange
40+
ready = self.exchange
6141

62-
waiting.each do |fiber|
63-
Fiber.scheduler.resume(fiber, value) if fiber.alive?
42+
ready.num_waiting.times do
43+
ready.push(value)
6444
end
6545

46+
ready.close
47+
6648
return nil
6749
end
6850

6951
protected
7052

7153
def exchange
72-
waiting = @waiting
73-
@waiting = List.new
74-
return waiting
54+
ready = @ready
55+
@ready = ::Thread::Queue.new
56+
return ready
7557
end
7658
end
7759
end

lib/async/notification.rb

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,23 +12,25 @@ class Notification < Condition
1212
# Signal to a given task that it should resume operations.
1313
#
1414
# @returns [Boolean] if a task was signalled.
15-
def signal(value = nil, task: Task.current)
16-
return false if @waiting.empty?
15+
def signal(value = nil)
16+
return false if empty?
1717

1818
Fiber.scheduler.push Signal.new(self.exchange, value)
1919

2020
return true
2121
end
2222

23-
Signal = Struct.new(:waiting, :value) do
23+
Signal = Struct.new(:ready, :value) do
2424
def alive?
2525
true
2626
end
2727

2828
def transfer
29-
waiting.each do |fiber|
30-
fiber.transfer(value) if fiber.alive?
29+
ready.num_waiting.times do
30+
ready.push(value)
3131
end
32+
33+
ready.close
3234
end
3335
end
3436

test/async/condition.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@
2121

2222
expect(task).to be(:running?)
2323

24-
# This will cause the task to exit:
2524
condition.signal
2625

26+
task.wait
27+
2728
expect(task).to be(:completed?)
2829
end
2930

test/async/notification.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
sequence << :yielding
3535
reactor.yield
36+
task.wait
3637
sequence << :finished
3738

3839
expect(task.status).to be == :completed

test/async/task.rb

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -527,7 +527,7 @@
527527
producer.stop # (5) [producer is resumed already] producer.stop
528528
end
529529

530-
expect(items).to be == [1, 2]
530+
expect(items).not.to be(:empty?)
531531
expect(value).to be == 3
532532
end
533533

@@ -846,15 +846,13 @@ def sleep_forever
846846

847847
with "stopped task" do
848848
it "is stopped?" do
849-
reactor.async do |task|
850-
child = task.async do |task|
851-
sleep(1)
852-
end
853-
854-
child.stop
855-
856-
expect(child).to be(:stopped?)
857-
end.wait
849+
child = reactor.async do |task|
850+
sleep(1)
851+
end
852+
853+
child.stop
854+
855+
expect(child).to be(:stopped?)
858856
end
859857
end
860858
end
@@ -930,11 +928,12 @@ def sleep_forever
930928
child_task.stop(true)
931929
expect(child_task).to be(:running?)
932930

933-
reactor.async do
934-
condition.signal
931+
condition.signal
932+
933+
while child_task.running?
934+
reactor.run_once(0)
935935
end
936936

937-
reactor.run_once(0)
938937
expect(child_task).to be(:stopped?)
939938
end
940939

0 commit comments

Comments
 (0)