|
1 | 1 | module Share.Web.UCM.HistoryComments.Impl (server) where |
2 | 2 |
|
3 | | -import Conduit (ConduitT) |
| 3 | +import Control.Monad.Except |
| 4 | +import Control.Monad.Trans.Maybe |
4 | 5 | import Data.Either (partitionEithers) |
5 | | -import Data.Void |
6 | 6 | import Network.WebSockets.Connection |
7 | | -import Servant |
8 | 7 | import Share.IDs |
| 8 | +import Share.IDs qualified as IDs |
9 | 9 | import Share.Postgres qualified as PG |
| 10 | +import Share.Postgres.Queries qualified as PGQ |
| 11 | +import Share.Postgres.Users.Queries qualified as UserQ |
10 | 12 | import Share.Prelude |
11 | | -import Share.Utils.Servant.Streaming qualified as Streaming |
12 | 13 | import Share.Web.App (WebApp, WebAppServer) |
| 14 | +import Share.Web.Authentication qualified as AuthN |
| 15 | +import Share.Web.Authorization qualified as AuthZ |
13 | 16 | import Share.Web.Errors (Unimplemented (Unimplemented), reportError, respondError) |
14 | | -import Share.Web.UCM.HistoryComments.Queries (insertHistoryComments) |
15 | 17 | import Share.Web.UCM.HistoryComments.Queries qualified as Q |
16 | 18 | import Unison.Server.HistoryComments.API qualified as HistoryComments |
17 | | -import Unison.Server.HistoryComments.Types (DownloadCommentsRequest (DownloadCommentsRequest), HistoryCommentChunk (..), UploadCommentsResponse) |
| 19 | +import Unison.Server.HistoryComments.Types (DownloadCommentsRequest (DownloadCommentsRequest), HistoryCommentChunk (..), UploadCommentsResponse (..)) |
18 | 20 | import Unison.Server.Types |
19 | | -import Unison.Util.Servant.CBOR |
20 | 21 | import Unison.Util.Websockets |
21 | 22 | import UnliftIO |
22 | 23 |
|
@@ -54,23 +55,42 @@ fetchChunk size action = do |
54 | 55 | go size |
55 | 56 |
|
56 | 57 | uploadHistoryCommentsStreamImpl :: Maybe UserId -> BranchRef -> Connection -> WebApp () |
57 | | -uploadHistoryCommentsStreamImpl mayUserId branchRef conn = do |
58 | | - authZ <- error "AUTH CHECK HERE" |
59 | | - projectId <- error "Process Branch Ref" |
60 | | - result <- withQueues @HistoryCommentChunk @_ wsMessageBufferSize wsMessageBufferSize conn \Queues {receive} -> do |
61 | | - let loop :: WebApp () |
| 58 | +uploadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn = do |
| 59 | + callerUserId <- AuthN.requireAuthenticatedUser' mayCallerUserId |
| 60 | + result <- withQueues @UploadCommentsResponse @HistoryCommentChunk wsMessageBufferSize wsMessageBufferSize conn \q@(Queues {receive}) -> runExceptT $ do |
| 61 | + projectBranchSH@ProjectBranchShortHand {userHandle, projectSlug, contributorHandle} <- case IDs.fromText @ProjectBranchShortHand branchRef of |
| 62 | + Left err -> handleErrInQueue q (UploadCommentsGenericFailure $ IDs.toText err) |
| 63 | + Right pbsh -> pure pbsh |
| 64 | + let projectSH = ProjectShortHand {userHandle, projectSlug} |
| 65 | + mayInfo <- runMaybeT $ mapMaybeT PG.runTransaction $ do |
| 66 | + project <- MaybeT $ PGQ.projectByShortHand projectSH |
| 67 | + branch <- MaybeT $ PGQ.branchByProjectBranchShortHand projectBranchSH |
| 68 | + contributorUser <- MaybeT $ for contributorHandle UserQ.userByHandle |
| 69 | + pure (project, branch, contributorUser) |
| 70 | + (project, branch, contributorUser) <- maybe (handleErrInQueue q $ UploadCommentsProjectBranchNotFound br) pure $ mayInfo |
| 71 | + authZ <- |
| 72 | + lift (AuthZ.checkUploadToProjectBranchCodebase callerUserId project.projectId contributorUser.user_id) >>= \case |
| 73 | + Left _authErr -> handleErrInQueue q (UploadCommentsNotAuthorized br) |
| 74 | + Right authZ -> pure authZ |
| 75 | + projectId <- error "Process Branch Ref" |
| 76 | + let loop :: ExceptT UploadCommentsResponse WebApp () |
62 | 77 | loop = do |
63 | 78 | (chunk, closed) <- atomically $ fetchChunk insertCommentBatchSize do |
64 | 79 | receive <&> fmap \case |
65 | | - HistoryCommentErrorChunk err -> (Left err) |
| 80 | + HistoryCommentErrorChunk err -> (Left $ UploadCommentsGenericFailure err) |
66 | 81 | chunk -> (Right chunk) |
67 | 82 | let (errs, chunks) = partitionEithers chunk |
68 | | - for_ errs reportError |
69 | 83 | PG.runTransaction $ Q.insertHistoryComments authZ projectId chunks |
| 84 | + for errs $ \err -> handleErrInQueue q err |
70 | 85 | when (not closed) loop |
71 | 86 | loop |
72 | 87 | case result of |
73 | 88 | Left err -> reportError err |
74 | | - Right () -> pure () |
| 89 | + Right (Left err) -> reportError err |
| 90 | + Right (Right ()) -> pure () |
75 | 91 | where |
76 | 92 | insertCommentBatchSize = 100 |
| 93 | + handleErrInQueue :: forall o x. Queues UploadCommentsResponse o -> UploadCommentsResponse -> ExceptT UploadCommentsResponse WebApp x |
| 94 | + handleErrInQueue Queues {send} e = do |
| 95 | + _ <- atomically $ send e |
| 96 | + throwError e |
0 commit comments