Skip to content

Commit 344d7fa

Browse files
mnafeesmrkaye97
andauthored
Attempt to fix deadlock in PollScheduledWorkflows by scoping FOR UPDATE lock (#3261)
* make sure skip locked semantics is meaningful * fix: order by cond * fix: refs * chore: decrease diff size * fix: run triggered by not exists instead of left join * feat: add index for polling * chore: schema * fix: make cte doing the filtering also do the locking * fix: one more cte fix * fix: rm dupe for update lock * fix: order by --------- Co-authored-by: mrkaye97 <mrkaye97@gmail.com>
1 parent b5351a9 commit 344d7fa

File tree

4 files changed

+40
-11
lines changed

4 files changed

+40
-11
lines changed
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
-- +goose Up
2+
-- +goose NO TRANSACTION
3+
-- +goose StatementBegin
4+
CREATE INDEX CONCURRENTLY IF NOT EXISTS "ix_WorkflowTriggerScheduledRef_triggerAt_tickerId" ON "WorkflowTriggerScheduledRef" ("triggerAt", "tickerId");
5+
-- +goose StatementEnd
6+
7+
-- +goose Down
8+
-- +goose StatementBegin
9+
DROP INDEX CONCURRENTLY IF EXISTS "ix_WorkflowTriggerScheduledRef_triggerAt_tickerId";
10+
-- +goose StatementEnd

pkg/repository/sqlcv1/ticker.sql

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -182,11 +182,8 @@ WITH latest_workflow_versions AS (
182182
"Workflow" AS workflow ON workflow."id" = versions."workflowId"
183183
JOIN
184184
latest_workflow_versions AS latestVersions ON latestVersions."workflowId" = workflow."id"
185-
LEFT JOIN
186-
"WorkflowRunTriggeredBy" AS runTriggeredBy ON runTriggeredBy."scheduledId" = scheduledWorkflow."id"
187185
WHERE
188186
"triggerAt" <= NOW() + INTERVAL '5 seconds'
189-
AND runTriggeredBy IS NULL
190187
AND versions."deletedAt" IS NULL
191188
AND workflow."deletedAt" IS NULL
192189
AND (
@@ -196,23 +193,34 @@ WITH latest_workflow_versions AS (
196193
)
197194
OR "tickerId" = @tickerId::uuid
198195
)
196+
AND NOT EXISTS (
197+
SELECT 1
198+
FROM "WorkflowRunTriggeredBy" AS runTriggeredBy
199+
WHERE runTriggeredBy."scheduledId" = scheduledWorkflow."id"
200+
)
201+
ORDER BY scheduledWorkflow."triggerAt" ASC, scheduledWorkflow."id" ASC
199202
),
200203
active_scheduled_workflows AS (
201204
SELECT
202205
*
203206
FROM
204-
not_run_scheduled_workflows
207+
"WorkflowTriggerScheduledRef"
208+
WHERE "id" IN (SELECT "id" FROM not_run_scheduled_workflows)
209+
ORDER BY "triggerAt" ASC, "id" ASC
205210
FOR UPDATE SKIP LOCKED
206211
)
212+
207213
UPDATE
208214
"WorkflowTriggerScheduledRef" as scheduledWorkflows
209215
SET
210216
"tickerId" = @tickerId::uuid
211217
FROM
212218
active_scheduled_workflows
219+
JOIN "WorkflowVersion" as versions ON versions."id" = active_scheduled_workflows."parentId"
220+
JOIN "Workflow" as workflow ON workflow."id" = versions."workflowId"
213221
WHERE
214222
scheduledWorkflows."id" = active_scheduled_workflows."id"
215-
RETURNING scheduledWorkflows.*, active_scheduled_workflows."workflowVersionId", active_scheduled_workflows."tenantId";
223+
RETURNING scheduledWorkflows.*, versions."id" AS "workflowVersionId", workflow."tenantId";
216224

217225
-- name: PollTenantAlerts :many
218226
-- Finds tenant alerts which haven't alerted since their frequency and assigns them to a ticker

pkg/repository/sqlcv1/ticker.sql.go

Lines changed: 14 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sql/schema/v0.sql

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1537,6 +1537,9 @@ CREATE UNIQUE INDEX "WorkflowTriggerEventRef_parentId_eventKey_key" ON "Workflow
15371537
-- CreateIndex
15381538
CREATE UNIQUE INDEX "WorkflowTriggerScheduledRef_id_key" ON "WorkflowTriggerScheduledRef" ("id" ASC);
15391539

1540+
-- CreateIndex
1541+
CREATE INDEX CONCURRENTLY IF NOT EXISTS "ix_WorkflowTriggerScheduledRef_triggerAt_tickerId" ON "WorkflowTriggerScheduledRef" ("triggerAt", "tickerId");
1542+
15401543
-- CreateIndex
15411544
CREATE UNIQUE INDEX "WorkflowTriggerScheduledRef_parentId_parentStepRunId_childK_key" ON "WorkflowTriggerScheduledRef" (
15421545
"parentId" ASC,

0 commit comments

Comments
 (0)