Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 64 additions & 6 deletions js/private/watch/src/aspect_watch_protocol.mts
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,36 @@ export enum MessageType {
CYCLE_COMPLETED = 'CYCLE_COMPLETED',
NEGOTIATE = 'NEGOTIATE',
NEGOTIATE_RESPONSE = 'NEGOTIATE_RESPONSE',
SUBSCRIBE = 'SUBSCRIBE',
SUBSCRIBE_RESPONSE = 'SUBSCRIBE_RESPONSE',
EXIT = 'EXIT',
}

export interface Message {
readonly kind: MessageType
}

export interface NegotiateMessage extends Message {
readonly kind: MessageType.NEGOTIATE
readonly versions: number[]
}

export interface NegotiateResponseMessage extends Message {
readonly kind: MessageType.NEGOTIATE_RESPONSE
readonly version: number
}

export type WatchType = 'sources' | 'runfiles'

export interface SubscribeMessage extends Message {
readonly kind: MessageType.SUBSCRIBE
readonly watch_type: WatchType
}

export interface SubscribeResponseMessage extends Message {
readonly kind: MessageType.SUBSCRIBE_RESPONSE
}

export interface SourceInfo {
readonly is_symlink?: boolean
readonly is_source?: boolean
Expand All @@ -40,12 +63,29 @@ export interface CycleSourcesMessage extends CycleMessage {
readonly sources: CycleMessageSources
}

export interface ExitMessage extends Message {
readonly kind: MessageType.EXIT
}

// Environment constants
const { JS_BINARY__LOG_DEBUG } = process.env

function selectVersion(versions: number[]): number {
if (versions.includes(1)) {
return 1
}
if (versions.includes(0)) {
return 0
}

throw new Error(`No supported protocol versions: ${versions.join(', ')}`)
}

export class AspectWatchProtocol {
private readonly socketFile: string
private readonly connection: net.Socket

private _version: number = -1
private _error: (err: Error) => void
private _cycle: (msg: CycleMessage) => Promise<void>

Expand Down Expand Up @@ -74,9 +114,18 @@ export class AspectWatchProtocol {
}
})

await this._receive(MessageType.NEGOTIATE)
// TODO: throw if unsupported version
await this._send(MessageType.NEGOTIATE_RESPONSE, { version: 0 })
const { versions } = await this._receive<NegotiateMessage>(
MessageType.NEGOTIATE
)

const version = selectVersion(versions)

await this._send<NegotiateResponseMessage>(
MessageType.NEGOTIATE_RESPONSE,
{ version }
)

this._version = version

return this
}
Expand All @@ -87,7 +136,7 @@ export class AspectWatchProtocol {
async disconnect() {
if (this.connection.writable) {
try {
await this._send(MessageType.EXIT)
await this._send<ExitMessage>(MessageType.EXIT, {})
} catch (e) {
if (JS_BINARY__LOG_DEBUG) {
console.log(
Expand Down Expand Up @@ -124,6 +173,15 @@ export class AspectWatchProtocol {
* build has completed before proceeding.
*/
async awaitFirstCycle() {
if (this._version > 0) {
await this._send<SubscribeMessage>(MessageType.SUBSCRIBE, {
watch_type: 'runfiles',
})
await this._receive<SubscribeResponseMessage>(
MessageType.SUBSCRIBE_RESPONSE
)
}

await this.cycle(true)
}

Expand Down Expand Up @@ -163,7 +221,7 @@ export class AspectWatchProtocol {
}

async _receive<M extends Message>(
type: MessageType | null = null
type: M['kind'] | null = null
): Promise<M> {
return await new Promise((resolve, reject) => {
const dataBufs: Buffer[] = []
Expand Down Expand Up @@ -217,7 +275,7 @@ export class AspectWatchProtocol {
})
}

async _send(type: MessageType, data: Omit<Message, 'kind'> = {}) {
async _send<M extends Message>(type: M['kind'], data: Omit<M, 'kind'>) {
await new Promise<void>((resolve, reject) => {
try {
this.connection.write(
Expand Down
Loading