Skip to content

Commit 9e47658

Browse files
committed
Add event-based asynchronous communication
1 parent 26a9f84 commit 9e47658

File tree

8 files changed

+331
-3
lines changed

8 files changed

+331
-3
lines changed

agents/build.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import dts from 'bun-plugin-dts';
77
await Bun.build({
88
entrypoints: ['./src/index.ts', './src/tts/index.ts', './src/stt/index.ts'],
99
outdir: './dist',
10-
target: 'node',
10+
target: 'bun', // https://github.com/oven-sh/bun/blob/main/src/bundler/bundle_v2.zig#L2667
1111
sourcemap: 'external',
1212
root: './src',
1313
plugins: [dts()],

agents/src/http_server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
15
import { createServer, ServerResponse, Server, IncomingMessage } from 'http';
26

37
const healthCheck = async (res: ServerResponse) => {

agents/src/ipc/job_main.ts

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
import {
6+
JobMainArgs,
7+
Message,
8+
Ping,
9+
Pong,
10+
ShutdownRequest,
11+
ShutdownResponse,
12+
StartJobRequest,
13+
StartJobResponse,
14+
UserExit,
15+
} from './protocol';
16+
import { Room } from '@livekit/rtc-node';
17+
import { EventEmitter } from 'events';
18+
import { JobContext } from '../job_context';
19+
import { log } from '../log';
20+
21+
export const runJob = (event: EventEmitter, args: JobMainArgs) => {
22+
const room = new Room();
23+
const conn = room.connect(args.url, args.token);
24+
let request: StartJobRequest | undefined = undefined;
25+
let shuttingDown = false;
26+
let closed = false;
27+
let task: ((arg: JobContext) => void) | undefined = undefined;
28+
let context: JobContext | undefined = undefined;
29+
30+
const start = () => {
31+
if (request && room.isConnected && !closed) {
32+
event.emit('msg', new StartJobResponse());
33+
34+
task = args.target;
35+
context = new JobContext(event, request.job, room);
36+
}
37+
};
38+
39+
new Promise(() => {
40+
conn
41+
.then(() => {
42+
if (!closed) start();
43+
})
44+
.catch(() => {
45+
if (!closed) event.emit('msg', new StartJobResponse());
46+
});
47+
});
48+
49+
while (!closed) {
50+
event.once('close', () => {
51+
event.emit('msg', new UserExit());
52+
closed = true;
53+
});
54+
55+
event.on('msg', (msg: Message) => {
56+
if (msg instanceof ShutdownRequest) {
57+
shuttingDown = true;
58+
closed = true;
59+
} else if (msg instanceof StartJobRequest) {
60+
request = msg;
61+
start();
62+
} else if (msg instanceof Ping) {
63+
event.emit('msg', new Pong(msg.timestamp, Date.now()));
64+
}
65+
});
66+
}
67+
68+
log.debug('disconnecting from room');
69+
room.disconnect().then(() => {
70+
if (task !== undefined && context) task(context);
71+
72+
if (shuttingDown) {
73+
event.emit('msg', new ShutdownResponse());
74+
}
75+
76+
event.removeAllListeners();
77+
});
78+
};

agents/src/ipc/job_process.ts

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
import { Job } from '@livekit/protocol';
6+
import {
7+
JobMainArgs,
8+
Log,
9+
Message,
10+
Ping,
11+
Pong,
12+
ShutdownRequest,
13+
ShutdownResponse,
14+
StartJobRequest,
15+
StartJobResponse,
16+
UserExit,
17+
} from './protocol';
18+
import { JobContext } from '../job_context';
19+
import { runJob } from './job_main';
20+
import { EventEmitter } from 'events';
21+
import { log } from '../log';
22+
23+
const START_TIMEOUT = 90;
24+
const PING_INTERVAL = 5;
25+
const PING_TIMEOUT = 90;
26+
const HIGH_PING_THRESHOLD = 10; // milliseconds
27+
28+
export class JobProcess {
29+
#job: Job;
30+
args: JobMainArgs;
31+
logger = log.child({ job_id: this.job.id });
32+
event: EventEmitter;
33+
closed = false;
34+
35+
constructor(job: Job, url: string, token: string, target: (arg: JobContext) => void) {
36+
this.#job = job;
37+
this.args = { jobID: job.id, url, token, target };
38+
this.event = new EventEmitter();
39+
}
40+
41+
get job(): Job {
42+
return this.#job;
43+
}
44+
45+
async close() {
46+
this.logger.info('closing job process');
47+
this.event.emit('msg', new ShutdownRequest());
48+
this.event.removeAllListeners();
49+
}
50+
51+
async run() {
52+
let resp: StartJobResponse | undefined = undefined;
53+
54+
runJob(this.event, this.args);
55+
this.event.emit('msg', new StartJobRequest(this.job));
56+
57+
setTimeout(() => {
58+
if (resp === undefined) {
59+
this.logger.error('process start timed out, killing job');
60+
this.closed = true;
61+
}
62+
}, START_TIMEOUT);
63+
64+
const pingInterval = setInterval(() => {
65+
if (this.closed) clearInterval(pingInterval);
66+
else {
67+
this.event.emit('msg', new Ping(Date.now()));
68+
}
69+
}, PING_INTERVAL);
70+
71+
const pongTimeout = setTimeout(() => {
72+
this.logger.error('job ping timed out, killing job');
73+
this.closed = true;
74+
}, PING_TIMEOUT);
75+
76+
while (!this.closed) {
77+
this.event.on('msg', (msg: Message) => {
78+
if (msg instanceof StartJobResponse) {
79+
resp = msg;
80+
} else if (msg instanceof Log) {
81+
switch (msg.level) {
82+
// pino uses 10, 20, ..., 60 as representations for log levels
83+
case 10:
84+
this.logger.trace(msg.message);
85+
break;
86+
case 20:
87+
this.logger.debug(msg.message);
88+
break;
89+
case 30:
90+
this.logger.info(msg.message);
91+
break;
92+
case 40:
93+
this.logger.warn(msg.message);
94+
break;
95+
case 50:
96+
this.logger.error(msg.message);
97+
break;
98+
case 60:
99+
this.logger.fatal(msg.message);
100+
break;
101+
}
102+
} else if (msg instanceof Pong) {
103+
const delay = Date.now() - msg.timestamp;
104+
if (delay > HIGH_PING_THRESHOLD) {
105+
this.logger.warn(`job is unresponsive (${delay}ms delay)`);
106+
pongTimeout.refresh();
107+
}
108+
} else if (msg instanceof UserExit || msg instanceof ShutdownResponse) {
109+
this.logger.info('job exiting');
110+
this.closed = true;
111+
}
112+
});
113+
}
114+
115+
this.logger.info('job process closed');
116+
}
117+
}

agents/src/ipc/protocol.ts

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
import { Job } from '@livekit/protocol';
6+
import { JobContext } from '../job_context';
7+
8+
export type JobMainArgs = {
9+
jobID: string;
10+
url: string;
11+
token: string;
12+
target: (arg: JobContext) => void;
13+
};
14+
15+
export interface Message {
16+
MSG_ID: number; // TypeScript is weird with statics; this requires a getter hack
17+
}
18+
19+
export class StartJobRequest implements Message {
20+
static MSG_ID = 0;
21+
job: Job;
22+
23+
get MSG_ID(): number {
24+
return StartJobRequest.MSG_ID;
25+
}
26+
27+
constructor(job = new Job()) {
28+
this.job = job;
29+
}
30+
}
31+
32+
export class StartJobResponse implements Message {
33+
static MSG_ID = 1;
34+
err: Error | undefined;
35+
36+
get MSG_ID(): number {
37+
return StartJobResponse.MSG_ID;
38+
}
39+
40+
constructor(err: Error | undefined = undefined) {
41+
this.err = err;
42+
}
43+
}
44+
45+
export class Log implements Message {
46+
static MSG_ID = 2;
47+
level: number;
48+
message: string;
49+
50+
get MSG_ID(): number {
51+
return Log.MSG_ID;
52+
}
53+
54+
constructor(level = 10, message = '') {
55+
this.level = level;
56+
this.message = message;
57+
}
58+
}
59+
60+
export class Ping implements Message {
61+
static MSG_ID = 3;
62+
timestamp: number;
63+
64+
get MSG_ID(): number {
65+
return Ping.MSG_ID;
66+
}
67+
68+
constructor(timestamp = 0) {
69+
this.timestamp = timestamp;
70+
}
71+
}
72+
73+
export class Pong implements Message {
74+
static MSG_ID = 4;
75+
lastTimestamp: number;
76+
timestamp: number;
77+
78+
get MSG_ID(): number {
79+
return Pong.MSG_ID;
80+
}
81+
82+
constructor(lastTimestamp = 0, timestamp = 0) {
83+
this.lastTimestamp = lastTimestamp;
84+
this.timestamp = timestamp;
85+
}
86+
}
87+
88+
export class ShutdownRequest implements Message {
89+
static MSG_ID = 5;
90+
91+
get MSG_ID(): number {
92+
return ShutdownRequest.MSG_ID;
93+
}
94+
}
95+
96+
export class ShutdownResponse implements Message {
97+
static MSG_ID = 6;
98+
99+
get MSG_ID(): number {
100+
return ShutdownResponse.MSG_ID;
101+
}
102+
}
103+
104+
export class UserExit implements Message {
105+
static MSG_ID = 7;
106+
107+
get MSG_ID(): number {
108+
return UserExit.MSG_ID;
109+
}
110+
}
111+
112+
export const IPC_MESSAGES: { [x: number]: Message } = {
113+
[StartJobRequest.MSG_ID]: new StartJobRequest(),
114+
[StartJobResponse.MSG_ID]: new StartJobResponse(),
115+
[Log.MSG_ID]: new Log(),
116+
[Ping.MSG_ID]: new Ping(),
117+
[Pong.MSG_ID]: new Pong(),
118+
[ShutdownRequest.MSG_ID]: new ShutdownRequest(),
119+
[ShutdownResponse.MSG_ID]: new ShutdownResponse(),
120+
[UserExit.MSG_ID]: new UserExit(),
121+
};

agents/src/job_context.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,6 @@ export class JobContext {
4545
}
4646

4747
async shutdown() {
48-
this.tx.removeAllListeners();
48+
this.tx.emit('close');
4949
}
5050
}

agents/src/job_request.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
15
import { JobContext } from './job_context';
26
import { VideoGrant } from 'livekit-server-sdk';
37
import { Job, ParticipantInfo, Room } from '@livekit/protocol';
@@ -27,7 +31,7 @@ enum AutoSubscribe {
2731

2832
type AgentEntry = (ctx: JobContext) => Promise<void>;
2933

30-
type AcceptData = {
34+
export type AcceptData = {
3135
entry: AgentEntry;
3236
autoSubscribe: AutoSubscribe;
3337
autoDisconnect: AutoDisconnect;

agents/src/log.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
15
import pino from 'pino';
26

37
export const log = pino({

0 commit comments

Comments
 (0)