Skip to content

Commit 686f5a9

Browse files
Support listening to top-level composio events (#3125)
## Summary - Allow `composio listen` to subscribe directly to top-level `composio.*` project event types without creating a temporary trigger. - Keep the existing temporary-trigger flow for standard trigger slugs. - Update help text and root command descriptions to document the new behavior. - Add analytics/test layer coverage for the new listen path and the existing trigger path. ## Testing - Added CLI test coverage for listening to `composio.connected_account.expired` and verifying no temporary trigger is created. - Added CLI test coverage for the existing trigger-slug flow to confirm it still creates and disables a temporary trigger. - Added analytics test coverage to ensure `cli_version` is present on tracked events. - Not run: full repository test suite.
1 parent efcb8c2 commit 686f5a9

File tree

6 files changed

+320
-77
lines changed

6 files changed

+320
-77
lines changed

ts/packages/cli/src/analytics/dispatch.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,12 +212,12 @@ export const getCurrentCwdSessionId = (): string | undefined => {
212212
const withCliSessionId = (event: TrackEvent): TrackEvent => {
213213
if (!event) return event;
214214
const cliSessionId = getCurrentCwdSessionId();
215-
if (!cliSessionId) return event;
216215
return {
217216
...event,
218217
properties: {
219218
...(event.properties ?? {}),
220-
cli_session_id: cliSessionId,
219+
cli_version: event.properties?.cli_version ?? constants.APP_VERSION,
220+
...(cliSessionId ? { cli_session_id: cliSessionId } : {}),
221221
},
222222
};
223223
};

ts/packages/cli/src/commands/listen.cmd.ts

Lines changed: 161 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { Args, Command, Options } from '@effect/cli';
22
import { FileSystem } from '@effect/platform';
3+
import type { Composio as RawComposioClient } from '@composio/client';
34
import { Deferred, Effect, Option, Runtime } from 'effect';
45
import path from 'node:path';
56
import { requireAuth } from 'src/effects/require-auth';
@@ -21,17 +22,21 @@ import { matchesTriggerListenFilters } from './triggers/filter';
2122
import { parseTriggerListenEvent } from './triggers/parse';
2223

2324
const slug = Args.text({ name: 'slug' }).pipe(
24-
Args.withDescription('Trigger slug (e.g. "GMAIL_NEW_GMAIL_MESSAGE")')
25+
Args.withDescription(
26+
'Trigger slug (e.g. "GMAIL_NEW_GMAIL_MESSAGE") or project event type (e.g. "composio.connected_account.expired")'
27+
)
2528
);
2629

2730
const params = Options.text('params').pipe(
2831
Options.withAlias('p'),
29-
Options.withDescription('Trigger create params as JSON/JS object, @file, or - for stdin'),
32+
Options.withDescription(
33+
'Trigger create params as JSON/JS object, @file, or - for stdin. Only valid for trigger slugs.'
34+
),
3035
Options.optional
3136
);
3237

3338
const maxEvents = Options.integer('max-events').pipe(
34-
Options.withDescription('Stop after receiving N events for this temporary trigger'),
39+
Options.withDescription('Stop after receiving N matching events'),
3540
Options.optional
3641
);
3742

@@ -57,6 +62,8 @@ const debug = Options.boolean('debug').pipe(
5762
const sanitizePathPart = (value: string): string =>
5863
value.replace(/[^A-Z0-9._-]+/gi, '-').replace(/^-+|-+$/g, '') || 'unknown';
5964

65+
const isProjectEventType = (value: string): boolean => value.startsWith('composio.');
66+
6067
const resolveParamsInput = (input: Option.Option<string>) =>
6168
resolveOptionalTextInput(input, { missingValue: '{}' });
6269

@@ -79,6 +86,58 @@ const parseCreateParams = (raw: string) =>
7986
return parsed as Record<string, unknown>;
8087
});
8188

89+
const assertSupportedListenParams = (params: {
90+
listeningToProjectEvent: boolean;
91+
slug: string;
92+
createParamsInput: Record<string, unknown>;
93+
}) =>
94+
params.listeningToProjectEvent && Object.keys(params.createParamsInput).length > 0
95+
? Effect.fail(
96+
new Error(
97+
`--params is only supported for trigger slugs. "${params.slug}" is a project-level composio.* event type and does not create a temporary trigger.`
98+
)
99+
)
100+
: Effect.void;
101+
102+
const resolveConnectedAccountIdForTrigger = (params: {
103+
client: RawComposioClient;
104+
slug: string;
105+
consumerUserId: string;
106+
}) =>
107+
Effect.gen(function* () {
108+
const toolkitSlug = toolkitFromToolSlug(params.slug);
109+
if (!toolkitSlug) {
110+
return yield* Effect.fail(
111+
new Error(
112+
`Could not infer a toolkit from trigger slug "${params.slug}". Use a standard trigger slug such as "GMAIL_NEW_GMAIL_MESSAGE", or a project event type such as "composio.connected_account.expired".`
113+
)
114+
);
115+
}
116+
117+
const connectedAccounts = yield* Effect.tryPromise({
118+
try: () =>
119+
params.client.connectedAccounts.list({
120+
toolkit_slugs: [toolkitSlug],
121+
user_ids: [params.consumerUserId],
122+
statuses: ['ACTIVE'],
123+
limit: 100,
124+
}),
125+
catch: error =>
126+
new Error(`Failed to list connected accounts for "${toolkitSlug}": ${String(error)}`),
127+
});
128+
const connectedAccountId = selectConnectedAccountId(connectedAccounts.items);
129+
130+
if (!connectedAccountId) {
131+
return yield* Effect.fail(
132+
new Error(
133+
`No active connected account found for toolkit "${toolkitSlug}" and consumer user "${params.consumerUserId}". Run \`composio link ${toolkitSlug}\` first.`
134+
)
135+
);
136+
}
137+
138+
return connectedAccountId;
139+
});
140+
82141
const selectConnectedAccountId = (
83142
items: ReadonlyArray<{
84143
id: string;
@@ -102,6 +161,9 @@ const emitStreamLine = (line: string, ui: TerminalUI) =>
102161
yield* ui.output(line, { force: true });
103162
});
104163

164+
const eventTypeOf = (eventData: Record<string, unknown>): string | undefined =>
165+
typeof eventData.type === 'string' && eventData.type.length > 0 ? eventData.type : undefined;
166+
105167
const extractEventFileId = (eventData: Record<string, unknown>): string => {
106168
const candidates = [
107169
eventData.id,
@@ -180,6 +242,23 @@ const formatStreamValue = (value: unknown): string => {
180242
return JSON.stringify(value);
181243
};
182244

245+
const formatStopMessage = (params: {
246+
matchingEvents: number;
247+
timedOut: boolean;
248+
temporaryTriggerDisabled: boolean;
249+
}): string => {
250+
const eventLabel = `event${params.matchingEvents === 1 ? '' : 's'}`;
251+
if (params.timedOut) {
252+
return params.temporaryTriggerDisabled
253+
? `Stopped after timeout with ${params.matchingEvents} matching ${eventLabel}. Temporary trigger disabled.`
254+
: `Stopped after timeout with ${params.matchingEvents} matching ${eventLabel}.`;
255+
}
256+
257+
return params.temporaryTriggerDisabled
258+
? `Stopped after receiving ${params.matchingEvents} ${eventLabel}. Temporary trigger disabled.`
259+
: `Stopped after receiving ${params.matchingEvents} ${eventLabel}.`;
260+
};
261+
183262
const TIMEOUT_UNITS_MS: Record<string, number> = {
184263
ms: 1,
185264
millisecond: 1,
@@ -252,44 +331,27 @@ export const listenCmd = Command.make(
252331
);
253332
}
254333

334+
const listeningToProjectEvent = isProjectEventType(slug);
255335
const rawParams = Option.isSome(params)
256336
? (yield* resolveParamsInput(params))?.trim() || '{}'
257337
: '{}';
258338
const createParamsInput = yield* parseCreateParams(rawParams);
259-
const toolkitSlug = toolkitFromToolSlug(slug);
260-
if (!toolkitSlug) {
261-
return yield* Effect.fail(
262-
new Error(
263-
`Could not infer a toolkit from trigger slug "${slug}". Use a standard trigger slug such as "GMAIL_NEW_GMAIL_MESSAGE".`
264-
)
265-
);
266-
}
267-
268-
const connectedAccounts = yield* Effect.tryPromise({
269-
try: () =>
270-
client.connectedAccounts.list({
271-
toolkit_slugs: [toolkitSlug],
272-
user_ids: resolvedProject.consumerUserId ? [resolvedProject.consumerUserId] : undefined,
273-
statuses: ['ACTIVE'],
274-
limit: 100,
275-
}),
276-
catch: error =>
277-
new Error(`Failed to list connected accounts for "${toolkitSlug}": ${String(error)}`),
278-
});
279-
const resolvedConnectedAccountId = selectConnectedAccountId(connectedAccounts.items);
280-
281-
if (!resolvedConnectedAccountId) {
282-
return yield* Effect.fail(
283-
new Error(
284-
`No active connected account found for toolkit "${toolkitSlug}" and consumer user "${resolvedProject.consumerUserId}". Run \`composio link ${toolkitSlug}\` first.`
285-
)
286-
);
287-
}
288-
289-
const createParams = {
290-
...createParamsInput,
291-
connected_account_id: resolvedConnectedAccountId,
292-
} as Parameters<typeof client.triggerInstances.upsert>[1];
339+
yield* assertSupportedListenParams({ listeningToProjectEvent, slug, createParamsInput });
340+
341+
const resolvedConnectedAccountId = listeningToProjectEvent
342+
? undefined
343+
: yield* resolveConnectedAccountIdForTrigger({
344+
client,
345+
slug,
346+
consumerUserId: resolvedProject.consumerUserId,
347+
});
348+
349+
const createParams = listeningToProjectEvent
350+
? undefined
351+
: ({
352+
...createParamsInput,
353+
connected_account_id: resolvedConnectedAccountId,
354+
} as Parameters<typeof client.triggerInstances.upsert>[1]);
293355
const timeoutMs = Option.match(timeout, {
294356
onNone: () => undefined,
295357
onSome: value => parseTimeoutMs(value),
@@ -311,7 +373,11 @@ export const listenCmd = Command.make(
311373
onNone: () => resolveFallbackArtifactsDir(),
312374
onSome: value => value.directoryPath,
313375
});
314-
const triggerDir = path.join(artifactsRoot, 'triggers', sanitizePathPart(slug));
376+
const triggerDir = path.join(
377+
artifactsRoot,
378+
listeningToProjectEvent ? 'events' : 'triggers',
379+
sanitizePathPart(slug)
380+
);
315381
const streamFilePath = path.join(triggerDir, 'events.jsonl');
316382

317383
yield* fs.makeDirectory(triggerDir, { recursive: true });
@@ -323,21 +389,28 @@ export const listenCmd = Command.make(
323389
const seenEventIds = new Set<string>();
324390

325391
yield* Effect.acquireUseRelease(
326-
Effect.tryPromise({
327-
try: () => client.triggerInstances.upsert(slug, createParams),
328-
catch: error =>
329-
new Error(`Failed to create temporary trigger "${slug}": ${String(error)}`),
330-
}),
392+
listeningToProjectEvent
393+
? Effect.succeed<null | { trigger_id: string }>(null)
394+
: Effect.tryPromise({
395+
try: () => client.triggerInstances.upsert(slug, createParams),
396+
catch: error =>
397+
new Error(`Failed to create temporary trigger "${slug}": ${String(error)}`),
398+
}),
331399
createdTrigger =>
332400
Effect.gen(function* () {
333401
yield* emitStreamLine(`listening for events ${slug} (tail at ${streamFilePath})`, ui);
334402
if (debug) {
335-
const debugMsg = `[debug] trigger_id=${createdTrigger.trigger_id} project=${resolvedProject.projectId} org=${resolvedProject.orgId} createParams=${JSON.stringify(createParams)}`;
403+
const debugMsg =
404+
createdTrigger === null
405+
? `[debug] event_type=${slug} project=${resolvedProject.projectId} org=${resolvedProject.orgId}`
406+
: `[debug] trigger_id=${createdTrigger.trigger_id} project=${resolvedProject.projectId} org=${resolvedProject.orgId} createParams=${JSON.stringify(createParams)}`;
336407
yield* emitStreamLine(debugMsg, ui);
337-
yield* emitStreamLine(
338-
`[debug] upsert response: ${JSON.stringify(createdTrigger)}`,
339-
ui
340-
);
408+
if (createdTrigger !== null) {
409+
yield* emitStreamLine(
410+
`[debug] upsert response: ${JSON.stringify(createdTrigger)}`,
411+
ui
412+
);
413+
}
341414
}
342415

343416
const onEvent = (eventData: Record<string, unknown>) => {
@@ -353,14 +426,20 @@ export const listenCmd = Command.make(
353426
ui
354427
);
355428
}
356-
const parsed = parseTriggerListenEvent(eventData);
357-
const filterResult = matchesTriggerListenFilters(
358-
{ triggerId: createdTrigger.trigger_id },
359-
parsed
360-
);
429+
const parsedTriggerEvent =
430+
createdTrigger === null ? undefined : parseTriggerListenEvent(eventData);
431+
const filterResult =
432+
createdTrigger === null
433+
? eventTypeOf(eventData) === slug
434+
: matchesTriggerListenFilters(
435+
{ triggerId: createdTrigger.trigger_id },
436+
parsedTriggerEvent!
437+
);
361438
if (debug) {
362439
yield* emitStreamLine(
363-
`[debug] parsed.id=${parsed.id} triggerSlug=${parsed.triggerSlug} trigger_id=${createdTrigger.trigger_id} match=${filterResult}`,
440+
createdTrigger === null
441+
? `[debug] event.type=${eventTypeOf(eventData) ?? '<missing>'} match=${filterResult}`
442+
: `[debug] parsed.id=${parsedTriggerEvent!.id} triggerSlug=${parsedTriggerEvent!.triggerSlug} trigger_id=${createdTrigger.trigger_id} match=${filterResult}`,
364443
ui
365444
);
366445
}
@@ -382,8 +461,9 @@ export const listenCmd = Command.make(
382461
const eventJson = JSON.stringify(eventData, null, 2);
383462
const streamEntry = JSON.stringify({
384463
event_id: eventFileId,
385-
trigger_id: createdTrigger.trigger_id,
386-
trigger_slug: slug,
464+
event_type: eventTypeOf(eventData),
465+
trigger_id: createdTrigger?.trigger_id,
466+
trigger_slug: createdTrigger ? slug : undefined,
387467
file_path: eventFilePath,
388468
received_at: new Date().toISOString(),
389469
});
@@ -440,34 +520,44 @@ export const listenCmd = Command.make(
440520
const stopReason = yield* Effect.raceFirst(listenEffect, Deferred.await(stopWhenDone));
441521
if (stopReason === 'max-events') {
442522
yield* ui.outro(
443-
`Stopped after receiving ${matchingEvents} events. Temporary trigger disabled.`
523+
formatStopMessage({
524+
matchingEvents,
525+
timedOut: false,
526+
temporaryTriggerDisabled: createdTrigger !== null,
527+
})
444528
);
445529
return;
446530
}
447531

448532
if (stopReason === 'timeout') {
449533
yield* ui.outro(
450-
`Stopped after timeout with ${matchingEvents} matching event${matchingEvents === 1 ? '' : 's'}. Temporary trigger disabled.`
534+
formatStopMessage({
535+
matchingEvents,
536+
timedOut: true,
537+
temporaryTriggerDisabled: createdTrigger !== null,
538+
})
451539
);
452540
}
453541
}),
454542
created =>
455-
Effect.tryPromise({
456-
try: () =>
457-
client.triggerInstances.manage.update(created.trigger_id, { status: 'disable' }),
458-
catch: error =>
459-
new Error(
460-
`Failed to disable temporary trigger "${created.trigger_id}": ${String(error)}`
461-
),
462-
}).pipe(
463-
Effect.catchAll(error =>
464-
ui.log.warn(error instanceof Error ? error.message : String(error))
465-
)
466-
)
543+
created === null
544+
? Effect.void
545+
: Effect.tryPromise({
546+
try: () =>
547+
client.triggerInstances.manage.update(created.trigger_id, { status: 'disable' }),
548+
catch: error =>
549+
new Error(
550+
`Failed to disable temporary trigger "${created.trigger_id}": ${String(error)}`
551+
),
552+
}).pipe(
553+
Effect.catchAll(error =>
554+
ui.log.warn(error instanceof Error ? error.message : String(error))
555+
)
556+
)
467557
);
468558
})
469559
).pipe(
470560
Command.withDescription(
471-
'Create a temporary subscription for consumer-project events and write each event to artifacts for easy background-agent consumption.'
561+
'Listen to consumer-project realtime events. Trigger slugs create a temporary trigger; top-level composio.* event types subscribe directly.'
472562
)
473563
);

0 commit comments

Comments
 (0)