Skip to content

Commit 790e82f

Browse files
authored
Move job_context.connect inside of AgentSession.start (#689)
1 parent d650e30 commit 790e82f

19 files changed

+59
-51
lines changed

.changeset/eight-parents-drive.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
'@livekit/agents-plugin-livekit': patch
3+
'@livekit/agents': patch
4+
---
5+
6+
Remove requirement to call ctx.connect in entrypoint function

agents/src/ipc/job_proc_lazy_main.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import { EventEmitter, once } from 'node:events';
66
import { pathToFileURL } from 'node:url';
77
import type { Logger } from 'pino';
88
import { type Agent, isAgent } from '../generator.js';
9-
import { CurrentJobContext, JobContext, JobProcess, type RunningJobInfo } from '../job.js';
9+
import { JobContext, JobProcess, type RunningJobInfo, runWithJobContextAsync } from '../job.js';
1010
import { initializeLogger, log } from '../log.js';
1111
import { Future, shortuuid } from '../utils.js';
1212
import { defaultInitializeProcessFunc } from '../worker.js';
@@ -87,7 +87,6 @@ const startJob = (
8787
};
8888

8989
const ctx = new JobContext(proc, info, room, onConnect, onShutdown, new InfClient());
90-
new CurrentJobContext(ctx);
9190

9291
const task = new Promise<void>(async () => {
9392
const unconnectedTimeout = setTimeout(() => {
@@ -98,7 +97,11 @@ const startJob = (
9897
);
9998
}
10099
}, 10000);
101-
func(ctx).finally(() => clearTimeout(unconnectedTimeout));
100+
101+
// Run the job function within the AsyncLocalStorage context
102+
await runWithJobContextAsync(ctx, () => func(ctx)).finally(() => {
103+
clearTimeout(unconnectedTimeout);
104+
});
102105

103106
await once(closeEvent, 'close').then((close) => {
104107
logger.debug('shutting down');

agents/src/job.ts

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,35 +10,43 @@ import type {
1010
RtcConfiguration,
1111
} from '@livekit/rtc-node';
1212
import { ParticipantKind, RoomEvent, TrackKind } from '@livekit/rtc-node';
13+
import { AsyncLocalStorage } from 'node:async_hooks';
1314
import type { Logger } from 'pino';
1415
import type { InferenceExecutor } from './ipc/inference_executor.js';
1516
import { log } from './log.js';
1617

17-
export class CurrentJobContext {
18-
static #current: JobContext;
19-
20-
constructor(proc: JobContext) {
21-
CurrentJobContext.#current = proc;
22-
}
23-
24-
static getCurrent(): JobContext {
25-
return CurrentJobContext.#current;
26-
}
27-
}
18+
// AsyncLocalStorage for job context, similar to Python's contextvars
19+
const jobContextStorage = new AsyncLocalStorage<JobContext>();
2820

2921
/**
3022
* Returns the current job context.
3123
*
3224
* @throws {Error} if no job context is found
3325
*/
3426
export function getJobContext(): JobContext {
35-
const ctx = CurrentJobContext.getCurrent();
27+
const ctx = jobContextStorage.getStore();
3628
if (!ctx) {
3729
throw new Error('no job context found, are you running this code inside a job entrypoint?');
3830
}
3931
return ctx;
4032
}
4133

34+
/**
35+
* Runs a function within a job context, similar to Python's contextvars.
36+
* @internal
37+
*/
38+
export function runWithJobContext<T>(context: JobContext, fn: () => T): T {
39+
return jobContextStorage.run(context, fn);
40+
}
41+
42+
/**
43+
* Runs an async function within a job context, similar to Python's contextvars.
44+
* @internal
45+
*/
46+
export function runWithJobContextAsync<T>(context: JobContext, fn: () => Promise<T>): Promise<T> {
47+
return jobContextStorage.run(context, fn);
48+
}
49+
4250
/** Which tracks, if any, should the agent automatically subscribe to? */
4351
export enum AutoSubscribe {
4452
SUBSCRIBE_ALL,
@@ -89,6 +97,8 @@ export class JobContext {
8997
#logger: Logger;
9098
#inferenceExecutor: InferenceExecutor;
9199

100+
private connected: boolean = false;
101+
92102
constructor(
93103
proc: JobProcess,
94104
info: RunningJobInfo,
@@ -191,6 +201,10 @@ export class JobContext {
191201
autoSubscribe: AutoSubscribe = AutoSubscribe.SUBSCRIBE_ALL,
192202
rtcConfig?: RtcConfiguration,
193203
) {
204+
if (this.connected) {
205+
return;
206+
}
207+
194208
const opts = {
195209
e2ee,
196210
autoSubscribe: autoSubscribe == AutoSubscribe.SUBSCRIBE_ALL,
@@ -215,6 +229,7 @@ export class JobContext {
215229
});
216230
});
217231
}
232+
this.connected = true;
218233
}
219234

220235
/**

agents/src/voice/agent_session.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import type { AudioFrame, Room } from '@livekit/rtc-node';
55
import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter';
66
import { EventEmitter } from 'node:events';
77
import type { ReadableStream } from 'node:stream/web';
8+
import { getJobContext } from '../job.js';
89
import { ChatContext, ChatMessage } from '../llm/chat_context.js';
910
import type { LLM, RealtimeModel, RealtimeModelError, ToolChoice } from '../llm/index.js';
1011
import type { LLMError } from '../llm/llm.js';
@@ -184,6 +185,7 @@ export class AgentSession<
184185
this.agent = agent;
185186
this._updateAgentState('initializing');
186187

188+
const tasks: Promise<void>[] = [];
187189
// Check for existing input/output configuration and warn if needed
188190
if (this.input.audio && inputOptions?.audioEnabled !== false) {
189191
this.logger.warn('RoomIO audio input is enabled but input.audio is already set, ignoring..');
@@ -209,7 +211,15 @@ export class AgentSession<
209211
});
210212
this.roomIO.start();
211213

212-
await this.updateActivity(this.agent);
214+
const ctx = getJobContext();
215+
if (ctx && ctx.room === room && !room.isConnected) {
216+
this.logger.debug('Auto-connecting to room via job context');
217+
tasks.push(ctx.connect());
218+
}
219+
// TODO(AJS-265): add shutdown callback to job context
220+
tasks.push(this.updateActivity(this.agent));
221+
222+
await Promise.allSettled(tasks);
213223

214224
// Log used IO configuration
215225
this.logger.debug(
@@ -220,7 +230,6 @@ export class AgentSession<
220230
`using transcript io: \`AgentSession\` -> ${this.output.transcription ? '`' + this.output.transcription.constructor.name + '`' : '(none)'}`,
221231
);
222232

223-
this.logger.debug('AgentSession started');
224233
this.started = true;
225234
this._updateAgentState('listening');
226235
}

examples/src/basic_agent.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,6 @@ export default defineAgent({
5555
},
5656
});
5757

58-
// join the room when agent is ready
59-
await ctx.connect();
60-
6158
session.say('Hello, how can I help you today?');
6259
},
6360
});

examples/src/basic_eou.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ export default defineAgent({
99
entry: async (ctx: JobContext) => {
1010
const logger = log();
1111

12+
// Manual connection required since this example doesn't use AgentSession
1213
await ctx.connect();
1314

1415
// const eouModel = new turnDetector.EnglishModel();

examples/src/basic_tool_call_agent.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,6 @@ export default defineAgent({
148148
agent: routerAgent,
149149
room: ctx.room,
150150
});
151-
152-
await ctx.connect();
153151
},
154152
});
155153

examples/src/cartersia_tts.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,6 @@ export default defineAgent({
5555
},
5656
});
5757

58-
// join the room when agent is ready
59-
await ctx.connect();
60-
6158
session.say('Hello, how can I help you today?');
6259
},
6360
});

examples/src/comprehensive_test.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,8 +238,6 @@ export default defineAgent({
238238
proc.userData.vad = await silero.VAD.load();
239239
},
240240
entry: async (ctx: JobContext) => {
241-
await ctx.connect();
242-
243241
const vad = ctx.proc.userData.vad! as silero.VAD;
244242
const session = new voice.AgentSession({
245243
vad,

examples/src/drive-thru/drivethru_agent.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -380,8 +380,6 @@ export default defineAgent({
380380
proc.userData.vad = await silero.VAD.load();
381381
},
382382
entry: async (ctx: JobContext) => {
383-
await ctx.connect();
384-
385383
const userdata = await newUserData();
386384

387385
const vad = ctx.proc.userData.vad! as silero.VAD;

0 commit comments

Comments
 (0)