Skip to content

Add WebSocket handler for Browser and Node #9191

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: dl/live
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/tsconfig.base.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"es2020",
"esnext.WeakRef",
],
"module": "ES2020",
"module": "ES2015",
"moduleResolution": "node",
"resolveJsonModule": true,
"esModuleInterop": true,
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@
"typescript": "5.5.4",
"watch": "1.0.2",
"webpack": "5.98.0",
"ws": "8.18.3",
Copy link
Contributor

@hsubox76 hsubox76 Aug 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're getting rid of this, right?

Edit: Oh, you need it for the server for the test.

Edit 2: Should probably leave it in the package.json for the product package if it's not used anywhere else.

"yargs": "17.7.2"
}
}
5 changes: 2 additions & 3 deletions packages/ai/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
"test:ci": "yarn testsetup && node ../../scripts/run_tests_in_ci.js -s test",
"test:skip-clone": "karma start",
"test:browser": "yarn testsetup && karma start",
"test:node": "TS_NODE_COMPILER_OPTIONS='{\"module\":\"commonjs\"}' mocha --require ts-node/register --require src/index.node.ts src/**/*.test.ts --config ../../config/mocharc.node.js",
"test:node": "TS_NODE_COMPILER_OPTIONS='{\"module\":\"commonjs\"}' mocha --require ts-node/register --require src/index.node.ts 'src/**/*.test.ts' --config ../../config/mocharc.node.js",
"test:integration": "karma start --integration",
"api-report": "api-extractor run --local --verbose",
"typings:public": "node ../../scripts/build/use_typings.js ./dist/ai-public.d.ts",
Expand All @@ -63,8 +63,7 @@
"rollup": "2.79.2",
"rollup-plugin-replace": "2.2.0",
"rollup-plugin-typescript2": "0.36.0",
"typescript": "5.5.4",
"ws": "8.18.3"
"typescript": "5.5.4"
},
"repository": {
"directory": "packages/ai",
Expand Down
11 changes: 6 additions & 5 deletions packages/ai/rollup.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@
* limitations under the License.
*/

import alias from '@rollup/plugin-alias';
import json from '@rollup/plugin-json';
import typescriptPlugin from 'rollup-plugin-typescript2';
import replace from 'rollup-plugin-replace';
import typescript from 'typescript';
import pkg from './package.json';
import tsconfig from './tsconfig.json';
import { generateBuildTargetReplaceConfig } from '../../scripts/build/rollup_replace_build_target';
import { getEnvironmentReplacements } from '../../scripts/build/rollup_get_environment_replacements';
import { emitModulePackageFile } from '../../scripts/build/rollup_emit_module_package_file';
import { generateAliasConfig } from '../../scripts/build/rollup_generate_alias_config';

const deps = Object.keys(
Object.assign({}, pkg.peerDependencies, pkg.dependencies)
Expand Down Expand Up @@ -56,9 +57,9 @@ const browserBuilds = [
sourcemap: true
},
plugins: [
alias(generateAliasConfig('browser')),
...buildPlugins,
replace({
...getEnvironmentReplacements('browser'),
...generateBuildTargetReplaceConfig('esm', 2020),
'__PACKAGE_VERSION__': pkg.version
}),
Expand All @@ -75,9 +76,9 @@ const browserBuilds = [
sourcemap: true
},
plugins: [
alias(generateAliasConfig('browser')),
...buildPlugins,
replace({
...getEnvironmentReplacements('browser'),
...generateBuildTargetReplaceConfig('cjs', 2020),
'__PACKAGE_VERSION__': pkg.version
})
Expand All @@ -96,9 +97,9 @@ const nodeBuilds = [
sourcemap: true
},
plugins: [
alias(generateAliasConfig('node')),
...buildPlugins,
replace({
...getEnvironmentReplacements('node'),
...generateBuildTargetReplaceConfig('esm', 2020)
})
],
Expand All @@ -113,9 +114,9 @@ const nodeBuilds = [
sourcemap: true
},
plugins: [
alias(generateAliasConfig('node')),
...buildPlugins,
replace({
...getEnvironmentReplacements('node'),
...generateBuildTargetReplaceConfig('cjs', 2020)
})
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ import { expect, use } from 'chai';
import sinon, { SinonFakeTimers, SinonStub } from 'sinon';
import sinonChai from 'sinon-chai';
import chaiAsPromised from 'chai-as-promised';
import { BrowserWebSocketHandler } from './browser-websocket-handler';
import { AIError } from '../errors';
import { isBrowser } from '@firebase/util';
import { BrowserWebSocketHandler } from './websocket';
import { AIError } from '../../errors';

use(sinonChai);
use(chaiAsPromised);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,35 @@
* limitations under the License.
*/

import { AIError } from '../errors';
import { AIErrorCode } from '../types';
import { WebSocketHandler } from './websocket-handler';
import { AIError } from '../../errors';
import { AIErrorCode } from '../../types';
import { WebSocketHandler } from '../websocket';

export function createWebSocketHandler(): WebSocketHandler {
if (typeof WebSocket === 'undefined') {
throw new AIError(
AIErrorCode.UNSUPPORTED,
'The WebSocket API is not available in this browser-like environment. ' +
'The "Live" feature is not supported here. It is supported in ' +
'modern browser windows, Web Workers with WebSocket support, and Node >= 22.'
);
}

return new BrowserWebSocketHandler();
}

/**
* A WebSocketHandler implementation for the browser environment.
* It uses the native `WebSocket`.
*
* @internal
*/
export class BrowserWebSocketHandler implements WebSocketHandler {
private ws?: WebSocket;

connect(url: string): Promise<void> {
return new Promise((resolve, reject) => {
try {
this.ws = new WebSocket(url);
} catch (e) {
return reject(
new AIError(
AIErrorCode.ERROR,
`Internal Error: Invalid WebSocket URL: ${url}`
)
);
}

this.ws = new WebSocket(url);
this.ws.addEventListener('open', () => resolve(), { once: true });
this.ws.addEventListener(
'error',
Expand All @@ -63,7 +67,6 @@ export class BrowserWebSocketHandler implements WebSocketHandler {
}

async *listen(): AsyncGenerator<unknown> {
console.log('listener started');
if (!this.ws) {
throw new AIError(
AIErrorCode.REQUEST_ERROR,
Expand All @@ -72,23 +75,57 @@ export class BrowserWebSocketHandler implements WebSocketHandler {
}

const messageQueue: unknown[] = [];
const errorQueue: Error[] = [];
let resolvePromise: (() => void) | null = null;
let isClosed = false;

const messageListener = async (event: MessageEvent): Promise<void> => {
let data: string;
if (event.data instanceof Blob) {
const obj = JSON.parse(await event.data.text()) as unknown;
messageQueue.push(obj);
data = await event.data.text();
} else if (typeof event.data === 'string') {
data = event.data;
} else {
errorQueue.push(
new AIError(
AIErrorCode.PARSE_FAILED,
`Failed to parse WebSocket response. Expected data to be a Blob or string, but was ${typeof event.data}.`
)
);
if (resolvePromise) {
resolvePromise();
resolvePromise = null;
}
} else {
throw new AIError(
AIErrorCode.PARSE_FAILED,
'Failed to parse WebSocket response to JSON, response was not a Blob'
return;
}

try {
const obj = JSON.parse(data) as unknown;
messageQueue.push(obj);
} catch (e) {
const err = e as Error;
errorQueue.push(
new AIError(
AIErrorCode.PARSE_FAILED,
`Error parsing WebSocket message to JSON: ${err.message}`
)
);
}

if (resolvePromise) {
resolvePromise();
resolvePromise = null;
}
};

const errorListener = (): void => {
errorQueue.push(
new AIError(AIErrorCode.FETCH_ERROR, 'WebSocket connection error.')
);
if (resolvePromise) {
resolvePromise();
resolvePromise = null;
}
};

const closeListener = (): void => {
Expand All @@ -100,12 +137,18 @@ export class BrowserWebSocketHandler implements WebSocketHandler {
// Clean up listeners to prevent memory leaks
this.ws?.removeEventListener('message', messageListener);
this.ws?.removeEventListener('close', closeListener);
this.ws?.removeEventListener('error', errorListener);
};

this.ws.addEventListener('message', messageListener);
this.ws.addEventListener('close', closeListener);
this.ws.addEventListener('error', errorListener);

while (!isClosed) {
if (errorQueue.length > 0) {
const error = errorQueue.shift()!;
throw error;
}
if (messageQueue.length > 0) {
yield messageQueue.shift()!;
} else {
Expand All @@ -114,19 +157,32 @@ export class BrowserWebSocketHandler implements WebSocketHandler {
});
}
}

// If the loop terminated because isClosed is true, check for any final errors
if (errorQueue.length > 0) {
const error = errorQueue.shift()!;
throw error;
}
}

close(code?: number, reason?: string): Promise<void> {
return new Promise(resolve => {
if (!this.ws) {
return resolve();
}

this.ws.addEventListener('close', () => resolve(), { once: true });
// Calling 'close' during these states results in an error.
if (
!this.ws ||
this.ws.readyState === WebSocket.CLOSED ||
this.ws.readyState === WebSocket.CLOSING
this.ws.readyState === WebSocket.CONNECTING
) {
return resolve();
}
this.ws.addEventListener('close', () => resolve(), { once: true });
this.ws.close(code, reason);

if (this.ws.readyState !== WebSocket.CLOSING) {
this.ws.close(code, reason);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,18 @@ import { expect, use } from 'chai';
import sinonChai from 'sinon-chai';
import chaiAsPromised from 'chai-as-promised';
import { isNode } from '@firebase/util';
import { NodeWebSocketHandler } from './node-websocket-handler';
import { WebSocketHandler } from './websocket-handler';
import { MockWebSocketServer } from '../../test-utils/mock-websocket-server';
import { AIError } from '../errors';
import { TextEncoder } from 'util';
import { MockWebSocketServer } from '../../../test-utils/mock-websocket-server';
import { WebSocketHandler } from '../websocket';
import { NodeWebSocketHandler } from './websocket';

use(sinonChai);
use(chaiAsPromised);

const TEST_PORT = 9003;
const TEST_URL = `ws://localhost:${TEST_PORT}`;

describe('NodeWebSocketHandler (Integration Tests)', () => {
describe('NodeWebSocketHandler', () => {
let server: MockWebSocketServer;
let handler: WebSocketHandler;

Expand Down Expand Up @@ -66,12 +65,9 @@ describe('NodeWebSocketHandler (Integration Tests)', () => {
expect(server.clients.size).to.equal(1);
});

it('should reject if the connection fails (e.g., wrong port)', async () => {
const wrongPortUrl = `ws://localhost:${TEST_PORT + 1}`;
await expect(handler.connect(wrongPortUrl)).to.be.rejectedWith(
AIError,
/Failed to establish WebSocket connection/
);
it('should reject if the connection fails', async () => {
const wrongPortUrl = `ws://wrongUrl:9000`;
await expect(handler.connect(wrongPortUrl)).to.be.rejected;
});
});

Expand Down
Loading
Loading