Skip to content

Add WebSocket handler for Browser and Node #9191

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: dl/live
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
"@types/sinon-chai": "3.2.12",
"@types/tmp": "0.2.6",
"@types/trusted-types": "2.0.7",
"@types/ws": "8.18.1",
"@types/yargs": "17.0.33",
"@typescript-eslint/eslint-plugin": "7.18.0",
"@typescript-eslint/eslint-plugin-tslint": "7.0.2",
Expand Down Expand Up @@ -158,6 +159,7 @@
"typescript": "5.5.4",
"watch": "1.0.2",
"webpack": "5.98.0",
"ws": "8.18.3",
"yargs": "17.7.2"
}
}
1 change: 1 addition & 0 deletions packages/ai/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
"test:ci": "yarn testsetup && node ../../scripts/run_tests_in_ci.js -s test",
"test:skip-clone": "karma start",
"test:browser": "yarn testsetup && karma start",
"test:node": "TS_NODE_COMPILER_OPTIONS='{\"module\":\"commonjs\"}' mocha --require ts-node/register --require src/index.node.ts 'src/**/*.test.ts' --config ../../config/mocharc.node.js",
"test:integration": "karma start --integration",
"api-report": "api-extractor run --local --verbose",
"typings:public": "node ../../scripts/build/use_typings.js ./dist/ai-public.d.ts",
Expand Down
22 changes: 16 additions & 6 deletions packages/ai/rollup.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/

import alias from '@rollup/plugin-alias';
import json from '@rollup/plugin-json';
import typescriptPlugin from 'rollup-plugin-typescript2';
import replace from 'rollup-plugin-replace';
Expand All @@ -23,6 +24,7 @@ import pkg from './package.json';
import tsconfig from './tsconfig.json';
import { generateBuildTargetReplaceConfig } from '../../scripts/build/rollup_replace_build_target';
import { emitModulePackageFile } from '../../scripts/build/rollup_emit_module_package_file';
import { generateAliasConfig } from '../../scripts/build/rollup_generate_alias_config';

const deps = Object.keys(
Object.assign({}, pkg.peerDependencies, pkg.dependencies)
Expand Down Expand Up @@ -55,14 +57,16 @@ const browserBuilds = [
sourcemap: true
},
plugins: [
alias(generateAliasConfig('browser')),
...buildPlugins,
replace({
...generateBuildTargetReplaceConfig('esm', 2020),
__PACKAGE_VERSION__: pkg.version
'__PACKAGE_VERSION__': pkg.version
}),
emitModulePackageFile()
],
external: id => deps.some(dep => id === dep || id.startsWith(`${dep}/`))
external: id =>
id === 'ws' || deps.some(dep => id === dep || id.startsWith(`${dep}/`))
},
{
input: 'src/index.ts',
Expand All @@ -72,13 +76,15 @@ const browserBuilds = [
sourcemap: true
},
plugins: [
alias(generateAliasConfig('browser')),
...buildPlugins,
replace({
...generateBuildTargetReplaceConfig('cjs', 2020),
__PACKAGE_VERSION__: pkg.version
'__PACKAGE_VERSION__': pkg.version
})
],
external: id => deps.some(dep => id === dep || id.startsWith(`${dep}/`))
external: id =>
id === 'ws' || deps.some(dep => id === dep || id.startsWith(`${dep}/`))
}
];

Expand All @@ -91,12 +97,14 @@ const nodeBuilds = [
sourcemap: true
},
plugins: [
alias(generateAliasConfig('node')),
...buildPlugins,
replace({
...generateBuildTargetReplaceConfig('esm', 2020)
})
],
external: id => deps.some(dep => id === dep || id.startsWith(`${dep}/`))
external: id =>
id === 'ws' || deps.some(dep => id === dep || id.startsWith(`${dep}/`))
},
{
input: 'src/index.node.ts',
Expand All @@ -106,12 +114,14 @@ const nodeBuilds = [
sourcemap: true
},
plugins: [
alias(generateAliasConfig('node')),
...buildPlugins,
replace({
...generateBuildTargetReplaceConfig('cjs', 2020)
})
],
external: id => deps.some(dep => id === dep || id.startsWith(`${dep}/`))
external: id =>
id === 'ws' || deps.some(dep => id === dep || id.startsWith(`${dep}/`))
}
];

