Skip to content

Commit 96aede5

Browse files
feat!: support for item in stream style iteration on Streams (#44)
1 parent 6140dae commit 96aede5

File tree

22 files changed

+135
-85
lines changed

22 files changed

+135
-85
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ stream = openai.chat.completions.create_streaming(
8181
model: "gpt-4o"
8282
)
8383

84-
stream.for_each do |completion|
84+
stream.each do |completion|
8585
puts(completion)
8686
end
8787
```

lib/openai/base_client.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,7 @@ def request(req)
437437
decoded = OpenAI::Util.decode_content(response, stream: stream)
438438
case req
439439
in { stream: Class => st }
440-
st.new(model: model, url: url, status: status, response: response, messages: decoded)
440+
st.new(model: model, url: url, status: status, response: response, stream: decoded)
441441
in { page: Class => page }
442442
page.new(client: self, req: req, headers: response, page_data: decoded)
443443
else

lib/openai/base_stream.rb

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
module OpenAI
44
# @example
55
# ```ruby
6-
# stream.for_each do |chunk|
6+
# stream.each do |chunk|
77
# puts(chunk)
88
# end
99
# ```
@@ -12,7 +12,6 @@ module OpenAI
1212
# ```ruby
1313
# chunks =
1414
# stream
15-
# .to_enum
1615
# .lazy
1716
# .select { _1.object_id.even? }
1817
# .map(&:itself)
@@ -22,6 +21,8 @@ module OpenAI
2221
# chunks => Array
2322
# ```
2423
module BaseStream
24+
include Enumerable
25+
2526
# @return [void]
2627
def close = OpenAI::Util.close_fused!(@iterator)
2728

@@ -33,14 +34,14 @@ def close = OpenAI::Util.close_fused!(@iterator)
3334
# @param blk [Proc]
3435
#
3536
# @return [void]
36-
def for_each(&)
37+
def each(&)
3738
unless block_given?
3839
raise ArgumentError.new("A block must be given to ##{__method__}")
3940
end
4041
@iterator.each(&)
4142
end
4243

43-
# @return [Enumerable]
44+
# @return [Enumerator]
4445
def to_enum = @iterator
4546

4647
alias_method :enum_for, :to_enum
@@ -51,13 +52,13 @@ def to_enum = @iterator
5152
# @param url [URI::Generic]
5253
# @param status [Integer]
5354
# @param response [Net::HTTPResponse]
54-
# @param messages [Enumerable]
55-
def initialize(model:, url:, status:, response:, messages:)
55+
# @param stream [Enumerable]
56+
def initialize(model:, url:, status:, response:, stream:)
5657
@model = model
5758
@url = url
5859
@status = status
5960
@response = response
60-
@messages = messages
61+
@stream = stream
6162
@iterator = iterator
6263
end
6364
end

lib/openai/resources/beta/threads.rb

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,10 @@ def delete(thread_id, params = {})
208208
# @return [OpenAI::Models::Beta::Threads::Run]
209209
def create_and_run(params)
210210
parsed, options = OpenAI::Models::Beta::ThreadCreateAndRunParams.dump_request(params)
211-
parsed.delete(:stream)
211+
if parsed[:stream]
212+
message = "Please use `#create_and_run_streaming` for the streaming use case."
213+
raise ArgumentError.new(message)
214+
end
212215
@client.request(
213216
method: :post,
214217
path: "threads/runs",
@@ -315,6 +318,10 @@ def create_and_run(params)
315318
# @return [OpenAI::Stream<OpenAI::Models::Beta::AssistantStreamEvent::ThreadCreated, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunCreated, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunQueued, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunInProgress, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunRequiresAction, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunCompleted, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunIncomplete, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunFailed, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunCancelling, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunCancelled, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunExpired, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepCreated, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepInProgress, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepDelta, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepCompleted, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepFailed, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepCancelled, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepExpired, OpenAI::Models::Beta::AssistantStreamEvent::ThreadMessageCreated, OpenAI::Models::Beta::AssistantStreamEvent::ThreadMessageInProgress, OpenAI::Models::Beta::AssistantStreamEvent::ThreadMessageDelta, OpenAI::Models::Beta::AssistantStreamEvent::ThreadMessageCompleted, OpenAI::Models::Beta::AssistantStreamEvent::ThreadMessageIncomplete, OpenAI::Models::Beta::AssistantStreamEvent::ErrorEvent>]
316319
def create_and_run_streaming(params)
317320
parsed, options = OpenAI::Models::Beta::ThreadCreateAndRunParams.dump_request(params)
321+
unless parsed.fetch(:stream, true)
322+
message = "Please use `#create_and_run` for the non-streaming use case."
323+
raise ArgumentError.new(message)
324+
end
318325
parsed.store(:stream, true)
319326
@client.request(
320327
method: :post,

lib/openai/resources/beta/threads/runs.rb

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,10 @@ class Runs
125125
# @return [OpenAI::Models::Beta::Threads::Run]
126126
def create(thread_id, params)
127127
parsed, options = OpenAI::Models::Beta::Threads::RunCreateParams.dump_request(params)
128-
parsed.delete(:stream)
128+
if parsed[:stream]
129+
message = "Please use `#create_streaming` for the streaming use case."
130+
raise ArgumentError.new(message)
131+
end
129132
query_params = [:include]
130133
@client.request(
131134
method: :post,
@@ -254,6 +257,10 @@ def create(thread_id, params)
254257
# @return [OpenAI::Stream<OpenAI::Models::Beta::AssistantStreamEvent::ThreadCreated, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunCreated, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunQueued, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunInProgress, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunRequiresAction, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunCompleted, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunIncomplete, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunFailed, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunCancelling, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunCancelled, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunExpired, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepCreated, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepInProgress, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepDelta, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepCompleted, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepFailed, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepCancelled, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepExpired, OpenAI::Models::Beta::AssistantStreamEvent::ThreadMessageCreated, OpenAI::Models::Beta::AssistantStreamEvent::ThreadMessageInProgress, OpenAI::Models::Beta::AssistantStreamEvent::ThreadMessageDelta, OpenAI::Models::Beta::AssistantStreamEvent::ThreadMessageCompleted, OpenAI::Models::Beta::AssistantStreamEvent::ThreadMessageIncomplete, OpenAI::Models::Beta::AssistantStreamEvent::ErrorEvent>]
255258
def create_streaming(thread_id, params)
256259
parsed, options = OpenAI::Models::Beta::Threads::RunCreateParams.dump_request(params)
260+
unless parsed.fetch(:stream, true)
261+
message = "Please use `#create` for the non-streaming use case."
262+
raise ArgumentError.new(message)
263+
end
257264
parsed.store(:stream, true)
258265
query_params = [:include]
259266
@client.request(
@@ -410,7 +417,10 @@ def cancel(run_id, params)
410417
# @return [OpenAI::Models::Beta::Threads::Run]
411418
def submit_tool_outputs(run_id, params)
412419
parsed, options = OpenAI::Models::Beta::Threads::RunSubmitToolOutputsParams.dump_request(params)
413-
parsed.delete(:stream)
420+
if parsed[:stream]
421+
message = "Please use `#submit_tool_outputs_streaming` for the streaming use case."
422+
raise ArgumentError.new(message)
423+
end
414424
thread_id =
415425
parsed.delete(:thread_id) do
416426
raise ArgumentError.new("missing required path argument #{_1}")
@@ -444,6 +454,10 @@ def submit_tool_outputs(run_id, params)
444454
# @return [OpenAI::Stream<OpenAI::Models::Beta::AssistantStreamEvent::ThreadCreated, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunCreated, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunQueued, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunInProgress, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunRequiresAction, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunCompleted, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunIncomplete, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunFailed, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunCancelling, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunCancelled, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunExpired, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepCreated, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepInProgress, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepDelta, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepCompleted, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepFailed, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepCancelled, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepExpired, OpenAI::Models::Beta::AssistantStreamEvent::ThreadMessageCreated, OpenAI::Models::Beta::AssistantStreamEvent::ThreadMessageInProgress, OpenAI::Models::Beta::AssistantStreamEvent::ThreadMessageDelta, OpenAI::Models::Beta::AssistantStreamEvent::ThreadMessageCompleted, OpenAI::Models::Beta::AssistantStreamEvent::ThreadMessageIncomplete, OpenAI::Models::Beta::AssistantStreamEvent::ErrorEvent>]
445455
def submit_tool_outputs_streaming(run_id, params)
446456
parsed, options = OpenAI::Models::Beta::Threads::RunSubmitToolOutputsParams.dump_request(params)
457+
unless parsed.fetch(:stream, true)
458+
message = "Please use `#submit_tool_outputs` for the non-streaming use case."
459+
raise ArgumentError.new(message)
460+
end
447461
parsed.store(:stream, true)
448462
thread_id =
449463
parsed.delete(:thread_id) do

lib/openai/resources/chat/completions.rb

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,10 @@ class Completions
215215
# @return [OpenAI::Models::Chat::ChatCompletion]
216216
def create(params)
217217
parsed, options = OpenAI::Models::Chat::CompletionCreateParams.dump_request(params)
218-
parsed.delete(:stream)
218+
if parsed[:stream]
219+
message = "Please use `#create_streaming` for the streaming use case."
220+
raise ArgumentError.new(message)
221+
end
219222
@client.request(
220223
method: :post,
221224
path: "chat/completions",
@@ -433,6 +436,10 @@ def create(params)
433436
# @return [OpenAI::Stream<OpenAI::Models::Chat::ChatCompletionChunk>]
434437
def create_streaming(params)
435438
parsed, options = OpenAI::Models::Chat::CompletionCreateParams.dump_request(params)
439+
unless parsed.fetch(:stream, true)
440+
message = "Please use `#create` for the non-streaming use case."
441+
raise ArgumentError.new(message)
442+
end
436443
parsed.store(:stream, true)
437444
@client.request(
438445
method: :post,

lib/openai/resources/completions.rb

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,10 @@ class Completions
115115
# @return [OpenAI::Models::Completion]
116116
def create(params)
117117
parsed, options = OpenAI::Models::CompletionCreateParams.dump_request(params)
118-
parsed.delete(:stream)
118+
if parsed[:stream]
119+
message = "Please use `#create_streaming` for the streaming use case."
120+
raise ArgumentError.new(message)
121+
end
119122
@client.request(
120123
method: :post,
121124
path: "completions",
@@ -237,6 +240,10 @@ def create(params)
237240
# @return [OpenAI::Stream<OpenAI::Models::Completion>]
238241
def create_streaming(params)
239242
parsed, options = OpenAI::Models::CompletionCreateParams.dump_request(params)
243+
unless parsed.fetch(:stream, true)
244+
message = "Please use `#create` for the non-streaming use case."
245+
raise ArgumentError.new(message)
246+
end
240247
parsed.store(:stream, true)
241248
@client.request(
242249
method: :post,

lib/openai/resources/responses.rb

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,10 @@ class Responses
129129
# @return [OpenAI::Models::Responses::Response]
130130
def create(params)
131131
parsed, options = OpenAI::Models::Responses::ResponseCreateParams.dump_request(params)
132-
parsed.delete(:stream)
132+
if parsed[:stream]
133+
message = "Please use `#create_streaming` for the streaming use case."
134+
raise ArgumentError.new(message)
135+
end
133136
@client.request(
134137
method: :post,
135138
path: "responses",
@@ -262,6 +265,10 @@ def create(params)
262265
# @return [OpenAI::Stream<OpenAI::Models::Responses::ResponseAudioDeltaEvent, OpenAI::Models::Responses::ResponseAudioDoneEvent, OpenAI::Models::Responses::ResponseAudioTranscriptDeltaEvent, OpenAI::Models::Responses::ResponseAudioTranscriptDoneEvent, OpenAI::Models::Responses::ResponseCodeInterpreterCallCodeDeltaEvent, OpenAI::Models::Responses::ResponseCodeInterpreterCallCodeDoneEvent, OpenAI::Models::Responses::ResponseCodeInterpreterCallCompletedEvent, OpenAI::Models::Responses::ResponseCodeInterpreterCallInProgressEvent, OpenAI::Models::Responses::ResponseCodeInterpreterCallInterpretingEvent, OpenAI::Models::Responses::ResponseCompletedEvent, OpenAI::Models::Responses::ResponseContentPartAddedEvent, OpenAI::Models::Responses::ResponseContentPartDoneEvent, OpenAI::Models::Responses::ResponseCreatedEvent, OpenAI::Models::Responses::ResponseErrorEvent, OpenAI::Models::Responses::ResponseFileSearchCallCompletedEvent, OpenAI::Models::Responses::ResponseFileSearchCallInProgressEvent, OpenAI::Models::Responses::ResponseFileSearchCallSearchingEvent, OpenAI::Models::Responses::ResponseFunctionCallArgumentsDeltaEvent, OpenAI::Models::Responses::ResponseFunctionCallArgumentsDoneEvent, OpenAI::Models::Responses::ResponseInProgressEvent, OpenAI::Models::Responses::ResponseFailedEvent, OpenAI::Models::Responses::ResponseIncompleteEvent, OpenAI::Models::Responses::ResponseOutputItemAddedEvent, OpenAI::Models::Responses::ResponseOutputItemDoneEvent, OpenAI::Models::Responses::ResponseRefusalDeltaEvent, OpenAI::Models::Responses::ResponseRefusalDoneEvent, OpenAI::Models::Responses::ResponseTextAnnotationDeltaEvent, OpenAI::Models::Responses::ResponseTextDeltaEvent, OpenAI::Models::Responses::ResponseTextDoneEvent, OpenAI::Models::Responses::ResponseWebSearchCallCompletedEvent, OpenAI::Models::Responses::ResponseWebSearchCallInProgressEvent, OpenAI::Models::Responses::ResponseWebSearchCallSearchingEvent>]
263266
def create_streaming(params)
264267
parsed, options = OpenAI::Models::Responses::ResponseCreateParams.dump_request(params)
268+
unless parsed.fetch(:stream, true)
269+
message = "Please use `#create` for the non-streaming use case."
270+
raise ArgumentError.new(message)
271+
end
265272
parsed.store(:stream, true)
266273
@client.request(
267274
method: :post,

lib/openai/stream.rb

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
module OpenAI
44
# @example
55
# ```ruby
6-
# stream.for_each do |event|
6+
# stream.each do |event|
77
# puts(event)
88
# end
99
# ```
@@ -12,7 +12,6 @@ module OpenAI
1212
# ```ruby
1313
# events =
1414
# stream
15-
# .to_enum
1615
# .lazy
1716
# .select { _1.object_id.even? }
1817
# .map(&:itself)
@@ -29,10 +28,10 @@ class Stream
2928
# @return [Enumerable]
3029
private def iterator
3130
# rubocop:disable Metrics/BlockLength
32-
@iterator ||= OpenAI::Util.chain_fused(@messages) do |y|
31+
@iterator ||= OpenAI::Util.chain_fused(@stream) do |y|
3332
consume = false
3433

35-
@messages.each do |msg|
34+
@stream.each do |msg|
3635
next if consume
3736

3837
case msg

rbi/lib/openai/base_stream.rbi

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
module OpenAI
44
module BaseStream
5+
include Enumerable
6+
57
Message = type_member(:in)
68
Elem = type_member(:out)
79

@@ -15,10 +17,10 @@ module OpenAI
1517
end
1618

1719
sig { params(blk: T.proc.params(arg0: Elem).void).void }
18-
def for_each(&blk)
20+
def each(&blk)
1921
end
2022

21-
sig { returns(T::Enumerable[Elem]) }
23+
sig { returns(T::Enumerator[Elem]) }
2224
def to_enum
2325
end
2426

@@ -31,11 +33,11 @@ module OpenAI
3133
url: URI::Generic,
3234
status: Integer,
3335
response: Net::HTTPResponse,
34-
messages: T::Enumerable[Message]
36+
stream: T::Enumerable[Message]
3537
)
3638
.void
3739
end
38-
def initialize(model:, url:, status:, response:, messages:)
40+
def initialize(model:, url:, status:, response:, stream:)
3941
end
4042
end
4143
end

0 commit comments

Comments
 (0)