Skip to content

Commit 98c4235

Browse files
Better handling of waiter invalidation in PriorityQueue.
Add waiter invalidation to prevent abandoned waiters from accumulating in the @waiting heap when exceptions occur during dequeue operations. - Add Waiter#invalidate! method that nullifies fiber and condition. - Add Waiter#valid? method to check if waiter is still usable. - Add ensure block in dequeue to invalidate waiters on exceptions.
1 parent f75b823 commit 98c4235

File tree

2 files changed

+112
-16
lines changed

2 files changed

+112
-16
lines changed

lib/async/priority_queue.rb

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,16 @@ def wait_for_value(mutex)
4343
condition.wait(mutex)
4444
return self.value
4545
end
46+
47+
# Invalidate this waiter, making it unusable and detectable as abandoned.
48+
def invalidate!
49+
self.fiber = nil
50+
end
51+
52+
# Check if this waiter has been invalidated.
53+
def valid?
54+
self.fiber&.alive?
55+
end
4656
end
4757

4858
# Create a new priority queue.
@@ -64,12 +74,9 @@ def close
6474
@mutex.synchronize do
6575
@closed = true
6676

67-
# Signal all waiting fibers with nil, skipping dead ones:
77+
# Signal all waiting fibers with nil, skipping dead/invalid ones:
6878
while waiter = @waiting.pop
69-
if waiter.fiber.alive?
70-
waiter.signal(nil)
71-
end
72-
# Dead waiter discarded, continue to next one.
79+
waiter.signal(nil)
7380
end
7481
end
7582
end
@@ -105,14 +112,14 @@ def push(item)
105112

106113
@items << item
107114

108-
# Wake up the highest priority waiter if any, skipping dead waiters:
115+
# Wake up the highest priority waiter if any, skipping dead/invalid waiters:
109116
while waiter = @waiting.pop
110-
if waiter.fiber.alive?
117+
if waiter.valid?
111118
value = @items.shift
112119
waiter.signal(value)
113120
break
114121
end
115-
# Dead waiter discarded, try next one.
122+
# Dead/invalid waiter discarded, try next one.
116123
end
117124
end
118125
end
@@ -133,13 +140,13 @@ def enqueue(*items)
133140

134141
@items.concat(items)
135142

136-
# Wake up waiting fibers in priority order, skipping dead waiters:
143+
# Wake up waiting fibers in priority order, skipping dead/invalid waiters:
137144
while !@items.empty? && (waiter = @waiting.pop)
138-
if waiter.fiber.alive?
145+
if waiter.valid?
139146
value = @items.shift
140147
waiter.signal(value)
141148
end
142-
# Dead waiter discarded, continue to next one.
149+
# Dead/invalid waiter discarded, continue to next one.
143150
end
144151
end
145152
end
@@ -172,12 +179,16 @@ def dequeue(priority: 0)
172179
@sequence += 1
173180

174181
condition = ConditionVariable.new
175-
waiter = Waiter.new(Fiber.current, priority, sequence, condition, nil)
176-
@waiting.push(waiter)
177182

178-
# Wait for our specific condition variable to be signaled:
179-
# The mutex is released during wait, reacquired after:
180-
return waiter.wait_for_value(@mutex)
183+
begin
184+
waiter = Waiter.new(Fiber.current, priority, sequence, condition, nil)
185+
@waiting.push(waiter)
186+
187+
# Wait for our specific condition variable to be signaled:
188+
return waiter.wait_for_value(@mutex)
189+
ensure
190+
waiter&.invalidate!
191+
end
181192
end
182193
end
183194

test/async/priority_queue.rb

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -547,4 +547,89 @@
547547
]
548548
end
549549
end
550+
551+
with "waiter invalidation" do
552+
it "should invalidate waiters when tasks are stopped to prevent memory leaks" do
553+
# Start a task that will wait and then be stopped
554+
task = reactor.async do
555+
queue.dequeue(priority: 1)
556+
end
557+
558+
# Give the task time to start waiting
559+
sleep(0.01)
560+
expect(queue.waiting).to be == 1
561+
562+
# Stop the task (simulates exception)
563+
task.stop
564+
565+
# Give time for cleanup
566+
sleep(0.01)
567+
568+
# Now enqueue an item - should not try to wake the invalid waiter
569+
queue.enqueue("test_item")
570+
571+
# The item should still be available for a new waiter
572+
result = nil
573+
new_task = reactor.async do
574+
result = queue.dequeue
575+
end
576+
577+
new_task.wait
578+
expect(result).to be == "test_item"
579+
end
580+
581+
it "should skip invalid waiters during enqueue" do
582+
received_items = []
583+
584+
# Start multiple waiters
585+
tasks = []
586+
3.times do |i|
587+
tasks << reactor.async do
588+
item = queue.dequeue(priority: i)
589+
received_items << [i, item]
590+
end
591+
end
592+
593+
# Give tasks time to start waiting
594+
sleep(0.01)
595+
expect(queue.waiting).to be == 3
596+
597+
# Stop the middle priority task (priority 1)
598+
tasks[1].stop
599+
600+
# Give time for invalidation
601+
sleep(0.01)
602+
603+
# Add items to the queue
604+
queue.enqueue("item1", "item2")
605+
606+
# Give time for processing
607+
sleep(0.01)
608+
609+
# Should have received items in the valid waiters only
610+
# Invalid waiter (priority 1) should be skipped
611+
expect(received_items.size).to be == 2
612+
613+
# Items should go to highest priority waiters (2, then 0)
614+
priorities_served = received_items.map(&:first).sort.reverse
615+
expect(priorities_served).to be == [2, 0]
616+
end
617+
end
618+
619+
describe Async::PriorityQueue::Waiter do
620+
it "should invalidate correctly" do
621+
condition = ConditionVariable.new
622+
fiber = Fiber.current
623+
waiter = Async::PriorityQueue::Waiter.new(fiber, 1, 1, condition, nil)
624+
625+
expect(waiter).to be(:valid?)
626+
expect(waiter.fiber).to be == fiber
627+
expect(waiter.condition).to be == condition
628+
629+
waiter.invalidate!
630+
631+
expect(waiter).not.to be(:valid?)
632+
expect(waiter.fiber).to be_nil
633+
end
634+
end
550635
end

0 commit comments

Comments
 (0)