@@ -87,6 +87,7 @@ uploadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn =
8787 _receiverThread <- lift $ Ki. fork scope $ receiverWorker receive errMVar hashesToCheckQ commentsQ
8888 inserterThread <- lift $ Ki. fork scope $ inserterWorker authZ commentsQ project. projectId
8989 _hashCheckingThread <- lift $ Ki. fork scope $ hashCheckingWorker send hashesToCheckQ
90+ Debug. debugLogM Debug. Temp " Upload history comments: waiting for inserter thread to finish"
9091 -- The inserter thread will finish when the client closes the connection.
9192 atomically $ Ki. await inserterThread
9293 case result of
@@ -102,9 +103,12 @@ uploadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn =
102103 inserterWorker authZ commentsQ projectId = do
103104 let loop = do
104105 (chunk, closed) <- atomically (fetchChunk insertCommentBatchSize (readTBMQueue commentsQ))
105- PG. runTransaction $ Q. insertHistoryComments authZ projectId chunk
106+ PG. whenNonEmpty chunk do
107+ Debug. debugM Debug. Temp " Inserting comments chunk of size" (length chunk)
108+ PG. runTransaction $ Q. insertHistoryComments authZ projectId chunk
106109 when (not closed) loop
107110 loop
111+ Debug. debugLogM Debug. Temp " Inserter worker finished"
108112
109113 hashCheckingWorker ::
110114 (MsgOrError err HistoryCommentDownloaderChunk -> STM Bool ) ->
@@ -114,13 +118,16 @@ uploadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn =
114118 let loop = do
115119 (hashes, closed) <- atomically (fetchChunk insertCommentBatchSize (readTBMQueue hashesToCheckQ))
116120 Debug. debugM Debug. Temp " Checking hashes chunk of size" (length hashes)
117- unknownHashes <- PG. runTransaction $ do Q. filterForUnknownHistoryCommentHashes hashes
118- case NESet. nonEmptySet (Set. fromList unknownHashes) of
119- Nothing -> pure ()
120- Just unknownHashesSet -> do
121- void . atomically $ send $ Msg $ RequestCommentsChunk unknownHashesSet
121+ PG. whenNonEmpty hashes $ do
122+ unknownHashes <- PG. runTransaction $ do Q. filterForUnknownHistoryCommentHashes hashes
123+ case NESet. nonEmptySet (Set. fromList unknownHashes) of
124+ Nothing -> pure ()
125+ Just unknownHashesSet -> do
126+ void . atomically $ send $ Msg $ RequestCommentsChunk unknownHashesSet
122127 when (not closed) loop
123128 loop
129+ void . atomically $ send $ Msg $ DoneCheckingHashesChunk
130+ Debug. debugLogM Debug. Temp " Hash checking worker finished"
124131 receiverWorker :: STM (Maybe (MsgOrError Void HistoryCommentUploaderChunk )) -> TMVar Text -> TBMQueue Hash32 -> TBMQueue (Either Sync. HistoryComment Sync. HistoryCommentRevision ) -> WebApp ()
125132 receiverWorker recv errMVar hashesToCheckQ commentsQ = do
126133 let loop = do
@@ -146,6 +153,7 @@ uploadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn =
146153 pure loop
147154 next
148155 loop
156+ Debug. debugLogM Debug. Temp " Receiver worker finished"
149157 insertCommentBatchSize = 100
150158 handleErrInQueue :: forall o x e a . Queues (MsgOrError e a ) o -> e -> ExceptT e WebApp x
151159 handleErrInQueue Queues {send} e = do
0 commit comments