@@ -7,11 +7,13 @@ import Codec.Serialise qualified as CBOR
77import Conduit qualified as C
88import Control.Concurrent.STM qualified as STM
99import Control.Concurrent.STM.TBMQueue qualified as STM
10- import Control.Monad.Except (ExceptT (ExceptT ))
10+ import Control.Monad.Except (ExceptT (ExceptT ), withExceptT )
1111import Control.Monad.Trans.Except (runExceptT )
1212import Data.Binary.Builder qualified as Builder
13- import Data.Vector (Vector )
13+ import Data.Set qualified as Set
14+ import Data.Text.Encoding qualified as Text
1415import Data.Vector qualified as Vector
16+ import Ki.Unlifted qualified as Ki
1517import Servant
1618import Servant.Conduit (ConduitToSourceIO (.. ))
1719import Servant.Types.SourceT (SourceT (.. ))
@@ -33,14 +35,15 @@ import Share.Web.Authorization qualified as AuthZ
3335import Share.Web.Errors
3436import Share.Web.UCM.Sync.HashJWT qualified as HashJWT
3537import Share.Web.UCM.SyncV2.Queries qualified as SSQ
38+ import Share.Web.UCM.SyncV2.Types (IsCausalSpine (.. ), IsLibRoot (.. ))
3639import U.Codebase.Sqlite.Orphans ()
40+ import Unison.Debug qualified as Debug
3741import Unison.Hash32 (Hash32 )
3842import Unison.Share.API.Hash (HashJWTClaims (.. ))
3943import Unison.SyncV2.API qualified as SyncV2
40- import Unison.SyncV2.Types (DownloadEntitiesChunk (.. ), EntityChunk (.. ), ErrorChunk (.. ), StreamInitInfo (.. ))
44+ import Unison.SyncV2.Types (CausalDependenciesChunk ( .. ), DependencyType ( .. ), DownloadEntitiesChunk (.. ), EntityChunk (.. ), ErrorChunk (.. ), StreamInitInfo (.. ))
4145import Unison.SyncV2.Types qualified as SyncV2
4246import UnliftIO qualified
43- import UnliftIO.Async qualified as Async
4447
4548batchSize :: Int32
4649batchSize = 1000
@@ -51,7 +54,8 @@ streamSettings rootCausalHash rootBranchRef = StreamInitInfo {version = SyncV2.V
5154server :: Maybe UserId -> SyncV2. Routes WebAppServer
5255server mayUserId =
5356 SyncV2. Routes
54- { downloadEntitiesStream = downloadEntitiesStreamImpl mayUserId
57+ { downloadEntitiesStream = downloadEntitiesStreamImpl mayUserId,
58+ causalDependenciesStream = causalDependenciesStreamImpl mayUserId
5559 }
5660
5761parseBranchRef :: SyncV2. BranchRef -> Either Text (Either ProjectReleaseShortHand ProjectBranchShortHand )
@@ -66,30 +70,16 @@ parseBranchRef (SyncV2.BranchRef branchRef) =
6670 parseRelease = fmap Left . eitherToMaybe $ IDs. fromText @ ProjectReleaseShortHand branchRef
6771
6872downloadEntitiesStreamImpl :: Maybe UserId -> SyncV2. DownloadEntitiesRequest -> WebApp (SourceIO (SyncV2. CBORStream SyncV2. DownloadEntitiesChunk ))
69- downloadEntitiesStreamImpl mayCallerUserId (SyncV2. DownloadEntitiesRequest {causalHash = causalHashJWT, branchRef, knownHashes = _todo }) = do
73+ downloadEntitiesStreamImpl mayCallerUserId (SyncV2. DownloadEntitiesRequest {causalHash = causalHashJWT, branchRef, knownHashes}) = do
7074 either emitErr id <$> runExceptT do
7175 addRequestTag " branch-ref" (SyncV2. unBranchRef branchRef)
7276 HashJWTClaims {hash = causalHash} <- lift (HashJWT. verifyHashJWT mayCallerUserId causalHashJWT >>= either respondError pure )
7377 codebase <-
74- case parseBranchRef branchRef of
75- Left err -> throwError (SyncV2. DownloadEntitiesInvalidBranchRef err branchRef)
76- Right (Left (ProjectReleaseShortHand {userHandle, projectSlug})) -> do
77- let projectShortHand = ProjectShortHand {userHandle, projectSlug}
78- (Project {ownerUserId = projectOwnerUserId}, contributorId) <- ExceptT . PG. tryRunTransaction $ do
79- project <- PGQ. projectByShortHand projectShortHand `whenNothingM` throwError (SyncV2. DownloadEntitiesProjectNotFound $ IDs. toText @ ProjectShortHand projectShortHand)
80- pure (project, Nothing )
81- authZToken <- lift AuthZ. checkDownloadFromProjectBranchCodebase `whenLeftM` \ _err -> throwError (SyncV2. DownloadEntitiesNoReadPermission branchRef)
82- let codebaseLoc = Codebase. codebaseLocationForProjectBranchCodebase projectOwnerUserId contributorId
83- pure $ Codebase. codebaseEnv authZToken codebaseLoc
84- Right (Right (ProjectBranchShortHand {userHandle, projectSlug, contributorHandle})) -> do
85- let projectShortHand = ProjectShortHand {userHandle, projectSlug}
86- (Project {ownerUserId = projectOwnerUserId}, contributorId) <- ExceptT . PG. tryRunTransaction $ do
87- project <- (PGQ. projectByShortHand projectShortHand) `whenNothingM` throwError (SyncV2. DownloadEntitiesProjectNotFound $ IDs. toText @ ProjectShortHand projectShortHand)
88- mayContributorUserId <- for contributorHandle \ ch -> fmap user_id $ (PGQ. userByHandle ch) `whenNothingM` throwError (SyncV2. DownloadEntitiesUserNotFound $ IDs. toText @ UserHandle ch)
89- pure (project, mayContributorUserId)
90- authZToken <- lift AuthZ. checkDownloadFromProjectBranchCodebase `whenLeftM` \ _err -> throwError (SyncV2. DownloadEntitiesNoReadPermission branchRef)
91- let codebaseLoc = Codebase. codebaseLocationForProjectBranchCodebase projectOwnerUserId contributorId
92- pure $ Codebase. codebaseEnv authZToken codebaseLoc
78+ flip withExceptT (codebaseForBranchRef branchRef) \ case
79+ CodebaseLoadingErrorProjectNotFound projectShortHand -> SyncV2. DownloadEntitiesProjectNotFound (IDs. toText projectShortHand)
80+ CodebaseLoadingErrorUserNotFound userHandle -> SyncV2. DownloadEntitiesUserNotFound (IDs. toText userHandle)
81+ CodebaseLoadingErrorNoReadPermission branchRef -> SyncV2. DownloadEntitiesNoReadPermission branchRef
82+ CodebaseLoadingErrorInvalidBranchRef err branchRef -> SyncV2. DownloadEntitiesInvalidBranchRef err branchRef
9383 q <- UnliftIO. atomically $ do
9484 q <- STM. newTBMQueue 10
9585 STM. writeTBMQueue q (Vector. singleton $ InitialC $ streamSettings causalHash (Just branchRef))
@@ -98,39 +88,107 @@ downloadEntitiesStreamImpl mayCallerUserId (SyncV2.DownloadEntitiesRequest {caus
9888 Logging. logInfoText " Starting download entities stream"
9989 Codebase. runCodebaseTransaction codebase $ do
10090 (_bhId, causalId) <- CausalQ. expectCausalIdsOf id (hash32ToCausalHash causalHash)
101- cursor <- SSQ. allSerializedDependenciesOfCausalCursor causalId
91+ let knownCausalHashes = Set. map hash32ToCausalHash knownHashes
92+ cursor <- SSQ. allSerializedDependenciesOfCausalCursor causalId knownCausalHashes
10293 Cursor. foldBatched cursor batchSize \ batch -> do
10394 let entityChunkBatch = batch <&> \ (entityCBOR, hash) -> EntityC (EntityChunk {hash, entityCBOR})
10495 PG. transactionUnsafeIO $ STM. atomically $ STM. writeTBMQueue q entityChunkBatch
10596 PG. transactionUnsafeIO $ STM. atomically $ STM. closeTBMQueue q
10697 pure $ sourceIOWithAsync streamResults $ conduitToSourceIO do
107- stream q
98+ queueToStream q
10899 where
109- stream :: STM. TBMQueue (Vector DownloadEntitiesChunk ) -> C. ConduitT () (SyncV2. CBORStream DownloadEntitiesChunk ) IO ()
110- stream q = do
111- let loop :: C. ConduitT () (SyncV2. CBORStream DownloadEntitiesChunk ) IO ()
112- loop = do
113- liftIO (STM. atomically (STM. readTBMQueue q)) >>= \ case
114- -- The queue is closed.
115- Nothing -> do
116- pure ()
117- Just batches -> do
118- batches
119- & foldMap (CBOR. serialiseIncremental)
120- & (SyncV2. CBORStream . Builder. toLazyByteString)
121- & C. yield
122- loop
123-
124- loop
125-
126100 emitErr :: SyncV2. DownloadEntitiesError -> SourceIO (SyncV2. CBORStream SyncV2. DownloadEntitiesChunk )
127101 emitErr err = SourceT. source [SyncV2. CBORStream . CBOR. serialise $ ErrorC (ErrorChunk err)]
128102
103+ causalDependenciesStreamImpl :: Maybe UserId -> SyncV2. CausalDependenciesRequest -> WebApp (SourceIO (SyncV2. CBORStream SyncV2. CausalDependenciesChunk ))
104+ causalDependenciesStreamImpl mayCallerUserId (SyncV2. CausalDependenciesRequest {rootCausal = causalHashJWT, branchRef}) = do
105+ respondExceptT do
106+ addRequestTag " branch-ref" (SyncV2. unBranchRef branchRef)
107+ HashJWTClaims {hash = causalHash} <- lift (HashJWT. verifyHashJWT mayCallerUserId causalHashJWT >>= either respondError pure )
108+ addRequestTag " root-causal" (tShow causalHash)
109+ codebase <- codebaseForBranchRef branchRef
110+ q <- UnliftIO. atomically $ STM. newTBMQueue 10
111+ streamResults <- lift $ UnliftIO. toIO do
112+ Logging. logInfoText " Starting causal dependencies stream"
113+ Codebase. runCodebaseTransaction codebase $ do
114+ (_bhId, causalId) <- CausalQ. expectCausalIdsOf id (hash32ToCausalHash causalHash)
115+ Debug. debugLogM Debug. Temp " Getting cursor"
116+ cursor <- SSQ. spineAndLibDependenciesOfCausalCursor causalId
117+ Debug. debugLogM Debug. Temp " Folding cursor"
118+ Cursor. foldBatched cursor batchSize \ batch -> do
119+ Debug. debugLogM Debug. Temp " Got batch"
120+ let depBatch =
121+ batch <&> \ (causalHash, isCausalSpine, isLibRoot) ->
122+ let dependencyType = case (isCausalSpine, isLibRoot) of
123+ (IsCausalSpine , _) -> CausalSpineDependency
124+ (_, IsLibRoot ) -> LibDependency
125+ _ -> error $ " Causal dependency which is neither spine nor lib root: " <> show causalHash
126+ in CausalHashDepC {causalHash, dependencyType}
127+ PG. transactionUnsafeIO $ STM. atomically $ STM. writeTBMQueue q depBatch
128+ PG. transactionUnsafeIO $ STM. atomically $ STM. closeTBMQueue q
129+ pure $ sourceIOWithAsync streamResults $ conduitToSourceIO do
130+ queueToStream q
131+
132+ queueToStream :: forall a f . (CBOR. Serialise a , Foldable f ) => STM. TBMQueue (f a ) -> C. ConduitT () (SyncV2. CBORStream a ) IO ()
133+ queueToStream q = do
134+ let loop :: C. ConduitT () (SyncV2. CBORStream a ) IO ()
135+ loop = do
136+ liftIO (STM. atomically (STM. readTBMQueue q)) >>= \ case
137+ -- The queue is closed.
138+ Nothing -> do
139+ pure ()
140+ Just batches -> do
141+ batches
142+ & foldMap (CBOR. serialiseIncremental)
143+ & (SyncV2. CBORStream . Builder. toLazyByteString)
144+ & C. yield
145+ loop
146+ loop
147+
148+ data CodebaseLoadingError
149+ = CodebaseLoadingErrorProjectNotFound ProjectShortHand
150+ | CodebaseLoadingErrorUserNotFound UserHandle
151+ | CodebaseLoadingErrorNoReadPermission SyncV2. BranchRef
152+ | CodebaseLoadingErrorInvalidBranchRef Text SyncV2. BranchRef
153+ deriving stock (Show )
154+ deriving (Logging.Loggable ) via Logging. ShowLoggable Logging. UserFault CodebaseLoadingError
155+
156+ instance ToServerError CodebaseLoadingError where
157+ toServerError = \ case
158+ CodebaseLoadingErrorProjectNotFound projectShortHand -> (ErrorID " codebase-loading:project-not-found" , Servant. err404 {errBody = from . Text. encodeUtf8 $ " Project not found: " <> (IDs. toText projectShortHand)})
159+ CodebaseLoadingErrorUserNotFound userHandle -> (ErrorID " codebase-loading:user-not-found" , Servant. err404 {errBody = from . Text. encodeUtf8 $ " User not found: " <> (IDs. toText userHandle)})
160+ CodebaseLoadingErrorNoReadPermission branchRef -> (ErrorID " codebase-loading:no-read-permission" , Servant. err403 {errBody = from . Text. encodeUtf8 $ " No read permission for branch ref: " <> (SyncV2. unBranchRef branchRef)})
161+ CodebaseLoadingErrorInvalidBranchRef err branchRef -> (ErrorID " codebase-loading:invalid-branch-ref" , Servant. err400 {errBody = from . Text. encodeUtf8 $ " Invalid branch ref: " <> err <> " " <> (SyncV2. unBranchRef branchRef)})
162+
163+ codebaseForBranchRef :: SyncV2. BranchRef -> (ExceptT CodebaseLoadingError WebApp Codebase. CodebaseEnv )
164+ codebaseForBranchRef branchRef = do
165+ case parseBranchRef branchRef of
166+ Left err -> throwError (CodebaseLoadingErrorInvalidBranchRef err branchRef)
167+ Right (Left (ProjectReleaseShortHand {userHandle, projectSlug})) -> do
168+ let projectShortHand = ProjectShortHand {userHandle, projectSlug}
169+ (Project {ownerUserId = projectOwnerUserId}, contributorId) <- ExceptT . PG. tryRunTransaction $ do
170+ project <- PGQ. projectByShortHand projectShortHand `whenNothingM` throwError (CodebaseLoadingErrorProjectNotFound $ projectShortHand)
171+ pure (project, Nothing )
172+ authZToken <- lift AuthZ. checkDownloadFromProjectBranchCodebase `whenLeftM` \ _err -> throwError (CodebaseLoadingErrorNoReadPermission branchRef)
173+ let codebaseLoc = Codebase. codebaseLocationForProjectBranchCodebase projectOwnerUserId contributorId
174+ pure $ Codebase. codebaseEnv authZToken codebaseLoc
175+ Right (Right (ProjectBranchShortHand {userHandle, projectSlug, contributorHandle})) -> do
176+ let projectShortHand = ProjectShortHand {userHandle, projectSlug}
177+ (Project {ownerUserId = projectOwnerUserId}, contributorId) <- ExceptT . PG. tryRunTransaction $ do
178+ project <- (PGQ. projectByShortHand projectShortHand) `whenNothingM` throwError (CodebaseLoadingErrorProjectNotFound projectShortHand)
179+ mayContributorUserId <- for contributorHandle \ ch -> fmap user_id $ (PGQ. userByHandle ch) `whenNothingM` throwError (CodebaseLoadingErrorUserNotFound ch)
180+ pure (project, mayContributorUserId)
181+ authZToken <- lift AuthZ. checkDownloadFromProjectBranchCodebase `whenLeftM` \ _err -> throwError (CodebaseLoadingErrorNoReadPermission branchRef)
182+ let codebaseLoc = Codebase. codebaseLocationForProjectBranchCodebase projectOwnerUserId contributorId
183+ pure $ Codebase. codebaseEnv authZToken codebaseLoc
184+
129185-- | Run an IO action in the background while streaming the results.
130186--
131187-- Servant doesn't provide any easier way to do bracketing like this, all the IO must be
132188-- inside the SourceIO somehow.
133189sourceIOWithAsync :: IO a -> SourceIO r -> SourceIO r
134190sourceIOWithAsync action (SourceT k) =
135191 SourceT \ k' ->
136- Async. withAsync action \ _ -> k k'
192+ Ki. scoped \ scope -> do
193+ _ <- Ki. fork scope action
194+ k k'
0 commit comments