Skip to content

Commit 0785f6e

Browse files
feat(logs-api): expose logs as api + can subscribe to workflow execution using webhook url (#1287)
* feat(logs-api): expose logs as api + can subscribe to workflow exection using webhook url * fix scroll * Update apps/docs/content/docs/execution/api.mdx Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> * fix rate limits * address greptile comments * remove unused file * address more greptile comments * minor UI changes * fix atomicity to prevent races * make search param sensible --------- Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
1 parent cf4a935 commit 0785f6e

File tree

24 files changed

+9929
-26
lines changed

24 files changed

+9929
-26
lines changed

apps/docs/content/docs/execution/api.mdx

Lines changed: 532 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"title": "Execution",
3-
"pages": ["basics", "advanced"]
3+
"pages": ["basics", "advanced", "api"]
44
}

apps/sim/app/api/v1/auth.ts

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import { eq } from 'drizzle-orm'
2+
import type { NextRequest } from 'next/server'
3+
import { createLogger } from '@/lib/logs/console/logger'
4+
import { db } from '@/db'
5+
import { apiKey as apiKeyTable } from '@/db/schema'
6+
7+
const logger = createLogger('V1Auth')
8+
9+
export interface AuthResult {
10+
authenticated: boolean
11+
userId?: string
12+
error?: string
13+
}
14+
15+
export async function authenticateApiKey(request: NextRequest): Promise<AuthResult> {
16+
const apiKey = request.headers.get('x-api-key')
17+
18+
if (!apiKey) {
19+
return {
20+
authenticated: false,
21+
error: 'API key required',
22+
}
23+
}
24+
25+
try {
26+
const [keyRecord] = await db
27+
.select({
28+
userId: apiKeyTable.userId,
29+
expiresAt: apiKeyTable.expiresAt,
30+
})
31+
.from(apiKeyTable)
32+
.where(eq(apiKeyTable.key, apiKey))
33+
.limit(1)
34+
35+
if (!keyRecord) {
36+
logger.warn('Invalid API key attempted', { keyPrefix: apiKey.slice(0, 8) })
37+
return {
38+
authenticated: false,
39+
error: 'Invalid API key',
40+
}
41+
}
42+
43+
if (keyRecord.expiresAt && keyRecord.expiresAt < new Date()) {
44+
logger.warn('Expired API key attempted', { userId: keyRecord.userId })
45+
return {
46+
authenticated: false,
47+
error: 'API key expired',
48+
}
49+
}
50+
51+
await db.update(apiKeyTable).set({ lastUsed: new Date() }).where(eq(apiKeyTable.key, apiKey))
52+
53+
return {
54+
authenticated: true,
55+
userId: keyRecord.userId,
56+
}
57+
} catch (error) {
58+
logger.error('API key authentication error', { error })
59+
return {
60+
authenticated: false,
61+
error: 'Authentication failed',
62+
}
63+
}
64+
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
import { and, eq } from 'drizzle-orm'
2+
import { type NextRequest, NextResponse } from 'next/server'
3+
import { createLogger } from '@/lib/logs/console/logger'
4+
import { createApiResponse, getUserLimits } from '@/app/api/v1/logs/meta'
5+
import { checkRateLimit, createRateLimitResponse } from '@/app/api/v1/middleware'
6+
import { db } from '@/db'
7+
import { permissions, workflow, workflowExecutionLogs } from '@/db/schema'
8+
9+
const logger = createLogger('V1LogDetailsAPI')
10+
11+
export const revalidate = 0
12+
13+
export async function GET(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
14+
const requestId = crypto.randomUUID().slice(0, 8)
15+
16+
try {
17+
const rateLimit = await checkRateLimit(request, 'logs-detail')
18+
if (!rateLimit.allowed) {
19+
return createRateLimitResponse(rateLimit)
20+
}
21+
22+
const userId = rateLimit.userId!
23+
const { id } = await params
24+
25+
const rows = await db
26+
.select({
27+
id: workflowExecutionLogs.id,
28+
workflowId: workflowExecutionLogs.workflowId,
29+
executionId: workflowExecutionLogs.executionId,
30+
stateSnapshotId: workflowExecutionLogs.stateSnapshotId,
31+
level: workflowExecutionLogs.level,
32+
trigger: workflowExecutionLogs.trigger,
33+
startedAt: workflowExecutionLogs.startedAt,
34+
endedAt: workflowExecutionLogs.endedAt,
35+
totalDurationMs: workflowExecutionLogs.totalDurationMs,
36+
executionData: workflowExecutionLogs.executionData,
37+
cost: workflowExecutionLogs.cost,
38+
files: workflowExecutionLogs.files,
39+
createdAt: workflowExecutionLogs.createdAt,
40+
workflowName: workflow.name,
41+
workflowDescription: workflow.description,
42+
workflowColor: workflow.color,
43+
workflowFolderId: workflow.folderId,
44+
workflowUserId: workflow.userId,
45+
workflowWorkspaceId: workflow.workspaceId,
46+
workflowCreatedAt: workflow.createdAt,
47+
workflowUpdatedAt: workflow.updatedAt,
48+
})
49+
.from(workflowExecutionLogs)
50+
.innerJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
51+
.innerJoin(
52+
permissions,
53+
and(
54+
eq(permissions.entityType, 'workspace'),
55+
eq(permissions.entityId, workflow.workspaceId),
56+
eq(permissions.userId, userId)
57+
)
58+
)
59+
.where(eq(workflowExecutionLogs.id, id))
60+
.limit(1)
61+
62+
const log = rows[0]
63+
if (!log) {
64+
return NextResponse.json({ error: 'Log not found' }, { status: 404 })
65+
}
66+
67+
const workflowSummary = {
68+
id: log.workflowId,
69+
name: log.workflowName,
70+
description: log.workflowDescription,
71+
color: log.workflowColor,
72+
folderId: log.workflowFolderId,
73+
userId: log.workflowUserId,
74+
workspaceId: log.workflowWorkspaceId,
75+
createdAt: log.workflowCreatedAt,
76+
updatedAt: log.workflowUpdatedAt,
77+
}
78+
79+
const response = {
80+
id: log.id,
81+
workflowId: log.workflowId,
82+
executionId: log.executionId,
83+
level: log.level,
84+
trigger: log.trigger,
85+
startedAt: log.startedAt.toISOString(),
86+
endedAt: log.endedAt?.toISOString() || null,
87+
totalDurationMs: log.totalDurationMs,
88+
files: log.files || undefined,
89+
workflow: workflowSummary,
90+
executionData: log.executionData as any,
91+
cost: log.cost as any,
92+
createdAt: log.createdAt.toISOString(),
93+
}
94+
95+
// Get user's workflow execution limits and usage
96+
const limits = await getUserLimits(userId)
97+
98+
// Create response with limits information
99+
const apiResponse = createApiResponse({ data: response }, limits, rateLimit)
100+
101+
return NextResponse.json(apiResponse.body, { headers: apiResponse.headers })
102+
} catch (error: any) {
103+
logger.error(`[${requestId}] Log details fetch error`, { error: error.message })
104+
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
105+
}
106+
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import { and, eq } from 'drizzle-orm'
2+
import { type NextRequest, NextResponse } from 'next/server'
3+
import { createLogger } from '@/lib/logs/console/logger'
4+
import { createApiResponse, getUserLimits } from '@/app/api/v1/logs/meta'
5+
import { checkRateLimit, createRateLimitResponse } from '@/app/api/v1/middleware'
6+
import { db } from '@/db'
7+
import {
8+
permissions,
9+
workflow,
10+
workflowExecutionLogs,
11+
workflowExecutionSnapshots,
12+
} from '@/db/schema'
13+
14+
const logger = createLogger('V1ExecutionAPI')
15+
16+
export async function GET(
17+
request: NextRequest,
18+
{ params }: { params: Promise<{ executionId: string }> }
19+
) {
20+
try {
21+
const rateLimit = await checkRateLimit(request, 'logs-detail')
22+
if (!rateLimit.allowed) {
23+
return createRateLimitResponse(rateLimit)
24+
}
25+
26+
const userId = rateLimit.userId!
27+
const { executionId } = await params
28+
29+
logger.debug(`Fetching execution data for: ${executionId}`)
30+
31+
const rows = await db
32+
.select({
33+
log: workflowExecutionLogs,
34+
workflow: workflow,
35+
})
36+
.from(workflowExecutionLogs)
37+
.innerJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
38+
.innerJoin(
39+
permissions,
40+
and(
41+
eq(permissions.entityType, 'workspace'),
42+
eq(permissions.entityId, workflow.workspaceId),
43+
eq(permissions.userId, userId)
44+
)
45+
)
46+
.where(eq(workflowExecutionLogs.executionId, executionId))
47+
.limit(1)
48+
49+
if (rows.length === 0) {
50+
return NextResponse.json({ error: 'Workflow execution not found' }, { status: 404 })
51+
}
52+
53+
const { log: workflowLog } = rows[0]
54+
55+
const [snapshot] = await db
56+
.select()
57+
.from(workflowExecutionSnapshots)
58+
.where(eq(workflowExecutionSnapshots.id, workflowLog.stateSnapshotId))
59+
.limit(1)
60+
61+
if (!snapshot) {
62+
return NextResponse.json({ error: 'Workflow state snapshot not found' }, { status: 404 })
63+
}
64+
65+
const response = {
66+
executionId,
67+
workflowId: workflowLog.workflowId,
68+
workflowState: snapshot.stateData,
69+
executionMetadata: {
70+
trigger: workflowLog.trigger,
71+
startedAt: workflowLog.startedAt.toISOString(),
72+
endedAt: workflowLog.endedAt?.toISOString(),
73+
totalDurationMs: workflowLog.totalDurationMs,
74+
cost: workflowLog.cost || null,
75+
},
76+
}
77+
78+
logger.debug(`Successfully fetched execution data for: ${executionId}`)
79+
logger.debug(
80+
`Workflow state contains ${Object.keys((snapshot.stateData as any)?.blocks || {}).length} blocks`
81+
)
82+
83+
// Get user's workflow execution limits and usage
84+
const limits = await getUserLimits(userId)
85+
86+
// Create response with limits information
87+
const apiResponse = createApiResponse(
88+
{
89+
...response,
90+
},
91+
limits,
92+
rateLimit
93+
)
94+
95+
return NextResponse.json(apiResponse.body, { headers: apiResponse.headers })
96+
} catch (error) {
97+
logger.error('Error fetching execution data:', error)
98+
return NextResponse.json({ error: 'Failed to fetch execution data' }, { status: 500 })
99+
}
100+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
import { and, desc, eq, gte, inArray, lte, type SQL, sql } from 'drizzle-orm'
2+
import { workflow, workflowExecutionLogs } from '@/db/schema'
3+
4+
export interface LogFilters {
5+
workspaceId: string
6+
workflowIds?: string[]
7+
folderIds?: string[]
8+
triggers?: string[]
9+
level?: 'info' | 'error'
10+
startDate?: Date
11+
endDate?: Date
12+
executionId?: string
13+
minDurationMs?: number
14+
maxDurationMs?: number
15+
minCost?: number
16+
maxCost?: number
17+
model?: string
18+
cursor?: {
19+
startedAt: string
20+
id: string
21+
}
22+
order?: 'desc' | 'asc'
23+
}
24+
25+
export function buildLogFilters(filters: LogFilters): SQL<unknown> {
26+
const conditions: SQL<unknown>[] = []
27+
28+
// Required: workspace and permissions check
29+
conditions.push(eq(workflow.workspaceId, filters.workspaceId))
30+
31+
// Cursor-based pagination
32+
if (filters.cursor) {
33+
const cursorDate = new Date(filters.cursor.startedAt)
34+
if (filters.order === 'desc') {
35+
conditions.push(
36+
sql`(${workflowExecutionLogs.startedAt}, ${workflowExecutionLogs.id}) < (${cursorDate}, ${filters.cursor.id})`
37+
)
38+
} else {
39+
conditions.push(
40+
sql`(${workflowExecutionLogs.startedAt}, ${workflowExecutionLogs.id}) > (${cursorDate}, ${filters.cursor.id})`
41+
)
42+
}
43+
}
44+
45+
// Workflow IDs filter
46+
if (filters.workflowIds && filters.workflowIds.length > 0) {
47+
conditions.push(inArray(workflow.id, filters.workflowIds))
48+
}
49+
50+
// Folder IDs filter
51+
if (filters.folderIds && filters.folderIds.length > 0) {
52+
conditions.push(inArray(workflow.folderId, filters.folderIds))
53+
}
54+
55+
// Triggers filter
56+
if (filters.triggers && filters.triggers.length > 0 && !filters.triggers.includes('all')) {
57+
conditions.push(inArray(workflowExecutionLogs.trigger, filters.triggers))
58+
}
59+
60+
// Level filter
61+
if (filters.level) {
62+
conditions.push(eq(workflowExecutionLogs.level, filters.level))
63+
}
64+
65+
// Date range filters
66+
if (filters.startDate) {
67+
conditions.push(gte(workflowExecutionLogs.startedAt, filters.startDate))
68+
}
69+
70+
if (filters.endDate) {
71+
conditions.push(lte(workflowExecutionLogs.startedAt, filters.endDate))
72+
}
73+
74+
// Search filter (execution ID)
75+
if (filters.executionId) {
76+
conditions.push(eq(workflowExecutionLogs.executionId, filters.executionId))
77+
}
78+
79+
// Duration filters
80+
if (filters.minDurationMs !== undefined) {
81+
conditions.push(gte(workflowExecutionLogs.totalDurationMs, filters.minDurationMs))
82+
}
83+
84+
if (filters.maxDurationMs !== undefined) {
85+
conditions.push(lte(workflowExecutionLogs.totalDurationMs, filters.maxDurationMs))
86+
}
87+
88+
// Cost filters
89+
if (filters.minCost !== undefined) {
90+
conditions.push(sql`(${workflowExecutionLogs.cost}->>'total')::numeric >= ${filters.minCost}`)
91+
}
92+
93+
if (filters.maxCost !== undefined) {
94+
conditions.push(sql`(${workflowExecutionLogs.cost}->>'total')::numeric <= ${filters.maxCost}`)
95+
}
96+
97+
// Model filter
98+
if (filters.model) {
99+
conditions.push(sql`${workflowExecutionLogs.cost}->>'models' ? ${filters.model}`)
100+
}
101+
102+
// Combine all conditions with AND
103+
return conditions.length > 0 ? and(...conditions)! : sql`true`
104+
}
105+
106+
export function getOrderBy(order: 'desc' | 'asc' = 'desc') {
107+
return order === 'desc'
108+
? desc(workflowExecutionLogs.startedAt)
109+
: sql`${workflowExecutionLogs.startedAt} ASC`
110+
}

0 commit comments

Comments
 (0)