Skip to content

Commit 0ad319e

Browse files
authored
fix: task execution run in parallel (#6)
Signed-off-by: Aleš Kalfas <kalfas.ales@gmail.com>
1 parent f9042c1 commit 0ad319e

File tree

7 files changed

+394
-242
lines changed

7 files changed

+394
-242
lines changed

src/agent.ts

Lines changed: 52 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4,54 +4,78 @@ import "dotenv/config.js";
44

55
import { createAgent } from "./agents/agent-factory.js";
66
import { AgentKindSchema, AgentRegistry } from "./agents/agent-registry.js";
7+
import * as operator from "./agents/operator.js";
8+
import * as supervisor from "./agents/supervisor.js";
79
import { createConsoleReader } from "./helpers/reader.js";
810
import { TaskManager } from "./tasks/task-manager.js";
9-
import * as supervisor from "./agents/supervisor.js";
10-
import * as operator from "./agents/operator.js";
1111

1212
const registry = new AgentRegistry<BeeAgent>({
1313
async onCreate(
14-
{ kind, tools, type, instructions, description },
14+
config,
1515
poolStats,
1616
toolsFactory,
17-
): Promise<{ id: string; instance: BeeAgent }> {
17+
): Promise<{ agentId: string; instance: BeeAgent }> {
18+
const { kind: agentKind, type: agentType, instructions, description } = config;
1819
const num = poolStats.created + 1;
19-
const id = `${kind}:${type}[${num}]`;
20-
tools = tools == null ? toolsFactory.getAvailableToolsNames() : tools;
20+
const agentId = `${agentKind}:${agentType}[${num}]`;
21+
const tools = config.tools == null ? toolsFactory.getAvailableToolsNames() : config.tools;
2122
const instance = createAgent(
2223
{
23-
agentKind: kind,
24-
agentType: type,
25-
agentId: id,
24+
agentKind,
25+
agentType,
26+
agentId,
2627
description,
2728
instructions,
2829
tools,
2930
},
3031
toolsFactory,
3132
);
3233

33-
return { id, instance };
34+
return { agentId, instance };
3435
},
3536
async onDestroy(instance) {
3637
instance.destroy();
3738
},
3839
});
3940

40-
const taskManager = new TaskManager(async (task) => {
41-
const { instance: agent } = await registry.acquireAgent(task.agentKind, task.agentType);
42-
const prompt = task.input;
43-
const resp = await agent.run(
44-
{ prompt },
45-
{
46-
execution: {
47-
maxIterations: 8,
48-
maxRetriesPerStep: 2,
49-
totalMaxRetries: 10,
50-
},
51-
},
52-
);
53-
return resp.result.text;
54-
});
41+
const taskManager = new TaskManager(
42+
async (task, taskManager, { onAgentCreate, onAgentComplete, onAgentError }) => {
43+
const agent = await registry.acquireAgent(task.agentKind, task.agentType);
44+
onAgentCreate(task.id, agent.agentId, taskManager);
45+
const { instance } = agent;
46+
const prompt = task.input;
47+
instance
48+
.run(
49+
{ prompt },
50+
{
51+
execution: {
52+
maxIterations: 8,
53+
maxRetriesPerStep: 2,
54+
totalMaxRetries: 10,
55+
},
56+
},
57+
)
58+
.observe((emitter) => {
59+
emitter.on("update", (data, meta) => {
60+
reader.write(
61+
`${(meta.creator as any).input.meta.name} 🤖 (${data.update.key}) :`,
62+
data.update.value,
63+
);
64+
});
65+
emitter.on("error", (data, meta) => {
66+
reader.write(
67+
`${(meta.creator as any).input.meta.name} 🤖 (${data.error.name}) :`,
68+
data.error.message,
69+
);
70+
});
71+
})
72+
.then((resp) => onAgentComplete(resp.result.text, task.id, agent.agentId, taskManager))
73+
.catch((err) => onAgentError(err, task.id, agent.agentId, taskManager))
74+
.finally(async () => {
75+
await registry.releaseAgent(agent.agentId);
76+
});
77+
},
78+
);
5579

5680
registry.registerToolsFactories([
5781
["supervisor", new supervisor.ToolsFactory(registry, taskManager)],
@@ -67,7 +91,7 @@ registry.registerAgentType({
6791
maxPoolSize: 1,
6892
});
6993

70-
const { instance: supervisorAgent, agentId: supervisorAgentId } = await registry.acquireAgent(
94+
const { instance: supervisorAgent } = await registry.acquireAgent(
7195
AgentKindSchema.Enum.supervisor,
7296
supervisor.AgentTypes.BOSS,
7397
);
@@ -81,7 +105,7 @@ const { instance: supervisorAgent, agentId: supervisorAgentId } = await registry
81105
// Can you runt these tasks?
82106
// Can you list their results?
83107

84-
// Can you create poems on each of these topics: night, sky, fall, love, hate?
108+
// Can you generate poem for each of these topics: love, day, night?
85109
// Can you get list of articles about each of these topics: deepseek, interstellar engine, agi?
86110

87111
const reader = createConsoleReader({ fallback: "What is the current weather in Las Vegas?" });
@@ -90,11 +114,7 @@ for await (const { prompt } of reader) {
90114
const response = await supervisorAgent
91115
.run(
92116
{
93-
prompt: `# State
94-
- Active agents: ${JSON.stringify(registry.getActiveAgents().map(({ id, kind, type, inUse }) => ({ id, kind, type, inUse })))}
95-
- Active tasks: ${JSON.stringify(taskManager.getAllTaskStatuses(supervisorAgentId).map(({ id, isRunning, isOccupied, isCompleted, nextRunAt, lastRunAt, ownerAgentId, currentAgentId, occupiedSince }) => ({ id, isRunning, isOccupied, isCompleted, nextRunAt, lastRunAt, ownerAgentId, currentAgentId, occupiedSince })))}
96-
97-
${prompt}`,
117+
prompt,
98118
},
99119
{
100120
execution: {

src/agents/agent-factory.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ export function createAgent<TInput extends BaseCreateAgentInput>(
3434
templates: {
3535
system: (template) =>
3636
template.fork((config) => {
37-
config.defaults.instructions = `${supervisor.SUPERVISOR_INSTRUCTIONS}\n\n${generalInstructions}`;
37+
config.defaults.instructions = `${supervisor.SUPERVISOR_INSTRUCTIONS(input.agentKind, input.agentType, input.agentId)}\n\n${generalInstructions}`;
3838
}),
3939
},
4040
});

src/agents/agent-registry.ts

Lines changed: 28 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ export const AgentConfigSchema = z.object({
2424
.array(z.string())
2525
.nullish()
2626
.describe(
27-
"List of tool identifiers that this agent type can utilize. Null/undefined means all available.",
27+
"List of tool identifiers that this agent type can utilize. Null/undefined means all available. Empty array means no tools.",
2828
),
2929
maxPoolSize: z
3030
.number()
@@ -45,7 +45,7 @@ export type AgentConfig = z.infer<typeof AgentConfigSchema>;
4545
*/
4646
export const AgentSchema = z.object({
4747
/** Unique identifier for this specific agent instance */
48-
id: z.string(),
48+
agentId: z.string(),
4949
/** The type of agent this instance represents */
5050
type: z.string(),
5151
kind: AgentKindSchema,
@@ -59,6 +59,9 @@ export const AgentSchema = z.object({
5959
instance: z.any(),
6060
});
6161
export type Agent = z.infer<typeof AgentSchema>;
62+
export type AgentWithInstance<TAgentInstance> = Omit<Agent, "instance"> & {
63+
instance: TAgentInstance;
64+
};
6265

6366
/**
6467
* Schema for an available tool.
@@ -99,13 +102,13 @@ export interface AgentLifecycleCallbacks<TAgentInstance> {
99102
* @param config - Configuration for the agent
100103
* @param poolStats - Statistics of the agent pool
101104
* @param toolsFactory - Factory to create tools
102-
* @returns Promise resolving to the new agent's ID
105+
* @returns Promise resolving to the new agent's id and instance
103106
*/
104107
onCreate: (
105108
config: AgentConfig,
106109
poolStats: PoolStats,
107110
toolsFactory: BaseToolsFactory,
108-
) => Promise<{ id: string; instance: TAgentInstance }>;
111+
) => Promise<{ agentId: string; instance: TAgentInstance }>;
109112

110113
/**
111114
* Called when an agent is being destroyed
@@ -135,13 +138,6 @@ export interface AgentInstanceRef<TInstance> {
135138
instance: TInstance;
136139
}
137140

138-
function ref<TAgentInstance>(
139-
agentId: string,
140-
instance: TAgentInstance,
141-
): AgentInstanceRef<TAgentInstance> {
142-
return { agentId, instance };
143-
}
144-
145141
/**
146142
* Registry for managing agent types, instances, and pools.
147143
* Provides functionality for:
@@ -158,12 +154,10 @@ export class AgentRegistry<TAgentInstance> {
158154
private readonly logger: Logger;
159155
/** Map of registered agent kind and their configurations */
160156
private agentConfigs: Map<AgentKind, AgentConfigMap>;
161-
/** Map of all active agent instances */
162-
private activeAgents = new Map<string, Agent>();
157+
/** Map of all agent instances */
158+
private agents = new Map<string, Agent>();
163159
/** Map of agent pools by kind and type, containing sets of available agent IDs */
164160
private agentPools: Map<AgentKind, AgentTypePoolMap>;
165-
/** Map of agent instances available by agent IDs */
166-
private agentInstances = new Map<string, TAgentInstance>();
167161
/** Callbacks for agent lifecycle events */
168162
private callbacks: AgentLifecycleCallbacks<TAgentInstance>;
169163
/** Maps of tools factories for use by agents per agent kinds */
@@ -249,7 +243,7 @@ export class AgentRegistry<TAgentInstance> {
249243

250244
const toolsFactory = this.getToolsFactory(config.kind);
251245
const availableTools = toolsFactory.getAvailableTools();
252-
if (config.tools) {
246+
if (config.tools?.filter((it) => !!it.length).length) {
253247
const undefinedTools = config.tools.filter(
254248
(tool) => !availableTools.some((at) => at.name === tool),
255249
);
@@ -314,7 +308,8 @@ export class AgentRegistry<TAgentInstance> {
314308
});
315309

316310
for (let i = 0; i < needed; i++) {
317-
const { agentId: agentId } = await this.createAgent(kind, type, true);
311+
const agent = await this.createAgent(kind, type, true);
312+
const { agentId } = agent;
318313
pool.add(agentId);
319314
this.logger.trace("Added agent to pool", { kind, type, agentId });
320315
}
@@ -353,7 +348,7 @@ export class AgentRegistry<TAgentInstance> {
353348
* @returns Promise resolving to the agent ID
354349
* @throws Error if no agents are available and pool is at capacity
355350
*/
356-
async acquireAgent(kind: AgentKind, type: string): Promise<AgentInstanceRef<TAgentInstance>> {
351+
async acquireAgent(kind: AgentKind, type: string): Promise<AgentWithInstance<TAgentInstance>> {
357352
this.logger.debug("Attempting to acquire agent", { type });
358353
const config = this.getAgentTypeConfig(kind, type);
359354
const pool = this.getAgentPoolMap(kind, type);
@@ -365,7 +360,7 @@ export class AgentRegistry<TAgentInstance> {
365360

366361
// Try to get an available agent from the pool
367362
for (const agentId of pool) {
368-
const agent = this.activeAgents.get(agentId);
363+
const agent = this.agents.get(agentId);
369364
if (agent && !agent.inUse) {
370365
pool.delete(agentId);
371366
agent.inUse = true;
@@ -377,7 +372,7 @@ export class AgentRegistry<TAgentInstance> {
377372
await this.callbacks.onAcquire(agentId);
378373
}
379374

380-
return ref(agentId, agent.instance);
375+
return agent as AgentWithInstance<TAgentInstance>;
381376
}
382377
}
383378

@@ -406,7 +401,7 @@ export class AgentRegistry<TAgentInstance> {
406401
*/
407402
async releaseAgent(agentId: string): Promise<void> {
408403
this.logger.debug("Attempting to release agent", { agentId });
409-
const agent = this.activeAgents.get(agentId);
404+
const agent = this.agents.get(agentId);
410405
if (!agent) {
411406
this.logger.error("Agent not found for release", { agentId });
412407
throw new Error(`Agent with ID '${agentId}' not found`);
@@ -443,28 +438,25 @@ export class AgentRegistry<TAgentInstance> {
443438
kind: AgentKind,
444439
type: string,
445440
forPool: boolean,
446-
): Promise<AgentInstanceRef<TAgentInstance>> {
441+
): Promise<AgentWithInstance<TAgentInstance>> {
447442
this.logger.debug("Creating new agent", { kind, type, forPool });
448443
const config = this.getAgentTypeConfig(kind, type);
449444
const poolStats = this.getPoolStats(kind, type);
450445
const toolsFactory = this.getToolsFactory(kind);
451-
const { id: agentId, instance } = await this.callbacks.onCreate(
452-
config,
453-
poolStats,
454-
toolsFactory,
455-
);
446+
const { agentId, instance } = await this.callbacks.onCreate(config, poolStats, toolsFactory);
456447

457-
this.activeAgents.set(agentId, {
458-
id: agentId,
448+
const agent = {
449+
agentId,
459450
kind,
460451
type,
461452
config,
462453
inUse: !forPool,
463454
instance,
464-
});
455+
} satisfies Agent;
456+
this.agents.set(agentId, agent);
465457

466458
this.logger.info("Agent created successfully", { agentId, type, forPool });
467-
return ref(agentId, instance);
459+
return agent;
468460
}
469461

470462
/**
@@ -474,7 +466,7 @@ export class AgentRegistry<TAgentInstance> {
474466
*/
475467
async destroyAgent(agentId: string): Promise<void> {
476468
this.logger.debug("Attempting to destroy agent", { agentId });
477-
const agent = this.activeAgents.get(agentId);
469+
const agent = this.agents.get(agentId);
478470
if (!agent) {
479471
this.logger.error("Agent not found for destruction", { agentId });
480472
throw new Error(`Agent with ID '${agentId}' not found`);
@@ -488,7 +480,7 @@ export class AgentRegistry<TAgentInstance> {
488480
}
489481

490482
await this.callbacks.onDestroy(agent.instance);
491-
this.activeAgents.delete(agentId);
483+
this.agents.delete(agentId);
492484
this.logger.info("Agent destroyed successfully", {
493485
agentId,
494486
kind: agent.kind,
@@ -502,7 +494,7 @@ export class AgentRegistry<TAgentInstance> {
502494
*/
503495
getActiveAgents(): Agent[] {
504496
this.logger.trace("Getting active agents");
505-
return Array.from(this.activeAgents.values());
497+
return Array.from(this.agents.values()).filter((a) => a.inUse);
506498
}
507499

508500
/**
@@ -513,7 +505,7 @@ export class AgentRegistry<TAgentInstance> {
513505
*/
514506
getAgent(agentId: string): Agent {
515507
this.logger.trace("Getting agent by ID", { agentId });
516-
const agent = this.activeAgents.get(agentId);
508+
const agent = this.agents.get(agentId);
517509
if (!agent) {
518510
this.logger.error("Agent not found", { agentId });
519511
throw new Error(`Agent with ID '${agentId}' not found`);
@@ -535,9 +527,7 @@ export class AgentRegistry<TAgentInstance> {
535527
return { poolSize: 0, available: 0, inUse: 0, created: 0 };
536528
}
537529

538-
const available = Array.from(pool).filter(
539-
(agentId) => !this.activeAgents.get(agentId)?.inUse,
540-
).length;
530+
const available = Array.from(pool).filter((agentId) => !this.agents.get(agentId)?.inUse).length;
541531

542532
const stats = {
543533
poolSize: config.maxPoolSize,

0 commit comments

Comments
 (0)