Skip to content

Commit 224fef8

Browse files
authored
Merge pull request #29 from unisoncomputing/cp/batched-diffs
Batched background diffs
2 parents b91eb5e + d0fc132 commit 224fef8

File tree

29 files changed

+5121
-662
lines changed

29 files changed

+5121
-662
lines changed

share-api.cabal

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ library
2828
Share.App
2929
Share.Backend
3030
Share.BackgroundJobs
31+
Share.BackgroundJobs.Diffs.ContributionDiffs
32+
Share.BackgroundJobs.Diffs.Queries
3133
Share.BackgroundJobs.Errors
3234
Share.BackgroundJobs.Monad
3335
Share.BackgroundJobs.Search.DefinitionSync

share-utils/package.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ default-extensions:
4242
- FlexibleContexts
4343
- FlexibleInstances
4444
- GeneralizedNewtypeDeriving
45+
- InstanceSigs
4546
- LambdaCase
4647
- MultiParamTypeClasses
4748
- NamedFieldPuns

share-utils/share-utils.cabal

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
cabal-version: 1.12
22

3-
-- This file has been generated from package.yaml by hpack version 0.36.0.
3+
-- This file has been generated from package.yaml by hpack version 0.37.0.
44
--
55
-- see: https://github.com/sol/hpack
66

