Skip to content

Commit aa38765

Browse files
TonsOfFunclaude
andcommitted
Add streaming lifecycle regression tests for OpenAI and Ollama providers
Add provider-level tests to prevent broadcast_stream_open from being accidentally removed. Tests verify: - :open event emits on first chunk - :update events emit for content - broadcast_stream_open is idempotent (only fires once) - :close event emits via process_prompt_finished - Full lifecycle order: open -> update(s) -> close - streaming flag state management 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 427a688 commit aa38765

File tree

2 files changed

+338
-0
lines changed

2 files changed

+338
-0
lines changed
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
# frozen_string_literal: true
2+
3+
require "test_helper"
4+
require_relative "../../../lib/active_agent/providers/ollama_provider"
5+
6+
module Providers
7+
module Ollama
8+
class StreamingLifecycleTest < ActiveSupport::TestCase
9+
setup do
10+
@stream_events = []
11+
12+
@provider = ActiveAgent::Providers::OllamaProvider.new(
13+
service: "Ollama",
14+
model: "llama3.2",
15+
messages: [{ role: "user", content: "Hello" }],
16+
stream: true,
17+
stream_broadcaster: ->(message, delta, event_type) {
18+
@stream_events << { message: message, delta: delta, type: event_type }
19+
}
20+
)
21+
22+
# Initialize message stack for streaming
23+
@provider.send(:message_stack).push({
24+
index: 0,
25+
role: "assistant",
26+
content: ""
27+
})
28+
end
29+
30+
# Reuse OpenAI mock structures since Ollama inherits from OpenAI::ChatProvider
31+
MockChunk = Struct.new(:choices, keyword_init: true)
32+
MockChoice = Struct.new(:index, :delta, keyword_init: true)
33+
MockDelta = Struct.new(:content, :role, keyword_init: true) do
34+
def as_json
35+
{ content: content, role: role }.compact
36+
end
37+
end
38+
39+
MockChunkEvent = Struct.new(:type, :chunk, keyword_init: true)
40+
MockContentDoneEvent = Struct.new(:type, :content, :parsed, keyword_init: true)
41+
42+
test "inherits streaming lifecycle from OpenAI::ChatProvider - emits :open event" do
43+
chunk = MockChunk.new(
44+
choices: [MockChoice.new(index: 0, delta: MockDelta.new(content: "Hi", role: "assistant"))]
45+
)
46+
event = MockChunkEvent.new(type: :chunk, chunk: chunk)
47+
48+
@provider.send(:process_stream_chunk, event)
49+
50+
open_events = @stream_events.select { |e| e[:type] == :open }
51+
assert_equal 1, open_events.size, "Ollama should emit :open event via inherited process_stream_chunk"
52+
end
53+
54+
test "broadcast_stream_open is idempotent - only fires once" do
55+
3.times do
56+
chunk = MockChunk.new(
57+
choices: [MockChoice.new(index: 0, delta: MockDelta.new(content: "x"))]
58+
)
59+
event = MockChunkEvent.new(type: :chunk, chunk: chunk)
60+
@provider.send(:process_stream_chunk, event)
61+
end
62+
63+
open_events = @stream_events.select { |e| e[:type] == :open }
64+
assert_equal 1, open_events.size, "Expected only one :open event even after multiple chunks"
65+
end
66+
67+
test "full streaming lifecycle with Ollama role handling quirk" do
68+
# Ollama duplicates role in every delta - message_merge_delta handles this
69+
chunk1 = MockChunk.new(
70+
choices: [MockChoice.new(index: 0, delta: MockDelta.new(content: "Hi", role: "assistant"))]
71+
)
72+
@provider.send(:process_stream_chunk, MockChunkEvent.new(type: :chunk, chunk: chunk1))
73+
74+
# Subsequent chunks also have role (Ollama bug/quirk)
75+
chunk2 = MockChunk.new(
76+
choices: [MockChoice.new(index: 0, delta: MockDelta.new(content: " there", role: "assistant"))]
77+
)
78+
@provider.send(:process_stream_chunk, MockChunkEvent.new(type: :chunk, chunk: chunk2))
79+
80+
chunk3 = MockChunk.new(
81+
choices: [MockChoice.new(index: 0, delta: MockDelta.new(content: "!", role: "assistant"))]
82+
)
83+
@provider.send(:process_stream_chunk, MockChunkEvent.new(type: :chunk, chunk: chunk3))
84+
85+
done_event = MockContentDoneEvent.new(
86+
type: :"content.done",
87+
content: "Hi there!",
88+
parsed: nil
89+
)
90+
91+
# Stub process_prompt_finished to just call broadcast_stream_close
92+
@provider.stub(:process_prompt_finished, ->(*_) { @provider.send(:broadcast_stream_close) }) do
93+
@provider.send(:process_stream_chunk, done_event)
94+
end
95+
96+
event_types = @stream_events.map { |e| e[:type] }
97+
98+
assert_equal :open, event_types.first, "First event should be :open"
99+
assert_equal :close, event_types.last, "Last event should be :close"
100+
assert event_types.include?(:update), "Should have :update events"
101+
102+
# Verify ordering
103+
open_index = event_types.index(:open)
104+
first_update_index = event_types.index(:update)
105+
close_index = event_types.index(:close)
106+
assert open_index < first_update_index, ":open should appear before first :update"
107+
assert first_update_index < close_index, ":update should appear before :close"
108+
end
109+
110+
test "streaming flag is set to true after broadcast_stream_open" do
111+
refute @provider.send(:streaming), "streaming should be false initially"
112+
113+
chunk = MockChunk.new(
114+
choices: [MockChoice.new(index: 0, delta: MockDelta.new(content: "Hi"))]
115+
)
116+
event = MockChunkEvent.new(type: :chunk, chunk: chunk)
117+
@provider.send(:process_stream_chunk, event)
118+
119+
assert @provider.send(:streaming), "streaming should be true after open"
120+
end
121+
122+
test "streaming flag is reset to false after broadcast_stream_close" do
123+
# Open the stream
124+
chunk = MockChunk.new(
125+
choices: [MockChoice.new(index: 0, delta: MockDelta.new(content: "Hi", role: "assistant"))]
126+
)
127+
@provider.send(:process_stream_chunk, MockChunkEvent.new(type: :chunk, chunk: chunk))
128+
129+
assert @provider.send(:streaming), "streaming should be true after open"
130+
131+
# Close the stream
132+
done_event = MockContentDoneEvent.new(
133+
type: :"content.done",
134+
content: "Hi",
135+
parsed: nil
136+
)
137+
138+
# Stub process_prompt_finished to just call broadcast_stream_close
139+
@provider.stub(:process_prompt_finished, ->(*_) { @provider.send(:broadcast_stream_close) }) do
140+
@provider.send(:process_stream_chunk, done_event)
141+
end
142+
143+
refute @provider.send(:streaming), "streaming should be false after close"
144+
end
145+
end
146+
end
147+
end
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
# frozen_string_literal: true
2+
3+
require "test_helper"
4+
require_relative "../../../../lib/active_agent/providers/open_ai/chat_provider"
5+
6+
module Providers
7+
module OpenAI
8+
module Chat
9+
class StreamingLifecycleTest < ActiveSupport::TestCase
10+
setup do
11+
@stream_events = []
12+
13+
@provider = ActiveAgent::Providers::OpenAI::ChatProvider.new(
14+
service: "OpenAI",
15+
model: "gpt-4o-mini",
16+
messages: [{ role: "user", content: "Hello" }],
17+
stream: true,
18+
stream_broadcaster: ->(message, delta, event_type) {
19+
@stream_events << { message: message, delta: delta, type: event_type }
20+
}
21+
)
22+
23+
# Initialize message stack for streaming
24+
@provider.send(:message_stack).push({
25+
index: 0,
26+
role: "assistant",
27+
content: ""
28+
})
29+
end
30+
31+
# Mock event structures that match OpenAI's streaming API
32+
MockChunk = Struct.new(:choices, keyword_init: true)
33+
MockChoice = Struct.new(:index, :delta, keyword_init: true)
34+
MockDelta = Struct.new(:content, :role, keyword_init: true) do
35+
def as_json
36+
{ content: content, role: role }.compact
37+
end
38+
end
39+
40+
MockChunkEvent = Struct.new(:type, :chunk, keyword_init: true)
41+
MockContentDoneEvent = Struct.new(:type, :content, :parsed, keyword_init: true)
42+
43+
test "process_stream_chunk emits :open event on first chunk" do
44+
chunk = MockChunk.new(
45+
choices: [MockChoice.new(index: 0, delta: MockDelta.new(content: "Hi", role: "assistant"))]
46+
)
47+
event = MockChunkEvent.new(type: :chunk, chunk: chunk)
48+
49+
@provider.send(:process_stream_chunk, event)
50+
51+
open_events = @stream_events.select { |e| e[:type] == :open }
52+
assert_equal 1, open_events.size, "Expected exactly one :open event"
53+
end
54+
55+
test "process_stream_chunk emits :update events for content" do
56+
%w[Hi there !].each do |content|
57+
chunk = MockChunk.new(
58+
choices: [MockChoice.new(index: 0, delta: MockDelta.new(content: content))]
59+
)
60+
event = MockChunkEvent.new(type: :chunk, chunk: chunk)
61+
@provider.send(:process_stream_chunk, event)
62+
end
63+
64+
update_events = @stream_events.select { |e| e[:type] == :update }
65+
assert_equal 3, update_events.size, "Expected three :update events"
66+
end
67+
68+
test "broadcast_stream_open is idempotent - only fires once" do
69+
3.times do
70+
chunk = MockChunk.new(
71+
choices: [MockChoice.new(index: 0, delta: MockDelta.new(content: "x"))]
72+
)
73+
event = MockChunkEvent.new(type: :chunk, chunk: chunk)
74+
@provider.send(:process_stream_chunk, event)
75+
end
76+
77+
open_events = @stream_events.select { |e| e[:type] == :open }
78+
assert_equal 1, open_events.size, "Expected only one :open event even after multiple chunks"
79+
end
80+
81+
test "content.done event triggers :close via process_prompt_finished" do
82+
# First send a chunk to trigger :open
83+
chunk = MockChunk.new(
84+
choices: [MockChoice.new(index: 0, delta: MockDelta.new(content: "Hi", role: "assistant"))]
85+
)
86+
chunk_event = MockChunkEvent.new(type: :chunk, chunk: chunk)
87+
@provider.send(:process_stream_chunk, chunk_event)
88+
89+
# Then send content.done event which triggers process_prompt_finished
90+
done_event = MockContentDoneEvent.new(
91+
type: :"content.done",
92+
content: "Hi there!",
93+
parsed: nil
94+
)
95+
96+
# Stub process_prompt_finished to just call broadcast_stream_close
97+
# This avoids the nil request issue while testing the streaming lifecycle
98+
@provider.stub(:process_prompt_finished, ->(*_) { @provider.send(:broadcast_stream_close) }) do
99+
@provider.send(:process_stream_chunk, done_event)
100+
end
101+
102+
close_events = @stream_events.select { |e| e[:type] == :close }
103+
assert_equal 1, close_events.size, "Expected exactly one :close event"
104+
end
105+
106+
test "full streaming lifecycle emits open, update, and close in correct order" do
107+
# First chunk with role
108+
chunk1 = MockChunk.new(
109+
choices: [MockChoice.new(index: 0, delta: MockDelta.new(content: "Hi", role: "assistant"))]
110+
)
111+
@provider.send(:process_stream_chunk, MockChunkEvent.new(type: :chunk, chunk: chunk1))
112+
113+
# Additional content chunks
114+
chunk2 = MockChunk.new(
115+
choices: [MockChoice.new(index: 0, delta: MockDelta.new(content: " there"))]
116+
)
117+
@provider.send(:process_stream_chunk, MockChunkEvent.new(type: :chunk, chunk: chunk2))
118+
119+
chunk3 = MockChunk.new(
120+
choices: [MockChoice.new(index: 0, delta: MockDelta.new(content: "!"))]
121+
)
122+
@provider.send(:process_stream_chunk, MockChunkEvent.new(type: :chunk, chunk: chunk3))
123+
124+
# Content done event
125+
done_event = MockContentDoneEvent.new(
126+
type: :"content.done",
127+
content: "Hi there!",
128+
parsed: nil
129+
)
130+
131+
# Stub process_prompt_finished to just call broadcast_stream_close
132+
@provider.stub(:process_prompt_finished, ->(*_) { @provider.send(:broadcast_stream_close) }) do
133+
@provider.send(:process_stream_chunk, done_event)
134+
end
135+
136+
event_types = @stream_events.map { |e| e[:type] }
137+
138+
assert_equal :open, event_types.first, "First event should be :open"
139+
assert_equal :close, event_types.last, "Last event should be :close"
140+
assert event_types.count(:update) >= 3, "Should have at least 3 :update events"
141+
142+
# Verify :open appears before first :update
143+
open_index = event_types.index(:open)
144+
first_update_index = event_types.index(:update)
145+
assert open_index < first_update_index, ":open should appear before first :update"
146+
147+
# Verify last :update appears before :close
148+
last_update_index = event_types.rindex(:update)
149+
close_index = event_types.index(:close)
150+
assert last_update_index < close_index, "last :update should appear before :close"
151+
end
152+
153+
test "streaming flag is set to true after broadcast_stream_open" do
154+
refute @provider.send(:streaming), "streaming should be false initially"
155+
156+
chunk = MockChunk.new(
157+
choices: [MockChoice.new(index: 0, delta: MockDelta.new(content: "Hi"))]
158+
)
159+
event = MockChunkEvent.new(type: :chunk, chunk: chunk)
160+
@provider.send(:process_stream_chunk, event)
161+
162+
assert @provider.send(:streaming), "streaming should be true after open"
163+
end
164+
165+
test "streaming flag is reset to false after broadcast_stream_close" do
166+
# Open the stream
167+
chunk = MockChunk.new(
168+
choices: [MockChoice.new(index: 0, delta: MockDelta.new(content: "Hi", role: "assistant"))]
169+
)
170+
@provider.send(:process_stream_chunk, MockChunkEvent.new(type: :chunk, chunk: chunk))
171+
172+
assert @provider.send(:streaming), "streaming should be true after open"
173+
174+
# Close the stream
175+
done_event = MockContentDoneEvent.new(
176+
type: :"content.done",
177+
content: "Hi",
178+
parsed: nil
179+
)
180+
181+
# Stub process_prompt_finished to just call broadcast_stream_close
182+
@provider.stub(:process_prompt_finished, ->(*_) { @provider.send(:broadcast_stream_close) }) do
183+
@provider.send(:process_stream_chunk, done_event)
184+
end
185+
186+
refute @provider.send(:streaming), "streaming should be false after close"
187+
end
188+
end
189+
end
190+
end
191+
end

0 commit comments

Comments
 (0)