11module Share.Web.UCM.HistoryComments.Impl (server ) where
22
3+ import Control.Concurrent.STM.TBMQueue (TBMQueue , closeTBMQueue , newTBMQueueIO , readTBMQueue , writeTBMQueue )
34import Control.Monad.Except
45import Control.Monad.Trans.Maybe
5- import Data.Either (partitionEithers )
6+ import Data.Set qualified as Set
7+ import Data.Set.NonEmpty qualified as NESet
8+ import Ki.Unlifted qualified as Ki
69import Network.WebSockets.Connection
710import Share.IDs
811import Share.IDs qualified as IDs
@@ -18,8 +21,10 @@ import Share.Web.Authorization qualified as AuthZ
1821import Share.Web.Errors (Unimplemented (Unimplemented ), reportError , respondError )
1922import Share.Web.UCM.HistoryComments.Queries qualified as Q
2023import Unison.Debug qualified as Debug
24+ import Unison.Hash32 (Hash32 )
2125import Unison.Server.HistoryComments.API qualified as HistoryComments
22- import Unison.Server.HistoryComments.Types (HistoryCommentChunk (.. ), UploadCommentsResponse (.. ))
26+ import Unison.Server.HistoryComments.Types (HistoryCommentDownloaderChunk (.. ), HistoryCommentUploaderChunk (.. ), UploadCommentsResponse (.. ))
27+ import Unison.Server.HistoryComments.Types qualified as Sync
2328import Unison.Server.Types
2429import Unison.Util.Websockets
2530import UnliftIO
@@ -40,68 +45,109 @@ downloadHistoryCommentsStreamImpl _mayUserId _conn = do
4045 respondError Unimplemented
4146
4247-- Re-run the given STM action at most n times, collecting the results into a list.
43- -- If the action returns Nothing, stop early and return what has been collected so far, along with a Bool indicating whether the action was exhausted.
48+ -- If the action returns Nothing, stop and return what has been collected so far, along with a Bool indicating whether the action was exhausted.
4449fetchChunk :: (Show a ) => Int -> STM (Maybe a ) -> STM ([a ], Bool )
4550fetchChunk size action = do
4651 let go 0 = pure ([] , False )
4752 go n = do
4853 optional action >>= \ case
4954 Nothing -> do
5055 -- No more values available at the moment
51- pure ( [] , False )
56+ empty
5257 Just Nothing -> do
5358 -- Queue is closed
5459 pure ([] , True )
5560 Just (Just val) -> do
5661 Debug. debugM Debug. Temp " Fetched value from queue" val
57- (rest, exhausted) <- go (n - 1 )
62+ (rest, exhausted) <- go (n - 1 ) <|> pure ( [] , False )
5863 pure (val : rest, exhausted)
5964 go size
6065
6166uploadHistoryCommentsStreamImpl :: Maybe UserId -> BranchRef -> Connection -> WebApp ()
6267uploadHistoryCommentsStreamImpl mayCallerUserId br@ (BranchRef branchRef) conn = do
63- Debug. debugM Debug. Temp " uploadHistoryCommentsStreamImpl called with branchRef: " (IDs. toText branchRef, mayCallerUserId)
6468 callerUserId <- AuthN. requireAuthenticatedUser' mayCallerUserId
65- result <- withQueues @ (MsgOrError Void UploadCommentsResponse ) @ (MsgOrError Void HistoryCommentChunk ) wsMessageBufferSize wsMessageBufferSize conn \ q@ (Queues {receive}) -> runExceptT $ do
69+ result <- withQueues @ (MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk ) @ (MsgOrError Void HistoryCommentUploaderChunk ) wsMessageBufferSize wsMessageBufferSize conn \ q@ (Queues {receive, send}) -> Ki. scoped \ scope -> runExceptT $ do
6670 projectBranchSH@ ProjectBranchShortHand {userHandle, projectSlug, contributorHandle} <- case IDs. fromText @ ProjectBranchShortHand branchRef of
6771 Left err -> handleErrInQueue q (UploadCommentsGenericFailure $ IDs. toText err)
6872 Right pbsh -> pure pbsh
6973 let projectSH = ProjectShortHand {userHandle, projectSlug}
7074 mayInfo <- lift . runMaybeT $ mapMaybeT PG. runTransaction $ do
7175 project <- MaybeT $ PGQ. projectByShortHand projectSH
72- Debug. debugM Debug. Temp " FOUND PROJECT" (project)
7376 branch <- MaybeT $ PGQ. branchByProjectBranchShortHand projectBranchSH
74- Debug. debugM Debug. Temp " FOUND BRANCH" (branch)
7577 contributorUser <- for contributorHandle (MaybeT . UserQ. userByHandle)
76- Debug. debugM Debug. Temp " FOUND Contributor" (contributorUser)
7778 pure (project, branch, contributorUser)
7879 (project, _branch, contributorUser) <- maybe (handleErrInQueue q $ UploadCommentsProjectBranchNotFound br) pure $ mayInfo
79- authZ <-
80+ ! authZ <-
8081 lift (AuthZ. checkUploadToProjectBranchCodebase callerUserId project. projectId (user_id <$> contributorUser)) >>= \ case
8182 Left _authErr -> handleErrInQueue q (UploadCommentsNotAuthorized br)
8283 Right authZ -> pure authZ
83- let loop :: ExceptT UploadCommentsResponse WebApp ()
84- loop = do
85- (chunk, closed) <- atomically $ fetchChunk insertCommentBatchSize do
86- receive <&> fmap \ case
87- Msg (HistoryCommentErrorChunk err) -> (Left $ UploadCommentsGenericFailure err)
88- Msg chunk -> (Right chunk)
89- DeserialiseFailure msg -> (Left $ UploadCommentsGenericFailure msg)
90- UserErr err -> absurd err
91- Debug. debugM Debug. Temp " Processing chunk of size" (length chunk)
92- let (errs, chunks) = partitionEithers chunk
93- when (not $ null errs) $ Debug. debugM Debug. Temp " Got errors in chunk" (errs)
94- lift $ PG. runTransaction $ Q. insertHistoryComments authZ project. projectId chunks
95- for errs $ \ err -> handleErrInQueue q err
96- when (not closed) loop
97- loop
84+ hashesToCheckQ <- liftIO $ newTBMQueueIO 100
85+ commentsQ <- liftIO $ newTBMQueueIO 100
86+ errMVar <- liftIO newEmptyTMVarIO
87+ _receiverThread <- lift $ Ki. fork scope $ receiverWorker receive errMVar hashesToCheckQ commentsQ
88+ inserterThread <- lift $ Ki. fork scope $ inserterWorker authZ commentsQ project. projectId
89+ _hashCheckingThread <- lift $ Ki. fork scope $ hashCheckingWorker send hashesToCheckQ
90+ -- The inserter thread will finish when the client closes the connection.
91+ atomically $ Ki. await inserterThread
9892 case result of
9993 Left err -> reportError err
10094 Right (Left err, _leftovers) -> reportError err
10195 Right (Right () , _leftovers) -> pure ()
10296 where
97+ inserterWorker ::
98+ AuthZ. AuthZReceipt ->
99+ TBMQueue (Either Sync. HistoryComment Sync. HistoryCommentRevision ) ->
100+ ProjectId ->
101+ WebApp ()
102+ inserterWorker authZ commentsQ projectId = do
103+ let loop = do
104+ (chunk, closed) <- atomically (fetchChunk insertCommentBatchSize (readTBMQueue commentsQ))
105+ PG. runTransaction $ Q. insertHistoryComments authZ projectId chunk
106+ when (not closed) loop
107+ loop
108+
109+ hashCheckingWorker ::
110+ (MsgOrError err HistoryCommentDownloaderChunk -> STM Bool ) ->
111+ TBMQueue Hash32 ->
112+ WebApp ()
113+ hashCheckingWorker send hashesToCheckQ = do
114+ let loop = do
115+ (hashes, closed) <- atomically (fetchChunk insertCommentBatchSize (readTBMQueue hashesToCheckQ))
116+ Debug. debugM Debug. Temp " Checking hashes chunk of size" (length hashes)
117+ unknownHashes <- PG. runTransaction $ do Q. filterForUnknownHistoryCommentHashes hashes
118+ case NESet. nonEmptySet (Set. fromList unknownHashes) of
119+ Nothing -> pure ()
120+ Just unknownHashesSet -> do
121+ void . atomically $ send $ Msg $ RequestCommentsChunk unknownHashesSet
122+ when (not closed) loop
123+ loop
124+ receiverWorker :: STM (Maybe (MsgOrError Void HistoryCommentUploaderChunk )) -> TMVar Text -> TBMQueue Hash32 -> TBMQueue (Either Sync. HistoryComment Sync. HistoryCommentRevision ) -> WebApp ()
125+ receiverWorker recv errMVar hashesToCheckQ commentsQ = do
126+ let loop = do
127+ next <- atomically do
128+ recv >>= \ case
129+ Nothing -> do
130+ closeTBMQueue hashesToCheckQ
131+ closeTBMQueue commentsQ
132+ pure (pure () )
133+ Just (DeserialiseFailure err) -> do
134+ putTMVar errMVar err
135+ pure (pure () )
136+ Just (Msg msg) -> do
137+ case msg of
138+ PossiblyNewHashesChunk hashesToCheck -> do
139+ for_ hashesToCheck $ \ h -> writeTBMQueue hashesToCheckQ h
140+ DoneSendingHashesChunk -> do
141+ closeTBMQueue hashesToCheckQ
142+ HistoryCommentChunk comment -> do
143+ writeTBMQueue commentsQ (Left comment)
144+ HistoryCommentRevisionChunk revision -> do
145+ writeTBMQueue commentsQ (Right revision)
146+ pure loop
147+ next
148+ loop
103149 insertCommentBatchSize = 100
104- handleErrInQueue :: forall o x e . Queues (MsgOrError e UploadCommentsResponse ) o -> UploadCommentsResponse -> ExceptT UploadCommentsResponse WebApp x
150+ handleErrInQueue :: forall o x e a . Queues (MsgOrError e a ) o -> e -> ExceptT e WebApp x
105151 handleErrInQueue Queues {send} e = do
106- _ <- atomically $ send $ Msg e
152+ _ <- atomically $ send $ UserErr e
107153 throwError e
0 commit comments