@@ -25,6 +25,7 @@ source-repository head
2525
library
2626
exposed-modules:
2727
Share.Debug
28+
Share.Utils.Aeson
2829
Share.Utils.Binary
2930
Share.Utils.Deployment
3031
Share.Utils.IDs
@@ -50,6 +51,7 @@ library
5051
FlexibleContexts
5152
FlexibleInstances
5253
GeneralizedNewtypeDeriving
54+
InstanceSigs
5355
LambdaCase
5456
MultiParamTypeClasses
5557
NamedFieldPuns
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
{-# LANGUAGE InstanceSigs #-}
2+
3+
module Share.Utils.Aeson (MaybeEncoded (..), PreEncoded (..)) where
4+
5+
import Data.Aeson (ToJSON (..))
6+
import Data.Aeson qualified as Aeson
7+
import Data.Aeson.Encoding qualified as Encoding
8+
import Data.Binary.Builder qualified as Builder
9+
import Data.ByteString.Lazy.Char8 qualified as BL
10+
import Data.Maybe (fromMaybe)
11+
import Data.Proxy (Proxy (..))
12+
import Data.Typeable (Typeable, typeRep)
13+
import GHC.Stack (HasCallStack)
14+
15+
data MaybeEncoded a
16+
= IsEncoded BL.ByteString
17+
| NotEncoded a
18+
deriving stock (Show, Eq, Ord, Functor, Foldable, Traversable)
19+
20+
instance (Typeable a, ToJSON a) => ToJSON (MaybeEncoded a) where
21+
toJSON :: (HasCallStack) => MaybeEncoded a -> Aeson.Value
22+
toJSON (IsEncoded txt) = Aeson.toJSON (PreEncoded @a txt)
23+
toJSON (NotEncoded a) = Aeson.toJSON a
24+
25+
toEncoding :: (HasCallStack) => MaybeEncoded a -> Aeson.Encoding
26+
toEncoding (IsEncoded txt) = Aeson.toEncoding (PreEncoded @a txt)
27+
toEncoding (NotEncoded a) = Aeson.toEncoding a
28+
29+
newtype PreEncoded a = PreEncoded BL.ByteString
30+
deriving stock (Show, Eq, Ord)
31+
32+
instance (Typeable a) => ToJSON (PreEncoded a) where
33+
toJSON :: (HasCallStack) => PreEncoded a -> Aeson.Value
34+
toJSON (PreEncoded txt) =
35+
-- It's regrettable we have to do this, but seemingly it's required when building values
36+
-- with @@object [key .= val]@@ syntax.
37+
fromMaybe (error $ "Invalid PreEncoded JSON for type: " <> show (typeRep (Proxy @a))) $ Aeson.decode txt
38+
39+
toEncoding :: (HasCallStack) => PreEncoded a -> Aeson.Encoding
40+
toEncoding (PreEncoded txt) = Encoding.unsafeToEncoding . Builder.fromLazyByteString $ txt

share-utils/src/Share/Utils/URI.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@ import Data.Map qualified as Map
2222
import Data.Text (Text)
2323
import Data.Text qualified as Text
2424
import Data.Text.Encoding qualified as Text
25-
import Share.Utils.Show (tShow)
2625
import Hasql.Decoders qualified as Decoders
2726
import Hasql.Interpolate qualified as Hasql
2827
import Network.HTTP.Types (parseQuery, renderQuery)
2928
import Network.URI qualified as URI
3029
import Servant
30+
import Share.Utils.Show (tShow)
3131

3232
-- | Helper type to provide additional instances for URIs.
3333
newtype URIParam = URIParam {unpackURI :: URI}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
-- Adds tables for storing pre-computed namespace diffs
2+
3+
CREATE TABLE namespace_diffs (
4+
left_namespace_id INTEGER NOT NULL REFERENCES branch_hashes(id) ON DELETE CASCADE,
5+
right_namespace_id INTEGER NOT NULL REFERENCES branch_hashes(id) ON DELETE CASCADE,
6+
7+
-- Since different codebases can have different variable names and such we also need to sandbox diffs by codebase owner
8+
left_codebase_owner_user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
9+
right_codebase_owner_user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
10+
11+
diff JSONB NOT NULL,
12+
13+
PRIMARY KEY (left_namespace_id, right_namespace_id, left_codebase_owner_user_id, right_codebase_owner_user_id)
14+
);
15+
16+
17+
-- New table for coordinating background job for pre-computing diffs
18+
19+
-- Table of all contributions which have been updated and may need their diffs re-computed
20+
CREATE TABLE contribution_diff_queue (
21+
contribution_id UUID PRIMARY KEY REFERENCES contributions(id) ON DELETE CASCADE,
22+
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
23+
);

src/Share/BackgroundJobs.hs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
module Share.BackgroundJobs (startWorkers) where
22

33
import Ki.Unlifted qualified as Ki
4+
import Share.BackgroundJobs.Diffs.ContributionDiffs qualified as ContributionDiffs
45
import Share.BackgroundJobs.Monad (Background)
56
import Share.BackgroundJobs.Search.DefinitionSync qualified as DefnSearch
67

78
-- | Kicks off all background workers.
89
startWorkers :: Ki.Scope -> Background ()
910
startWorkers scope = do
1011
DefnSearch.worker scope
12+
ContributionDiffs.worker scope
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
module Share.BackgroundJobs.Diffs.ContributionDiffs (worker) where
2+
3+
import Control.Lens
4+
import Control.Monad.Except (ExceptT (..), runExceptT)
5+
import Ki.Unlifted qualified as Ki
6+
import Share.BackgroundJobs.Diffs.Queries qualified as DQ
7+
import Share.BackgroundJobs.Errors (reportError)
8+
import Share.BackgroundJobs.Monad (Background)
9+
import Share.BackgroundJobs.Workers (newWorker)
10+
import Share.Branch (Branch (..))
11+
import Share.Codebase qualified as Codebase
12+
import Share.Contribution (Contribution (..))
13+
import Share.IDs
14+
import Share.Metrics qualified as Metrics
15+
import Share.NamespaceDiffs (NamespaceDiffError (MissingEntityError))
16+
import Share.Postgres qualified as PG
17+
import Share.Postgres.Contributions.Queries qualified as ContributionsQ
18+
import Share.Postgres.Queries qualified as Q
19+
import Share.Prelude
20+
import Share.Utils.Logging qualified as Logging
21+
import Share.Web.Authorization qualified as AuthZ
22+
import Share.Web.Errors (EntityMissing (..), ErrorID (..))
23+
import Share.Web.Share.Diffs.Impl qualified as Diffs
24+
import Unison.Debug qualified as Debug
25+
import UnliftIO.Concurrent qualified as UnliftIO
26+
27+
pollingIntervalSeconds :: Int
28+
pollingIntervalSeconds = 10
29+
30+
worker :: Ki.Scope -> Background ()
31+
worker scope = do
32+
authZReceipt <- AuthZ.backgroundJobAuthZ
33+
newWorker scope "diffs:contributions" $ forever do
34+
processDiffs authZReceipt >>= \case
35+
Left e -> reportError e
36+
Right _ -> pure ()
37+
liftIO $ UnliftIO.threadDelay $ pollingIntervalSeconds * 1000000
38+
39+
processDiffs :: AuthZ.AuthZReceipt -> Background (Either NamespaceDiffError ())
40+
processDiffs authZReceipt = Metrics.recordContributionDiffDuration . runExceptT $ do
41+
Debug.debugLogM Debug.Temp "Background: Getting contributions to be diffed"
42+
mayContributionId <- PG.runTransactionMode PG.ReadCommitted PG.ReadWrite $ do
43+
DQ.claimContributionToDiff
44+
Debug.debugM Debug.Temp "Background: contribution to be diffed: " mayContributionId
45+
for_ mayContributionId (diffContribution authZReceipt)
46+
case mayContributionId of
47+
Just contributionId -> do
48+
Logging.textLog ("Recomputed contribution diff: " <> tShow contributionId)
49+
& Logging.withTag ("contribution-id", tShow contributionId)
50+
& Logging.withSeverity Logging.Info
51+
& Logging.logMsg
52+
-- Keep processing releases until we run out of them.
53+
either throwError pure =<< lift (processDiffs authZReceipt)
54+
Nothing -> pure ()
55+
56+
diffContribution :: AuthZ.AuthZReceipt -> ContributionId -> ExceptT NamespaceDiffError Background ()
57+
diffContribution authZReceipt contributionId = do
58+
( project,
59+
newBranch@Branch {causal = newBranchCausalId},
60+
oldBranch@Branch {causal = oldBranchCausalId}
61+
) <- ExceptT $ PG.tryRunTransaction $ do
62+
Contribution {sourceBranchId = newBranchId, targetBranchId = oldBranchId, projectId} <- ContributionsQ.contributionById contributionId `whenNothingM` throwError (MissingEntityError $ EntityMissing (ErrorID "contribution:missing") "Contribution not found")
63+
project <- Q.projectById projectId `whenNothingM` throwError (MissingEntityError $ EntityMissing (ErrorID "project:missing") "Project not found")
64+
newBranch <- Q.branchById newBranchId `whenNothingM` throwError (MissingEntityError $ EntityMissing (ErrorID "branch:missing") "Source branch not found")
65+
oldBranch <- Q.branchById oldBranchId `whenNothingM` throwError (MissingEntityError $ EntityMissing (ErrorID "branch:missing") "Target branch not found")
66+
pure (project, newBranch, oldBranch)
67+
let oldCodebase = Codebase.codebaseForProjectBranch authZReceipt project oldBranch
68+
let newCodebase = Codebase.codebaseForProjectBranch authZReceipt project newBranch
69+
-- This method saves the diff so it'll be there when we need it, so we don't need to do anything with it.
70+
_ <- Diffs.diffCausals authZReceipt (oldCodebase, oldBranchCausalId) (newCodebase, newBranchCausalId)
71+
pure ()
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
module Share.BackgroundJobs.Diffs.Queries
2+
( submitContributionsToBeDiffed,
3+
claimContributionToDiff,
4+
)
5+
where
6+
7+
import Data.Foldable (toList)
8+
import Data.Set (Set)
9+
import Share.IDs
10+
import Share.Postgres
11+
import Unison.Debug qualified as Debug
12+
13+
submitContributionsToBeDiffed :: (QueryM m) => Set ContributionId -> m ()
14+
submitContributionsToBeDiffed contributions = do
15+
Debug.debugM Debug.Temp "Submitting contributions to be diffed: " contributions
16+
execute_
17+
[sql|
18+
WITH new_contributions(contribution_id) AS (
19+
SELECT * FROM ^{singleColumnTable (toList contributions)}
20+
)
21+
INSERT INTO contribution_diff_queue (contribution_id)
22+
SELECT nc.contribution_id FROM new_contributions nc
23+
ON CONFLICT DO NOTHING
24+
|]
25+
26+
-- | Claim the oldest contribution in the queue to be diffed.
27+
claimContributionToDiff :: Transaction e (Maybe ContributionId)
28+
claimContributionToDiff = do
29+
query1Col
30+
[sql|
31+
WITH chosen_contribution(contribution_id) AS (
32+
SELECT q.contribution_id
33+
FROM contribution_diff_queue q
34+
ORDER BY q.created_at ASC
35+
LIMIT 1
36+
-- Skip any that are being synced by other workers.
37+
FOR UPDATE SKIP LOCKED
38+
)
39+
DELETE FROM contribution_diff_queue
40+
USING chosen_contribution
41+
WHERE contribution_diff_queue.contribution_id = chosen_contribution.contribution_id
42+
RETURNING chosen_contribution.contribution_id
43+
|]

src/Share/Metrics.hs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ module Share.Metrics
1010
tickUserSignup,
1111
recordBackgroundImportDuration,
1212
recordDefinitionSearchIndexDuration,
13+
recordContributionDiffDuration,
1314
)
1415
where
1516

