Skip to content

Commit 8c1a22f

Browse files
committed
IPC tweaks
1 parent 8b36711 commit 8c1a22f

File tree

12 files changed

+892
-354
lines changed

12 files changed

+892
-354
lines changed
Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1-
import { IpcClient } from "../src/ipcClient"
1+
import { IpcClientMessageType, IpcClient } from "../src/index.js"
22

3-
async function main(socketPath: string) {
3+
async function main(socketPath: string, prompt: string) {
44
try {
55
const startTime = Date.now()
66
const client = new IpcClient(socketPath)
77

8+
client.on("message", (data) => console.log(data))
9+
810
while (!client.isConnected) {
911
if (Date.now() - startTime > 5000) {
1012
throw new Error("Failed to connect to server.")
@@ -13,9 +15,10 @@ async function main(socketPath: string) {
1315
await new Promise((resolve) => setTimeout(resolve, 1000))
1416
}
1517

18+
client.sendMessage({ type: IpcClientMessageType.StartNewTask, data: { text: prompt } })
19+
1620
while (client.isConnected) {
17-
client.ping()
18-
await new Promise((resolve) => setTimeout(resolve, 5000))
21+
await new Promise((resolve) => setTimeout(resolve, 1000))
1922
}
2023

2124
process.exit(0)
@@ -26,8 +29,8 @@ async function main(socketPath: string) {
2629
}
2730

2831
if (!process.argv[2]) {
29-
console.error("Usage: npx tsx scripts/client.ts <socketPath>")
32+
console.error("Usage: npx tsx scripts/client.ts <socketPath> <prompt>")
3033
process.exit(1)
3134
}
3235

33-
main(process.argv[2])
36+
main(process.argv[2], process.argv[3])

benchmark/packages/ipc/scripts/server.ts

Lines changed: 0 additions & 27 deletions
This file was deleted.
Lines changed: 261 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,262 @@
1-
export { IpcClient } from "./ipcClient.js"
2-
export { IpcServer } from "./ipcServer.js"
1+
import EventEmitter from "node:events"
2+
import { Socket } from "node:net"
3+
import * as crypto from "node:crypto"
34

4-
export * from "./types.js"
5+
import ipc from "node-ipc"
6+
import { z } from "zod"
7+
8+
/**
9+
* IpcClient
10+
*/
11+
12+
export type IpcClientId = string
13+
14+
export enum IpcClientMessageType {
15+
Message = "Message",
16+
StartNewTask = "StartNewTask",
17+
}
18+
19+
export const ipcClientMessageSchema = z.discriminatedUnion("type", [
20+
z.object({
21+
type: z.literal(IpcClientMessageType.StartNewTask),
22+
data: z.object({
23+
text: z.string(),
24+
images: z.array(z.string()).optional(),
25+
}),
26+
}),
27+
])
28+
29+
export type IpcClientMessage = z.infer<typeof ipcClientMessageSchema>
30+
31+
export interface IpcClientEvents {
32+
connect: []
33+
disconnect: []
34+
message: [data: IpcServerMessage]
35+
}
36+
37+
export class IpcClient extends EventEmitter<IpcClientEvents> {
38+
private readonly _socketPath: string
39+
private readonly _log: (...args: unknown[]) => void
40+
41+
private _isConnected = false
42+
private _clientId?: IpcClientId
43+
44+
constructor(socketPath: string, log = console.log) {
45+
super()
46+
47+
this._socketPath = socketPath
48+
this._log = log
49+
50+
ipc.config.silent = true
51+
52+
ipc.connectTo("benchmarkServer", this.socketPath, () => {
53+
ipc.of.benchmarkServer?.on("connect", (args) => this.onConnect(args))
54+
ipc.of.benchmarkServer?.on("disconnect", (args) => this.onDisconnect(args))
55+
ipc.of.benchmarkServer?.on("message", (data) => this.onMessage(data))
56+
})
57+
}
58+
59+
private onConnect(args: unknown) {
60+
if (this._isConnected) {
61+
return
62+
}
63+
64+
this.log("[client#onConnect]", args)
65+
this._isConnected = true
66+
this.emit("connect")
67+
}
68+
69+
private onDisconnect(args: unknown) {
70+
if (!this._isConnected) {
71+
return
72+
}
73+
74+
this.log("[client#onDisconnect]", args)
75+
this._isConnected = false
76+
this.emit("disconnect")
77+
}
78+
79+
private onMessage(data: unknown) {
80+
if (typeof data !== "object") {
81+
this._log("[client#onMessage] invalid data", data)
82+
return
83+
}
84+
85+
const result = ipcServerMessageSchema.safeParse(data)
86+
87+
if (!result.success) {
88+
this.log("[client#onMessage] invalid payload", result.error)
89+
return
90+
}
91+
92+
this.emit("message", result.data)
93+
}
94+
95+
private log(...args: unknown[]) {
96+
this._log(...args)
97+
}
98+
99+
public sendMessage(message: IpcClientMessage) {
100+
ipc.of.benchmarkServer?.emit("message", message)
101+
}
102+
103+
public disconnect() {
104+
try {
105+
ipc.disconnect("benchmarkServer")
106+
// @TODO: Should we set _disconnect here?
107+
} catch (error) {
108+
this.log("[client#disconnect] error disconnecting", error)
109+
}
110+
}
111+
112+
public get socketPath() {
113+
return this._socketPath
114+
}
115+
116+
public get clientId() {
117+
return this._clientId
118+
}
119+
120+
public get isConnected() {
121+
return this._isConnected
122+
}
123+
124+
public get isReady() {
125+
return this._isConnected && this._clientId !== undefined
126+
}
127+
}
128+
129+
/**
130+
* IpcServer
131+
*/
132+
133+
export enum IpcServerMessageType {
134+
Ack = "Ack",
135+
TaskEvent = "TaskEvent",
136+
}
137+
138+
export const ipcServerMessageSchema = z.discriminatedUnion("type", [
139+
z.object({
140+
type: z.literal(IpcServerMessageType.Ack),
141+
data: z.object({ clientId: z.string() }),
142+
}),
143+
z.object({
144+
type: z.literal(IpcServerMessageType.TaskEvent),
145+
data: z.object({
146+
eventName: z.string(),
147+
data: z.unknown(),
148+
}),
149+
}),
150+
])
151+
152+
export type IpcServerMessage = z.infer<typeof ipcServerMessageSchema>
153+
154+
type IpcServerEvents = {
155+
connect: [id: IpcClientId]
156+
disconnect: [id: IpcClientId]
157+
message: [data: IpcClientMessage]
158+
}
159+
160+
export class IpcServer extends EventEmitter<IpcServerEvents> {
161+
private readonly _socketPath: string
162+
private readonly _log: (...args: unknown[]) => void
163+
private readonly _clients: Map<string, Socket>
164+
165+
private _isListening = false
166+
167+
constructor(socketPath: string, log = console.log) {
168+
super()
169+
170+
this._socketPath = socketPath
171+
this._log = log
172+
this._clients = new Map()
173+
}
174+
175+
public listen() {
176+
this._isListening = true
177+
178+
ipc.config.silent = true
179+
180+
ipc.serve(this.socketPath, () => {
181+
ipc.server.on("connect", (socket) => this.onConnect(socket))
182+
ipc.server.on("socket.disconnected", (socket) => this.onDisconnect(socket))
183+
ipc.server.on("message", (data) => this.onMessage(data))
184+
})
185+
186+
ipc.server.start()
187+
}
188+
189+
private onConnect(socket: Socket) {
190+
const clientId = crypto.randomBytes(6).toString("hex")
191+
this._clients.set(clientId, socket)
192+
this.log(`[server#onConnect] clientId = ${clientId}, # clients = ${this._clients.size}`)
193+
this.send(socket, { type: IpcServerMessageType.Ack, data: { clientId } })
194+
this.emit("connect", clientId)
195+
}
196+
197+
private onDisconnect(destroyedSocket: Socket) {
198+
let disconnectedClientId: IpcClientId | undefined
199+
200+
for (const [clientId, socket] of this._clients.entries()) {
201+
if (socket === destroyedSocket) {
202+
disconnectedClientId = clientId
203+
this._clients.delete(clientId)
204+
break
205+
}
206+
}
207+
208+
this.log(`[server#socket.disconnected] clientId = ${disconnectedClientId}, # clients = ${this._clients.size}`)
209+
210+
if (disconnectedClientId) {
211+
this.emit("disconnect", disconnectedClientId)
212+
}
213+
}
214+
215+
private onMessage(data: unknown) {
216+
if (typeof data !== "object") {
217+
this.log("[server#onMessage] invalid data", data)
218+
return
219+
}
220+
221+
const result = ipcClientMessageSchema.safeParse(data)
222+
223+
if (!result.success) {
224+
this.log("[server#onMessage] invalid payload", result.error)
225+
return
226+
}
227+
228+
const payload = result.data
229+
this.emit("message", payload)
230+
}
231+
232+
private log(...args: unknown[]) {
233+
this._log(...args)
234+
}
235+
236+
public broadcast(message: IpcServerMessage) {
237+
this.log("[server#broadcast] message =", message)
238+
ipc.server.broadcast("message", message)
239+
}
240+
241+
public send(client: IpcClientId | Socket, message: IpcServerMessage) {
242+
this.log("[server#send] message =", message)
243+
244+
if (typeof client === "string") {
245+
const socket = this._clients.get(client)
246+
247+
if (socket) {
248+
ipc.server.emit(socket, "message", message)
249+
}
250+
} else {
251+
ipc.server.emit(client, "message", message)
252+
}
253+
}
254+
255+
public get socketPath() {
256+
return this._socketPath
257+
}
258+
259+
public get isListening() {
260+
return this._isListening
261+
}
262+
}

0 commit comments

Comments
 (0)