Skip to content

Commit 237c252

Browse files
Fix memory leak in PriorityQueue waiter management
Add waiter invalidation to prevent abandoned waiters from accumulating in the @waiting heap when exceptions occur during dequeue operations. Changes: - Add Waiter#invalidate! method that nullifies fiber and condition - Add Waiter#valid? method to check if waiter is still usable - Add ensure block in dequeue to invalidate waiters on exceptions - Update all waiter processing to skip invalid waiters (close, enqueue, enqueue_all) - Add comprehensive tests for waiter invalidation scenarios This fixes a memory leak where waiters would remain in @waiting indefinitely if exceptions occurred during wait_for_value, causing both memory growth and incorrect waiting counts. The fix uses lazy cleanup - invalid waiters are skipped during normal operation rather than expensive O(n) deletion, making it safe for concurrent access patterns.
1 parent f75b823 commit 237c252

File tree

2 files changed

+115
-12
lines changed

2 files changed

+115
-12
lines changed

lib/async/priority_queue.rb

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,17 @@ def wait_for_value(mutex)
4343
condition.wait(mutex)
4444
return self.value
4545
end
46+
47+
# Invalidate this waiter, making it unusable and detectable as abandoned.
48+
def invalidate!
49+
self.fiber = nil
50+
self.condition = nil
51+
end
52+
53+
# Check if this waiter has been invalidated.
54+
def valid?
55+
!fiber.nil? && !condition.nil?
56+
end
4657
end
4758

4859
# Create a new priority queue.
@@ -64,12 +75,12 @@ def close
6475
@mutex.synchronize do
6576
@closed = true
6677

67-
# Signal all waiting fibers with nil, skipping dead ones:
78+
# Signal all waiting fibers with nil, skipping dead/invalid ones:
6879
while waiter = @waiting.pop
69-
if waiter.fiber.alive?
80+
if waiter.valid? && waiter.fiber.alive?
7081
waiter.signal(nil)
7182
end
72-
# Dead waiter discarded, continue to next one.
83+
# Dead/invalid waiter discarded, continue to next one.
7384
end
7485
end
7586
end
@@ -105,14 +116,14 @@ def push(item)
105116

106117
@items << item
107118

108-
# Wake up the highest priority waiter if any, skipping dead waiters:
119+
# Wake up the highest priority waiter if any, skipping dead/invalid waiters:
109120
while waiter = @waiting.pop
110-
if waiter.fiber.alive?
121+
if waiter.valid? && waiter.fiber.alive?
111122
value = @items.shift
112123
waiter.signal(value)
113124
break
114125
end
115-
# Dead waiter discarded, try next one.
126+
# Dead/invalid waiter discarded, try next one.
116127
end
117128
end
118129
end
@@ -133,13 +144,13 @@ def enqueue(*items)
133144

134145
@items.concat(items)
135146

136-
# Wake up waiting fibers in priority order, skipping dead waiters:
147+
# Wake up waiting fibers in priority order, skipping dead/invalid waiters:
137148
while !@items.empty? && (waiter = @waiting.pop)
138-
if waiter.fiber.alive?
149+
if waiter.valid? && waiter.fiber.alive?
139150
value = @items.shift
140151
waiter.signal(value)
141152
end
142-
# Dead waiter discarded, continue to next one.
153+
# Dead/invalid waiter discarded, continue to next one.
143154
end
144155
end
145156
end
@@ -175,9 +186,14 @@ def dequeue(priority: 0)
175186
waiter = Waiter.new(Fiber.current, priority, sequence, condition, nil)
176187
@waiting.push(waiter)
177188

178-
# Wait for our specific condition variable to be signaled:
179-
# The mutex is released during wait, reacquired after:
180-
return waiter.wait_for_value(@mutex)
189+
begin
190+
# Wait for our specific condition variable to be signaled:
191+
# The mutex is released during wait, reacquired after:
192+
return waiter.wait_for_value(@mutex)
193+
ensure
194+
# Invalidate waiter if exception occurs - prevents memory leaks
195+
waiter.invalidate!
196+
end
181197
end
182198
end
183199

test/async/priority_queue.rb

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -547,4 +547,91 @@
547547
]
548548
end
549549
end
550+
551+
with "waiter invalidation" do
552+
it "should invalidate waiters when tasks are stopped to prevent memory leaks" do
553+
# Start a task that will wait and then be stopped
554+
task = reactor.async do
555+
queue.dequeue(priority: 1)
556+
end
557+
558+
# Give the task time to start waiting
559+
sleep(0.01)
560+
expect(queue.waiting).to be == 1
561+
562+
# Stop the task (simulates exception)
563+
task.stop
564+
565+
# Give time for cleanup
566+
sleep(0.01)
567+
568+
# Now enqueue an item - should not try to wake the invalid waiter
569+
queue.enqueue("test_item")
570+
571+
# The item should still be available for a new waiter
572+
result = nil
573+
new_task = reactor.async do
574+
result = queue.dequeue
575+
end
576+
577+
new_task.wait
578+
expect(result).to be == "test_item"
579+
end
580+
581+
it "should skip invalid waiters during enqueue" do
582+
received_items = []
583+
584+
# Start multiple waiters
585+
tasks = []
586+
3.times do |i|
587+
tasks << reactor.async do
588+
item = queue.dequeue(priority: i)
589+
received_items << [i, item]
590+
end
591+
end
592+
593+
# Give tasks time to start waiting
594+
sleep(0.01)
595+
expect(queue.waiting).to be == 3
596+
597+
# Stop the middle priority task (priority 1)
598+
tasks[1].stop
599+
600+
# Give time for invalidation
601+
sleep(0.01)
602+
603+
# Add items to the queue
604+
queue.enqueue("item1", "item2")
605+
606+
# Give time for processing
607+
sleep(0.01)
608+
609+
# Should have received items in the valid waiters only
610+
# Invalid waiter (priority 1) should be skipped
611+
expect(received_items.size).to be == 2
612+
613+
# Items should go to highest priority waiters (2, then 0)
614+
priorities_served = received_items.map(&:first).sort.reverse
615+
expect(priorities_served).to be == [2, 0]
616+
end
617+
end
618+
619+
describe "Waiter" do
620+
it "should invalidate correctly" do
621+
condition = ConditionVariable.new
622+
fiber = Fiber.current
623+
waiter = Async::PriorityQueue::Waiter.new(fiber, 1, 1, condition, nil)
624+
625+
expect(waiter.valid?).to be == true
626+
expect(waiter.fiber).to be == fiber
627+
expect(waiter.condition).to be == condition
628+
629+
waiter.invalidate!
630+
631+
expect(waiter.valid?).to be == false
632+
expect(waiter.fiber).to be_nil
633+
expect(waiter.condition).to be_nil
634+
# value can be whatever - we don't care
635+
end
636+
end
550637
end

0 commit comments

Comments
 (0)