Skip to content

Commit 6eaa8dc

Browse files
committed
Add JobRequest and properly implement event tx
1 parent 0cc008d commit 6eaa8dc

File tree

6 files changed

+148
-7
lines changed

6 files changed

+148
-7
lines changed

agents/package.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@
2525
"typescript": "^5.0.0"
2626
},
2727
"dependencies": {
28+
"@livekit/protocol": "^1.12.0",
2829
"commander": "^12.0.0",
29-
"@livekit/protocol": "^1.12.0"
30+
"livekit-server-sdk": "^2.1.2",
31+
"pino": "^8.19.0",
32+
"pino-pretty": "^11.0.0"
3033
}
3134
}

agents/src/job_context.ts

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,24 @@
44

55
import { Room, LocalParticipant, RemoteParticipant } from '@livekit/rtc-node';
66
import { Job } from '@livekit/protocol';
7+
import { EventEmitter } from 'events';
78

89
export class JobContext {
910
#job: Job;
1011
#room: Room;
1112
#publisher: RemoteParticipant | undefined;
12-
13-
constructor(job: Job, room: Room, publisher: RemoteParticipant | undefined = undefined) {
13+
tx: EventEmitter;
14+
15+
constructor(
16+
tx: EventEmitter,
17+
job: Job,
18+
room: Room,
19+
publisher: RemoteParticipant | undefined = undefined,
20+
) {
1421
this.#job = job;
1522
this.#room = room;
1623
this.#publisher = publisher;
24+
this.tx = tx;
1725
}
1826

1927
get id(): string {
@@ -36,8 +44,7 @@ export class JobContext {
3644
return this.#room.localParticipant;
3745
}
3846

39-
// TODO(nbsp): aio
40-
// async shutdown() {
41-
// await this.closeTx.close();
42-
// }
47+
async shutdown() {
48+
this.tx.removeAllListeners();
49+
}
4350
}

agents/src/job_request.ts

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
import { JobContext } from './job_context';
2+
import { VideoGrant } from 'livekit-server-sdk';
3+
import { Job, ParticipantInfo, Room } from '@livekit/protocol';
4+
import { log } from './log';
5+
import { EventEmitter } from 'events';
6+
7+
class AnsweredError extends Error {
8+
constructor() {
9+
super();
10+
this.name = 'AnsweredError';
11+
this.message = 'request already answered';
12+
}
13+
}
14+
15+
enum AutoDisconnect {
16+
ROOM_EMPTY,
17+
PUBLISHER_LEFT,
18+
NONE,
19+
}
20+
21+
enum AutoSubscribe {
22+
SUBSCRIBE_ALL,
23+
SUBSCRIBE_NONE,
24+
VIDEO_ONLY,
25+
AUDIO_ONLY,
26+
}
27+
28+
type AgentEntry = (ctx: JobContext) => Promise<void>;
29+
30+
type AcceptData = {
31+
entry: AgentEntry;
32+
autoSubscribe: AutoSubscribe;
33+
autoDisconnect: AutoDisconnect;
34+
grants: VideoGrant;
35+
name: string;
36+
identity: string;
37+
metadata: string;
38+
assign: EventEmitter;
39+
};
40+
41+
type AvailRes = {
42+
avail: boolean;
43+
data: AcceptData | undefined;
44+
};
45+
46+
export class JobRequest {
47+
#job: Job;
48+
#answered = false;
49+
tx: EventEmitter;
50+
logger = log.child({ job: this.job });
51+
52+
constructor(job: Job, tx: EventEmitter) {
53+
this.#job = job;
54+
this.tx = tx;
55+
}
56+
57+
get id(): string {
58+
return this.#job.id;
59+
}
60+
61+
get job(): Job {
62+
return this.#job;
63+
}
64+
65+
get room(): Room | undefined {
66+
return this.#job.room;
67+
}
68+
69+
get publisher(): ParticipantInfo | undefined {
70+
return this.#job.participant;
71+
}
72+
73+
get answered(): boolean {
74+
return this.#answered;
75+
}
76+
77+
async reject() {
78+
if (this.#answered) {
79+
throw new AnsweredError();
80+
}
81+
this.#answered = true;
82+
this.tx.emit('recv', { avail: false, data: undefined } as AvailRes);
83+
this.logger.info('rejected job', this.id);
84+
}
85+
86+
async accept(
87+
entry: AgentEntry,
88+
autoSubscribe: AutoSubscribe = AutoSubscribe.SUBSCRIBE_ALL,
89+
autoDisconnect: AutoDisconnect = AutoDisconnect.ROOM_EMPTY,
90+
grants: VideoGrant,
91+
name: string = '',
92+
identity: string = '',
93+
metadata: string = '',
94+
) {
95+
if (this.#answered) {
96+
throw new AnsweredError();
97+
}
98+
this.#answered = true;
99+
100+
const assign = new EventEmitter();
101+
assign.on('error', (e) => {
102+
throw e;
103+
});
104+
105+
const data: AcceptData = {
106+
entry,
107+
autoSubscribe,
108+
autoDisconnect,
109+
grants,
110+
name,
111+
identity,
112+
metadata,
113+
assign,
114+
};
115+
116+
this.tx.emit('recv', { avail: true, data } as AvailRes);
117+
118+
this.logger.info('accepted job', this.id);
119+
}
120+
}

agents/src/log.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import pino from 'pino';
2+
3+
export const log = pino({
4+
transport: {
5+
target: 'pino-pretty',
6+
options: {
7+
colorize: true,
8+
},
9+
},
10+
});

bun.lockb

15.2 KB
Binary file not shown.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
{
2+
"private": true,
23
"workspaces": ["agents"]
34
}

0 commit comments

Comments
 (0)