Skip to content

Commit f4b9869

Browse files
authored
Merge pull request #169 from unisoncomputing/cp/causal-diff-deadletter
Add deadletter queue for causal diffs
2 parents bfa6354 + eb2f610 commit f4b9869

File tree

3 files changed

+47
-12
lines changed

3 files changed

+47
-12
lines changed

share-api/src/Share/BackgroundJobs/Diffs/CausalDiffs.hs

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import Share.Web.Authorization qualified as AuthZ
2525
import Share.Web.Errors (EntityMissing (..))
2626
import Share.Web.Share.Diffs.Impl qualified as Diffs
2727
import System.Clock qualified as Clock
28+
import UnliftIO qualified
2829

2930
-- | Check every 10 minutes if we haven't heard on the notifications channel.
3031
-- Just in case we missed a notification.
@@ -50,19 +51,32 @@ processDiffs authZReceipt unisonRuntime = do
5051

5152
-- | Process a diff, then return whether or not we did any work.
5253
processDiff :: AuthZ.AuthZReceipt -> CR.UnisonRuntime -> Background Bool
53-
processDiff authZReceipt unisonRuntime = do
54-
result <- Trace.withSpan "background:causal-diffs:process-diff" mempty $
55-
PG.runTransactionMode PG.RepeatableRead PG.ReadWrite do
56-
DQ.claimCausalDiff >>= \case
57-
Nothing -> pure Nothing
58-
Just causalDiffInfo -> withTags (causalDiffTags causalDiffInfo) do
59-
startTime <- PG.transactionUnsafeIO (Clock.getTime Clock.Monotonic)
60-
result <- PG.catchTransaction (maybeComputeAndStoreCausalDiff authZReceipt unisonRuntime causalDiffInfo)
61-
DQ.deleteClaimedCausalDiff causalDiffInfo
62-
pure (Just (causalDiffInfo, startTime, result))
54+
processDiff authZReceipt unisonRuntime = Trace.withSpan "background:causal-diffs:process-diff" mempty $ do
55+
pendingCausalDiffVar <- liftIO $ UnliftIO.newEmptyMVar
56+
result <- UnliftIO.tryAny $ PG.runTransactionMode PG.RepeatableRead PG.ReadWrite do
57+
DQ.claimCausalDiff >>= \case
58+
Nothing -> pure Nothing
59+
Just causalDiffInfo -> withTags (causalDiffTags causalDiffInfo) do
60+
PG.transactionUnsafeIO $ UnliftIO.tryPutMVar pendingCausalDiffVar causalDiffInfo
61+
startTime <- PG.transactionUnsafeIO (Clock.getTime Clock.Monotonic)
62+
result <- PG.catchTransaction (maybeComputeAndStoreCausalDiff authZReceipt unisonRuntime causalDiffInfo)
63+
DQ.deleteClaimedCausalDiff causalDiffInfo
64+
pure (Just (causalDiffInfo, startTime, result))
6365
case result of
64-
Nothing -> pure False
65-
Just (cdi, startTime, result) -> do
66+
-- The transaction failed with an exception.
67+
-- One possible cause is an unknown builtin.
68+
-- We should report it and mark it as invalid so we don't keep retrying it.
69+
Left err -> do
70+
mCausalDiffInfo <- liftIO $ UnliftIO.tryTakeMVar pendingCausalDiffVar
71+
case mCausalDiffInfo of
72+
Nothing -> pure ()
73+
Just cdi -> withTags (causalDiffTags cdi) do
74+
reportError err
75+
PG.runTransaction $ DQ.markCausalDiffInvalid (tShow err) cdi
76+
-- Continue processing other diffs.
77+
pure True
78+
Right Nothing -> pure False
79+
Right (Just (cdi, startTime, result)) -> do
6680
let tags = causalDiffTags cdi
6781
withTags tags do
6882
case result of

share-api/src/Share/BackgroundJobs/Diffs/Queries.hs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module Share.BackgroundJobs.Diffs.Queries
33
submitCausalsToBeDiffed,
44
claimCausalDiff,
55
deleteClaimedCausalDiff,
6+
markCausalDiffInvalid,
67
)
78
where
89

@@ -84,12 +85,25 @@ claimCausalDiff = do
8485
[sql|
8586
SELECT from_causal_id, to_causal_id, from_codebase_owner, to_codebase_owner
8687
FROM causal_diff_queue
88+
WHERE error IS NULL
8789
ORDER BY created_at ASC
8890
LIMIT 1
8991
-- Skip any that are being synced by other workers.
9092
FOR UPDATE SKIP LOCKED
9193
|]
9294

95+
markCausalDiffInvalid :: Text -> CausalDiffInfo -> Transaction e ()
96+
markCausalDiffInvalid err CausalDiffInfo {fromCausalId, toCausalId, fromCodebaseOwner, toCodebaseOwner} =
97+
execute_
98+
[sql|
99+
UPDATE causal_diff_queue
100+
SET error = #{err}
101+
WHERE causal_diff_queue.from_causal_id = #{fromCausalId}
102+
AND causal_diff_queue.to_causal_id = #{toCausalId}
103+
AND causal_diff_queue.from_codebase_owner = #{fromCodebaseOwner}
104+
AND causal_diff_queue.to_codebase_owner = #{toCodebaseOwner}
105+
|]
106+
93107
deleteClaimedCausalDiff :: CausalDiffInfo -> Transaction e ()
94108
deleteClaimedCausalDiff CausalDiffInfo {fromCausalId, toCausalId, fromCodebaseOwner, toCodebaseOwner} =
95109
execute_
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
-- Add nullable error column to causal_diff_queue table
2+
ALTER TABLE causal_diff_queue
3+
ADD COLUMN error TEXT;
4+
5+
CREATE INDEX IF NOT EXISTS idx_causal_diff_queue_processing_order
6+
ON causal_diff_queue (created_at ASC)
7+
WHERE error IS NULL;

0 commit comments

Comments
 (0)