11module Share.Web.UCM.HistoryComments.Impl (server ) where
22
33import Conduit (ConduitT )
4+ import Data.Either (partitionEithers )
45import Data.Void
56import Network.WebSockets.Connection
67import Servant
78import Share.IDs
9+ import Share.Postgres qualified as PG
10+ import Share.Prelude
811import Share.Utils.Servant.Streaming qualified as Streaming
912import Share.Web.App (WebApp , WebAppServer )
10- import Share.Web.Errors (Unimplemented (Unimplemented ), respondError )
13+ import Share.Web.Errors (Unimplemented (Unimplemented ), reportError , respondError )
14+ import Share.Web.UCM.HistoryComments.Queries (insertHistoryComments )
15+ import Share.Web.UCM.HistoryComments.Queries qualified as Q
1116import Unison.Server.HistoryComments.API qualified as HistoryComments
12- import Unison.Server.HistoryComments.Types (DownloadCommentsRequest (DownloadCommentsRequest ), HistoryCommentChunk , UploadCommentsResponse )
17+ import Unison.Server.HistoryComments.Types (DownloadCommentsRequest (DownloadCommentsRequest ), HistoryCommentChunk (.. ), UploadCommentsResponse )
18+ import Unison.Server.Types
1319import Unison.Util.Servant.CBOR
1420import Unison.Util.Websockets
21+ import UnliftIO
1522
1623server :: Maybe UserId -> HistoryComments. Routes WebAppServer
1724server mayCaller =
@@ -32,7 +39,7 @@ downloadHistoryCommentsStreamImpl mayUserId (DownloadCommentsRequest {}) = do
3239-- If the action returns Nothing, stop early and return what has been collected so far, along with a Bool indicating whether the action was exhausted.
3340fetchChunk :: Int -> STM (Maybe a ) -> STM ([a ], Bool )
3441fetchChunk size action = do
35- let go 0 = pure []
42+ let go 0 = pure ( [] , False )
3643 go n = do
3744 optional action >>= \ case
3845 Nothing -> do
@@ -50,14 +57,20 @@ uploadHistoryCommentsStreamImpl :: Maybe UserId -> BranchRef -> Connection -> We
5057uploadHistoryCommentsStreamImpl mayUserId branchRef conn = do
5158 authZ <- error " AUTH CHECK HERE"
5259 projectId <- error " Process Branch Ref"
53- withQueues @ HistoryCommentChunk @ _ wsMessageBufferSize wsMessageBufferSize conn \ Queues {receive} -> do
54- errVar <- newEmptyMVar
55- let loop = do
56- chunk <- fetchChunk insertCommentBatchSize do
60+ result <- withQueues @ HistoryCommentChunk @ _ wsMessageBufferSize wsMessageBufferSize conn \ Queues {receive} -> do
61+ let loop :: WebApp ()
62+ loop = do
63+ ( chunk, closed) <- atomically $ fetchChunk insertCommentBatchSize do
5764 receive <&> fmap \ case
58- HistoryCommentErrorChunk err -> Just (Left err)
59- chunk -> Just (Right chunk)
60- PG. runTransaction $ Q. insertHistoryComments authZ projectId chunk
61- loop
62- where
63- insertCommentBatchSize = 100
65+ HistoryCommentErrorChunk err -> (Left err)
66+ chunk -> (Right chunk)
67+ let (errs, chunks) = partitionEithers chunk
68+ for_ errs reportError
69+ PG. runTransaction $ Q. insertHistoryComments authZ projectId chunks
70+ when (not closed) loop
71+ loop
72+ case result of
73+ Left err -> reportError err
74+ Right () -> pure ()
75+ where
76+ insertCommentBatchSize = 100
0 commit comments