From f96c80e8ce0d70033f0c0c2321fa02fdfc4426d0 Mon Sep 17 00:00:00 2001 From: Jason Bedard Date: Sat, 1 Nov 2025 15:30:02 -0700 Subject: [PATCH] feat: support watch protocol v1 --- .../watch/src/aspect_watch_protocol.mts | 70 +++++++++++++++++-- 1 file changed, 64 insertions(+), 6 deletions(-) diff --git a/js/private/watch/src/aspect_watch_protocol.mts b/js/private/watch/src/aspect_watch_protocol.mts index 61c3001ba..83ea5ba2c 100644 --- a/js/private/watch/src/aspect_watch_protocol.mts +++ b/js/private/watch/src/aspect_watch_protocol.mts @@ -11,6 +11,8 @@ export enum MessageType { CYCLE_COMPLETED = 'CYCLE_COMPLETED', NEGOTIATE = 'NEGOTIATE', NEGOTIATE_RESPONSE = 'NEGOTIATE_RESPONSE', + SUBSCRIBE = 'SUBSCRIBE', + SUBSCRIBE_RESPONSE = 'SUBSCRIBE_RESPONSE', EXIT = 'EXIT', } @@ -18,6 +20,27 @@ export interface Message { readonly kind: MessageType } +export interface NegotiateMessage extends Message { + readonly kind: MessageType.NEGOTIATE + readonly versions: number[] +} + +export interface NegotiateResponseMessage extends Message { + readonly kind: MessageType.NEGOTIATE_RESPONSE + readonly version: number +} + +export type WatchType = 'sources' | 'runfiles' + +export interface SubscribeMessage extends Message { + readonly kind: MessageType.SUBSCRIBE + readonly watch_type: WatchType +} + +export interface SubscribeResponseMessage extends Message { + readonly kind: MessageType.SUBSCRIBE_RESPONSE +} + export interface SourceInfo { readonly is_symlink?: boolean readonly is_source?: boolean @@ -40,12 +63,29 @@ export interface CycleSourcesMessage extends CycleMessage { readonly sources: CycleMessageSources } +export interface ExitMessage extends Message { + readonly kind: MessageType.EXIT +} + // Environment constants const { JS_BINARY__LOG_DEBUG } = process.env +function selectVersion(versions: number[]): number { + if (versions.includes(1)) { + return 1 + } + if (versions.includes(0)) { + return 0 + } + + throw new Error(`No supported protocol versions: ${versions.join(', ')}`) +} + export class AspectWatchProtocol { private readonly socketFile: string private readonly connection: net.Socket + + private _version: number = -1 private _error: (err: Error) => void private _cycle: (msg: CycleMessage) => Promise @@ -74,9 +114,18 @@ export class AspectWatchProtocol { } }) - await this._receive(MessageType.NEGOTIATE) - // TODO: throw if unsupported version - await this._send(MessageType.NEGOTIATE_RESPONSE, { version: 0 }) + const { versions } = await this._receive( + MessageType.NEGOTIATE + ) + + const version = selectVersion(versions) + + await this._send( + MessageType.NEGOTIATE_RESPONSE, + { version } + ) + + this._version = version return this } @@ -87,7 +136,7 @@ export class AspectWatchProtocol { async disconnect() { if (this.connection.writable) { try { - await this._send(MessageType.EXIT) + await this._send(MessageType.EXIT, {}) } catch (e) { if (JS_BINARY__LOG_DEBUG) { console.log( @@ -124,6 +173,15 @@ export class AspectWatchProtocol { * build has completed before proceeding. */ async awaitFirstCycle() { + if (this._version > 0) { + await this._send(MessageType.SUBSCRIBE, { + watch_type: 'runfiles', + }) + await this._receive( + MessageType.SUBSCRIBE_RESPONSE + ) + } + await this.cycle(true) } @@ -163,7 +221,7 @@ export class AspectWatchProtocol { } async _receive( - type: MessageType | null = null + type: M['kind'] | null = null ): Promise { return await new Promise((resolve, reject) => { const dataBufs: Buffer[] = [] @@ -217,7 +275,7 @@ export class AspectWatchProtocol { }) } - async _send(type: MessageType, data: Omit = {}) { + async _send(type: M['kind'], data: Omit) { await new Promise((resolve, reject) => { try { this.connection.write(