Expand Down
273 changes: 273 additions & 0 deletions packages/ai/src/platform/browser/websocket.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
/**
* @license
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { expect, use } from 'chai';
import sinon, { SinonFakeTimers, SinonStub } from 'sinon';
import sinonChai from 'sinon-chai';
import chaiAsPromised from 'chai-as-promised';
import { isBrowser } from '@firebase/util';
import { BrowserWebSocketHandler } from './websocket';
import { AIError } from '../../errors';

use(sinonChai);
use(chaiAsPromised);

class MockBrowserWebSocket {
static CONNECTING = 0;
static OPEN = 1;
static CLOSING = 2;
static CLOSED = 3;

readyState: number = MockBrowserWebSocket.CONNECTING;
sentMessages: Array<string | ArrayBuffer> = [];
url: string;
private listeners: Map<string, Set<EventListener>> = new Map();

constructor(url: string) {
this.url = url;
}

send(data: string | ArrayBuffer): void {
if (this.readyState !== MockBrowserWebSocket.OPEN) {
throw new Error('WebSocket is not in OPEN state');
}
this.sentMessages.push(data);
}

close(): void {
if (
this.readyState === MockBrowserWebSocket.CLOSED ||
this.readyState === MockBrowserWebSocket.CLOSING
) {
return;
}
this.readyState = MockBrowserWebSocket.CLOSING;
setTimeout(() => {
this.readyState = MockBrowserWebSocket.CLOSED;
this.dispatchEvent(new Event('close'));
}, 10);
}

addEventListener(type: string, listener: EventListener): void {
if (!this.listeners.has(type)) {
this.listeners.set(type, new Set());
}
this.listeners.get(type)!.add(listener);
}

removeEventListener(type: string, listener: EventListener): void {
this.listeners.get(type)?.delete(listener);
}

dispatchEvent(event: Event): void {
this.listeners.get(event.type)?.forEach(listener => listener(event));
}

triggerOpen(): void {
this.readyState = MockBrowserWebSocket.OPEN;
this.dispatchEvent(new Event('open'));
}

triggerMessage(data: any): void {
this.dispatchEvent(new MessageEvent('message', { data }));
}

triggerError(): void {
this.dispatchEvent(new Event('error'));
}
}

describe('BrowserWebSocketHandler', () => {
let handler: BrowserWebSocketHandler;
let mockWebSocket: MockBrowserWebSocket;
let clock: SinonFakeTimers;
let webSocketStub: SinonStub;

// Only run these tests in a browser environment
if (!isBrowser()) {
return;
}

beforeEach(() => {
webSocketStub = sinon.stub(window, 'WebSocket').callsFake((url: string) => {
mockWebSocket = new MockBrowserWebSocket(url);
return mockWebSocket as any;
});
clock = sinon.useFakeTimers();
handler = new BrowserWebSocketHandler();
});

afterEach(() => {
sinon.restore();
clock.restore();
});

describe('connect()', () => {
it('should resolve on open event', async () => {
const connectPromise = handler.connect('ws://test-url');
expect(webSocketStub).to.have.been.calledWith('ws://test-url');

await clock.tickAsync(1);
mockWebSocket.triggerOpen();

await expect(connectPromise).to.be.fulfilled;
});

it('should reject on error event', async () => {
const connectPromise = handler.connect('ws://test-url');
await clock.tickAsync(1);
mockWebSocket.triggerError();

await expect(connectPromise).to.be.rejectedWith(
AIError,
/Failed to establish WebSocket connection/
);
});
});

describe('listen()', () => {
beforeEach(async () => {
const connectPromise = handler.connect('ws://test');
mockWebSocket.triggerOpen();
await connectPromise;
});

it('should yield multiple messages as they arrive', async () => {
const generator = handler.listen();

const received: unknown[] = [];
const listenPromise = (async () => {
for await (const msg of generator) {
received.push(msg);
}
})();

// Use tickAsync to allow the consumer to start listening
await clock.tickAsync(1);
mockWebSocket.triggerMessage(new Blob([JSON.stringify({ foo: 1 })]));

await clock.tickAsync(10);
mockWebSocket.triggerMessage(new Blob([JSON.stringify({ foo: 2 })]));

await clock.tickAsync(5);
mockWebSocket.close();
await clock.runAllAsync(); // Let timers finish

await listenPromise; // Wait for the consumer to finish

expect(received).to.deep.equal([
{
foo: 1
},
{
foo: 2
}
]);
});

it('should buffer messages that arrive before the consumer calls .next()', async () => {
const generator = handler.listen();

// Create a promise that will consume the generator in a separate async context
const received: unknown[] = [];
const consumptionPromise = (async () => {
for await (const message of generator) {
received.push(message);
}
})();

await clock.tickAsync(1);

mockWebSocket.triggerMessage(new Blob([JSON.stringify({ foo: 1 })]));
mockWebSocket.triggerMessage(new Blob([JSON.stringify({ foo: 2 })]));

await clock.tickAsync(1);
mockWebSocket.close();
await clock.runAllAsync();

await consumptionPromise;

expect(received).to.deep.equal([
{
foo: 1
},
{
foo: 2
}
]);
});
});

describe('close()', () => {
it('should be idempotent and not throw if called multiple times', async () => {
const connectPromise = handler.connect('ws://test');
mockWebSocket.triggerOpen();
await connectPromise;

const closePromise1 = handler.close();
await clock.runAllAsync();
await closePromise1;

await expect(handler.close()).to.be.fulfilled;
});

it('should wait for the onclose event before resolving', async () => {
const connectPromise = handler.connect('ws://test');
mockWebSocket.triggerOpen();
await connectPromise;

let closed = false;
const closePromise = handler.close().then(() => {
closed = true;
});

// The promise should not have resolved yet
await clock.tickAsync(5);
expect(closed).to.be.false;

// Now, let the mock's setTimeout for closing run, which triggers onclose
await clock.tickAsync(10);

await expect(closePromise).to.be.fulfilled;
expect(closed).to.be.true;
});
});

describe('Interaction between listen() and close()', () => {
it('should allow close() to take precedence and resolve correctly, while also terminating the listener', async () => {
const connectPromise = handler.connect('ws://test');
mockWebSocket.triggerOpen();
await connectPromise;

const generator = handler.listen();
const listenPromise = (async () => {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
for await (const _ of generator) {
}
})();

const closePromise = handler.close();

await clock.runAllAsync();

await expect(closePromise).to.be.fulfilled;
await expect(listenPromise).to.be.fulfilled;

expect(mockWebSocket.readyState).to.equal(MockBrowserWebSocket.CLOSED);
});
});
});
Loading
Loading