Skip to content

Commit c78dc17

Browse files
fix: coroutine leaks from connection pool
1 parent 91708ed commit c78dc17

File tree

5 files changed

+32
-46
lines changed

5 files changed

+32
-46
lines changed

lib/openai/internal/transport/pooled_net_requester.rb

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -134,9 +134,9 @@ def execute(request)
134134

135135
# rubocop:disable Metrics/BlockLength
136136
enum = Enumerator.new do |y|
137-
with_pool(url, deadline: deadline) do |conn|
138-
next if finished
137+
next if finished
139138

139+
with_pool(url, deadline: deadline) do |conn|
140140
req, closing = self.class.build_request(request) do
141141
self.class.calibrate_socket_timeout(conn, deadline)
142142
end
@@ -149,7 +149,7 @@ def execute(request)
149149

150150
self.class.calibrate_socket_timeout(conn, deadline)
151151
conn.request(req) do |rsp|
152-
y << [conn, req, rsp]
152+
y << [req, rsp]
153153
break if finished
154154

155155
rsp.read_body do |bytes|
@@ -160,6 +160,8 @@ def execute(request)
160160
end
161161
eof = true
162162
end
163+
ensure
164+
conn.finish if !eof && conn&.started?
163165
end
164166
rescue Timeout::Error
165167
raise OpenAI::Errors::APITimeoutError.new(url: url, request: req)
@@ -168,16 +170,11 @@ def execute(request)
168170
end
169171
# rubocop:enable Metrics/BlockLength
170172

171-
conn, _, response = enum.next
173+
_, response = enum.next
172174
body = OpenAI::Internal::Util.fused_enum(enum, external: true) do
173175
finished = true
174-
tap do
175-
enum.next
176-
rescue StopIteration
177-
nil
178-
end
176+
loop { enum.next }
179177
ensure
180-
conn.finish if !eof && conn&.started?
181178
closing&.call
182179
end
183180
[Integer(response.code), response, body]

lib/openai/internal/type/base_stream.rb

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,6 @@ module Type
1313
module BaseStream
1414
include Enumerable
1515

16-
class << self
17-
# Attempt to close the underlying transport when the stream itself is garbage
18-
# collected.
19-
#
20-
# This should not be relied upon for resource clean up, as the garbage collector
21-
# is not guaranteed to run.
22-
#
23-
# @param stream [Enumerable<Object>]
24-
#
25-
# @return [Proc]
26-
#
27-
# @see https://rubyapi.org/3.2/o/objectspace#method-c-define_finalizer
28-
def defer_closing(stream) = ->(_id) { OpenAI::Internal::Util.close_fused!(stream) }
29-
end
30-
3116
# @return [Integer]
3217
attr_reader :status
3318

@@ -82,8 +67,6 @@ def initialize(model:, url:, status:, headers:, response:, unwrap:, stream:)
8267
@unwrap = unwrap
8368
@stream = stream
8469
@iterator = iterator
85-
86-
ObjectSpace.define_finalizer(self, OpenAI::Internal::Type::BaseStream.defer_closing(@stream))
8770
end
8871

8972
# @api private

rbi/openai/internal/type/base_stream.rbi

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,21 +12,6 @@ module OpenAI
1212
Message = type_member(:in)
1313
Elem = type_member(:out)
1414

15-
class << self
16-
# Attempt to close the underlying transport when the stream itself is garbage
17-
# collected.
18-
#
19-
# This should not be relied upon for resource clean up, as the garbage collector
20-
# is not guaranteed to run.
21-
sig do
22-
params(stream: T::Enumerable[T.anything]).returns(
23-
T.proc.params(arg0: Integer).void
24-
)
25-
end
26-
def defer_closing(stream)
27-
end
28-
end
29-
3015
sig { returns(Integer) }
3116
attr_reader :status
3217

sig/openai/internal/type/base_stream.rbs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,6 @@ module OpenAI
44
module BaseStream[Message, Elem]
55
include Enumerable[Elem]
66

7-
def self.defer_closing: (
8-
Enumerable[top] stream
9-
) -> (^(Integer arg0) -> void)
10-
117
attr_reader status: Integer
128

139
attr_reader headers: ::Hash[String, String]

test/openai/internal/util_test.rb

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,31 @@ def test_copy_write
310310
end
311311

312312
class OpenAI::Test::UtilFusedEnumTest < Minitest::Test
313+
def test_rewind_closing
314+
touched = false
315+
once = 0
316+
steps = 0
317+
enum = Enumerator.new do |y|
318+
next if touched
319+
320+
10.times do
321+
steps = _1
322+
y << _1
323+
end
324+
ensure
325+
once = once.succ
326+
end
327+
328+
fused = OpenAI::Internal::Util.fused_enum(enum, external: true) do
329+
touched = true
330+
loop { enum.next }
331+
end
332+
OpenAI::Internal::Util.close_fused!(fused)
333+
334+
assert_equal(1, once)
335+
assert_equal(0, steps)
336+
end
337+
313338
def test_closing
314339
arr = [1, 2, 3]
315340
once = 0

0 commit comments

Comments
 (0)