Skip to content

Commit b296323

Browse files
feat(attachments): use filesystem for gmail, outlook triggers to save attachments (#1631)
* feat(outlook): add include attachment feature to outlook * add include attachments to gmail trigger * add gmail trigger, outlook block include attachments * fix rendering issue * remove comment * fix architecture * fix redeploy * pass files to logging session to surface in logs * fix gmail block parsing attachments * fix reads
1 parent d325fdd commit b296323

File tree

13 files changed

+557
-169
lines changed

13 files changed

+557
-169
lines changed

apps/sim/app/api/workflows/[id]/deploy/route.ts

Lines changed: 55 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
import { apiKey, db, workflow, workflowDeploymentVersion } from '@sim/db'
22
import { and, desc, eq, sql } from 'drizzle-orm'
3-
import type { NextRequest } from 'next/server'
3+
import { type NextRequest, NextResponse } from 'next/server'
44
import { v4 as uuidv4 } from 'uuid'
5-
import { generateApiKey } from '@/lib/api-key/service'
65
import { createLogger } from '@/lib/logs/console/logger'
76
import { generateRequestId } from '@/lib/utils'
87
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/db-helpers'
@@ -64,26 +63,7 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
6463
.orderBy(desc(apiKey.lastUsed), desc(apiKey.createdAt))
6564
.limit(1)
6665

67-
if (userApiKey.length === 0) {
68-
try {
69-
const newApiKeyVal = generateApiKey()
70-
const keyName = 'Default API Key'
71-
await db.insert(apiKey).values({
72-
id: uuidv4(),
73-
userId: workflowData.userId,
74-
workspaceId: null,
75-
name: keyName,
76-
key: newApiKeyVal,
77-
type: 'personal',
78-
createdAt: new Date(),
79-
updatedAt: new Date(),
80-
})
81-
keyInfo = { name: keyName, type: 'personal' }
82-
logger.info(`[${requestId}] Generated new API key for user: ${workflowData.userId}`)
83-
} catch (keyError) {
84-
logger.error(`[${requestId}] Failed to generate API key:`, keyError)
85-
}
86-
} else {
66+
if (userApiKey.length > 0) {
8767
keyInfo = { name: userApiKey[0].name, type: userApiKey[0].type as 'personal' | 'workspace' }
8868
}
8969
}
@@ -190,34 +170,6 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
190170
const deployedAt = new Date()
191171
logger.debug(`[${requestId}] Proceeding with deployment at ${deployedAt.toISOString()}`)
192172

193-
const userApiKey = await db
194-
.select({
195-
key: apiKey.key,
196-
})
197-
.from(apiKey)
198-
.where(and(eq(apiKey.userId, userId), eq(apiKey.type, 'personal')))
199-
.orderBy(desc(apiKey.lastUsed), desc(apiKey.createdAt))
200-
.limit(1)
201-
202-
if (userApiKey.length === 0) {
203-
try {
204-
const newApiKey = generateApiKey()
205-
await db.insert(apiKey).values({
206-
id: uuidv4(),
207-
userId,
208-
workspaceId: null,
209-
name: 'Default API Key',
210-
key: newApiKey,
211-
type: 'personal',
212-
createdAt: new Date(),
213-
updatedAt: new Date(),
214-
})
215-
logger.info(`[${requestId}] Generated new API key for user: ${userId}`)
216-
} catch (keyError) {
217-
logger.error(`[${requestId}] Failed to generate API key:`, keyError)
218-
}
219-
}
220-
221173
let keyInfo: { name: string; type: 'personal' | 'workspace' } | null = null
222174
let matchedKey: {
223175
id: string
@@ -226,13 +178,50 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
226178
type: 'personal' | 'workspace'
227179
} | null = null
228180

229-
if (providedApiKey) {
230-
let isValidKey = false
181+
// Use provided API key, or fall back to existing pinned API key for redeployment
182+
const apiKeyToUse = providedApiKey || workflowData!.pinnedApiKeyId
183+
184+
if (!apiKeyToUse) {
185+
return NextResponse.json(
186+
{ error: 'API key is required. Please create or select an API key before deploying.' },
187+
{ status: 400 }
188+
)
189+
}
190+
191+
let isValidKey = false
192+
193+
const currentUserId = session?.user?.id
194+
195+
if (currentUserId) {
196+
const [personalKey] = await db
197+
.select({
198+
id: apiKey.id,
199+
key: apiKey.key,
200+
name: apiKey.name,
201+
expiresAt: apiKey.expiresAt,
202+
})
203+
.from(apiKey)
204+
.where(
205+
and(
206+
eq(apiKey.id, apiKeyToUse),
207+
eq(apiKey.userId, currentUserId),
208+
eq(apiKey.type, 'personal')
209+
)
210+
)
211+
.limit(1)
231212

232-
const currentUserId = session?.user?.id
213+
if (personalKey) {
214+
if (!personalKey.expiresAt || personalKey.expiresAt >= new Date()) {
215+
matchedKey = { ...personalKey, type: 'personal' }
216+
isValidKey = true
217+
keyInfo = { name: personalKey.name, type: 'personal' }
218+
}
219+
}
220+
}
233221

234-
if (currentUserId) {
235-
const [personalKey] = await db
222+
if (!isValidKey) {
223+
if (workflowData!.workspaceId) {
224+
const [workspaceKey] = await db
236225
.select({
237226
id: apiKey.id,
238227
key: apiKey.key,
@@ -242,55 +231,26 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
242231
.from(apiKey)
243232
.where(
244233
and(
245-
eq(apiKey.id, providedApiKey),
246-
eq(apiKey.userId, currentUserId),
247-
eq(apiKey.type, 'personal')
234+
eq(apiKey.id, apiKeyToUse),
235+
eq(apiKey.workspaceId, workflowData!.workspaceId),
236+
eq(apiKey.type, 'workspace')
248237
)
249238
)
250239
.limit(1)
251240

252-
if (personalKey) {
253-
if (!personalKey.expiresAt || personalKey.expiresAt >= new Date()) {
254-
matchedKey = { ...personalKey, type: 'personal' }
241+
if (workspaceKey) {
242+
if (!workspaceKey.expiresAt || workspaceKey.expiresAt >= new Date()) {
243+
matchedKey = { ...workspaceKey, type: 'workspace' }
255244
isValidKey = true
256-
keyInfo = { name: personalKey.name, type: 'personal' }
257-
}
258-
}
259-
}
260-
261-
if (!isValidKey) {
262-
if (workflowData!.workspaceId) {
263-
const [workspaceKey] = await db
264-
.select({
265-
id: apiKey.id,
266-
key: apiKey.key,
267-
name: apiKey.name,
268-
expiresAt: apiKey.expiresAt,
269-
})
270-
.from(apiKey)
271-
.where(
272-
and(
273-
eq(apiKey.id, providedApiKey),
274-
eq(apiKey.workspaceId, workflowData!.workspaceId),
275-
eq(apiKey.type, 'workspace')
276-
)
277-
)
278-
.limit(1)
279-
280-
if (workspaceKey) {
281-
if (!workspaceKey.expiresAt || workspaceKey.expiresAt >= new Date()) {
282-
matchedKey = { ...workspaceKey, type: 'workspace' }
283-
isValidKey = true
284-
keyInfo = { name: workspaceKey.name, type: 'workspace' }
285-
}
245+
keyInfo = { name: workspaceKey.name, type: 'workspace' }
286246
}
287247
}
288248
}
249+
}
289250

290-
if (!isValidKey) {
291-
logger.warn(`[${requestId}] Invalid API key ID provided for workflow deployment: ${id}`)
292-
return createErrorResponse('Invalid API key provided', 400)
293-
}
251+
if (!isValidKey) {
252+
logger.warn(`[${requestId}] Invalid API key ID provided for workflow deployment: ${id}`)
253+
return createErrorResponse('Invalid API key provided', 400)
294254
}
295255

296256
// Attribution: this route is UI-only; require session user as actor

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/control-bar/components/deploy-modal/deploy-modal.tsx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,8 @@ export function DeployModal({
361361
await fetchVersions()
362362
} catch (error: unknown) {
363363
logger.error('Error deploying workflow:', { error })
364+
const errorMessage = error instanceof Error ? error.message : 'Failed to deploy workflow'
365+
setApiDeployError(errorMessage)
364366
} finally {
365367
setIsSubmitting(false)
366368
}

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/workflow-block.tsx

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,17 @@ export const WorkflowBlock = memo(
432432
}
433433
}, [id, blockHeight, blockWidth, updateBlockLayoutMetrics, updateNodeInternals, debounce])
434434

435+
// Subscribe to this block's subblock values to track changes for conditional rendering
436+
const blockSubBlockValues = useSubBlockStore(
437+
useCallback(
438+
(state) => {
439+
if (!activeWorkflowId) return {}
440+
return state.workflowValues[activeWorkflowId]?.[id] || {}
441+
},
442+
[activeWorkflowId, id]
443+
)
444+
)
445+
435446
// Memoized SubBlock layout management - only recalculate when dependencies change
436447
const subBlockRows = useMemo(() => {
437448
const rows: SubBlockConfig[][] = []
@@ -450,8 +461,7 @@ export const WorkflowBlock = memo(
450461
} else {
451462
// In normal mode, use merged state
452463
const blocks = useWorkflowStore.getState().blocks
453-
const activeWorkflowId = useWorkflowRegistry.getState().activeWorkflowId || undefined
454-
const mergedState = mergeSubblockState(blocks, activeWorkflowId, id)[id]
464+
const mergedState = mergeSubblockState(blocks, activeWorkflowId || undefined, id)[id]
455465
stateToUse = mergedState?.subBlocks || {}
456466
}
457467

@@ -552,6 +562,8 @@ export const WorkflowBlock = memo(
552562
data.subBlockValues,
553563
currentWorkflow.isDiffMode,
554564
currentBlock,
565+
blockSubBlockValues,
566+
activeWorkflowId,
555567
])
556568

557569
// Name editing handlers

apps/sim/background/webhook-execution.ts

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { createLogger } from '@/lib/logs/console/logger'
1010
import { LoggingSession } from '@/lib/logs/execution/logging-session'
1111
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
1212
import { decryptSecret } from '@/lib/utils'
13+
import { WebhookAttachmentProcessor } from '@/lib/webhooks/attachment-processor'
1314
import { fetchAndProcessAirtablePayloads, formatWebhookInput } from '@/lib/webhooks/utils'
1415
import {
1516
loadDeployedWorkflowState,
@@ -20,9 +21,66 @@ import { Executor } from '@/executor'
2021
import type { ExecutionResult } from '@/executor/types'
2122
import { Serializer } from '@/serializer'
2223
import { mergeSubblockState } from '@/stores/workflows/server-utils'
24+
import { getTrigger } from '@/triggers'
2325

2426
const logger = createLogger('TriggerWebhookExecution')
2527

28+
/**
29+
* Process trigger outputs based on their schema definitions
30+
* Finds outputs marked as 'file' or 'file[]' and uploads them to execution storage
31+
*/
32+
async function processTriggerFileOutputs(
33+
input: any,
34+
triggerOutputs: Record<string, any>,
35+
context: {
36+
workspaceId: string
37+
workflowId: string
38+
executionId: string
39+
requestId: string
40+
},
41+
path = ''
42+
): Promise<any> {
43+
if (!input || typeof input !== 'object') {
44+
return input
45+
}
46+
47+
const processed: any = Array.isArray(input) ? [] : {}
48+
49+
for (const [key, value] of Object.entries(input)) {
50+
const currentPath = path ? `${path}.${key}` : key
51+
const outputDef = triggerOutputs[key]
52+
const val: any = value
53+
54+
// If this field is marked as file or file[], process it
55+
if (outputDef?.type === 'file[]' && Array.isArray(val)) {
56+
try {
57+
processed[key] = await WebhookAttachmentProcessor.processAttachments(val as any, context)
58+
} catch (error) {
59+
processed[key] = []
60+
}
61+
} else if (outputDef?.type === 'file' && val) {
62+
try {
63+
const [processedFile] = await WebhookAttachmentProcessor.processAttachments(
64+
[val as any],
65+
context
66+
)
67+
processed[key] = processedFile
68+
} catch (error) {
69+
logger.error(`[${context.requestId}] Error processing ${currentPath}:`, error)
70+
processed[key] = val
71+
}
72+
} else if (outputDef && typeof outputDef === 'object' && !outputDef.type) {
73+
// Nested object in schema - recurse with the nested schema
74+
processed[key] = await processTriggerFileOutputs(val, outputDef, context, currentPath)
75+
} else {
76+
// Not a file output - keep as is
77+
processed[key] = val
78+
}
79+
}
80+
81+
return processed
82+
}
83+
2684
export type WebhookExecutionPayload = {
2785
webhookId: string
2886
workflowId: string
@@ -250,6 +308,7 @@ async function executeWebhookJobInternal(
250308
totalDurationMs: totalDuration || 0,
251309
finalOutput: executionResult.output || {},
252310
traceSpans: traceSpans as any,
311+
workflowInput: airtableInput,
253312
})
254313

255314
return {
@@ -312,6 +371,32 @@ async function executeWebhookJobInternal(
312371
}
313372
}
314373

374+
// Process trigger file outputs based on schema
375+
if (input && payload.blockId && blocks[payload.blockId]) {
376+
try {
377+
const triggerBlock = blocks[payload.blockId]
378+
const triggerId = triggerBlock?.subBlocks?.triggerId?.value
379+
380+
if (triggerId && typeof triggerId === 'string') {
381+
const triggerConfig = getTrigger(triggerId)
382+
383+
if (triggerConfig?.outputs) {
384+
logger.debug(`[${requestId}] Processing trigger ${triggerId} file outputs`)
385+
const processedInput = await processTriggerFileOutputs(input, triggerConfig.outputs, {
386+
workspaceId: workspaceId || '',
387+
workflowId: payload.workflowId,
388+
executionId,
389+
requestId,
390+
})
391+
Object.assign(input, processedInput)
392+
}
393+
}
394+
} catch (error) {
395+
logger.error(`[${requestId}] Error processing trigger file outputs:`, error)
396+
// Continue without processing attachments rather than failing execution
397+
}
398+
}
399+
315400
// Create executor and execute
316401
const executor = new Executor({
317402
workflow: serializedWorkflow,
@@ -367,6 +452,7 @@ async function executeWebhookJobInternal(
367452
totalDurationMs: totalDuration || 0,
368453
finalOutput: executionResult.output || {},
369454
traceSpans: traceSpans as any,
455+
workflowInput: input,
370456
})
371457

372458
return {

apps/sim/blocks/blocks/outlook.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,13 @@ export const OutlookBlock: BlockConfig<OutlookResponse> = {
173173
placeholder: 'Number of emails to retrieve (default: 1, max: 10)',
174174
condition: { field: 'operation', value: 'read_outlook' },
175175
},
176+
{
177+
id: 'includeAttachments',
178+
title: 'Include Attachments',
179+
type: 'switch',
180+
layout: 'full',
181+
condition: { field: 'operation', value: 'read_outlook' },
182+
},
176183
// TRIGGER MODE: Trigger configuration (only shown when trigger mode is active)
177184
{
178185
id: 'triggerConfig',
@@ -231,6 +238,7 @@ export const OutlookBlock: BlockConfig<OutlookResponse> = {
231238
folder: { type: 'string', description: 'Email folder' },
232239
manualFolder: { type: 'string', description: 'Manual folder name' },
233240
maxResults: { type: 'number', description: 'Maximum emails' },
241+
includeAttachments: { type: 'boolean', description: 'Include email attachments' },
234242
},
235243
outputs: {
236244
// Common outputs
@@ -255,6 +263,10 @@ export const OutlookBlock: BlockConfig<OutlookResponse> = {
255263
receivedDateTime: { type: 'string', description: 'Email received timestamp' },
256264
sentDateTime: { type: 'string', description: 'Email sent timestamp' },
257265
hasAttachments: { type: 'boolean', description: 'Whether email has attachments' },
266+
attachments: {
267+
type: 'json',
268+
description: 'Email attachments (if includeAttachments is enabled)',
269+
},
258270
isRead: { type: 'boolean', description: 'Whether email is read' },
259271
importance: { type: 'string', description: 'Email importance level' },
260272
// Trigger outputs

0 commit comments

Comments
 (0)