@@ -398,6 +399,18 @@ definitionSearchIndexDurationSeconds =
398399
"definition_search_indexing_duration_seconds"
399400
"The time it took to index a release for definition search"
400401

402+
{-# NOINLINE contributionDiffDurationSeconds #-}
403+
contributionDiffDurationSeconds :: Prom.Vector Prom.Label2 Prom.Histogram
404+
contributionDiffDurationSeconds =
405+
Prom.unsafeRegister $
406+
Prom.vector ("deployment", "service") $
407+
Prom.histogram info Prom.defaultBuckets
408+
where
409+
info =
410+
Prom.Info
411+
"contribution_diff_duration_seconds"
412+
"The time it took to compute a contribution diff"
413+
401414
timeActionIntoHistogram :: (Prom.Label l, MonadUnliftIO m) => (Prom.Vector l Prom.Histogram) -> l -> m c -> m c
402415
timeActionIntoHistogram histogram l m = do
403416
UnliftIO.bracket start end \_ -> m
@@ -416,3 +429,6 @@ recordBackgroundImportDuration = timeActionIntoHistogram backgroundImportDuratio
416429
-- | Record the duration of a background import.
417430
recordDefinitionSearchIndexDuration :: (MonadUnliftIO m) => m r -> m r
418431
recordDefinitionSearchIndexDuration = timeActionIntoHistogram definitionSearchIndexDurationSeconds (deployment, service)
432+
433+
recordContributionDiffDuration :: (MonadUnliftIO m) => m r -> m r
434+
recordContributionDiffDuration = timeActionIntoHistogram contributionDiffDurationSeconds (deployment, service)

0 commit comments

Comments
 (0)