Skip to content

Commit 98cdde5

Browse files
ninacebanNina Ciocanu
andauthored
Safari fix: refactor Stream Parses to use while(true) instead of for await (#1444)
* fix stream parser * refactoring * adding changeset --------- Co-authored-by: Nina Ciocanu <nciocanu@adobe.com>
1 parent 0abf50a commit 98cdde5

File tree

7 files changed

+257
-112
lines changed

7 files changed

+257
-112
lines changed

.changeset/tiny-houses-boil.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@adobe/alloy": patch
3+
---
4+
5+
Fixed a bug where in Safari the Brand Concierge streams were not parsed

packages/core/src/components/BrandConcierge/createStreamParser.js

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ export default () => {
7373

7474
/**
7575
* Parse SSE stream using callbacks.
76-
* Uses modern async iteration (for await...of) for clean, performant stream processing.
76+
* Uses ReadableStreamDefaultReader for cross-browser compatibility (Safari 11+, Firefox, Chrome).
7777
*
7878
* @param {ReadableStream} stream - The readable stream from fetch response
7979
* @param {Object} callbacks - Callback functions for stream events
@@ -82,12 +82,20 @@ export default () => {
8282
* @param {Function} callbacks.onComplete - Callback function called when stream ends
8383
*/
8484
return async (stream, { onEvent, onPing, onComplete }) => {
85+
const reader = stream.getReader();
8586
const decoder = new TextDecoder(ENCODING);
8687
let buffer = "";
8788

8889
try {
89-
for await (const chunk of stream) {
90-
buffer += decoder.decode(chunk, { stream: true });
90+
while (true) {
91+
// eslint-disable-next-line no-await-in-loop
92+
const { done, value } = await reader.read();
93+
94+
if (done) {
95+
break;
96+
}
97+
98+
buffer += decoder.decode(value, { stream: true });
9199
const events = buffer.split(EVENT_SEPARATOR_REGEX);
92100
buffer = events.pop() || "";
93101

@@ -134,6 +142,8 @@ export default () => {
134142
} catch (error) {
135143
onEvent({ error });
136144
onComplete();
145+
} finally {
146+
reader.releaseLock();
137147
}
138148
};
139149
};

packages/core/src/components/BrandConcierge/createTimeoutWrapper.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ export default ({ onStreamResponseCallback, streamTimeout }) => {
3131
timedOut = true;
3232
onStreamResponseCallback({
3333
error: {
34-
message: "Stream timeout: No data received within 10 seconds",
34+
message: `Stream timeout: No data received within ${streamTimeout / 1000} seconds`,
3535
},
3636
});
3737
}, streamTimeout);

packages/core/test/unit/specs/components/BrandConcierge/createSendConversationEvent.spec.js

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ governing permissions and limitations under the License.
1212
import { vi, beforeEach, describe, it, expect } from "vitest";
1313
import createSendConversationEvent from "../../../../../src/components/BrandConcierge/createSendConversationEvent.js";
1414
import flushPromiseChains from "../../../helpers/flushPromiseChains.js";
15+
import createMockReadableStream from "./helpers/createMockReadableStream.js";
1516

1617
describe("createSendConversationEvent", () => {
1718
let mockDependencies;
@@ -77,7 +78,7 @@ describe("createSendConversationEvent", () => {
7778
const mockResponse = {
7879
ok: true,
7980
status: 200,
80-
body: "mock-stream-body"
81+
body: createMockReadableStream([]),
8182
};
8283
mockDependencies.sendConversationServiceRequest.mockResolvedValue(mockResponse);
8384

@@ -117,7 +118,7 @@ describe("createSendConversationEvent", () => {
117118
const mockResponse = {
118119
ok: true,
119120
status: 200,
120-
body: "mock-stream-body"
121+
body: createMockReadableStream([]),
121122
};
122123
mockDependencies.sendConversationServiceRequest.mockResolvedValue(mockResponse);
123124

@@ -156,7 +157,7 @@ describe("createSendConversationEvent", () => {
156157
const mockResponse = {
157158
ok: true,
158159
status: 200,
159-
body: "mock-stream-body"
160+
body: createMockReadableStream([]),
160161
};
161162
mockDependencies.sendConversationServiceRequest.mockResolvedValue(mockResponse);
162163

@@ -196,17 +197,19 @@ describe("createSendConversationEvent", () => {
196197

197198
it("handles stream timeout when no data is received within 10 seconds", async () => {
198199
vi.useFakeTimers();
199-
200+
200201
const mockResponse = {
201202
ok: true,
202203
status: 200,
203204
body: {
204-
// Simulate an async iterator that never yields data
205-
[Symbol.asyncIterator]: async function* () {
206-
// Never yield anything - simulates a hanging stream
207-
await new Promise(() => {}); // Promise that never resolves
208-
}
209-
}
205+
getReader() {
206+
return {
207+
// Simulate a reader that never returns data (hanging stream)
208+
read: () => new Promise(() => {}), // Promise that never resolves
209+
releaseLock: () => {},
210+
};
211+
},
212+
},
210213
};
211214
mockDependencies.sendConversationServiceRequest.mockResolvedValue(mockResponse);
212215

packages/core/test/unit/specs/components/BrandConcierge/createStreamParser.spec.js

Lines changed: 72 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ governing permissions and limitations under the License.
1111
*/
1212
import { vi, beforeEach, describe, it, expect } from "vitest";
1313
import createStreamParser from "../../../../../src/components/BrandConcierge/createStreamParser.js";
14-
import flushPromiseChains from "../../../helpers/flushPromiseChains.js";
14+
import createMockReadableStream from "./helpers/createMockReadableStream.js";
1515

1616
describe("createStreamParser", () => {
1717
let streamParser;
@@ -31,176 +31,170 @@ describe("createStreamParser", () => {
3131
});
3232

3333
it("parses streaming data chunks", async () => {
34-
// Create a mock stream using async generator
35-
async function* mockStream() {
36-
yield new TextEncoder().encode('data: {"text": "Hello"}\n\n');
37-
yield new TextEncoder().encode('data: {"text": " World"}\n\n');
38-
}
34+
const mockStream = createMockReadableStream([
35+
new TextEncoder().encode('data: {"text": "Hello"}\n\n'),
36+
new TextEncoder().encode('data: {"text": " World"}\n\n'),
37+
]);
3938

40-
await streamParser(mockStream(), { onEvent, onPing, onComplete });
39+
await streamParser(mockStream, { onEvent, onPing, onComplete });
4140

4241
expect(onEvent).toHaveBeenCalledTimes(2);
4342
expect(onEvent).toHaveBeenCalledWith({ data: '{"text": "Hello"}' });
4443
expect(onEvent).toHaveBeenCalledWith({ data: '{"text": " World"}' });
4544
});
4645

4746
it("handles parsing errors gracefully", async () => {
48-
// Create a mock stream that yields invalid data
49-
async function* mockStream() {
50-
yield new TextEncoder().encode('data: {"text": "Hello"}\n\n');
51-
yield new TextEncoder().encode('data: invalid json\n\n');
52-
}
47+
const mockStream = createMockReadableStream([
48+
new TextEncoder().encode('data: {"text": "Hello"}\n\n'),
49+
new TextEncoder().encode("data: invalid json\n\n"),
50+
]);
5351

54-
await streamParser(mockStream(), { onEvent, onPing, onComplete });
52+
await streamParser(mockStream, { onEvent, onPing, onComplete });
5553

5654
expect(onEvent).toHaveBeenCalledTimes(2);
5755
expect(onEvent).toHaveBeenCalledWith({ data: '{"text": "Hello"}' });
58-
expect(onEvent).toHaveBeenCalledWith({ data: 'invalid json' });
56+
expect(onEvent).toHaveBeenCalledWith({ data: "invalid json" });
5957
});
6058

6159
it("handles stream reading errors", async () => {
62-
// Create a mock stream that throws an error
63-
async function* mockStream() {
64-
yield new TextEncoder().encode('data: {"text": "Hello"}\n\n');
65-
throw new Error("Stream reading failed");
66-
}
60+
const mockStream = createMockReadableStream([
61+
new TextEncoder().encode('data: {"text": "Hello"}\n\n'),
62+
new Error("Stream reading failed"),
63+
]);
6764

68-
await streamParser(mockStream(), { onEvent, onPing, onComplete });
65+
await streamParser(mockStream, { onEvent, onPing, onComplete });
6966

7067
expect(onEvent).toHaveBeenCalledWith({ data: '{"text": "Hello"}' });
7168
expect(onEvent).toHaveBeenCalledWith({ error: expect.any(Error) });
7269
});
7370

7471
it("ignores empty lines and non-ping comments", async () => {
75-
async function* mockStream() {
76-
yield new TextEncoder().encode(': this is a comment\n\n');
77-
yield new TextEncoder().encode('\n\n');
78-
yield new TextEncoder().encode('data: {"text": "Hello"}\n\n');
79-
}
72+
const mockStream = createMockReadableStream([
73+
new TextEncoder().encode(": this is a comment\n\n"),
74+
new TextEncoder().encode("\n\n"),
75+
new TextEncoder().encode('data: {"text": "Hello"}\n\n'),
76+
]);
8077

81-
await streamParser(mockStream(), { onEvent, onPing, onComplete });
78+
await streamParser(mockStream, { onEvent, onPing, onComplete });
8279

8380
expect(onEvent).toHaveBeenCalledTimes(1);
8481
expect(onEvent).toHaveBeenCalledWith({ data: '{"text": "Hello"}' });
8582
});
8683

8784
it("calls onPing callback when ping comment is received", async () => {
85+
const mockStream = createMockReadableStream([
86+
new TextEncoder().encode(": ping\n\n"),
87+
new TextEncoder().encode('data: {"text": "Hello"}\n\n'),
88+
]);
8889

89-
async function* mockStream() {
90-
yield new TextEncoder().encode(': ping\n\n');
91-
yield new TextEncoder().encode('data: {"text": "Hello"}\n\n');
92-
}
93-
94-
await streamParser(mockStream(), { onEvent, onPing, onComplete });
90+
await streamParser(mockStream, { onEvent, onPing, onComplete });
9591

9692
expect(onPing).toHaveBeenCalledTimes(1);
9793
expect(onEvent).toHaveBeenCalledTimes(1);
9894
expect(onEvent).toHaveBeenCalledWith({ data: '{"text": "Hello"}' });
9995
});
10096

10197
it("handles multiple ping comments", async () => {
102-
async function* mockStream() {
103-
yield new TextEncoder().encode(': ping\n\n');
104-
yield new TextEncoder().encode(': ping\n\n');
105-
yield new TextEncoder().encode('data: {"text": "Hello"}\n\n');
106-
yield new TextEncoder().encode(': ping\n\n');
107-
}
98+
const mockStream = createMockReadableStream([
99+
new TextEncoder().encode(": ping\n\n"),
100+
new TextEncoder().encode(": ping\n\n"),
101+
new TextEncoder().encode('data: {"text": "Hello"}\n\n'),
102+
new TextEncoder().encode(": ping\n\n"),
103+
]);
108104

109-
await streamParser(mockStream(), { onEvent, onPing, onComplete });
105+
await streamParser(mockStream, { onEvent, onPing, onComplete });
110106

111107
expect(onPing).toHaveBeenCalledTimes(3);
112108
expect(onEvent).toHaveBeenCalledTimes(1);
113109
});
114110

115111
it("only treats ': ping' as ping comment", async () => {
116-
async function* mockStream() {
117-
yield new TextEncoder().encode(': ping\n\n'); // This IS a ping (space after colon)
118-
yield new TextEncoder().encode(':ping\n\n'); // This is NOT (no space after colon)
119-
yield new TextEncoder().encode(': pinging\n\n'); // This IS a ping (startsWith ': ping')
120-
yield new TextEncoder().encode(': PING\n\n'); // This is NOT (uppercase)
121-
}
112+
const mockStream = createMockReadableStream([
113+
new TextEncoder().encode(": ping\n\n"), // This IS a ping (space after colon)
114+
new TextEncoder().encode(":ping\n\n"), // This is NOT (no space after colon)
115+
new TextEncoder().encode(": pinging\n\n"), // This IS a ping (startsWith ': ping')
116+
new TextEncoder().encode(": PING\n\n"), // This is NOT (uppercase)
117+
]);
122118

123-
await streamParser(mockStream(), { onEvent, onPing, onComplete });
119+
await streamParser(mockStream, { onEvent, onPing, onComplete });
124120

125-
expect(onPing).toHaveBeenCalledTimes(2); // ': ping' and ': pinging' count
121+
expect(onPing).toHaveBeenCalledTimes(2); // ': ping' and ': pinging' count
126122
expect(onEvent).toHaveBeenCalledTimes(0); // None have data fields
127123
});
128124

129125
it("handles event types and IDs", async () => {
130-
async function* mockStream() {
131-
yield new TextEncoder().encode('event: message\ndata: {"text": "Hello"}\nid: 123\n\n');
132-
}
126+
const mockStream = createMockReadableStream([
127+
new TextEncoder().encode('event: message\ndata: {"text": "Hello"}\nid: 123\n\n'),
128+
]);
133129

134-
await streamParser(mockStream(), { onEvent, onPing, onComplete });
130+
await streamParser(mockStream, { onEvent, onPing, onComplete });
135131

136132
expect(onEvent).toHaveBeenCalledTimes(1);
137133
expect(onEvent).toHaveBeenCalledWith({
138-
type: 'message',
134+
type: "message",
139135
data: '{"text": "Hello"}',
140-
id: '123'
136+
id: "123",
141137
});
142138
});
143139

144140
it("handles multi-line data", async () => {
145-
async function* mockStream() {
146-
yield new TextEncoder().encode('data: line 1\ndata: line 2\n\n');
147-
}
141+
const mockStream = createMockReadableStream([
142+
new TextEncoder().encode("data: line 1\ndata: line 2\n\n"),
143+
]);
148144

149-
await streamParser(mockStream(), { onEvent, onPing, onComplete });
145+
await streamParser(mockStream, { onEvent, onPing, onComplete });
150146

151147
expect(onEvent).toHaveBeenCalledTimes(1);
152-
expect(onEvent).toHaveBeenCalledWith({ data: 'line 1line 2' });
148+
expect(onEvent).toHaveBeenCalledWith({ data: "line 1line 2" });
153149
});
154150

155151
it("processes buffer remainder at end", async () => {
156-
async function* mockStream() {
157-
yield new TextEncoder().encode('data: {"text": "Hello"}');
158-
}
152+
const mockStream = createMockReadableStream([
153+
new TextEncoder().encode('data: {"text": "Hello"}'),
154+
]);
159155

160-
await streamParser(mockStream(), { onEvent, onPing, onComplete });
156+
await streamParser(mockStream, { onEvent, onPing, onComplete });
161157

162158
expect(onEvent).toHaveBeenCalledTimes(1);
163159
expect(onEvent).toHaveBeenCalledWith({ data: '{"text": "Hello"}' });
164160
});
165161

166162
it("calls onComplete when stream ends with data", async () => {
167-
async function* mockStream() {
168-
yield new TextEncoder().encode('data: {"text": "Hello"}\n\n');
169-
}
163+
const mockStream = createMockReadableStream([
164+
new TextEncoder().encode('data: {"text": "Hello"}\n\n'),
165+
]);
170166

171-
await streamParser(mockStream(), { onEvent, onPing, onComplete });
167+
await streamParser(mockStream, { onEvent, onPing, onComplete });
172168

173169
expect(onComplete).toHaveBeenCalledTimes(1);
174170
});
175171

176172
it("calls onComplete when stream ends with empty buffer", async () => {
177-
async function* mockStream() {
178-
yield new TextEncoder().encode('data: {"text": "Hello"}\n\n');
179-
}
173+
const mockStream = createMockReadableStream([
174+
new TextEncoder().encode('data: {"text": "Hello"}\n\n'),
175+
]);
180176

181-
await streamParser(mockStream(), { onEvent, onPing, onComplete });
177+
await streamParser(mockStream, { onEvent, onPing, onComplete });
182178

183179
expect(onEvent).toHaveBeenCalledTimes(1);
184180
expect(onComplete).toHaveBeenCalledTimes(1);
185181
});
186182

187183
it("calls onComplete when stream ends with ping", async () => {
188-
async function* mockStream() {
189-
yield new TextEncoder().encode(': ping');
190-
}
184+
const mockStream = createMockReadableStream([
185+
new TextEncoder().encode(": ping"),
186+
]);
191187

192-
await streamParser(mockStream(), { onEvent, onPing, onComplete });
188+
await streamParser(mockStream, { onEvent, onPing, onComplete });
193189

194190
expect(onPing).toHaveBeenCalledTimes(1);
195191
expect(onComplete).toHaveBeenCalledTimes(1);
196192
});
197193

198194
it("calls onComplete after onEvent when stream errors", async () => {
199-
async function* mockStream() {
200-
throw new Error("Stream error");
201-
}
195+
const mockStream = createMockReadableStream([new Error("Stream error")]);
202196

203-
await streamParser(mockStream(), { onEvent, onPing, onComplete });
197+
await streamParser(mockStream, { onEvent, onPing, onComplete });
204198

205199
expect(onEvent).toHaveBeenCalledWith({ error: expect.any(Error) });
206200
expect(onComplete).toHaveBeenCalledTimes(1);

0 commit comments

Comments
 (0)