From 327039f978a76a633959d3b0157ca20cee70fe7d Mon Sep 17 00:00:00 2001 From: Chris Penner Date: Thu, 1 Aug 2024 14:33:41 -0700 Subject: [PATCH 01/20] Add CBOR media type --- package.yaml | 2 ++ share-api.cabal | 6 +++++ src/Share/Utils/Servant/CBOR.hs | 44 +++++++++++++++++++++++++++++++++ 3 files changed, 52 insertions(+) create mode 100644 src/Share/Utils/Servant/CBOR.hs diff --git a/package.yaml b/package.yaml index 5517b9e1..9ad47343 100644 --- a/package.yaml +++ b/package.yaml @@ -44,6 +44,7 @@ dependencies: - bytestring - bytes - case-insensitive +- cborg - clock - containers - cookie @@ -94,6 +95,7 @@ dependencies: - servant-auth - servant-client - servant-server +- serialise - stm - text - these diff --git a/share-api.cabal b/share-api.cabal index 4ecdde11..d61db621 100644 --- a/share-api.cabal +++ b/share-api.cabal @@ -94,6 +94,7 @@ library Share.Utils.Logging.Types Share.Utils.Postgres Share.Utils.Servant + Share.Utils.Servant.CBOR Share.Utils.Servant.Client Share.Utils.Servant.PathInfo Share.Utils.Servant.RawRequest @@ -152,6 +153,7 @@ library Share.Web.UCM.Sync.HashJWT Share.Web.UCM.Sync.Impl Share.Web.UCM.Sync.Types + Share.Web.UCM.SyncStream.API Unison.PrettyPrintEnvDecl.Postgres Unison.Server.NameSearch.Postgres Unison.Server.Share.Definitions @@ -208,6 +210,7 @@ library , bytes , bytestring , case-insensitive + , cborg , clock , containers , cookie @@ -252,6 +255,7 @@ library , raven-haskell , safe , semialign + , serialise , servant , servant-auth , servant-client @@ -351,6 +355,7 @@ executable share-api , bytes , bytestring , case-insensitive + , cborg , clock , containers , cookie @@ -395,6 +400,7 @@ executable share-api , raven-haskell , safe , semialign + , serialise , servant , servant-auth , servant-client diff --git a/src/Share/Utils/Servant/CBOR.hs b/src/Share/Utils/Servant/CBOR.hs new file mode 100644 index 00000000..0cf66ee5 --- /dev/null +++ b/src/Share/Utils/Servant/CBOR.hs @@ -0,0 +1,44 @@ +-- | Servant configuration for the CBOR media type +-- +-- Adapted from https://hackage.haskell.org/package/servant-serialization-0.3/docs/Servant-API-ContentTypes-SerialiseCBOR.html via MIT license +module Share.Utils.Servant.CBOR (CBOR) where + +import Codec.CBOR.Read (DeserialiseFailure (..)) +import Codec.Serialise (Serialise, deserialiseOrFail, serialise) +import Data.List.NonEmpty qualified as NonEmpty +import Network.HTTP.Media.MediaType qualified as MediaType +import Servant + +-- | Content-type for encoding and decoding objects as their CBOR representations +data CBOR + +-- | Mime-type for CBOR and additional ones using the word "hackage" and the +-- name of the package "serialise". +instance Accept CBOR where + contentTypes Proxy = + NonEmpty.singleton ("application" MediaType.// "cbor") + +-- | +-- +-- >>> mimeRender (Proxy :: Proxy CBOR) ("Hello" :: String) +-- "eHello" +instance (Serialise a) => MimeRender CBOR a where + mimeRender Proxy = serialise + +-- | +-- +-- >>> let bsl = mimeRender (Proxy :: Proxy CBOR) (3.14 :: Float) +-- >>> mimeUnrender (Proxy :: Proxy CBOR) bsl :: Either String Float +-- Right 3.14 +-- +-- >>> mimeUnrender (Proxy :: Proxy CBOR) (bsl <> "trailing garbage") :: Either String Float +-- Right 3.14 +-- +-- >>> mimeUnrender (Proxy :: Proxy CBOR) ("preceding garbage" <> bsl) :: Either String Float +-- Left "Codec.Serialise.deserialiseOrFail: expected float at byte-offset 0" +instance (Serialise a) => MimeUnrender CBOR a where + mimeUnrender Proxy = mapLeft prettyErr . deserialiseOrFail + where + mapLeft f = either (Left . f) Right + prettyErr (DeserialiseFailure offset err) = + "Codec.Serialise.deserialiseOrFail: " ++ err ++ " at byte-offset " ++ show offset From 8487d19f6505a149b3ecda4840f1f651ce01799b Mon Sep 17 00:00:00 2001 From: Chris Penner Date: Thu, 1 Aug 2024 14:33:41 -0700 Subject: [PATCH 02/20] Install cbor and make a basic streaming endpoint --- package.yaml | 2 ++ share-api.cabal | 4 ++++ src/Share/Web/API.hs | 2 ++ src/Share/Web/Impl.hs | 2 ++ src/Share/Web/UCM/SyncStream/API.hs | 27 +++++++++++++++++++++++++++ 5 files changed, 37 insertions(+) create mode 100644 src/Share/Web/UCM/SyncStream/API.hs diff --git a/package.yaml b/package.yaml index 9ad47343..8f568644 100644 --- a/package.yaml +++ b/package.yaml @@ -46,6 +46,7 @@ dependencies: - case-insensitive - cborg - clock +- conduit - containers - cookie - cryptonite @@ -95,6 +96,7 @@ dependencies: - servant-auth - servant-client - servant-server +- servant-conduit - serialise - stm - text diff --git a/share-api.cabal b/share-api.cabal index d61db621..6ef8b5d5 100644 --- a/share-api.cabal +++ b/share-api.cabal @@ -212,6 +212,7 @@ library , case-insensitive , cborg , clock + , conduit , containers , cookie , cryptonite @@ -259,6 +260,7 @@ library , servant , servant-auth , servant-client + , servant-conduit , servant-server , share-auth , share-utils @@ -357,6 +359,7 @@ executable share-api , case-insensitive , cborg , clock + , conduit , containers , cookie , cryptonite @@ -404,6 +407,7 @@ executable share-api , servant , servant-auth , servant-client + , servant-conduit , servant-server , share-api , share-auth diff --git a/src/Share/Web/API.hs b/src/Share/Web/API.hs index d188e3e9..8ed0450d 100644 --- a/src/Share/Web/API.hs +++ b/src/Share/Web/API.hs @@ -13,6 +13,7 @@ import Share.Web.Share.API qualified as Share import Share.Web.Share.Projects.API qualified as Projects import Share.Web.Support.API qualified as Support import Share.Web.Types +import Share.Web.UCM.SyncStream.API qualified as SyncStream import Unison.Share.API.Projects qualified as UCMProjects import Unison.Sync.API qualified as Unison.Sync @@ -37,6 +38,7 @@ type API = :<|> ("ucm" :> "v1" :> "sync" :> MaybeAuthenticatedSession :> Unison.Sync.API) :<|> ("ucm" :> "v1" :> "projects" :> MaybeAuthenticatedSession :> UCMProjects.ProjectsAPI) :<|> ("admin" :> Admin.API) + :<|> ("sync-stream" :> SyncStream.API) api :: Proxy API api = Proxy @API diff --git a/src/Share/Web/Impl.hs b/src/Share/Web/Impl.hs index 72284240..63602647 100644 --- a/src/Share/Web/Impl.hs +++ b/src/Share/Web/Impl.hs @@ -24,6 +24,7 @@ import Share.Web.Support.Impl qualified as Support import Share.Web.Types import Share.Web.UCM.Projects.Impl qualified as UCMProjects import Share.Web.UCM.Sync.Impl qualified as Sync +import Share.Web.UCM.SyncStream.API qualified as SyncStream discoveryEndpoint :: WebApp DiscoveryDocument discoveryEndpoint = do @@ -73,3 +74,4 @@ server = :<|> Sync.server :<|> UCMProjects.server :<|> Admin.server + :<|> SyncStream.server diff --git a/src/Share/Web/UCM/SyncStream/API.hs b/src/Share/Web/UCM/SyncStream/API.hs new file mode 100644 index 00000000..b83e0fa6 --- /dev/null +++ b/src/Share/Web/UCM/SyncStream/API.hs @@ -0,0 +1,27 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE TypeOperators #-} + +module Share.Web.UCM.SyncStream.API (API, server) where + +import Conduit +import Servant +import Servant.Conduit (ConduitToSourceIO (..)) +import Share.Prelude +import Share.Utils.Servant.CBOR (CBOR) +import Share.Web.App + +type API = "download-causal" :> DownloadCausalStreamEndpoint + +type DownloadCausalStreamEndpoint = + QueryParam "name" Text + :> StreamGet NewlineFraming CBOR (SourceIO Text) + +server :: ServerT API WebApp +server = + downloadCausalStreamEndpointConduit + +downloadCausalStreamEndpointConduit :: Maybe Text -> WebApp (SourceIO Text) +downloadCausalStreamEndpointConduit name = pure . conduitToSourceIO @IO $ do + yield "hello" + yield (fromMaybe "mystery" name) + yield "world" From 375492298dc631d43d479955e07bd1ab789eb177 Mon Sep 17 00:00:00 2001 From: Chris Penner Date: Thu, 1 Aug 2024 17:58:38 -0700 Subject: [PATCH 03/20] Stream hashes from PG --- share-api.cabal | 1 + src/Share/Web/UCM/SyncStream/API.hs | 51 ++++++- src/Share/Web/UCM/SyncStream/Queries.hs | 176 ++++++++++++++++++++++++ 3 files changed, 222 insertions(+), 6 deletions(-) create mode 100644 src/Share/Web/UCM/SyncStream/Queries.hs diff --git a/share-api.cabal b/share-api.cabal index 6ef8b5d5..31f97ede 100644 --- a/share-api.cabal +++ b/share-api.cabal @@ -154,6 +154,7 @@ library Share.Web.UCM.Sync.Impl Share.Web.UCM.Sync.Types Share.Web.UCM.SyncStream.API + Share.Web.UCM.SyncStream.Queries Unison.PrettyPrintEnvDecl.Postgres Unison.Server.NameSearch.Postgres Unison.Server.Share.Definitions diff --git a/src/Share/Web/UCM/SyncStream/API.hs b/src/Share/Web/UCM/SyncStream/API.hs index b83e0fa6..8d82f10f 100644 --- a/src/Share/Web/UCM/SyncStream/API.hs +++ b/src/Share/Web/UCM/SyncStream/API.hs @@ -4,24 +4,63 @@ module Share.Web.UCM.SyncStream.API (API, server) where import Conduit +import Control.Concurrent.STM qualified as STM import Servant import Servant.Conduit (ConduitToSourceIO (..)) +import Share.Codebase qualified as Codebase +import Share.IDs (UserId) +import Share.OAuth.Session (AuthenticatedUserId) +import Share.Postgres qualified as PG +import Share.Postgres.Causal.Queries qualified as CausalQ +import Share.Postgres.Cursors qualified as Cursor +import Share.Postgres.IDs (CausalHash) import Share.Prelude +import Share.Utils.Servant (RequiredQueryParam) import Share.Utils.Servant.CBOR (CBOR) import Share.Web.App +import Share.Web.Authorization qualified as AuthZ +import Share.Web.UCM.SyncStream.Queries qualified as SSQ +import UnliftIO.Async qualified as Async type API = "download-causal" :> DownloadCausalStreamEndpoint type DownloadCausalStreamEndpoint = - QueryParam "name" Text + AuthenticatedUserId + :> RequiredQueryParam "causalHash" CausalHash :> StreamGet NewlineFraming CBOR (SourceIO Text) server :: ServerT API WebApp server = downloadCausalStreamEndpointConduit -downloadCausalStreamEndpointConduit :: Maybe Text -> WebApp (SourceIO Text) -downloadCausalStreamEndpointConduit name = pure . conduitToSourceIO @IO $ do - yield "hello" - yield (fromMaybe "mystery" name) - yield "world" +downloadCausalStreamEndpointConduit :: UserId -> CausalHash -> WebApp (SourceIO Text) +downloadCausalStreamEndpointConduit callerUserId causalHash = do + let authZReceipt = AuthZ.adminOverride + let codebaseLoc = Codebase.codebaseLocationForUserCodebase callerUserId + let codebase = Codebase.codebaseEnv authZReceipt codebaseLoc + q <- liftIO $ STM.newTBQueueIO 10 + let streamResults = do + Codebase.runCodebaseTransaction codebase $ do + (_bhId, causalId) <- CausalQ.expectCausalIdsOf id causalHash + cursor <- SSQ.allHashDependenciesOfCausalCursor causalId + Cursor.foldBatched cursor 1000 \batch -> do + PG.transactionUnsafeIO $ STM.atomically $ STM.writeTBQueue q batch + Async.withAsync streamResults \async -> do + pure $ conduitToSourceIO @IO (stream q async) + where + stream :: STM.TBQueue (NonEmpty Text) -> Async.Async () -> ConduitT () Text IO () + stream q async = do + let loop :: ConduitT () Text IO () + loop = do + next <- liftIO . STM.atomically $ do + STM.tryReadTBQueue q >>= \case + Nothing -> do + Async.waitSTM async $> Nothing + Just batch -> do + pure $ Just batch + case next of + Nothing -> pure () + Just batch -> do + yieldMany batch + loop + loop diff --git a/src/Share/Web/UCM/SyncStream/Queries.hs b/src/Share/Web/UCM/SyncStream/Queries.hs new file mode 100644 index 00000000..317223ed --- /dev/null +++ b/src/Share/Web/UCM/SyncStream/Queries.hs @@ -0,0 +1,176 @@ +module Share.Web.UCM.SyncStream.Queries (allHashDependenciesOfCausalCursor) where + +import Control.Monad.Reader +import Share.Codebase (CodebaseM, codebaseOwner) +import Share.Postgres +import Share.Postgres.Cursors (PGCursor) +import Share.Postgres.Cursors qualified as PGCursor +import Share.Postgres.IDs +import Share.Prelude + +allHashDependenciesOfCausalCursor :: CausalId -> CodebaseM e (PGCursor Text) +allHashDependenciesOfCausalCursor cid = do + ownerUserId <- asks codebaseOwner + PGCursor.newColCursor + "causal_dependencies" + [sql| + WITH RECURSIVE transitive_causals(causal_id, causal_namespace_hash_id) AS ( + SELECT causal.id, causal.namespace_hash_id + FROM causals causal + WHERE causal.id = #{cid} + -- AND NOT EXISTS (SELECT FROM causal_ownership co WHERE co.user_id = to_codebase_user_id AND co.causal_id = causal.id) + AND EXISTS (SELECT FROM causal_ownership co WHERE co.user_id = #{ownerUserId} AND co.causal_id = causal.id) + UNION + -- This nested CTE is required because RECURSIVE CTEs can't refer + -- to the recursive table more than once. + ( WITH rec AS ( + SELECT causal_id, causal_namespace_hash_id + FROM transitive_causals tc + ) + SELECT ancestor_causal.id, ancestor_causal.namespace_hash_id + FROM causal_ancestors ca + JOIN rec tc ON ca.causal_id = tc.causal_id + JOIN causals ancestor_causal ON ca.ancestor_id = ancestor_causal.id + -- WHERE NOT EXISTS (SELECT FROM causal_ownership co WHERE co.user_id = to_codebase_user_id AND co.causal_id = ancestor_causal.id) + UNION + SELECT child_causal.id, child_causal.namespace_hash_id + FROM rec tc + JOIN namespace_children nc ON tc.causal_namespace_hash_id = nc.parent_namespace_hash_id + JOIN causals child_causal ON nc.child_causal_id = child_causal.id + -- WHERE NOT EXISTS (SELECT FROM causal_ownership co WHERE co.user_id = to_codebase_user_id AND co.causal_id = child_causal.id) + ) + ), all_namespaces(namespace_hash_id) AS ( + SELECT DISTINCT causal_namespace_hash_id AS namespace_hash_id + FROM transitive_causals + -- WHERE NOT EXISTS (SELECT FROM namespace_ownership no WHERE no.user_id = to_codebase_user_id AND no.namespace_hash_id = causal_namespace_hash_id) + ), all_patches(patch_id) AS ( + SELECT DISTINCT patch.id + FROM all_namespaces an + JOIN namespace_patches np ON an.namespace_hash_id = np.namespace_hash_id + JOIN patches patch ON np.patch_id = patch.id + -- WHERE NOT EXISTS (SELECT FROM patch_ownership po WHERE po.user_id = to_codebase_user_id AND po.patch_id = patch.id) + ), + -- term components to start transitively joining dependencies to + base_term_components(component_hash_id) AS ( + SELECT DISTINCT term.component_hash_id + FROM all_namespaces an + JOIN namespace_terms nt ON an.namespace_hash_id = nt.namespace_hash_id + JOIN terms term ON nt.term_id = term.id + -- WHERE NOT EXISTS (SELECT FROM sandboxed_terms st WHERE st.user_id = to_codebase_user_id AND st.term_id = term.id) + UNION + SELECT DISTINCT term.component_hash_id + FROM all_patches ap + JOIN patch_term_mappings ptm ON ap.patch_id = ptm.patch_id + JOIN terms term ON ptm.to_term_id = term.id + -- WHERE NOT EXISTS (SELECT FROM sandboxed_terms st WHERE st.user_id = to_codebase_user_id AND st.term_id = term.id) + UNION + -- term metadata + SELECT DISTINCT term.component_hash_id + FROM all_namespaces an + JOIN namespace_terms nt ON an.namespace_hash_id = nt.namespace_hash_id + JOIN namespace_term_metadata meta ON nt.id = meta.named_term + JOIN terms term ON meta.metadata_term_id = term.id + -- WHERE NOT EXISTS (SELECT FROM sandboxed_terms st WHERE st.user_id = to_codebase_user_id AND st.term_id = term.id) + UNION + -- type metadata + SELECT DISTINCT term.component_hash_id + FROM all_namespaces an + JOIN namespace_types nt ON an.namespace_hash_id = nt.namespace_hash_id + JOIN namespace_type_metadata meta ON nt.id = meta.named_type + JOIN terms term ON meta.metadata_term_id = term.id + -- WHERE NOT EXISTS (SELECT FROM sandboxed_terms st WHERE st.user_id = to_codebase_user_id AND st.term_id = term.id) + ), + -- type components to start transitively joining dependencies to + base_type_components(component_hash_id) AS ( + SELECT DISTINCT typ.component_hash_id + FROM all_namespaces an + JOIN namespace_types nt ON an.namespace_hash_id = nt.namespace_hash_id + JOIN types typ ON nt.type_id = typ.id + -- WHERE NOT EXISTS (SELECT FROM sandboxed_types st WHERE st.user_id = to_codebase_user_id AND st.type_id = typ.id) + UNION + SELECT DISTINCT typ.component_hash_id + FROM all_namespaces an + JOIN namespace_terms nt ON an.namespace_hash_id = nt.namespace_hash_id + JOIN constructors con ON nt.constructor_id = con.id + JOIN types typ ON con.type_id = typ.id + -- WHERE NOT EXISTS (SELECT FROM sandboxed_types st WHERE st.user_id = to_codebase_user_id AND st.type_id = typ.id) + UNION + SELECT DISTINCT typ.component_hash_id + FROM all_patches ap + JOIN patch_type_mappings ptm ON ap.patch_id = ptm.patch_id + JOIN types typ ON ptm.to_type_id = typ.id + -- WHERE NOT EXISTS (SELECT FROM sandboxed_types st WHERE st.user_id = to_codebase_user_id AND st.type_id = typ.id) + UNION + SELECT DISTINCT typ.component_hash_id + FROM all_patches ap + JOIN patch_constructor_mappings pcm ON ap.patch_id = pcm.patch_id + JOIN constructors con ON pcm.to_constructor_id = con.id + JOIN types typ ON con.type_id = typ.id + -- WHERE NOT EXISTS (SELECT FROM sandboxed_types st WHERE st.user_id = to_codebase_user_id AND st.type_id = typ.id) + ), + -- All the dependencies we join in transitively from the known term & type components we depend on. + -- Unfortunately it's not possible to know which hashes are terms vs types :'( + transitive_components(component_hash_id) AS ( + SELECT DISTINCT btc.component_hash_id + FROM base_term_components btc + UNION + SELECT DISTINCT btc.component_hash_id + FROM base_type_components btc + UNION + ( WITH rec AS ( + SELECT component_hash_id + FROM transitive_components tc + ) + -- recursively union in term dependencies + SELECT DISTINCT ref.component_hash_id + FROM rec atc + -- This joins in ALL the terms from the component, not just the one that caused the dependency on the + -- component + JOIN terms term ON atc.component_hash_id = term.component_hash_id + JOIN term_local_component_references ref ON term.id = ref.term_id + UNION + -- recursively union in type dependencies + SELECT DISTINCT ref.component_hash_id + FROM rec atc + -- This joins in ALL the types from the component, not just the one that caused the dependency on the + -- component + JOIN types typ ON atc.component_hash_id = typ.component_hash_id + JOIN type_local_component_references ref ON typ.id = ref.type_id + ) + ), copied_causals(causal_id) AS ( + SELECT DISTINCT tc.causal_id + FROM transitive_causals tc + ), copied_namespaces(namespace_hash_id) AS ( + SELECT DISTINCT an.namespace_hash_id + FROM all_namespaces an + ), copied_patches(patch_id) AS ( + SELECT DISTINCT ap.patch_id + FROM all_patches ap + ), copied_term_components AS ( + SELECT DISTINCT term.id, copy.bytes_id + FROM transitive_components tc + JOIN terms term ON tc.component_hash_id = term.component_hash_id + JOIN sandboxed_terms copy ON term.id = copy.term_id + WHERE copy.user_id = #{ownerUserId} + ), copied_type_components AS ( + SELECT DISTINCT typ.id, copy.bytes_id + FROM transitive_components tc + JOIN types typ ON tc.component_hash_id = typ.component_hash_id + JOIN sandboxed_types copy ON typ.id = copy.type_id + WHERE copy.user_id = #{ownerUserId} + ) SELECT causal.hash + FROM copied_causals cc + JOIN causals causal ON cc.causal_id = causal.id + UNION ALL + SELECT branch_hashes.base32 + FROM copied_namespaces cn + JOIN branch_hashes ON cn.namespace_hash_id = branch_hashes.id + UNION ALL + SELECT patch.hash + FROM copied_patches cp + JOIN patches patch ON cp.patch_id = patch.id + UNION ALL + SELECT component_hashes.base32 + FROM transitive_components tc + JOIN component_hashes ON tc.component_hash_id = component_hashes.id + |] From 6d69b4152d9506ab4c24c97b09f524a1e65c22ba Mon Sep 17 00:00:00 2001 From: Chris Penner Date: Thu, 1 Aug 2024 18:14:28 -0700 Subject: [PATCH 04/20] Wire up test stream and get it working with conduit --- src/Share/Web/UCM/SyncStream/API.hs | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/src/Share/Web/UCM/SyncStream/API.hs b/src/Share/Web/UCM/SyncStream/API.hs index 8d82f10f..b7d8eff7 100644 --- a/src/Share/Web/UCM/SyncStream/API.hs +++ b/src/Share/Web/UCM/SyncStream/API.hs @@ -4,6 +4,7 @@ module Share.Web.UCM.SyncStream.API (API, server) where import Conduit +import Control.Concurrent (threadDelay) import Control.Concurrent.STM qualified as STM import Servant import Servant.Conduit (ConduitToSourceIO (..)) @@ -20,6 +21,7 @@ import Share.Utils.Servant.CBOR (CBOR) import Share.Web.App import Share.Web.Authorization qualified as AuthZ import Share.Web.UCM.SyncStream.Queries qualified as SSQ +import UnliftIO qualified import UnliftIO.Async qualified as Async type API = "download-causal" :> DownloadCausalStreamEndpoint @@ -39,14 +41,15 @@ downloadCausalStreamEndpointConduit callerUserId causalHash = do let codebaseLoc = Codebase.codebaseLocationForUserCodebase callerUserId let codebase = Codebase.codebaseEnv authZReceipt codebaseLoc q <- liftIO $ STM.newTBQueueIO 10 - let streamResults = do - Codebase.runCodebaseTransaction codebase $ do - (_bhId, causalId) <- CausalQ.expectCausalIdsOf id causalHash - cursor <- SSQ.allHashDependenciesOfCausalCursor causalId - Cursor.foldBatched cursor 1000 \batch -> do - PG.transactionUnsafeIO $ STM.atomically $ STM.writeTBQueue q batch - Async.withAsync streamResults \async -> do - pure $ conduitToSourceIO @IO (stream q async) + streamResults <- UnliftIO.toIO do + Codebase.runCodebaseTransaction codebase $ do + (_bhId, causalId) <- CausalQ.expectCausalIdsOf id causalHash + cursor <- SSQ.allHashDependenciesOfCausalCursor causalId + Cursor.foldBatched cursor 1000 \batch -> do + PG.transactionUnsafeIO $ STM.atomically $ STM.writeTBQueue q batch + pure $ conduitToSourceIO do + handle <- liftIO $ Async.async streamResults + stream q handle where stream :: STM.TBQueue (NonEmpty Text) -> Async.Async () -> ConduitT () Text IO () stream q async = do @@ -62,5 +65,6 @@ downloadCausalStreamEndpointConduit callerUserId causalHash = do Nothing -> pure () Just batch -> do yieldMany batch + liftIO $ threadDelay 1000000 loop loop From 7e12608a2af920555043ee8470ab757585475a58 Mon Sep 17 00:00:00 2001 From: Chris Penner Date: Fri, 2 Aug 2024 14:49:11 -0700 Subject: [PATCH 05/20] Use Servant's Named Routes API --- src/Share/Web/App.hs | 4 ++++ src/Share/Web/UCM/SyncStream/API.hs | 17 ++++++++++++----- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/src/Share/Web/App.hs b/src/Share/Web/App.hs index bccf7644..f3aa70ee 100644 --- a/src/Share/Web/App.hs +++ b/src/Share/Web/App.hs @@ -3,6 +3,7 @@ module Share.Web.App ( RequestCtx (..), WebApp, + WebAppServer, ReqTagsVar, localRequestCtx, withLocalTag, @@ -23,6 +24,7 @@ import Control.Monad.Reader import Data.Map qualified as Map import Network.URI import Servant +import Servant.Server.Generic (AsServerT) import Share.App import Share.Env import Share.Env qualified as Env @@ -34,6 +36,8 @@ import UnliftIO.STM type WebApp = AppM RequestCtx +type WebAppServer = AsServerT WebApp + type ReqTagsVar = TVar (Map Text Text) -- | Context which is local to a single request. diff --git a/src/Share/Web/UCM/SyncStream/API.hs b/src/Share/Web/UCM/SyncStream/API.hs index b7d8eff7..10ac8b82 100644 --- a/src/Share/Web/UCM/SyncStream/API.hs +++ b/src/Share/Web/UCM/SyncStream/API.hs @@ -24,19 +24,26 @@ import Share.Web.UCM.SyncStream.Queries qualified as SSQ import UnliftIO qualified import UnliftIO.Async qualified as Async -type API = "download-causal" :> DownloadCausalStreamEndpoint +type API = NamedRoutes Routes type DownloadCausalStreamEndpoint = AuthenticatedUserId :> RequiredQueryParam "causalHash" CausalHash :> StreamGet NewlineFraming CBOR (SourceIO Text) -server :: ServerT API WebApp +data Routes mode = Routes + { downloadCausalStreamEndpointConduit :: mode :- "download-causal" :> DownloadCausalStreamEndpoint + } + deriving stock (Generic) + +server :: Routes WebAppServer server = - downloadCausalStreamEndpointConduit + Routes + { downloadCausalStreamEndpointConduit = downloadCausalStreamEndpointConduitImpl + } -downloadCausalStreamEndpointConduit :: UserId -> CausalHash -> WebApp (SourceIO Text) -downloadCausalStreamEndpointConduit callerUserId causalHash = do +downloadCausalStreamEndpointConduitImpl :: UserId -> CausalHash -> WebApp (SourceIO Text) +downloadCausalStreamEndpointConduitImpl callerUserId causalHash = do let authZReceipt = AuthZ.adminOverride let codebaseLoc = Codebase.codebaseLocationForUserCodebase callerUserId let codebase = Codebase.codebaseEnv authZReceipt codebaseLoc From 854629f967b0fedb51841eb5a2372bf8dff7728f Mon Sep 17 00:00:00 2001 From: Chris Penner Date: Sat, 3 Aug 2024 00:17:20 -0700 Subject: [PATCH 06/20] Add serialized entity tables and add code to write to them. --- sql/2024-09-00-00_sync_v2.sql | 36 +++++ .../Search/DefinitionSearch/Queries.hs | 5 +- src/Share/Postgres/Sync/Queries.hs | 134 ++++++++++++++++-- 3 files changed, 162 insertions(+), 13 deletions(-) create mode 100644 sql/2024-09-00-00_sync_v2.sql diff --git a/sql/2024-09-00-00_sync_v2.sql b/sql/2024-09-00-00_sync_v2.sql new file mode 100644 index 00000000..85825551 --- /dev/null +++ b/sql/2024-09-00-00_sync_v2.sql @@ -0,0 +1,36 @@ +CREATE TABLE serialized_components ( + -- The user the term is sandboxed to. + user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, + component_hash_id INTEGER NOT NULL REFERENCES component_hashes(id) ON DELETE CASCADE, + + -- The serialized component + bytes_id INTEGER NOT NULL REFERENCES bytes(id) ON DELETE NO ACTION, + + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + + PRIMARY KEY (user_id, component_hash_id) INCLUDE (bytes_id) +); + +-- namespaces don't need to be sandboxed to user. +CREATE TABLE serialized_namespaces ( + namespace_hash_id NOT NULL REFERENCES branch_hashes(id) ON DELETE NO ACTION, + + -- The serialized namespace + bytes_id INTEGER NOT NULL REFERENCES bytes(id) ON DELETE NO ACTION, + + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + + PRIMARY KEY (namespace_hash_id) INCLUDE (bytes_id) +); + +CREATE TABLE serialized_patches ( + patch_id INTEGER NOT NULL REFERENCES patches(id) ON DELETE CASCADE, + bytes_id INTEGER NOT NULL REFERENCES bytes(id) ON DELETE NO ACTION, + created_at TIMESTAMP NOT NULL DEFAULT NOW(), +); + +CREATE TABLE serialized_causals ( + causal_id INTEGER NOT NULL REFERENCES causals(id) ON DELETE CASCADE, + bytes_id INTEGER NOT NULL REFERENCES bytes(id) ON DELETE NO ACTION, + created_at TIMESTAMP NOT NULL DEFAULT NOW(), +); diff --git a/src/Share/Postgres/Search/DefinitionSearch/Queries.hs b/src/Share/Postgres/Search/DefinitionSearch/Queries.hs index 875ee882..6acb98fe 100644 --- a/src/Share/Postgres/Search/DefinitionSearch/Queries.hs +++ b/src/Share/Postgres/Search/DefinitionSearch/Queries.hs @@ -36,6 +36,7 @@ import Unison.Name (Name) import Unison.Name qualified as Name import Unison.Server.Types (TermTag (..), TypeTag (..)) import Unison.ShortHash (ShortHash) +import Unison.ShortHash qualified as SH import Unison.Syntax.Name qualified as Name import Unison.Syntax.NameSegment qualified as NameSegment @@ -191,11 +192,11 @@ searchTokenToText shouldAddWildcards = \case makeSearchToken typeMentionTypeByNameType (Text.toLower (reversedNameText name)) (Just occ) & addWildCard TypeMentionToken (Right sh) occ -> - makeSearchToken typeMentionTypeByHashType (into @Text @ShortHash sh) (Just occ) + makeSearchToken typeMentionTypeByHashType (SH.toText sh) (Just occ) & addWildCard TypeVarToken varId occ -> makeSearchToken typeVarType (varIdText varId) (Just occ) HashToken sh -> - makeSearchToken hashType (into @Text sh) Nothing + makeSearchToken hashType (SH.toText sh) Nothing & addWildCard TermTagToken termTag -> makeSearchToken tagType (termTagText termTag) Nothing TypeTagToken typTag -> makeSearchToken tagType (typeTagText typTag) Nothing diff --git a/src/Share/Postgres/Sync/Queries.hs b/src/Share/Postgres/Sync/Queries.hs index da10bf72..461f1da8 100644 --- a/src/Share/Postgres/Sync/Queries.hs +++ b/src/Share/Postgres/Sync/Queries.hs @@ -1,3 +1,6 @@ +{-# LANGUAGE StandaloneDeriving #-} +{-# OPTIONS_GHC -Wno-orphans #-} + -- | Queries related to sync and temp entities. module Share.Postgres.Sync.Queries ( expectEntity, @@ -13,6 +16,10 @@ module Share.Postgres.Sync.Queries ) where +import Codec.CBOR.Encoding qualified as CBOR +import Codec.CBOR.Write qualified as CBOR +import Codec.Serialise (Serialise (..)) +import Codec.Serialise.Class qualified as CBOR import Control.Lens hiding (from) import Data.ByteString.Lazy.Char8 qualified as BL import Data.Foldable qualified as Foldable @@ -21,6 +28,7 @@ import Data.Set qualified as Set import Data.Set.NonEmpty (NESet) import Data.Set.NonEmpty qualified as NESet import Data.Vector qualified as Vector +import Servant (ServerError (..), err500) import Share.Codebase.Types (CodebaseM) import Share.Codebase.Types qualified as Codebase import Share.IDs @@ -39,7 +47,6 @@ import Share.Prelude import Share.Utils.Logging qualified as Logging import Share.Web.Errors (InternalServerError (..), ToServerError (..), Unimplemented (Unimplemented)) import Share.Web.UCM.Sync.Types -import Servant (ServerError (..), err500) import U.Codebase.Branch qualified as V2 import U.Codebase.Causal qualified as U import U.Codebase.Sqlite.Branch.Format (LocalBranchBytes (LocalBranchBytes)) @@ -51,12 +58,14 @@ import U.Codebase.Sqlite.Decode qualified as Decoders import U.Codebase.Sqlite.Entity qualified as Entity import U.Codebase.Sqlite.LocalIds qualified as LocalIds import U.Codebase.Sqlite.LocalizeObject qualified as Localize +import U.Codebase.Sqlite.Patch.Format (PatchLocalIds' (patchDefnLookup)) import U.Codebase.Sqlite.Patch.Format qualified as PatchFormat import U.Codebase.Sqlite.Patch.Full qualified as PatchFull import U.Codebase.Sqlite.Queries qualified as Share import U.Codebase.Sqlite.TempEntity (TempEntity) import U.Codebase.Sqlite.TempEntityType (TempEntityType) import U.Codebase.Sqlite.Term.Format qualified as TermFormat +import U.Util.Base32Hex (Base32Hex (..)) import Unison.Hash32 import Unison.Hash32 qualified as Hash32 import Unison.Sync.Common qualified as Share @@ -82,7 +91,7 @@ instance Logging.Loggable SyncQError where toLog = Logging.withSeverity Logging.Error . Logging.showLog -- | Read an entity out of the database that we know is in main storage. -expectEntity :: HasCallStack => Hash32 -> CodebaseM e (Share.Entity Text Hash32 Hash32) +expectEntity :: (HasCallStack) => Hash32 -> CodebaseM e (Share.Entity Text Hash32 Hash32) expectEntity hash = do expectEntityKindForHash hash >>= \case CausalEntity -> Share.C <$> expectCausalEntity (CausalHash . Hash32.toHash $ hash) @@ -92,7 +101,7 @@ expectEntity hash = do PatchEntity -> Share.P <$> expectPatchEntity (PatchHash . Hash32.toHash $ hash) where -expectCausalEntity :: HasCallStack => CausalHash -> CodebaseM e (Share.Causal Hash32) +expectCausalEntity :: (HasCallStack) => CausalHash -> CodebaseM e (Share.Causal Hash32) expectCausalEntity hash = do causalId <- CausalQ.expectCausalIdByHash hash U.Causal {valueHash, parents} <- CausalQ.expectCausalNamespace causalId @@ -103,7 +112,7 @@ expectCausalEntity hash = do } ) -expectNamespaceEntity :: HasCallStack => BranchHash -> CodebaseM e (Share.Namespace Text Hash32) +expectNamespaceEntity :: (HasCallStack) => BranchHash -> CodebaseM e (Share.Namespace Text Hash32) expectNamespaceEntity bh = do bhId <- HashQ.expectBranchHashId bh v2Branch <- CausalQ.expectNamespace bhId @@ -122,17 +131,17 @@ expectNamespaceEntity bh = do bytes = bytes } -expectTermComponentEntity :: HasCallStack => ComponentHash -> CodebaseM e (Share.TermComponent Text Hash32) +expectTermComponentEntity :: (HasCallStack) => ComponentHash -> CodebaseM e (Share.TermComponent Text Hash32) expectTermComponentEntity hash = do chId <- HashQ.expectComponentHashId hash DefnQ.expectShareTermComponent chId -expectTypeComponentEntity :: HasCallStack => ComponentHash -> CodebaseM e (Share.DeclComponent Text Hash32) +expectTypeComponentEntity :: (HasCallStack) => ComponentHash -> CodebaseM e (Share.DeclComponent Text Hash32) expectTypeComponentEntity hash = do chId <- HashQ.expectComponentHashId hash DefnQ.expectShareTypeComponent chId -expectPatchEntity :: HasCallStack => PatchHash -> CodebaseM e (Share.Patch Text Hash32 Hash32) +expectPatchEntity :: (HasCallStack) => PatchHash -> CodebaseM e (Share.Patch Text Hash32 Hash32) expectPatchEntity patchHash = do patchId <- HashQ.expectPatchIdsOf id patchHash v2Patch <- PatchQ.expectPatch patchId @@ -154,7 +163,7 @@ expectPatchEntity patchHash = do & pure -- | Determine the kind of an arbitrary hash. -expectEntityKindForHash :: HasCallStack => Hash32 -> CodebaseM e EntityKind +expectEntityKindForHash :: (HasCallStack) => Hash32 -> CodebaseM e EntityKind expectEntityKindForHash h = do queryExpect1Row @@ -317,7 +326,7 @@ entityLocations sortedEntities = do -- | Save a temp entity to the temp entities table, also tracking its missing dependencies. -- You can pass ALL the dependencies of the temp entity, the query will determine which ones -- are missing. -saveTempEntities :: Foldable f => f (Hash32, Share.Entity Text Hash32 Hash32) -> CodebaseM e () +saveTempEntities :: (Foldable f) => f (Hash32, Share.Entity Text Hash32 Hash32) -> CodebaseM e () saveTempEntities entities = do codebaseOwnerUserId <- asks Codebase.codebaseOwner let tempEntities = @@ -401,10 +410,12 @@ clearTempDependencies hash = do -- | Save a temp entity to main storage, and clear any missing dependency rows for it, and -- return the set of hashes which dependended on it and _might_ now be ready to flush. -saveTempEntityInMain :: forall e. HasCallStack => Hash32 -> TempEntity -> CodebaseM e (Set Hash32) +saveTempEntityInMain :: forall e. (HasCallStack) => Hash32 -> TempEntity -> CodebaseM e (Set Hash32) saveTempEntityInMain hash entity = do saveEntity entity - clearTempDependencies hash + dependencies <- clearTempDependencies hash + saveSerializedEntities [(hash, entity)] + pure dependencies where saveEntity :: TempEntity -> CodebaseM e () saveEntity = \case @@ -573,3 +584,104 @@ getEntitiesReadyToFlush = do AND missing_dep.user_id = #{codebaseOwnerUserId} ) |] + +saveSerializedEntities :: (Foldable f) => f (Hash32, TempEntity) -> CodebaseM e () +saveSerializedEntities entities = do + for_ entities \(hash, entity) -> do + let serialised = CBOR.toStrictByteString (CBOR.encode hash <> CBOR.encode entity) + case entity of + Entity.TC {} -> saveSerializedComponent hash serialised + Entity.DC {} -> saveSerializedComponent hash serialised + Entity.P {} -> saveSerializedPatch hash serialised + Entity.C {} -> saveSerializedCausal hash serialised + Entity.N {} -> saveSerializedNamespace hash serialised + +saveSerializedComponent :: Hash32 -> ByteString -> CodebaseM e () +saveSerializedComponent hash serialised = do + codebaseOwnerUserId <- asks Codebase.codebaseOwner + bytesId <- DefnQ.ensureBytesIdsOf id serialised + execute_ + [sql| + INSERT INTO serialized_components (user_id, component_hash_id, bytes_id) + VALUES (#{codebaseOwnerUserId}, (SELECT ch.id FROM component_hashes where ch.base32 = #{hash}), #{bytesId}) + ON CONFLICT DO NOTHING + |] + +saveSerializedPatch :: Hash32 -> ByteString -> CodebaseM e () +saveSerializedPatch hash serialised = do + bytesId <- DefnQ.ensureBytesIdsOf id serialised + execute_ + [sql| + INSERT INTO serialized_patches (patch_id, bytes_id) + VALUES ((SELECT p.id FROM patches where p.hash = #{hash}), #{bytesId}) + ON CONFLICT DO NOTHING + |] + +saveSerializedCausal :: Hash32 -> ByteString -> CodebaseM e () +saveSerializedCausal hash serialised = do + bytesId <- DefnQ.ensureBytesIdsOf id serialised + execute_ + [sql| + INSERT INTO serialized_causals (causal_id, bytes_id) + VALUES ((SELECT c.id FROM causals where c.hash = #{hash}), #{bytesId}) + ON CONFLICT DO NOTHING + |] + +saveSerializedNamespace :: Hash32 -> ByteString -> CodebaseM e () +saveSerializedNamespace hash serialised = do + bytesId <- DefnQ.ensureBytesIdsOf id serialised + execute_ + [sql| + INSERT INTO serialized_namespaces (namespace_hash_id, bytes_id) + VALUES ((SELECT nh.id FROM branch_hashes where nh.base32 = #{hash}), #{bytesId}) + ON CONFLICT DO NOTHING + |] + +deriving via Text instance Serialise Hash32 + +instance Serialise TempEntity where + encode = \case + Entity.TC (TermFormat.SyncTerm (TermFormat.SyncLocallyIndexedComponent elements)) -> + CBOR.encodeWord8 0 + <> encodeFiniteListWith encodeElement elements + Entity.DC (DeclFormat.SyncDecl (DeclFormat.SyncLocallyIndexedComponent elements)) -> + CBOR.encodeWord8 1 + <> encodeFiniteListWith encodeElement elements + Entity.P (PatchFormat.SyncDiff {}) -> error "Serializing Diffs are not supported" + Entity.P (PatchFormat.SyncFull (PatchFormat.LocalIds {patchTextLookup, patchHashLookup, patchDefnLookup}) bytes) -> + CBOR.encodeWord8 2 + <> encodeVectorWith CBOR.encode patchTextLookup + <> encodeVectorWith CBOR.encode patchHashLookup + <> encodeVectorWith CBOR.encode patchDefnLookup + <> CBOR.encodeBytes bytes + Entity.N (BranchFormat.SyncDiff {}) -> error "Serializing Diffs are not supported" + Entity.N (BranchFormat.SyncFull (BranchFormat.LocalIds {branchTextLookup, branchDefnLookup, branchPatchLookup, branchChildLookup}) (LocalBranchBytes bytes)) -> + CBOR.encodeWord8 4 + <> encodeVectorWith CBOR.encode branchTextLookup + <> encodeVectorWith CBOR.encode branchDefnLookup + <> encodeVectorWith CBOR.encode branchPatchLookup + <> encodeVectorWith CBOR.encode branchChildLookup + <> CBOR.encodeBytes bytes + Entity.C (SqliteCausal.SyncCausalFormat {valueHash, parents}) -> + CBOR.encodeWord8 6 + <> CBOR.encode valueHash + <> ( CBOR.encodeVector parents + ) + where + encodeElement :: (Serialise t, Serialise d) => (LocalIds.LocalIds' t d, ByteString) -> CBOR.Encoding + encodeElement (LocalIds.LocalIds {textLookup, defnLookup}, bytes) = + CBOR.encodeVector textLookup + <> CBOR.encodeVector defnLookup + <> CBOR.encodeBytes bytes + + decode = error "Decoding Share.Entity not supported" + +encodeVectorWith :: (a -> CBOR.Encoding) -> Vector.Vector a -> CBOR.Encoding +encodeVectorWith f xs = + CBOR.encodeListLen (fromIntegral $ Vector.length xs) + <> (foldr (\a b -> f a <> b) mempty xs) + +encodeFiniteListWith :: (Foldable t) => (a -> CBOR.Encoding) -> t a -> CBOR.Encoding +encodeFiniteListWith f xs = + CBOR.encodeListLen (fromIntegral $ length xs) + <> (foldr (\a b -> f a <> b) mempty xs) From 6837661bb6377c95b94b1c8190312f70f464fe87 Mon Sep 17 00:00:00 2001 From: Chris Penner Date: Mon, 5 Aug 2024 11:53:40 -0700 Subject: [PATCH 07/20] Include actual components in stream --- sql/2024-09-00-00_sync_v2.sql | 6 +- src/Share/Postgres/Sync/Queries.hs | 57 +------- src/Share/Web/UCM/SyncStream/API.hs | 13 +- src/Share/Web/UCM/SyncStream/Queries.hs | 177 +++++++++++++++++++++++- 4 files changed, 188 insertions(+), 65 deletions(-) diff --git a/sql/2024-09-00-00_sync_v2.sql b/sql/2024-09-00-00_sync_v2.sql index 85825551..9c4e7fbd 100644 --- a/sql/2024-09-00-00_sync_v2.sql +++ b/sql/2024-09-00-00_sync_v2.sql @@ -13,7 +13,7 @@ CREATE TABLE serialized_components ( -- namespaces don't need to be sandboxed to user. CREATE TABLE serialized_namespaces ( - namespace_hash_id NOT NULL REFERENCES branch_hashes(id) ON DELETE NO ACTION, + namespace_hash_id INTEGER NOT NULL REFERENCES branch_hashes(id) ON DELETE NO ACTION, -- The serialized namespace bytes_id INTEGER NOT NULL REFERENCES bytes(id) ON DELETE NO ACTION, @@ -26,11 +26,11 @@ CREATE TABLE serialized_namespaces ( CREATE TABLE serialized_patches ( patch_id INTEGER NOT NULL REFERENCES patches(id) ON DELETE CASCADE, bytes_id INTEGER NOT NULL REFERENCES bytes(id) ON DELETE NO ACTION, - created_at TIMESTAMP NOT NULL DEFAULT NOW(), + created_at TIMESTAMP NOT NULL DEFAULT NOW() ); CREATE TABLE serialized_causals ( causal_id INTEGER NOT NULL REFERENCES causals(id) ON DELETE CASCADE, bytes_id INTEGER NOT NULL REFERENCES bytes(id) ON DELETE NO ACTION, - created_at TIMESTAMP NOT NULL DEFAULT NOW(), + created_at TIMESTAMP NOT NULL DEFAULT NOW() ); diff --git a/src/Share/Postgres/Sync/Queries.hs b/src/Share/Postgres/Sync/Queries.hs index 461f1da8..a8024c16 100644 --- a/src/Share/Postgres/Sync/Queries.hs +++ b/src/Share/Postgres/Sync/Queries.hs @@ -1,5 +1,4 @@ {-# LANGUAGE StandaloneDeriving #-} -{-# OPTIONS_GHC -Wno-orphans #-} -- | Queries related to sync and temp entities. module Share.Postgres.Sync.Queries @@ -603,7 +602,7 @@ saveSerializedComponent hash serialised = do execute_ [sql| INSERT INTO serialized_components (user_id, component_hash_id, bytes_id) - VALUES (#{codebaseOwnerUserId}, (SELECT ch.id FROM component_hashes where ch.base32 = #{hash}), #{bytesId}) + VALUES (#{codebaseOwnerUserId}, (SELECT ch.id FROM component_hashes ch where ch.base32 = #{hash}), #{bytesId}) ON CONFLICT DO NOTHING |] @@ -613,7 +612,7 @@ saveSerializedPatch hash serialised = do execute_ [sql| INSERT INTO serialized_patches (patch_id, bytes_id) - VALUES ((SELECT p.id FROM patches where p.hash = #{hash}), #{bytesId}) + VALUES ((SELECT p.id FROM patches p where p.hash = #{hash}), #{bytesId}) ON CONFLICT DO NOTHING |] @@ -623,7 +622,7 @@ saveSerializedCausal hash serialised = do execute_ [sql| INSERT INTO serialized_causals (causal_id, bytes_id) - VALUES ((SELECT c.id FROM causals where c.hash = #{hash}), #{bytesId}) + VALUES ((SELECT c.id FROM causals c where c.hash = #{hash}), #{bytesId}) ON CONFLICT DO NOTHING |] @@ -633,55 +632,7 @@ saveSerializedNamespace hash serialised = do execute_ [sql| INSERT INTO serialized_namespaces (namespace_hash_id, bytes_id) - VALUES ((SELECT nh.id FROM branch_hashes where nh.base32 = #{hash}), #{bytesId}) + VALUES ((SELECT bh.id FROM branch_hashes bh where bh.base32 = #{hash}), #{bytesId}) ON CONFLICT DO NOTHING |] -deriving via Text instance Serialise Hash32 - -instance Serialise TempEntity where - encode = \case - Entity.TC (TermFormat.SyncTerm (TermFormat.SyncLocallyIndexedComponent elements)) -> - CBOR.encodeWord8 0 - <> encodeFiniteListWith encodeElement elements - Entity.DC (DeclFormat.SyncDecl (DeclFormat.SyncLocallyIndexedComponent elements)) -> - CBOR.encodeWord8 1 - <> encodeFiniteListWith encodeElement elements - Entity.P (PatchFormat.SyncDiff {}) -> error "Serializing Diffs are not supported" - Entity.P (PatchFormat.SyncFull (PatchFormat.LocalIds {patchTextLookup, patchHashLookup, patchDefnLookup}) bytes) -> - CBOR.encodeWord8 2 - <> encodeVectorWith CBOR.encode patchTextLookup - <> encodeVectorWith CBOR.encode patchHashLookup - <> encodeVectorWith CBOR.encode patchDefnLookup - <> CBOR.encodeBytes bytes - Entity.N (BranchFormat.SyncDiff {}) -> error "Serializing Diffs are not supported" - Entity.N (BranchFormat.SyncFull (BranchFormat.LocalIds {branchTextLookup, branchDefnLookup, branchPatchLookup, branchChildLookup}) (LocalBranchBytes bytes)) -> - CBOR.encodeWord8 4 - <> encodeVectorWith CBOR.encode branchTextLookup - <> encodeVectorWith CBOR.encode branchDefnLookup - <> encodeVectorWith CBOR.encode branchPatchLookup - <> encodeVectorWith CBOR.encode branchChildLookup - <> CBOR.encodeBytes bytes - Entity.C (SqliteCausal.SyncCausalFormat {valueHash, parents}) -> - CBOR.encodeWord8 6 - <> CBOR.encode valueHash - <> ( CBOR.encodeVector parents - ) - where - encodeElement :: (Serialise t, Serialise d) => (LocalIds.LocalIds' t d, ByteString) -> CBOR.Encoding - encodeElement (LocalIds.LocalIds {textLookup, defnLookup}, bytes) = - CBOR.encodeVector textLookup - <> CBOR.encodeVector defnLookup - <> CBOR.encodeBytes bytes - - decode = error "Decoding Share.Entity not supported" - -encodeVectorWith :: (a -> CBOR.Encoding) -> Vector.Vector a -> CBOR.Encoding -encodeVectorWith f xs = - CBOR.encodeListLen (fromIntegral $ Vector.length xs) - <> (foldr (\a b -> f a <> b) mempty xs) - -encodeFiniteListWith :: (Foldable t) => (a -> CBOR.Encoding) -> t a -> CBOR.Encoding -encodeFiniteListWith f xs = - CBOR.encodeListLen (fromIntegral $ length xs) - <> (foldr (\a b -> f a <> b) mempty xs) diff --git a/src/Share/Web/UCM/SyncStream/API.hs b/src/Share/Web/UCM/SyncStream/API.hs index 10ac8b82..f2175e54 100644 --- a/src/Share/Web/UCM/SyncStream/API.hs +++ b/src/Share/Web/UCM/SyncStream/API.hs @@ -4,7 +4,6 @@ module Share.Web.UCM.SyncStream.API (API, server) where import Conduit -import Control.Concurrent (threadDelay) import Control.Concurrent.STM qualified as STM import Servant import Servant.Conduit (ConduitToSourceIO (..)) @@ -17,7 +16,6 @@ import Share.Postgres.Cursors qualified as Cursor import Share.Postgres.IDs (CausalHash) import Share.Prelude import Share.Utils.Servant (RequiredQueryParam) -import Share.Utils.Servant.CBOR (CBOR) import Share.Web.App import Share.Web.Authorization qualified as AuthZ import Share.Web.UCM.SyncStream.Queries qualified as SSQ @@ -29,7 +27,7 @@ type API = NamedRoutes Routes type DownloadCausalStreamEndpoint = AuthenticatedUserId :> RequiredQueryParam "causalHash" CausalHash - :> StreamGet NewlineFraming CBOR (SourceIO Text) + :> StreamGet NewlineFraming OctetStream (SourceIO ByteString) data Routes mode = Routes { downloadCausalStreamEndpointConduit :: mode :- "download-causal" :> DownloadCausalStreamEndpoint @@ -42,7 +40,7 @@ server = { downloadCausalStreamEndpointConduit = downloadCausalStreamEndpointConduitImpl } -downloadCausalStreamEndpointConduitImpl :: UserId -> CausalHash -> WebApp (SourceIO Text) +downloadCausalStreamEndpointConduitImpl :: UserId -> CausalHash -> WebApp (SourceIO ByteString) downloadCausalStreamEndpointConduitImpl callerUserId causalHash = do let authZReceipt = AuthZ.adminOverride let codebaseLoc = Codebase.codebaseLocationForUserCodebase callerUserId @@ -51,16 +49,16 @@ downloadCausalStreamEndpointConduitImpl callerUserId causalHash = do streamResults <- UnliftIO.toIO do Codebase.runCodebaseTransaction codebase $ do (_bhId, causalId) <- CausalQ.expectCausalIdsOf id causalHash - cursor <- SSQ.allHashDependenciesOfCausalCursor causalId + cursor <- SSQ.allSerializedDependenciesOfCausalCursor causalId Cursor.foldBatched cursor 1000 \batch -> do PG.transactionUnsafeIO $ STM.atomically $ STM.writeTBQueue q batch pure $ conduitToSourceIO do handle <- liftIO $ Async.async streamResults stream q handle where - stream :: STM.TBQueue (NonEmpty Text) -> Async.Async () -> ConduitT () Text IO () + stream :: STM.TBQueue (NonEmpty ByteString) -> Async.Async () -> ConduitT () ByteString IO () stream q async = do - let loop :: ConduitT () Text IO () + let loop :: ConduitT () ByteString IO () loop = do next <- liftIO . STM.atomically $ do STM.tryReadTBQueue q >>= \case @@ -72,6 +70,5 @@ downloadCausalStreamEndpointConduitImpl callerUserId causalHash = do Nothing -> pure () Just batch -> do yieldMany batch - liftIO $ threadDelay 1000000 loop loop diff --git a/src/Share/Web/UCM/SyncStream/Queries.hs b/src/Share/Web/UCM/SyncStream/Queries.hs index 317223ed..8fa99e88 100644 --- a/src/Share/Web/UCM/SyncStream/Queries.hs +++ b/src/Share/Web/UCM/SyncStream/Queries.hs @@ -1,4 +1,8 @@ -module Share.Web.UCM.SyncStream.Queries (allHashDependenciesOfCausalCursor) where +module Share.Web.UCM.SyncStream.Queries + ( allHashDependenciesOfCausalCursor, + allSerializedDependenciesOfCausalCursor, + ) +where import Control.Monad.Reader import Share.Codebase (CodebaseM, codebaseOwner) @@ -174,3 +178,174 @@ allHashDependenciesOfCausalCursor cid = do FROM transitive_components tc JOIN component_hashes ON tc.component_hash_id = component_hashes.id |] + +allSerializedDependenciesOfCausalCursor :: CausalId -> CodebaseM e (PGCursor ByteString) +allSerializedDependenciesOfCausalCursor cid = do + ownerUserId <- asks codebaseOwner + PGCursor.newColCursor + "causal_dependencies" + [sql| + WITH RECURSIVE transitive_causals(causal_id, causal_namespace_hash_id) AS ( + SELECT causal.id, causal.namespace_hash_id + FROM causals causal + WHERE causal.id = #{cid} + -- AND NOT EXISTS (SELECT FROM causal_ownership co WHERE co.user_id = to_codebase_user_id AND co.causal_id = causal.id) + AND EXISTS (SELECT FROM causal_ownership co WHERE co.user_id = #{ownerUserId} AND co.causal_id = causal.id) + UNION + -- This nested CTE is required because RECURSIVE CTEs can't refer + -- to the recursive table more than once. + ( WITH rec AS ( + SELECT causal_id, causal_namespace_hash_id + FROM transitive_causals tc + ) + SELECT ancestor_causal.id, ancestor_causal.namespace_hash_id + FROM causal_ancestors ca + JOIN rec tc ON ca.causal_id = tc.causal_id + JOIN causals ancestor_causal ON ca.ancestor_id = ancestor_causal.id + -- WHERE NOT EXISTS (SELECT FROM causal_ownership co WHERE co.user_id = to_codebase_user_id AND co.causal_id = ancestor_causal.id) + UNION + SELECT child_causal.id, child_causal.namespace_hash_id + FROM rec tc + JOIN namespace_children nc ON tc.causal_namespace_hash_id = nc.parent_namespace_hash_id + JOIN causals child_causal ON nc.child_causal_id = child_causal.id + -- WHERE NOT EXISTS (SELECT FROM causal_ownership co WHERE co.user_id = to_codebase_user_id AND co.causal_id = child_causal.id) + ) + ), all_namespaces(namespace_hash_id) AS ( + SELECT DISTINCT causal_namespace_hash_id AS namespace_hash_id + FROM transitive_causals + -- WHERE NOT EXISTS (SELECT FROM namespace_ownership no WHERE no.user_id = to_codebase_user_id AND no.namespace_hash_id = causal_namespace_hash_id) + ), all_patches(patch_id) AS ( + SELECT DISTINCT patch.id + FROM all_namespaces an + JOIN namespace_patches np ON an.namespace_hash_id = np.namespace_hash_id + JOIN patches patch ON np.patch_id = patch.id + -- WHERE NOT EXISTS (SELECT FROM patch_ownership po WHERE po.user_id = to_codebase_user_id AND po.patch_id = patch.id) + ), + -- term components to start transitively joining dependencies to + base_term_components(component_hash_id) AS ( + SELECT DISTINCT term.component_hash_id + FROM all_namespaces an + JOIN namespace_terms nt ON an.namespace_hash_id = nt.namespace_hash_id + JOIN terms term ON nt.term_id = term.id + -- WHERE NOT EXISTS (SELECT FROM sandboxed_terms st WHERE st.user_id = to_codebase_user_id AND st.term_id = term.id) + UNION + SELECT DISTINCT term.component_hash_id + FROM all_patches ap + JOIN patch_term_mappings ptm ON ap.patch_id = ptm.patch_id + JOIN terms term ON ptm.to_term_id = term.id + -- WHERE NOT EXISTS (SELECT FROM sandboxed_terms st WHERE st.user_id = to_codebase_user_id AND st.term_id = term.id) + UNION + -- term metadata + SELECT DISTINCT term.component_hash_id + FROM all_namespaces an + JOIN namespace_terms nt ON an.namespace_hash_id = nt.namespace_hash_id + JOIN namespace_term_metadata meta ON nt.id = meta.named_term + JOIN terms term ON meta.metadata_term_id = term.id + -- WHERE NOT EXISTS (SELECT FROM sandboxed_terms st WHERE st.user_id = to_codebase_user_id AND st.term_id = term.id) + UNION + -- type metadata + SELECT DISTINCT term.component_hash_id + FROM all_namespaces an + JOIN namespace_types nt ON an.namespace_hash_id = nt.namespace_hash_id + JOIN namespace_type_metadata meta ON nt.id = meta.named_type + JOIN terms term ON meta.metadata_term_id = term.id + -- WHERE NOT EXISTS (SELECT FROM sandboxed_terms st WHERE st.user_id = to_codebase_user_id AND st.term_id = term.id) + ), + -- type components to start transitively joining dependencies to + base_type_components(component_hash_id) AS ( + SELECT DISTINCT typ.component_hash_id + FROM all_namespaces an + JOIN namespace_types nt ON an.namespace_hash_id = nt.namespace_hash_id + JOIN types typ ON nt.type_id = typ.id + -- WHERE NOT EXISTS (SELECT FROM sandboxed_types st WHERE st.user_id = to_codebase_user_id AND st.type_id = typ.id) + UNION + SELECT DISTINCT typ.component_hash_id + FROM all_namespaces an + JOIN namespace_terms nt ON an.namespace_hash_id = nt.namespace_hash_id + JOIN constructors con ON nt.constructor_id = con.id + JOIN types typ ON con.type_id = typ.id + -- WHERE NOT EXISTS (SELECT FROM sandboxed_types st WHERE st.user_id = to_codebase_user_id AND st.type_id = typ.id) + UNION + SELECT DISTINCT typ.component_hash_id + FROM all_patches ap + JOIN patch_type_mappings ptm ON ap.patch_id = ptm.patch_id + JOIN types typ ON ptm.to_type_id = typ.id + -- WHERE NOT EXISTS (SELECT FROM sandboxed_types st WHERE st.user_id = to_codebase_user_id AND st.type_id = typ.id) + UNION + SELECT DISTINCT typ.component_hash_id + FROM all_patches ap + JOIN patch_constructor_mappings pcm ON ap.patch_id = pcm.patch_id + JOIN constructors con ON pcm.to_constructor_id = con.id + JOIN types typ ON con.type_id = typ.id + -- WHERE NOT EXISTS (SELECT FROM sandboxed_types st WHERE st.user_id = to_codebase_user_id AND st.type_id = typ.id) + ), + -- All the dependencies we join in transitively from the known term & type components we depend on. + -- Unfortunately it's not possible to know which hashes are terms vs types :'( + transitive_components(component_hash_id) AS ( + SELECT DISTINCT btc.component_hash_id + FROM base_term_components btc + UNION + SELECT DISTINCT btc.component_hash_id + FROM base_type_components btc + UNION + ( WITH rec AS ( + SELECT component_hash_id + FROM transitive_components tc + ) + -- recursively union in term dependencies + SELECT DISTINCT ref.component_hash_id + FROM rec atc + -- This joins in ALL the terms from the component, not just the one that caused the dependency on the + -- component + JOIN terms term ON atc.component_hash_id = term.component_hash_id + JOIN term_local_component_references ref ON term.id = ref.term_id + UNION + -- recursively union in type dependencies + SELECT DISTINCT ref.component_hash_id + FROM rec atc + -- This joins in ALL the types from the component, not just the one that caused the dependency on the + -- component + JOIN types typ ON atc.component_hash_id = typ.component_hash_id + JOIN type_local_component_references ref ON typ.id = ref.type_id + ) + ), copied_causals(causal_id) AS ( + SELECT DISTINCT tc.causal_id + FROM transitive_causals tc + ), copied_namespaces(namespace_hash_id) AS ( + SELECT DISTINCT an.namespace_hash_id + FROM all_namespaces an + ), copied_patches(patch_id) AS ( + SELECT DISTINCT ap.patch_id + FROM all_patches ap + ), copied_term_components AS ( + SELECT DISTINCT term.id, copy.bytes_id + FROM transitive_components tc + JOIN terms term ON tc.component_hash_id = term.component_hash_id + JOIN sandboxed_terms copy ON term.id = copy.term_id + WHERE copy.user_id = #{ownerUserId} + ), copied_type_components AS ( + SELECT DISTINCT typ.id, copy.bytes_id + FROM transitive_components tc + JOIN types typ ON tc.component_hash_id = typ.component_hash_id + JOIN sandboxed_types copy ON typ.id = copy.type_id + WHERE copy.user_id = #{ownerUserId} + ) SELECT bytes.bytes + FROM copied_causals cc + JOIN serialized_causals sc ON cc.causal_id = sc.causal_id + JOIN bytes ON sc.bytes_id = bytes.id + UNION ALL + SELECT bytes.bytes + FROM copied_namespaces cn + JOIN serialized_namespaces sn ON cn.namespace_hash_id = sn.namespace_hash_id + JOIN bytes ON sn.bytes_id = bytes.id + UNION ALL + SELECT bytes.bytes + FROM copied_patches cp + JOIN serialized_patches sp ON cp.patch_id = sp.patch_id + JOIN bytes ON sp.bytes_id = bytes.id + UNION ALL + SELECT bytes.bytes + FROM transitive_components tc + JOIN serialized_components sc ON tc.component_hash_id = sc.component_hash_id + JOIN bytes ON sc.bytes_id = bytes.id + |] From f6bb8c231ff104990174cf39b730ac60c63a2fa9 Mon Sep 17 00:00:00 2001 From: Chris Penner Date: Fri, 13 Dec 2024 12:15:01 -0800 Subject: [PATCH 08/20] Bump unison submodule --- unison | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unison b/unison index 40eac6a1..6c435ebd 160000 --- a/unison +++ b/unison @@ -1 +1 @@ -Subproject commit 40eac6a121a5edb8e083523b769aa656df0fa54e +Subproject commit 6c435ebd1b49f6075f294bb764a0624e4d511cdb From 2ac3e3b1cb48b90b669e809115fac711f89cece8 Mon Sep 17 00:00:00 2001 From: Chris Penner Date: Mon, 12 Aug 2024 15:22:13 -0700 Subject: [PATCH 09/20] SyncV2 WIP --- share-api.cabal | 1 + src/Share/Postgres.hs | 12 --- src/Share/Postgres/Definitions/Queries.hs | 32 +++---- src/Share/Postgres/Definitions/Types.hs | 6 +- src/Share/Postgres/Orphans.hs | 4 + src/Share/Postgres/Sync/Queries.hs | 5 +- src/Share/Postgres/Sync/Types.hs | 6 +- src/Share/Utils/Postgres.hs | 31 +++++- src/Share/Web/Impl.hs | 2 +- src/Share/Web/UCM/Sync/Impl.hs | 10 +- src/Share/Web/UCM/Sync/Types.hs | 30 +++--- src/Share/Web/UCM/SyncStream/API.hs | 72 +------------- src/Share/Web/UCM/SyncStream/Impl.hs | 112 ++++++++++++++++++++++ src/Share/Web/UCM/SyncStream/Queries.hs | 66 +++++-------- 14 files changed, 216 insertions(+), 173 deletions(-) create mode 100644 src/Share/Web/UCM/SyncStream/Impl.hs diff --git a/share-api.cabal b/share-api.cabal index 31f97ede..45d8289a 100644 --- a/share-api.cabal +++ b/share-api.cabal @@ -154,6 +154,7 @@ library Share.Web.UCM.Sync.Impl Share.Web.UCM.Sync.Types Share.Web.UCM.SyncStream.API + Share.Web.UCM.SyncStream.Impl Share.Web.UCM.SyncStream.Queries Unison.PrettyPrintEnvDecl.Postgres Unison.Server.NameSearch.Postgres diff --git a/src/Share/Postgres.hs b/src/Share/Postgres.hs index e301a660..cfff4d62 100644 --- a/src/Share/Postgres.hs +++ b/src/Share/Postgres.hs @@ -19,7 +19,6 @@ module Share.Postgres Interp.DecodeValue (..), Interp.DecodeRow (..), Interp.DecodeField, - RawBytes (..), Only (..), QueryA (..), QueryM (..), @@ -469,17 +468,6 @@ cachedForOf trav s f = do cachedFor :: (Traversable t, Monad m, Ord a) => t a -> (a -> m b) -> m (t b) cachedFor = cachedForOf traversed --- | Preferably you should use custom newtypes for your bytes, but you can use this with --- deriving via to get the encoding/decoding instances. -newtype RawBytes = RawBytes {unRawBytes :: ByteString} - deriving stock (Show, Eq, Ord) - -instance Interp.EncodeValue RawBytes where - encodeValue = contramap unRawBytes Encoders.bytea - -instance Interp.DecodeValue RawBytes where - decodeValue = RawBytes <$> Decoders.bytea - -- | Useful when running queries using a join over `toTable` which may be empty. -- Without explicitly handling the empty case we'll waste time sending a query to PG -- that we know can't return any results. diff --git a/src/Share/Postgres/Definitions/Queries.hs b/src/Share/Postgres/Definitions/Queries.hs index 3bce6ad3..d6b2574f 100644 --- a/src/Share/Postgres/Definitions/Queries.hs +++ b/src/Share/Postgres/Definitions/Queries.hs @@ -39,6 +39,7 @@ import Data.Set qualified as Set import Data.Text qualified as Text import Data.Vector (Vector) import Data.Vector qualified as Vector +import Servant (err500) import Share.Codebase.Types (CodebaseEnv (..), CodebaseM) import Share.IDs import Share.Postgres @@ -49,9 +50,8 @@ import Share.Postgres.Hashes.Queries qualified as HashQ import Share.Postgres.IDs import Share.Prelude import Share.Utils.Logging qualified as Logging -import Share.Utils.Postgres (OrdBy) +import Share.Utils.Postgres (OrdBy, RawBytes (..)) import Share.Web.Errors (ErrorID (..), InternalServerError (InternalServerError), ToServerError (..)) -import Servant (err500) import U.Codebase.Decl qualified as Decl import U.Codebase.Decl qualified as V2 hiding (Type) import U.Codebase.Decl qualified as V2Decl @@ -270,7 +270,7 @@ expectTypeComponent componentRef = do -- | This isn't in CodebaseM so that we can run it in a normal transaction to build the Code -- Lookup. -loadTermById :: QueryM m => UserId -> TermId -> m (Maybe (V2.Term Symbol, V2.Type Symbol)) +loadTermById :: (QueryM m) => UserId -> TermId -> m (Maybe (V2.Term Symbol, V2.Type Symbol)) loadTermById codebaseUser termId = runMaybeT $ do (TermComponentElement trm typ) <- MaybeT $ @@ -288,7 +288,7 @@ loadTermById codebaseUser termId = runMaybeT $ do localIds = LocalIds.LocalIds {textLookup = Vector.fromList textLookup, defnLookup = Vector.fromList defnLookup} pure $ s2cTermWithType (localIds, trm, typ) -termLocalTextReferences :: QueryM m => TermId -> m [Text] +termLocalTextReferences :: (QueryM m) => TermId -> m [Text] termLocalTextReferences termId = queryListCol [sql| @@ -299,7 +299,7 @@ termLocalTextReferences termId = ORDER BY local_index ASC |] -termLocalComponentReferences :: QueryM m => TermId -> m [ComponentHash] +termLocalComponentReferences :: (QueryM m) => TermId -> m [ComponentHash] termLocalComponentReferences termId = queryListCol [sql| @@ -342,10 +342,10 @@ resolveConstructorTypeLocalIds (LocalIds.LocalIds {textLookup, defnLookup}) = substText i = textLookup ^?! ix (fromIntegral i) substHash i = unComponentHash $ (defnLookup ^?! ix (fromIntegral i)) -loadDeclKind :: PG.QueryM m => Reference.Id -> m (Maybe CT.ConstructorType) +loadDeclKind :: (PG.QueryM m) => Reference.Id -> m (Maybe CT.ConstructorType) loadDeclKind = loadDeclKindsOf id -loadDeclKindsOf :: PG.QueryM m => Traversal s t Reference.Id (Maybe CT.ConstructorType) -> s -> m t +loadDeclKindsOf :: (PG.QueryM m) => Traversal s t Reference.Id (Maybe CT.ConstructorType) -> s -> m t loadDeclKindsOf trav s = s & unsafePartsOf trav %%~ \refIds -> do @@ -517,7 +517,7 @@ constructorReferentsByPrefix prefix mayComponentIndex mayConstructorIndex = do -- -- This is intentionally not in CodebaseM because this method is used to build the -- CodebaseEnv. -loadCachedEvalResult :: QueryM m => UserId -> Reference.Id -> m (Maybe (V2.Term Symbol)) +loadCachedEvalResult :: (QueryM m) => UserId -> Reference.Id -> m (Maybe (V2.Term Symbol)) loadCachedEvalResult codebaseOwnerUserId (Reference.Id hash compIndex) = runMaybeT do let compIndex' = pgComponentIndex compIndex (evalResultId :: EvalResultId, EvalResultTerm term) <- @@ -557,12 +557,12 @@ loadCachedEvalResult codebaseOwnerUserId (Reference.Id hash compIndex) = runMayb pure $ resolveTermLocalIds localIds term -- | Get text ids for all provided texts, inserting any that don't already exist. -ensureTextIds :: QueryM m => Traversable t => t Text -> m (t TextId) +ensureTextIds :: (QueryM m) => (Traversable t) => t Text -> m (t TextId) ensureTextIds = ensureTextIdsOf traversed -- | Efficiently saves all Text's focused by the provided traversal into the database and -- replaces them with their corresponding Ids. -ensureTextIdsOf :: QueryM m => Traversal s t Text TextId -> s -> m t +ensureTextIdsOf :: (QueryM m) => Traversal s t Text TextId -> s -> m t ensureTextIdsOf trav s = do s & unsafePartsOf trav %%~ \texts -> do @@ -589,12 +589,12 @@ ensureTextIdsOf trav s = do else pure results -- | Get text ids for all provided texts, inserting any that don't already exist. -ensureBytesIds :: QueryM m => Traversable t => t BS.ByteString -> m (t BytesId) +ensureBytesIds :: (QueryM m) => (Traversable t) => t BS.ByteString -> m (t BytesId) ensureBytesIds = ensureBytesIdsOf traversed -- | Efficiently saves all Text's focused by the provided traversal into the database and -- replaces them with their corresponding Ids. -ensureBytesIdsOf :: QueryM m => Traversal s t BS.ByteString BytesId -> s -> m t +ensureBytesIdsOf :: (QueryM m) => Traversal s t BS.ByteString BytesId -> s -> m t ensureBytesIdsOf trav s = do s & unsafePartsOf trav %%~ \bytestrings -> do @@ -621,7 +621,7 @@ ensureBytesIdsOf trav s = do else pure results -- | Efficiently loads Texts for all TextIds focused by the provided traversal. -expectTextsOf :: QueryM m => Traversal s t TextId Text -> s -> m t +expectTextsOf :: (QueryM m) => Traversal s t TextId Text -> s -> m t expectTextsOf trav = unsafePartsOf trav %%~ \textIds -> do let numberedTextIds = zip [0 :: Int32 ..] textIds @@ -649,7 +649,7 @@ localizeTerm tm = do -- | Replace all references in a term with local references. _localizeTermAndType :: - HasCallStack => + (HasCallStack) => V2.Term Symbol -> V2.Type Symbol -> Transaction e (PgLocalIds, TermFormat.Term, TermFormat.Type) @@ -997,7 +997,7 @@ resolveLocalIdsOf trav s = do >>= HashQ.expectComponentHashesOf (traversed . LocalIds.h_) -- | Fetch term tags for all the provided Referents. -termTagsByReferentsOf :: HasCallStack => Traversal s t Referent.Referent Tags.TermTag -> s -> Transaction e t +termTagsByReferentsOf :: (HasCallStack) => Traversal s t Referent.Referent Tags.TermTag -> s -> Transaction e t termTagsByReferentsOf trav s = do s & unsafePartsOf trav %%~ \refs -> do @@ -1080,7 +1080,7 @@ termTagsByReferentsOf trav s = do (refTagRow Tags.Test Decls.testResultListRef) ] -typeTagsByReferencesOf :: HasCallStack => Traversal s t TypeReference Tags.TypeTag -> s -> Transaction e t +typeTagsByReferencesOf :: (HasCallStack) => Traversal s t TypeReference Tags.TypeTag -> s -> Transaction e t typeTagsByReferencesOf trav s = do s & unsafePartsOf trav %%~ \refs -> do diff --git a/src/Share/Postgres/Definitions/Types.hs b/src/Share/Postgres/Definitions/Types.hs index 5cb6895c..78522cc0 100644 --- a/src/Share/Postgres/Definitions/Types.hs +++ b/src/Share/Postgres/Definitions/Types.hs @@ -24,13 +24,13 @@ module Share.Postgres.Definitions.Types ) where -import Share.Postgres qualified as PG -import Share.Postgres.Serialization qualified as S -import Share.Prelude import Hasql.Decoders qualified as Decoders import Hasql.Decoders qualified as Hasql import Hasql.Encoders qualified as Encoders import Hasql.Interpolate (DecodeValue (..), EncodeValue (..)) +import Share.Postgres.Serialization qualified as S +import Share.Prelude +import Share.Utils.Postgres qualified as PG import U.Codebase.Decl qualified as DD import U.Codebase.Decl qualified as Decl import U.Codebase.Reference qualified as Reference diff --git a/src/Share/Postgres/Orphans.hs b/src/Share/Postgres/Orphans.hs index 461d4159..381214ff 100644 --- a/src/Share/Postgres/Orphans.hs +++ b/src/Share/Postgres/Orphans.hs @@ -18,6 +18,7 @@ import Servant (err500) import Servant.API import Share.Prelude import Share.Utils.Logging qualified as Logging +import Share.Utils.Postgres (RawLazyBytes (..)) import Share.Web.Errors (ErrorID (..), ToServerError (..)) import U.Codebase.HashTags (BranchHash (..), CausalHash (..), ComponentHash (..), PatchHash (..)) import U.Codebase.Reference (Id' (Id), Reference' (..)) @@ -34,6 +35,7 @@ import Unison.Hash32 (Hash32) import Unison.Hash32 qualified as Hash32 import Unison.Name (Name) import Unison.NameSegment.Internal (NameSegment (..)) +import Unison.SyncV2.Types (CBORBytes (..)) import Unison.Syntax.Name qualified as Name -- Orphans for 'Hash' @@ -215,6 +217,8 @@ instance Hasql.DecodeValue SqliteTermEdit.Typing where _ -> Nothing ) +deriving via RawLazyBytes instance Hasql.DecodeValue (CBORBytes t) + instance ToServerError Hasql.SessionError where toServerError _ = (ErrorID "query-error", err500) diff --git a/src/Share/Postgres/Sync/Queries.hs b/src/Share/Postgres/Sync/Queries.hs index a8024c16..60d071ec 100644 --- a/src/Share/Postgres/Sync/Queries.hs +++ b/src/Share/Postgres/Sync/Queries.hs @@ -15,9 +15,7 @@ module Share.Postgres.Sync.Queries ) where -import Codec.CBOR.Encoding qualified as CBOR import Codec.CBOR.Write qualified as CBOR -import Codec.Serialise (Serialise (..)) import Codec.Serialise.Class qualified as CBOR import Control.Lens hiding (from) import Data.ByteString.Lazy.Char8 qualified as BL @@ -64,11 +62,11 @@ import U.Codebase.Sqlite.Queries qualified as Share import U.Codebase.Sqlite.TempEntity (TempEntity) import U.Codebase.Sqlite.TempEntityType (TempEntityType) import U.Codebase.Sqlite.Term.Format qualified as TermFormat -import U.Util.Base32Hex (Base32Hex (..)) import Unison.Hash32 import Unison.Hash32 qualified as Hash32 import Unison.Sync.Common qualified as Share import Unison.Sync.Types qualified as Share +import Unison.SyncV2.Types (EntityKind (..)) data SyncQError = InvalidNamespaceBytes @@ -635,4 +633,3 @@ saveSerializedNamespace hash serialised = do VALUES ((SELECT bh.id FROM branch_hashes bh where bh.base32 = #{hash}), #{bytesId}) ON CONFLICT DO NOTHING |] - diff --git a/src/Share/Postgres/Sync/Types.hs b/src/Share/Postgres/Sync/Types.hs index fc23e040..077fa513 100644 --- a/src/Share/Postgres/Sync/Types.hs +++ b/src/Share/Postgres/Sync/Types.hs @@ -1,9 +1,9 @@ module Share.Postgres.Sync.Types (TypedTempEntity (..)) where +import Hasql.Interpolate qualified as Hasql import Share.Postgres (decodeField) -import Share.Postgres qualified as PG import Share.Postgres.Serialization qualified as S -import Hasql.Interpolate qualified as Hasql +import Share.Utils.Postgres (RawBytes (..)) import U.Codebase.Sqlite.TempEntity (TempEntity) -- | Helper for deserializing typed temp entities. @@ -14,7 +14,7 @@ newtype TypedTempEntity = TypedTempEntity {unTypedTempEntity :: TempEntity} instance Hasql.DecodeRow TypedTempEntity where decodeRow = do entityType <- decodeField - PG.RawBytes entityBytes <- decodeField + RawBytes entityBytes <- decodeField case S.decodeTypedTempEntity entityType entityBytes of Left err -> fail (show err) Right tempEntity -> pure (TypedTempEntity tempEntity) diff --git a/src/Share/Utils/Postgres.hs b/src/Share/Utils/Postgres.hs index 227f7d3d..08305521 100644 --- a/src/Share/Utils/Postgres.hs +++ b/src/Share/Utils/Postgres.hs @@ -1,17 +1,22 @@ module Share.Utils.Postgres ( OrdBy (..), ordered, + RawBytes (..), + RawLazyBytes (..), ) where -import Share.Postgres qualified as PG +import Data.ByteString.Lazy qualified as BL +import Hasql.Decoders qualified as Decoders +import Hasql.Encoders qualified as Encoders +import Hasql.Interpolate qualified as Hasql import Share.Prelude -- | A type for propagating an application-code ordering through a database query. -- We can't trust the order returned by PG, so we make sure to order things explicitly. newtype OrdBy = OrdBy {unOrdBy :: Int32} deriving stock (Eq, Ord, Show) - deriving (PG.DecodeValue, PG.EncodeValue) via Int32 + deriving (Hasql.DecodeValue, Hasql.EncodeValue) via Int32 instance From Int OrdBy where from = OrdBy . fromIntegral @@ -21,3 +26,25 @@ instance From Int32 OrdBy where ordered :: [a] -> [(OrdBy, a)] ordered = zip (OrdBy <$> [0 ..]) + +-- | Preferably you should use custom newtypes for your bytes, but you can use this with +-- deriving via to get the encoding/decoding instances. +newtype RawBytes = RawBytes {unRawBytes :: ByteString} + deriving stock (Show, Eq, Ord) + +instance Hasql.EncodeValue RawBytes where + encodeValue = contramap unRawBytes Encoders.bytea + +instance Hasql.DecodeValue RawBytes where + decodeValue = RawBytes <$> Decoders.bytea + +-- | Preferably you should use custom newtypes for your bytes, but you can use this with +-- deriving via to get the encoding/decoding instances. +newtype RawLazyBytes = RawLazyBytes {unLazyRawBytes :: BL.ByteString} + deriving stock (Show, Eq, Ord) + +instance Hasql.EncodeValue RawLazyBytes where + encodeValue = contramap (BL.toStrict . unLazyRawBytes) Encoders.bytea + +instance Hasql.DecodeValue RawLazyBytes where + decodeValue = RawLazyBytes . BL.fromStrict <$> Decoders.bytea diff --git a/src/Share/Web/Impl.hs b/src/Share/Web/Impl.hs index 63602647..31a870fc 100644 --- a/src/Share/Web/Impl.hs +++ b/src/Share/Web/Impl.hs @@ -24,7 +24,7 @@ import Share.Web.Support.Impl qualified as Support import Share.Web.Types import Share.Web.UCM.Projects.Impl qualified as UCMProjects import Share.Web.UCM.Sync.Impl qualified as Sync -import Share.Web.UCM.SyncStream.API qualified as SyncStream +import Share.Web.UCM.SyncStream.Impl qualified as SyncStream discoveryEndpoint :: WebApp DiscoveryDocument discoveryEndpoint = do diff --git a/src/Share/Web/UCM/Sync/Impl.hs b/src/Share/Web/UCM/Sync/Impl.hs index 2bd71310..c03cd0ff 100644 --- a/src/Share/Web/UCM/Sync/Impl.hs +++ b/src/Share/Web/UCM/Sync/Impl.hs @@ -9,6 +9,7 @@ module Share.Web.UCM.Sync.Impl -- This export can be removed once we've migrated away from sqlite. insertEntitiesToCodebase, ensureCausalIsFlushed, + repoInfoKind, ) where @@ -48,7 +49,7 @@ import Share.Web.Authentication qualified as AuthN import Share.Web.Authorization qualified as AuthZ import Share.Web.Errors import Share.Web.UCM.Sync.HashJWT qualified as HashJWT -import Share.Web.UCM.Sync.Types (EntityBunch (..), EntityKind (..), entityKind) +import Share.Web.UCM.Sync.Types (EntityBunch (..), RepoInfoKind (..), entityKind) import U.Codebase.Causal qualified as Causal import U.Codebase.Sqlite.Orphans () import Unison.Codebase.Path qualified as Path @@ -63,14 +64,9 @@ import Unison.Sync.EntityValidation qualified as Sync import Unison.Sync.Types (DownloadEntitiesError (..), DownloadEntitiesRequest (..), DownloadEntitiesResponse (..), GetCausalHashByPathRequest (..), GetCausalHashByPathResponse (..), NeedDependencies (..), RepoInfo (..), UploadEntitiesError (..), UploadEntitiesRequest (..), UploadEntitiesResponse (..)) import Unison.Sync.Types qualified as Share import Unison.Sync.Types qualified as Sync +import Unison.SyncV2.Types (EntityKind (..)) import UnliftIO qualified -data RepoInfoKind - = RepoInfoUser UserHandle - | RepoInfoProjectBranch ProjectBranchShortHand - | RepoInfoProjectRelease ProjectReleaseShortHand - deriving stock (Show) - -- | Parse a `RepoInfo` into the correct codebase view, e.g. -- -- >>> repoInfoKind (RepoInfo "@unison") diff --git a/src/Share/Web/UCM/Sync/Types.hs b/src/Share/Web/UCM/Sync/Types.hs index 69b919e2..26d5e380 100644 --- a/src/Share/Web/UCM/Sync/Types.hs +++ b/src/Share/Web/UCM/Sync/Types.hs @@ -1,12 +1,14 @@ module Share.Web.UCM.Sync.Types ( EntityBunch (..), - EntityKind (..), entityKind, + RepoInfoKind (..), ) where +import Share.IDs (ProjectBranchShortHand, ProjectReleaseShortHand, UserHandle) import Share.Prelude import Unison.Sync.Types qualified as Share +import Unison.SyncV2.Types qualified as SyncV2 -- | Helper type for handling entities of different types. data EntityBunch a = EntityBunch @@ -25,20 +27,18 @@ instance Semigroup (EntityBunch a) where instance Monoid (EntityBunch a) where mempty = EntityBunch [] [] [] [] [] -data EntityKind - = CausalEntity - | NamespaceEntity - | TermEntity - | TypeEntity - | PatchEntity - deriving (Show, Eq, Ord) - -entityKind :: HasCallStack => Share.Entity text hash hash' -> EntityKind +entityKind :: (HasCallStack) => Share.Entity text hash hash' -> SyncV2.EntityKind entityKind = \case - Share.C _ -> CausalEntity - Share.N _ -> NamespaceEntity + Share.C _ -> SyncV2.CausalEntity + Share.N _ -> SyncV2.NamespaceEntity Share.ND _ -> error "entityKind: Unsupported Entity Kind: NamespaceDiff" - Share.TC _ -> TermEntity - Share.DC _ -> TypeEntity - Share.P _ -> PatchEntity + Share.TC _ -> SyncV2.TermEntity + Share.DC _ -> SyncV2.TypeEntity + Share.P _ -> SyncV2.PatchEntity Share.PD _ -> error "entityKind: Unsupported Entity Kind: PatchDiff" + +data RepoInfoKind + = RepoInfoUser UserHandle + | RepoInfoProjectBranch ProjectBranchShortHand + | RepoInfoProjectRelease ProjectReleaseShortHand + deriving stock (Show) diff --git a/src/Share/Web/UCM/SyncStream/API.hs b/src/Share/Web/UCM/SyncStream/API.hs index f2175e54..7ff82c0e 100644 --- a/src/Share/Web/UCM/SyncStream/API.hs +++ b/src/Share/Web/UCM/SyncStream/API.hs @@ -1,74 +1,10 @@ {-# LANGUAGE DataKinds #-} {-# LANGUAGE TypeOperators #-} -module Share.Web.UCM.SyncStream.API (API, server) where +module Share.Web.UCM.SyncStream.API (API) where -import Conduit -import Control.Concurrent.STM qualified as STM import Servant -import Servant.Conduit (ConduitToSourceIO (..)) -import Share.Codebase qualified as Codebase -import Share.IDs (UserId) -import Share.OAuth.Session (AuthenticatedUserId) -import Share.Postgres qualified as PG -import Share.Postgres.Causal.Queries qualified as CausalQ -import Share.Postgres.Cursors qualified as Cursor -import Share.Postgres.IDs (CausalHash) -import Share.Prelude -import Share.Utils.Servant (RequiredQueryParam) -import Share.Web.App -import Share.Web.Authorization qualified as AuthZ -import Share.Web.UCM.SyncStream.Queries qualified as SSQ -import UnliftIO qualified -import UnliftIO.Async qualified as Async +import Share.OAuth.Session (MaybeAuthenticatedUserId) +import Unison.SyncV2.API qualified as SyncV2 -type API = NamedRoutes Routes - -type DownloadCausalStreamEndpoint = - AuthenticatedUserId - :> RequiredQueryParam "causalHash" CausalHash - :> StreamGet NewlineFraming OctetStream (SourceIO ByteString) - -data Routes mode = Routes - { downloadCausalStreamEndpointConduit :: mode :- "download-causal" :> DownloadCausalStreamEndpoint - } - deriving stock (Generic) - -server :: Routes WebAppServer -server = - Routes - { downloadCausalStreamEndpointConduit = downloadCausalStreamEndpointConduitImpl - } - -downloadCausalStreamEndpointConduitImpl :: UserId -> CausalHash -> WebApp (SourceIO ByteString) -downloadCausalStreamEndpointConduitImpl callerUserId causalHash = do - let authZReceipt = AuthZ.adminOverride - let codebaseLoc = Codebase.codebaseLocationForUserCodebase callerUserId - let codebase = Codebase.codebaseEnv authZReceipt codebaseLoc - q <- liftIO $ STM.newTBQueueIO 10 - streamResults <- UnliftIO.toIO do - Codebase.runCodebaseTransaction codebase $ do - (_bhId, causalId) <- CausalQ.expectCausalIdsOf id causalHash - cursor <- SSQ.allSerializedDependenciesOfCausalCursor causalId - Cursor.foldBatched cursor 1000 \batch -> do - PG.transactionUnsafeIO $ STM.atomically $ STM.writeTBQueue q batch - pure $ conduitToSourceIO do - handle <- liftIO $ Async.async streamResults - stream q handle - where - stream :: STM.TBQueue (NonEmpty ByteString) -> Async.Async () -> ConduitT () ByteString IO () - stream q async = do - let loop :: ConduitT () ByteString IO () - loop = do - next <- liftIO . STM.atomically $ do - STM.tryReadTBQueue q >>= \case - Nothing -> do - Async.waitSTM async $> Nothing - Just batch -> do - pure $ Just batch - case next of - Nothing -> pure () - Just batch -> do - yieldMany batch - loop - loop +type API = MaybeAuthenticatedUserId :> NamedRoutes SyncV2.Routes diff --git a/src/Share/Web/UCM/SyncStream/Impl.hs b/src/Share/Web/UCM/SyncStream/Impl.hs new file mode 100644 index 00000000..9ea554af --- /dev/null +++ b/src/Share/Web/UCM/SyncStream/Impl.hs @@ -0,0 +1,112 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE TypeOperators #-} + +module Share.Web.UCM.SyncStream.Impl (server) where + +import Conduit +import Control.Concurrent.STM qualified as STM +import Control.Monad.Except (ExceptT (ExceptT)) +import Control.Monad.Trans.Except (runExceptT) +import Data.ByteString.Lazy qualified as BL +import Servant +import Servant.Conduit (ConduitToSourceIO (..)) +import Servant.Types.SourceT qualified as SourceT +import Share.Codebase qualified as Codebase +import Share.IDs (ProjectBranchShortHand (..), ProjectReleaseShortHand (..), ProjectShortHand (..), UserHandle, UserId) +import Share.IDs qualified as IDs +import Share.Postgres qualified as PG +import Share.Postgres.Causal.Queries qualified as CausalQ +import Share.Postgres.Cursors qualified as Cursor +import Share.Postgres.Queries qualified as PGQ +import Share.Prelude +import Share.Project (Project (..)) +import Share.User (User (..)) +import Share.Utils.Unison (hash32ToCausalHash) +import Share.Web.App +import Share.Web.Authorization qualified as AuthZ +import Share.Web.Errors +import Share.Web.UCM.Sync.HashJWT qualified as HashJWT +import Share.Web.UCM.SyncStream.Queries qualified as SSQ +import U.Codebase.Sqlite.Orphans () +import Unison.Hash32 (Hash32) +import Unison.Share.API.Hash (HashJWTClaims (..)) +import Unison.SyncV2.API qualified as SyncV2 +import Unison.SyncV2.Types (DownloadEntitiesChunk (..)) +import Unison.SyncV2.Types qualified as SyncV2 +import UnliftIO qualified +import UnliftIO.Async qualified as Async + +server :: Maybe UserId -> SyncV2.Routes WebAppServer +server mayUserId = + SyncV2.Routes + { downloadEntitiesStream = downloadEntitiesStreamImpl mayUserId, + uploadEntitiesStream = undefined + } + +parseBranchRef :: SyncV2.BranchRef -> Either Text (Either ProjectReleaseShortHand ProjectBranchShortHand) +parseBranchRef (SyncV2.BranchRef branchRef) = + case parseRelease <|> parseBranch of + Just a -> Right a + Nothing -> Left $ "Invalid repo info: " <> branchRef + where + parseBranch :: Maybe (Either ProjectReleaseShortHand ProjectBranchShortHand) + parseBranch = fmap Right . eitherToMaybe $ IDs.fromText @ProjectBranchShortHand branchRef + parseRelease :: Maybe (Either ProjectReleaseShortHand ProjectBranchShortHand) + parseRelease = fmap Left . eitherToMaybe $ IDs.fromText @ProjectReleaseShortHand branchRef + +downloadEntitiesStreamImpl :: Maybe UserId -> SyncV2.DownloadEntitiesRequest -> WebApp (SourceIO SyncV2.DownloadEntitiesChunk) +downloadEntitiesStreamImpl mayCallerUserId (SyncV2.DownloadEntitiesRequest {causalHash = causalHashJWT, branchRef}) = do + either emitErr id <$> runExceptT do + addRequestTag "branch-ref" (SyncV2.unBranchRef branchRef) + HashJWTClaims {hash = causalHash} <- lift (HashJWT.verifyHashJWT mayCallerUserId causalHashJWT >>= either respondError pure) + codebase <- + case parseBranchRef branchRef of + Left err -> throwError (SyncV2.DownloadEntitiesInvalidBranchRef err branchRef) + Right (Left (ProjectReleaseShortHand {userHandle, projectSlug})) -> do + let projectShortHand = ProjectShortHand {userHandle, projectSlug} + (Project {ownerUserId = projectOwnerUserId}, contributorId) <- ExceptT . PG.tryRunTransaction $ do + project <- PGQ.projectByShortHand projectShortHand `whenNothingM` throwError (SyncV2.DownloadEntitiesProjectNotFound $ IDs.toText @ProjectShortHand projectShortHand) + pure (project, Nothing) + authZToken <- lift AuthZ.checkDownloadFromProjectBranchCodebase `whenLeftM` \_err -> throwError (SyncV2.DownloadEntitiesNoReadPermission branchRef) + let codebaseLoc = Codebase.codebaseLocationForProjectBranchCodebase projectOwnerUserId contributorId + pure $ Codebase.codebaseEnv authZToken codebaseLoc + Right (Right (ProjectBranchShortHand {userHandle, projectSlug, contributorHandle})) -> do + let projectShortHand = ProjectShortHand {userHandle, projectSlug} + (Project {ownerUserId = projectOwnerUserId}, contributorId) <- ExceptT . PG.tryRunTransaction $ do + project <- (PGQ.projectByShortHand projectShortHand) `whenNothingM` throwError (SyncV2.DownloadEntitiesProjectNotFound $ IDs.toText @ProjectShortHand projectShortHand) + mayContributorUserId <- for contributorHandle \ch -> fmap user_id $ (PGQ.userByHandle ch) `whenNothingM` throwError (SyncV2.DownloadEntitiesUserNotFound $ IDs.toText @UserHandle ch) + pure (project, mayContributorUserId) + authZToken <- lift AuthZ.checkDownloadFromProjectBranchCodebase `whenLeftM` \_err -> throwError (SyncV2.DownloadEntitiesNoReadPermission branchRef) + let codebaseLoc = Codebase.codebaseLocationForProjectBranchCodebase projectOwnerUserId contributorId + pure $ Codebase.codebaseEnv authZToken codebaseLoc + q <- liftIO $ STM.newTBQueueIO 10 + streamResults <- lift $ UnliftIO.toIO do + Codebase.runCodebaseTransaction codebase $ do + (_bhId, causalId) <- CausalQ.expectCausalIdsOf id (hash32ToCausalHash causalHash) + cursor <- SSQ.allSerializedDependenciesOfCausalCursor causalId + Cursor.foldBatched cursor 1000 \batch -> do + PG.transactionUnsafeIO $ STM.atomically $ STM.writeTBQueue q batch + pure $ conduitToSourceIO do + handle <- liftIO $ Async.async streamResults + stream q handle + where + stream :: STM.TBQueue (NonEmpty (Hash32, ByteString)) -> Async.Async () -> ConduitT () DownloadEntitiesChunk IO () + stream q async = do + let loop :: ConduitT () DownloadEntitiesChunk IO () + loop = do + next <- liftIO . STM.atomically $ do + STM.tryReadTBQueue q >>= \case + Nothing -> do + Async.waitSTM async $> Nothing + Just batch -> do + pure $ Just batch + case next of + Nothing -> pure () + Just batch -> do + let chunks = batch <&> \(hash, bytes) -> EntityChunk {hash, entityCBOR = SyncV2.CBORBytes $ BL.fromStrict bytes} + yieldMany chunks + loop + loop + + emitErr :: SyncV2.DownloadEntitiesError -> SourceIO SyncV2.DownloadEntitiesChunk + emitErr err = SourceT.source [ErrorChunk err] diff --git a/src/Share/Web/UCM/SyncStream/Queries.hs b/src/Share/Web/UCM/SyncStream/Queries.hs index 8fa99e88..dee0dfb9 100644 --- a/src/Share/Web/UCM/SyncStream/Queries.hs +++ b/src/Share/Web/UCM/SyncStream/Queries.hs @@ -11,6 +11,7 @@ import Share.Postgres.Cursors (PGCursor) import Share.Postgres.Cursors qualified as PGCursor import Share.Postgres.IDs import Share.Prelude +import Unison.Hash32 (Hash32) allHashDependenciesOfCausalCursor :: CausalId -> CodebaseM e (PGCursor Text) allHashDependenciesOfCausalCursor cid = do @@ -179,14 +180,14 @@ allHashDependenciesOfCausalCursor cid = do JOIN component_hashes ON tc.component_hash_id = component_hashes.id |] -allSerializedDependenciesOfCausalCursor :: CausalId -> CodebaseM e (PGCursor ByteString) +allSerializedDependenciesOfCausalCursor :: CausalId -> CodebaseM e (PGCursor (Hash32, ByteString)) allSerializedDependenciesOfCausalCursor cid = do ownerUserId <- asks codebaseOwner - PGCursor.newColCursor + PGCursor.newRowCursor "causal_dependencies" [sql| - WITH RECURSIVE transitive_causals(causal_id, causal_namespace_hash_id) AS ( - SELECT causal.id, causal.namespace_hash_id + WITH RECURSIVE transitive_causals(causal_id, causal_hash, causal_namespace_hash_id) AS ( + SELECT causal.id, causal.hash, causal.namespace_hash_id FROM causals causal WHERE causal.id = #{cid} -- AND NOT EXISTS (SELECT FROM causal_ownership co WHERE co.user_id = to_codebase_user_id AND co.causal_id = causal.id) @@ -198,23 +199,24 @@ allSerializedDependenciesOfCausalCursor cid = do SELECT causal_id, causal_namespace_hash_id FROM transitive_causals tc ) - SELECT ancestor_causal.id, ancestor_causal.namespace_hash_id + SELECT ancestor_causal.id, ancestor_causal.hash, ancestor_causal.namespace_hash_id FROM causal_ancestors ca JOIN rec tc ON ca.causal_id = tc.causal_id JOIN causals ancestor_causal ON ca.ancestor_id = ancestor_causal.id -- WHERE NOT EXISTS (SELECT FROM causal_ownership co WHERE co.user_id = to_codebase_user_id AND co.causal_id = ancestor_causal.id) UNION - SELECT child_causal.id, child_causal.namespace_hash_id + SELECT child_causal.id, child_causal.hash, child_causal.namespace_hash_id FROM rec tc JOIN namespace_children nc ON tc.causal_namespace_hash_id = nc.parent_namespace_hash_id JOIN causals child_causal ON nc.child_causal_id = child_causal.id -- WHERE NOT EXISTS (SELECT FROM causal_ownership co WHERE co.user_id = to_codebase_user_id AND co.causal_id = child_causal.id) ) - ), all_namespaces(namespace_hash_id) AS ( - SELECT DISTINCT causal_namespace_hash_id AS namespace_hash_id - FROM transitive_causals - -- WHERE NOT EXISTS (SELECT FROM namespace_ownership no WHERE no.user_id = to_codebase_user_id AND no.namespace_hash_id = causal_namespace_hash_id) - ), all_patches(patch_id) AS ( + ), all_namespaces(namespace_hash_id, namespace_hash) AS ( + SELECT DISTINCT tc.causal_namespace_hash_id AS namespace_hash_id, bh.base32 as namespace_hash + FROM transitive_causals tc + JOIN branch_hashes bh ON tc.causal_namespace_hash_id = bh.id + -- WHERE NOT EXISTS (SELECT FROM namespace_ownership no WHERE no.user_id = to_codebase_user_id AND no.namespace_hash_id = tc.causal_namespace_hash_id) + ), all_patches(patch_id, patch_hash) AS ( SELECT DISTINCT patch.id FROM all_namespaces an JOIN namespace_patches np ON an.namespace_hash_id = np.namespace_hash_id @@ -308,44 +310,24 @@ allSerializedDependenciesOfCausalCursor cid = do JOIN types typ ON atc.component_hash_id = typ.component_hash_id JOIN type_local_component_references ref ON typ.id = ref.type_id ) - ), copied_causals(causal_id) AS ( - SELECT DISTINCT tc.causal_id - FROM transitive_causals tc - ), copied_namespaces(namespace_hash_id) AS ( - SELECT DISTINCT an.namespace_hash_id - FROM all_namespaces an - ), copied_patches(patch_id) AS ( - SELECT DISTINCT ap.patch_id - FROM all_patches ap - ), copied_term_components AS ( - SELECT DISTINCT term.id, copy.bytes_id - FROM transitive_components tc - JOIN terms term ON tc.component_hash_id = term.component_hash_id - JOIN sandboxed_terms copy ON term.id = copy.term_id - WHERE copy.user_id = #{ownerUserId} - ), copied_type_components AS ( - SELECT DISTINCT typ.id, copy.bytes_id - FROM transitive_components tc - JOIN types typ ON tc.component_hash_id = typ.component_hash_id - JOIN sandboxed_types copy ON typ.id = copy.type_id - WHERE copy.user_id = #{ownerUserId} - ) SELECT bytes.bytes - FROM copied_causals cc - JOIN serialized_causals sc ON cc.causal_id = sc.causal_id + ) SELECT bytes.bytes, tc.causal_hash + FROM transitive_causals tc + JOIN serialized_causals sc ON tc.causal_id = sc.causal_id JOIN bytes ON sc.bytes_id = bytes.id UNION ALL - SELECT bytes.bytes - FROM copied_namespaces cn - JOIN serialized_namespaces sn ON cn.namespace_hash_id = sn.namespace_hash_id + SELECT bytes.bytes, an.namespace_hash + FROM all_namespaces an + JOIN serialized_namespaces sn ON an.namespace_hash_id = sn.namespace_hash_id JOIN bytes ON sn.bytes_id = bytes.id UNION ALL - SELECT bytes.bytes - FROM copied_patches cp - JOIN serialized_patches sp ON cp.patch_id = sp.patch_id + SELECT bytes.bytes, ap.patch_hash + FROM all_patches ap + JOIN serialized_patches sp ON ap.patch_id = sp.patch_id JOIN bytes ON sp.bytes_id = bytes.id UNION ALL - SELECT bytes.bytes + SELECT bytes.bytes, ch.base32 FROM transitive_components tc JOIN serialized_components sc ON tc.component_hash_id = sc.component_hash_id JOIN bytes ON sc.bytes_id = bytes.id + JOIN component_hashes ch ON tc.component_hash_id = ch.id |] From 272cf1c005c2b08aeee89c8ccbca48b6c2dc04d0 Mon Sep 17 00:00:00 2001 From: Chris Penner Date: Mon, 12 Aug 2024 15:49:57 -0700 Subject: [PATCH 10/20] WIP --- src/Share/Postgres/Sync/Queries.hs | 31 ++++++++++++++-------------- src/Share/Web/UCM/SyncStream/Impl.hs | 3 +-- 2 files changed, 16 insertions(+), 18 deletions(-) diff --git a/src/Share/Postgres/Sync/Queries.hs b/src/Share/Postgres/Sync/Queries.hs index 60d071ec..d093ff5f 100644 --- a/src/Share/Postgres/Sync/Queries.hs +++ b/src/Share/Postgres/Sync/Queries.hs @@ -15,8 +15,6 @@ module Share.Postgres.Sync.Queries ) where -import Codec.CBOR.Write qualified as CBOR -import Codec.Serialise.Class qualified as CBOR import Control.Lens hiding (from) import Data.ByteString.Lazy.Char8 qualified as BL import Data.Foldable qualified as Foldable @@ -66,7 +64,8 @@ import Unison.Hash32 import Unison.Hash32 qualified as Hash32 import Unison.Sync.Common qualified as Share import Unison.Sync.Types qualified as Share -import Unison.SyncV2.Types (EntityKind (..)) +import Unison.SyncV2.Types (CBORBytes (..), EntityKind (..)) +import Unison.SyncV2.Types qualified as SyncV2 data SyncQError = InvalidNamespaceBytes @@ -585,7 +584,7 @@ getEntitiesReadyToFlush = do saveSerializedEntities :: (Foldable f) => f (Hash32, TempEntity) -> CodebaseM e () saveSerializedEntities entities = do for_ entities \(hash, entity) -> do - let serialised = CBOR.toStrictByteString (CBOR.encode hash <> CBOR.encode entity) + let serialised = SyncV2.serialiseCBORBytes entity case entity of Entity.TC {} -> saveSerializedComponent hash serialised Entity.DC {} -> saveSerializedComponent hash serialised @@ -593,10 +592,10 @@ saveSerializedEntities entities = do Entity.C {} -> saveSerializedCausal hash serialised Entity.N {} -> saveSerializedNamespace hash serialised -saveSerializedComponent :: Hash32 -> ByteString -> CodebaseM e () -saveSerializedComponent hash serialised = do +saveSerializedComponent :: Hash32 -> CBORBytes TempEntity -> CodebaseM e () +saveSerializedComponent hash (CBORBytes bytes) = do codebaseOwnerUserId <- asks Codebase.codebaseOwner - bytesId <- DefnQ.ensureBytesIdsOf id serialised + bytesId <- DefnQ.ensureBytesIdsOf id (BL.toStrict bytes) execute_ [sql| INSERT INTO serialized_components (user_id, component_hash_id, bytes_id) @@ -604,9 +603,9 @@ saveSerializedComponent hash serialised = do ON CONFLICT DO NOTHING |] -saveSerializedPatch :: Hash32 -> ByteString -> CodebaseM e () -saveSerializedPatch hash serialised = do - bytesId <- DefnQ.ensureBytesIdsOf id serialised +saveSerializedPatch :: Hash32 -> CBORBytes TempEntity -> CodebaseM e () +saveSerializedPatch hash (CBORBytes bytes) = do + bytesId <- DefnQ.ensureBytesIdsOf id (BL.toStrict bytes) execute_ [sql| INSERT INTO serialized_patches (patch_id, bytes_id) @@ -614,9 +613,9 @@ saveSerializedPatch hash serialised = do ON CONFLICT DO NOTHING |] -saveSerializedCausal :: Hash32 -> ByteString -> CodebaseM e () -saveSerializedCausal hash serialised = do - bytesId <- DefnQ.ensureBytesIdsOf id serialised +saveSerializedCausal :: Hash32 -> CBORBytes TempEntity -> CodebaseM e () +saveSerializedCausal hash (CBORBytes bytes) = do + bytesId <- DefnQ.ensureBytesIdsOf id (BL.toStrict bytes) execute_ [sql| INSERT INTO serialized_causals (causal_id, bytes_id) @@ -624,9 +623,9 @@ saveSerializedCausal hash serialised = do ON CONFLICT DO NOTHING |] -saveSerializedNamespace :: Hash32 -> ByteString -> CodebaseM e () -saveSerializedNamespace hash serialised = do - bytesId <- DefnQ.ensureBytesIdsOf id serialised +saveSerializedNamespace :: Hash32 -> CBORBytes TempEntity -> CodebaseM e () +saveSerializedNamespace hash (CBORBytes bytes) = do + bytesId <- DefnQ.ensureBytesIdsOf id (BL.toStrict bytes) execute_ [sql| INSERT INTO serialized_namespaces (namespace_hash_id, bytes_id) diff --git a/src/Share/Web/UCM/SyncStream/Impl.hs b/src/Share/Web/UCM/SyncStream/Impl.hs index 9ea554af..f90edd9b 100644 --- a/src/Share/Web/UCM/SyncStream/Impl.hs +++ b/src/Share/Web/UCM/SyncStream/Impl.hs @@ -39,8 +39,7 @@ import UnliftIO.Async qualified as Async server :: Maybe UserId -> SyncV2.Routes WebAppServer server mayUserId = SyncV2.Routes - { downloadEntitiesStream = downloadEntitiesStreamImpl mayUserId, - uploadEntitiesStream = undefined + { downloadEntitiesStream = downloadEntitiesStreamImpl mayUserId } parseBranchRef :: SyncV2.BranchRef -> Either Text (Either ProjectReleaseShortHand ProjectBranchShortHand) From f0abad8aaf8d594eea83f9996c47d49fda79bdbe Mon Sep 17 00:00:00 2001 From: Chris Penner Date: Mon, 12 Aug 2024 16:12:41 -0700 Subject: [PATCH 11/20] WIP --- src/Share/Web/API.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Share/Web/API.hs b/src/Share/Web/API.hs index 8ed0450d..1fcc9a6f 100644 --- a/src/Share/Web/API.hs +++ b/src/Share/Web/API.hs @@ -37,8 +37,8 @@ type API = :<|> ("sync" :> MaybeAuthenticatedSession :> Unison.Sync.API) :<|> ("ucm" :> "v1" :> "sync" :> MaybeAuthenticatedSession :> Unison.Sync.API) :<|> ("ucm" :> "v1" :> "projects" :> MaybeAuthenticatedSession :> UCMProjects.ProjectsAPI) + :<|> ("ucm" :> "v2" :> "sync" :> SyncStream.API) :<|> ("admin" :> Admin.API) - :<|> ("sync-stream" :> SyncStream.API) api :: Proxy API api = Proxy @API From a218f499dd6fe5a53d5ccd2013360e7af51d827e Mon Sep 17 00:00:00 2001 From: Chris Penner Date: Mon, 12 Aug 2024 23:42:12 -0700 Subject: [PATCH 12/20] Debugging --- src/Share/Web/Impl.hs | 2 +- src/Share/Web/UCM/SyncStream/Impl.hs | 41 ++++++++++++++++--------- src/Share/Web/UCM/SyncStream/Queries.hs | 6 ++-- 3 files changed, 32 insertions(+), 17 deletions(-) diff --git a/src/Share/Web/Impl.hs b/src/Share/Web/Impl.hs index 31a870fc..aa140091 100644 --- a/src/Share/Web/Impl.hs +++ b/src/Share/Web/Impl.hs @@ -73,5 +73,5 @@ server = :<|> Sync.server -- Deprecated path :<|> Sync.server :<|> UCMProjects.server - :<|> Admin.server :<|> SyncStream.server + :<|> Admin.server diff --git a/src/Share/Web/UCM/SyncStream/Impl.hs b/src/Share/Web/UCM/SyncStream/Impl.hs index f90edd9b..a41e63e3 100644 --- a/src/Share/Web/UCM/SyncStream/Impl.hs +++ b/src/Share/Web/UCM/SyncStream/Impl.hs @@ -5,9 +5,10 @@ module Share.Web.UCM.SyncStream.Impl (server) where import Conduit import Control.Concurrent.STM qualified as STM +import Control.Concurrent.STM.TBMQueue qualified as STM import Control.Monad.Except (ExceptT (ExceptT)) import Control.Monad.Trans.Except (runExceptT) -import Data.ByteString.Lazy qualified as BL +import Data.Conduit.Combinators qualified as Conduit import Servant import Servant.Conduit (ConduitToSourceIO (..)) import Servant.Types.SourceT qualified as SourceT @@ -28,6 +29,8 @@ import Share.Web.Errors import Share.Web.UCM.Sync.HashJWT qualified as HashJWT import Share.Web.UCM.SyncStream.Queries qualified as SSQ import U.Codebase.Sqlite.Orphans () +import U.Codebase.Sqlite.TempEntity (TempEntity) +import Unison.Debug qualified as Debug import Unison.Hash32 (Hash32) import Unison.Share.API.Hash (HashJWTClaims (..)) import Unison.SyncV2.API qualified as SyncV2 @@ -78,34 +81,44 @@ downloadEntitiesStreamImpl mayCallerUserId (SyncV2.DownloadEntitiesRequest {caus authZToken <- lift AuthZ.checkDownloadFromProjectBranchCodebase `whenLeftM` \_err -> throwError (SyncV2.DownloadEntitiesNoReadPermission branchRef) let codebaseLoc = Codebase.codebaseLocationForProjectBranchCodebase projectOwnerUserId contributorId pure $ Codebase.codebaseEnv authZToken codebaseLoc - q <- liftIO $ STM.newTBQueueIO 10 + q <- liftIO $ STM.newTBMQueueIO 10 streamResults <- lift $ UnliftIO.toIO do + Debug.debugLogM Debug.Temp "Starting source Stream" Codebase.runCodebaseTransaction codebase $ do (_bhId, causalId) <- CausalQ.expectCausalIdsOf id (hash32ToCausalHash causalHash) cursor <- SSQ.allSerializedDependenciesOfCausalCursor causalId Cursor.foldBatched cursor 1000 \batch -> do - PG.transactionUnsafeIO $ STM.atomically $ STM.writeTBQueue q batch + Debug.debugLogM Debug.Temp "Source stream batch" + PG.transactionUnsafeIO $ STM.atomically $ STM.writeTBMQueue q batch + PG.transactionUnsafeIO $ STM.atomically $ STM.closeTBMQueue q pure $ conduitToSourceIO do handle <- liftIO $ Async.async streamResults stream q handle + Conduit..| ( Conduit.iterM \case + EntityChunk {hash} -> Debug.debugM Debug.Temp "Chunk " hash + ErrorChunk err -> Debug.debugM Debug.Temp "Error " err + ) where - stream :: STM.TBQueue (NonEmpty (Hash32, ByteString)) -> Async.Async () -> ConduitT () DownloadEntitiesChunk IO () - stream q async = do + stream :: STM.TBMQueue (NonEmpty (SyncV2.CBORBytes TempEntity, Hash32)) -> (Async.Async a) -> ConduitT () DownloadEntitiesChunk IO () + stream q handle = do let loop :: ConduitT () DownloadEntitiesChunk IO () loop = do - next <- liftIO . STM.atomically $ do - STM.tryReadTBQueue q >>= \case - Nothing -> do - Async.waitSTM async $> Nothing - Just batch -> do - pure $ Just batch - case next of - Nothing -> pure () + Debug.debugLogM Debug.Temp "Waiting for batch..." + liftIO (STM.atomically (STM.readTBMQueue q)) >>= \case + -- The queue is closed. + Nothing -> do + Debug.debugLogM Debug.Temp "Queue closed. finishing up!" + pure () Just batch -> do - let chunks = batch <&> \(hash, bytes) -> EntityChunk {hash, entityCBOR = SyncV2.CBORBytes $ BL.fromStrict bytes} + let chunks = batch <&> \(entityCBOR, hash) -> EntityChunk {hash, entityCBOR} + Debug.debugLogM Debug.Temp $ "Emitting chunk of " <> show (length chunks) <> " entities" yieldMany chunks loop loop + Debug.debugLogM Debug.Temp "Waiting for worker thread to finish" + -- It _should_ have terminated by now, but just in case, cancel it. + Async.cancel handle + Debug.debugLogM Debug.Temp "Done!" emitErr :: SyncV2.DownloadEntitiesError -> SourceIO SyncV2.DownloadEntitiesChunk emitErr err = SourceT.source [ErrorChunk err] diff --git a/src/Share/Web/UCM/SyncStream/Queries.hs b/src/Share/Web/UCM/SyncStream/Queries.hs index dee0dfb9..11278a45 100644 --- a/src/Share/Web/UCM/SyncStream/Queries.hs +++ b/src/Share/Web/UCM/SyncStream/Queries.hs @@ -11,7 +11,9 @@ import Share.Postgres.Cursors (PGCursor) import Share.Postgres.Cursors qualified as PGCursor import Share.Postgres.IDs import Share.Prelude +import U.Codebase.Sqlite.TempEntity (TempEntity) import Unison.Hash32 (Hash32) +import Unison.SyncV2.Types (CBORBytes) allHashDependenciesOfCausalCursor :: CausalId -> CodebaseM e (PGCursor Text) allHashDependenciesOfCausalCursor cid = do @@ -180,7 +182,7 @@ allHashDependenciesOfCausalCursor cid = do JOIN component_hashes ON tc.component_hash_id = component_hashes.id |] -allSerializedDependenciesOfCausalCursor :: CausalId -> CodebaseM e (PGCursor (Hash32, ByteString)) +allSerializedDependenciesOfCausalCursor :: CausalId -> CodebaseM e (PGCursor (CBORBytes TempEntity, Hash32)) allSerializedDependenciesOfCausalCursor cid = do ownerUserId <- asks codebaseOwner PGCursor.newRowCursor @@ -217,7 +219,7 @@ allSerializedDependenciesOfCausalCursor cid = do JOIN branch_hashes bh ON tc.causal_namespace_hash_id = bh.id -- WHERE NOT EXISTS (SELECT FROM namespace_ownership no WHERE no.user_id = to_codebase_user_id AND no.namespace_hash_id = tc.causal_namespace_hash_id) ), all_patches(patch_id, patch_hash) AS ( - SELECT DISTINCT patch.id + SELECT DISTINCT patch.id, patch.hash FROM all_namespaces an JOIN namespace_patches np ON an.namespace_hash_id = np.namespace_hash_id JOIN patches patch ON np.patch_id = patch.id From 5862bce9d8d39a5036c85440c85f0f8a0b22216c Mon Sep 17 00:00:00 2001 From: Chris Penner Date: Tue, 13 Aug 2024 09:40:50 -0700 Subject: [PATCH 13/20] Try reversing hash ordering --- src/Share/Web/UCM/SyncStream/Queries.hs | 39 ++++++++++++++++--------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/src/Share/Web/UCM/SyncStream/Queries.hs b/src/Share/Web/UCM/SyncStream/Queries.hs index 11278a45..6cc55735 100644 --- a/src/Share/Web/UCM/SyncStream/Queries.hs +++ b/src/Share/Web/UCM/SyncStream/Queries.hs @@ -312,24 +312,37 @@ allSerializedDependenciesOfCausalCursor cid = do JOIN types typ ON atc.component_hash_id = typ.component_hash_id JOIN type_local_component_references ref ON typ.id = ref.type_id ) - ) SELECT bytes.bytes, tc.causal_hash - FROM transitive_causals tc - JOIN serialized_causals sc ON tc.causal_id = sc.causal_id + ) + (SELECT bytes.bytes, ch.base32 + FROM transitive_components tc + JOIN serialized_components sc ON tc.component_hash_id = sc.component_hash_id JOIN bytes ON sc.bytes_id = bytes.id + JOIN component_hashes ch ON tc.component_hash_id = ch.id + -- Reverse the ordering so deeper components come first + ORDER BY row_number() OVER () DESC + ) UNION ALL - SELECT bytes.bytes, an.namespace_hash - FROM all_namespaces an - JOIN serialized_namespaces sn ON an.namespace_hash_id = sn.namespace_hash_id - JOIN bytes ON sn.bytes_id = bytes.id - UNION ALL - SELECT bytes.bytes, ap.patch_hash + (SELECT bytes.bytes, ap.patch_hash FROM all_patches ap JOIN serialized_patches sp ON ap.patch_id = sp.patch_id JOIN bytes ON sp.bytes_id = bytes.id + -- Reverse the ordering so deeper components come first + ORDER BY row_number() OVER () DESC + ) UNION ALL - SELECT bytes.bytes, ch.base32 - FROM transitive_components tc - JOIN serialized_components sc ON tc.component_hash_id = sc.component_hash_id + (SELECT bytes.bytes, an.namespace_hash + FROM all_namespaces an + JOIN serialized_namespaces sn ON an.namespace_hash_id = sn.namespace_hash_id + JOIN bytes ON sn.bytes_id = bytes.id + -- Reverse the ordering so deeper components come first + ORDER BY row_number() OVER () DESC + ) + UNION ALL + (SELECT bytes.bytes, tc.causal_hash + FROM transitive_causals tc + JOIN serialized_causals sc ON tc.causal_id = sc.causal_id JOIN bytes ON sc.bytes_id = bytes.id - JOIN component_hashes ch ON tc.component_hash_id = ch.id + -- Reverse the ordering so deeper components come first + ORDER BY row_number() OVER () DESC + ) |] From 19592f7b22957933611c3796e280df83c8269af9 Mon Sep 17 00:00:00 2001 From: Chris Penner Date: Tue, 3 Dec 2024 16:03:25 -0800 Subject: [PATCH 14/20] Fix compilation --- package.yaml | 1 + share-api.cabal | 2 ++ src/Share/Utils/Servant/CBOR.hs | 3 +-- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/package.yaml b/package.yaml index 8f568644..04d0afe7 100644 --- a/package.yaml +++ b/package.yaml @@ -99,6 +99,7 @@ dependencies: - servant-conduit - serialise - stm +- stm-chans - text - these - time diff --git a/share-api.cabal b/share-api.cabal index 45d8289a..5f7338b8 100644 --- a/share-api.cabal +++ b/share-api.cabal @@ -267,6 +267,7 @@ library , share-auth , share-utils , stm + , stm-chans , text , these , time @@ -415,6 +416,7 @@ executable share-api , share-auth , share-utils , stm + , stm-chans , text , these , time diff --git a/src/Share/Utils/Servant/CBOR.hs b/src/Share/Utils/Servant/CBOR.hs index 0cf66ee5..85fa5178 100644 --- a/src/Share/Utils/Servant/CBOR.hs +++ b/src/Share/Utils/Servant/CBOR.hs @@ -12,8 +12,7 @@ import Servant -- | Content-type for encoding and decoding objects as their CBOR representations data CBOR --- | Mime-type for CBOR and additional ones using the word "hackage" and the --- name of the package "serialise". +-- | Mime-type for CBOR instance Accept CBOR where contentTypes Proxy = NonEmpty.singleton ("application" MediaType.// "cbor") From 541a87cf222418b800718d77016b4fff0c5e968f Mon Sep 17 00:00:00 2001 From: Chris Penner Date: Fri, 13 Dec 2024 12:26:55 -0800 Subject: [PATCH 15/20] Rename SyncStream -> SyncV2 --- share-api.cabal | 6 +++--- src/Share/Web/API.hs | 4 ++-- src/Share/Web/Impl.hs | 4 ++-- src/Share/Web/UCM/{SyncStream => SyncV2}/API.hs | 2 +- src/Share/Web/UCM/{SyncStream => SyncV2}/Impl.hs | 4 ++-- src/Share/Web/UCM/{SyncStream => SyncV2}/Queries.hs | 2 +- 6 files changed, 11 insertions(+), 11 deletions(-) rename src/Share/Web/UCM/{SyncStream => SyncV2}/API.hs (83%) rename src/Share/Web/UCM/{SyncStream => SyncV2}/Impl.hs (98%) rename src/Share/Web/UCM/{SyncStream => SyncV2}/Queries.hs (99%) diff --git a/share-api.cabal b/share-api.cabal index 5f7338b8..9b415478 100644 --- a/share-api.cabal +++ b/share-api.cabal @@ -153,9 +153,9 @@ library Share.Web.UCM.Sync.HashJWT Share.Web.UCM.Sync.Impl Share.Web.UCM.Sync.Types - Share.Web.UCM.SyncStream.API - Share.Web.UCM.SyncStream.Impl - Share.Web.UCM.SyncStream.Queries + Share.Web.UCM.SyncV2.API + Share.Web.UCM.SyncV2.Impl + Share.Web.UCM.SyncV2.Queries Unison.PrettyPrintEnvDecl.Postgres Unison.Server.NameSearch.Postgres Unison.Server.Share.Definitions diff --git a/src/Share/Web/API.hs b/src/Share/Web/API.hs index 1fcc9a6f..b1fbacea 100644 --- a/src/Share/Web/API.hs +++ b/src/Share/Web/API.hs @@ -13,7 +13,7 @@ import Share.Web.Share.API qualified as Share import Share.Web.Share.Projects.API qualified as Projects import Share.Web.Support.API qualified as Support import Share.Web.Types -import Share.Web.UCM.SyncStream.API qualified as SyncStream +import Share.Web.UCM.SyncV2.API qualified as SyncV2 import Unison.Share.API.Projects qualified as UCMProjects import Unison.Sync.API qualified as Unison.Sync @@ -37,7 +37,7 @@ type API = :<|> ("sync" :> MaybeAuthenticatedSession :> Unison.Sync.API) :<|> ("ucm" :> "v1" :> "sync" :> MaybeAuthenticatedSession :> Unison.Sync.API) :<|> ("ucm" :> "v1" :> "projects" :> MaybeAuthenticatedSession :> UCMProjects.ProjectsAPI) - :<|> ("ucm" :> "v2" :> "sync" :> SyncStream.API) + :<|> ("ucm" :> "v2" :> "sync" :> SyncV2.API) :<|> ("admin" :> Admin.API) api :: Proxy API diff --git a/src/Share/Web/Impl.hs b/src/Share/Web/Impl.hs index aa140091..04b576f3 100644 --- a/src/Share/Web/Impl.hs +++ b/src/Share/Web/Impl.hs @@ -24,7 +24,7 @@ import Share.Web.Support.Impl qualified as Support import Share.Web.Types import Share.Web.UCM.Projects.Impl qualified as UCMProjects import Share.Web.UCM.Sync.Impl qualified as Sync -import Share.Web.UCM.SyncStream.Impl qualified as SyncStream +import Share.Web.UCM.SyncV2.Impl qualified as SyncV2 discoveryEndpoint :: WebApp DiscoveryDocument discoveryEndpoint = do @@ -73,5 +73,5 @@ server = :<|> Sync.server -- Deprecated path :<|> Sync.server :<|> UCMProjects.server - :<|> SyncStream.server + :<|> SyncV2.server :<|> Admin.server diff --git a/src/Share/Web/UCM/SyncStream/API.hs b/src/Share/Web/UCM/SyncV2/API.hs similarity index 83% rename from src/Share/Web/UCM/SyncStream/API.hs rename to src/Share/Web/UCM/SyncV2/API.hs index 7ff82c0e..5f689c4b 100644 --- a/src/Share/Web/UCM/SyncStream/API.hs +++ b/src/Share/Web/UCM/SyncV2/API.hs @@ -1,7 +1,7 @@ {-# LANGUAGE DataKinds #-} {-# LANGUAGE TypeOperators #-} -module Share.Web.UCM.SyncStream.API (API) where +module Share.Web.UCM.SyncV2.API (API) where import Servant import Share.OAuth.Session (MaybeAuthenticatedUserId) diff --git a/src/Share/Web/UCM/SyncStream/Impl.hs b/src/Share/Web/UCM/SyncV2/Impl.hs similarity index 98% rename from src/Share/Web/UCM/SyncStream/Impl.hs rename to src/Share/Web/UCM/SyncV2/Impl.hs index a41e63e3..36af2ccf 100644 --- a/src/Share/Web/UCM/SyncStream/Impl.hs +++ b/src/Share/Web/UCM/SyncV2/Impl.hs @@ -1,7 +1,7 @@ {-# LANGUAGE DataKinds #-} {-# LANGUAGE TypeOperators #-} -module Share.Web.UCM.SyncStream.Impl (server) where +module Share.Web.UCM.SyncV2.Impl (server) where import Conduit import Control.Concurrent.STM qualified as STM @@ -27,7 +27,7 @@ import Share.Web.App import Share.Web.Authorization qualified as AuthZ import Share.Web.Errors import Share.Web.UCM.Sync.HashJWT qualified as HashJWT -import Share.Web.UCM.SyncStream.Queries qualified as SSQ +import Share.Web.UCM.SyncV2.Queries qualified as SSQ import U.Codebase.Sqlite.Orphans () import U.Codebase.Sqlite.TempEntity (TempEntity) import Unison.Debug qualified as Debug diff --git a/src/Share/Web/UCM/SyncStream/Queries.hs b/src/Share/Web/UCM/SyncV2/Queries.hs similarity index 99% rename from src/Share/Web/UCM/SyncStream/Queries.hs rename to src/Share/Web/UCM/SyncV2/Queries.hs index 6cc55735..60ae871e 100644 --- a/src/Share/Web/UCM/SyncStream/Queries.hs +++ b/src/Share/Web/UCM/SyncV2/Queries.hs @@ -1,4 +1,4 @@ -module Share.Web.UCM.SyncStream.Queries +module Share.Web.UCM.SyncV2.Queries ( allHashDependenciesOfCausalCursor, allSerializedDependenciesOfCausalCursor, ) From b8b9d557c4f663ba02f3046637c130e127a2a47d Mon Sep 17 00:00:00 2001 From: Chris Penner Date: Mon, 16 Dec 2024 12:05:07 -0800 Subject: [PATCH 16/20] Bump unison version --- unison | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unison b/unison index 6c435ebd..439360b3 160000 --- a/unison +++ b/unison @@ -1 +1 @@ -Subproject commit 6c435ebd1b49f6075f294bb764a0624e4d511cdb +Subproject commit 439360b37d568afb521c6a497599b5d9c672b773 From cd55056c10a60479c53f0684b8c9fde38e43e93d Mon Sep 17 00:00:00 2001 From: Chris Penner Date: Mon, 16 Dec 2024 12:05:12 -0800 Subject: [PATCH 17/20] Update Share for new unison --- src/Share/Web/UCM/SyncV2/Impl.hs | 36 ++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/src/Share/Web/UCM/SyncV2/Impl.hs b/src/Share/Web/UCM/SyncV2/Impl.hs index 36af2ccf..a89fc4f7 100644 --- a/src/Share/Web/UCM/SyncV2/Impl.hs +++ b/src/Share/Web/UCM/SyncV2/Impl.hs @@ -11,6 +11,7 @@ import Control.Monad.Trans.Except (runExceptT) import Data.Conduit.Combinators qualified as Conduit import Servant import Servant.Conduit (ConduitToSourceIO (..)) +import Servant.Types.SourceT (SourceT (..)) import Servant.Types.SourceT qualified as SourceT import Share.Codebase qualified as Codebase import Share.IDs (ProjectBranchShortHand (..), ProjectReleaseShortHand (..), ProjectShortHand (..), UserHandle, UserId) @@ -22,6 +23,7 @@ import Share.Postgres.Queries qualified as PGQ import Share.Prelude import Share.Project (Project (..)) import Share.User (User (..)) +import Share.Utils.Logging qualified as Logging import Share.Utils.Unison (hash32ToCausalHash) import Share.Web.App import Share.Web.Authorization qualified as AuthZ @@ -34,7 +36,7 @@ import Unison.Debug qualified as Debug import Unison.Hash32 (Hash32) import Unison.Share.API.Hash (HashJWTClaims (..)) import Unison.SyncV2.API qualified as SyncV2 -import Unison.SyncV2.Types (DownloadEntitiesChunk (..)) +import Unison.SyncV2.Types (DownloadEntitiesChunk (..), EntityChunk (..), ErrorChunk (..)) import Unison.SyncV2.Types qualified as SyncV2 import UnliftIO qualified import UnliftIO.Async qualified as Async @@ -57,7 +59,7 @@ parseBranchRef (SyncV2.BranchRef branchRef) = parseRelease = fmap Left . eitherToMaybe $ IDs.fromText @ProjectReleaseShortHand branchRef downloadEntitiesStreamImpl :: Maybe UserId -> SyncV2.DownloadEntitiesRequest -> WebApp (SourceIO SyncV2.DownloadEntitiesChunk) -downloadEntitiesStreamImpl mayCallerUserId (SyncV2.DownloadEntitiesRequest {causalHash = causalHashJWT, branchRef}) = do +downloadEntitiesStreamImpl mayCallerUserId (SyncV2.DownloadEntitiesRequest {causalHash = causalHashJWT, branchRef, knownHashes= _todo}) = do either emitErr id <$> runExceptT do addRequestTag "branch-ref" (SyncV2.unBranchRef branchRef) HashJWTClaims {hash = causalHash} <- lift (HashJWT.verifyHashJWT mayCallerUserId causalHashJWT >>= either respondError pure) @@ -83,24 +85,23 @@ downloadEntitiesStreamImpl mayCallerUserId (SyncV2.DownloadEntitiesRequest {caus pure $ Codebase.codebaseEnv authZToken codebaseLoc q <- liftIO $ STM.newTBMQueueIO 10 streamResults <- lift $ UnliftIO.toIO do - Debug.debugLogM Debug.Temp "Starting source Stream" + Logging.logInfoText "Starting download entities stream" Codebase.runCodebaseTransaction codebase $ do (_bhId, causalId) <- CausalQ.expectCausalIdsOf id (hash32ToCausalHash causalHash) cursor <- SSQ.allSerializedDependenciesOfCausalCursor causalId Cursor.foldBatched cursor 1000 \batch -> do - Debug.debugLogM Debug.Temp "Source stream batch" PG.transactionUnsafeIO $ STM.atomically $ STM.writeTBMQueue q batch PG.transactionUnsafeIO $ STM.atomically $ STM.closeTBMQueue q - pure $ conduitToSourceIO do - handle <- liftIO $ Async.async streamResults - stream q handle + pure $ sourceIOWithAsync streamResults $ conduitToSourceIO do + stream q Conduit..| ( Conduit.iterM \case - EntityChunk {hash} -> Debug.debugM Debug.Temp "Chunk " hash - ErrorChunk err -> Debug.debugM Debug.Temp "Error " err + InitialC init -> Debug.debugM Debug.Temp "Initial " init + EntityC ec -> Debug.debugM Debug.Temp "Chunk " ec + ErrorC err -> Debug.debugM Debug.Temp "Error " err ) where - stream :: STM.TBMQueue (NonEmpty (SyncV2.CBORBytes TempEntity, Hash32)) -> (Async.Async a) -> ConduitT () DownloadEntitiesChunk IO () - stream q handle = do + stream :: STM.TBMQueue (NonEmpty (SyncV2.CBORBytes TempEntity, Hash32)) -> ConduitT () DownloadEntitiesChunk IO () + stream q = do let loop :: ConduitT () DownloadEntitiesChunk IO () loop = do Debug.debugLogM Debug.Temp "Waiting for batch..." @@ -110,15 +111,18 @@ downloadEntitiesStreamImpl mayCallerUserId (SyncV2.DownloadEntitiesRequest {caus Debug.debugLogM Debug.Temp "Queue closed. finishing up!" pure () Just batch -> do - let chunks = batch <&> \(entityCBOR, hash) -> EntityChunk {hash, entityCBOR} + let chunks = batch <&> \(entityCBOR, hash) -> EntityC (EntityChunk {hash, entityCBOR}) Debug.debugLogM Debug.Temp $ "Emitting chunk of " <> show (length chunks) <> " entities" yieldMany chunks loop loop - Debug.debugLogM Debug.Temp "Waiting for worker thread to finish" - -- It _should_ have terminated by now, but just in case, cancel it. - Async.cancel handle Debug.debugLogM Debug.Temp "Done!" emitErr :: SyncV2.DownloadEntitiesError -> SourceIO SyncV2.DownloadEntitiesChunk - emitErr err = SourceT.source [ErrorChunk err] + emitErr err = SourceT.source [ErrorC (ErrorChunk err)] + +-- | Run an IO action in the background while streaming the results. +sourceIOWithAsync :: IO a -> SourceIO r -> SourceIO r +sourceIOWithAsync action (SourceT k) = + SourceT \k' -> + Async.withAsync action \_ -> k k' From f354a4fc4e8f27341f0cdf177c3a378a41ad6b10 Mon Sep 17 00:00:00 2001 From: Chris Penner Date: Mon, 16 Dec 2024 16:39:10 -0800 Subject: [PATCH 18/20] Debugging --- src/Share/Postgres.hs | 12 +++++-- src/Share/Web/UCM/SyncV2/Impl.hs | 55 ++++++++++++++++++++------------ 2 files changed, 44 insertions(+), 23 deletions(-) diff --git a/src/Share/Postgres.hs b/src/Share/Postgres.hs index cfff4d62..c99b017a 100644 --- a/src/Share/Postgres.hs +++ b/src/Share/Postgres.hs @@ -100,6 +100,7 @@ import Share.Utils.Logging qualified as Logging import Share.Web.App import Share.Web.Errors (ErrorID (..), SomeServerError, ToServerError (..), internalServerError, respondError, someServerError) import System.CPUTime (getCPUTime) +import Unison.Debug qualified as UDebug data TransactionError e = Unrecoverable SomeServerError @@ -333,7 +334,9 @@ instance QueryA (Transaction e) where statement q s = do transactionStatement q s - unrecoverableError e = Transaction (pure (Left (Unrecoverable (someServerError e)))) + unrecoverableError e = do + UDebug.debugM UDebug.Temp "Unrecoverable error in transaction: " e + Transaction (pure (Left (Unrecoverable (someServerError e)))) instance QueryM (Transaction e) where transactionUnsafeIO io = Transaction (Right <$> liftIO io) @@ -342,7 +345,9 @@ instance QueryA (Session e) where statement q s = do lift $ Session.statement q s - unrecoverableError e = throwError (Unrecoverable (someServerError e)) + unrecoverableError e = do + UDebug.debugM UDebug.Temp "Unrecoverable error in transaction: " e + throwError (Unrecoverable (someServerError e)) instance QueryM (Session e) where transactionUnsafeIO io = lift $ liftIO io @@ -355,7 +360,8 @@ instance QueryA (Pipeline e) where instance (QueryM m) => QueryA (ReaderT e m) where statement q s = lift $ statement q s - unrecoverableError e = lift $ unrecoverableError e + unrecoverableError e = do + lift $ unrecoverableError e instance (QueryM m) => QueryM (ReaderT e m) where transactionUnsafeIO io = lift $ transactionUnsafeIO io diff --git a/src/Share/Web/UCM/SyncV2/Impl.hs b/src/Share/Web/UCM/SyncV2/Impl.hs index a89fc4f7..8b89dbfb 100644 --- a/src/Share/Web/UCM/SyncV2/Impl.hs +++ b/src/Share/Web/UCM/SyncV2/Impl.hs @@ -8,7 +8,7 @@ import Control.Concurrent.STM qualified as STM import Control.Concurrent.STM.TBMQueue qualified as STM import Control.Monad.Except (ExceptT (ExceptT)) import Control.Monad.Trans.Except (runExceptT) -import Data.Conduit.Combinators qualified as Conduit +import Data.List.NonEmpty qualified as NEL import Servant import Servant.Conduit (ConduitToSourceIO (..)) import Servant.Types.SourceT (SourceT (..)) @@ -31,16 +31,20 @@ import Share.Web.Errors import Share.Web.UCM.Sync.HashJWT qualified as HashJWT import Share.Web.UCM.SyncV2.Queries qualified as SSQ import U.Codebase.Sqlite.Orphans () -import U.Codebase.Sqlite.TempEntity (TempEntity) import Unison.Debug qualified as Debug -import Unison.Hash32 (Hash32) import Unison.Share.API.Hash (HashJWTClaims (..)) import Unison.SyncV2.API qualified as SyncV2 -import Unison.SyncV2.Types (DownloadEntitiesChunk (..), EntityChunk (..), ErrorChunk (..)) +import Unison.SyncV2.Types (DownloadEntitiesChunk (..), EntityChunk (..), ErrorChunk (..), StreamInitInfo (..)) import Unison.SyncV2.Types qualified as SyncV2 import UnliftIO qualified import UnliftIO.Async qualified as Async +batchSize :: Int32 +batchSize = 1000 + +streamSettings :: StreamInitInfo +streamSettings = StreamInitInfo {version = SyncV2.Version 1, entitySorting = SyncV2.Unsorted, numEntities = Nothing} + server :: Maybe UserId -> SyncV2.Routes WebAppServer server mayUserId = SyncV2.Routes @@ -59,7 +63,7 @@ parseBranchRef (SyncV2.BranchRef branchRef) = parseRelease = fmap Left . eitherToMaybe $ IDs.fromText @ProjectReleaseShortHand branchRef downloadEntitiesStreamImpl :: Maybe UserId -> SyncV2.DownloadEntitiesRequest -> WebApp (SourceIO SyncV2.DownloadEntitiesChunk) -downloadEntitiesStreamImpl mayCallerUserId (SyncV2.DownloadEntitiesRequest {causalHash = causalHashJWT, branchRef, knownHashes= _todo}) = do +downloadEntitiesStreamImpl mayCallerUserId (SyncV2.DownloadEntitiesRequest {causalHash = causalHashJWT, branchRef, knownHashes = _todo}) = do either emitErr id <$> runExceptT do addRequestTag "branch-ref" (SyncV2.unBranchRef branchRef) HashJWTClaims {hash = causalHash} <- lift (HashJWT.verifyHashJWT mayCallerUserId causalHashJWT >>= either respondError pure) @@ -83,24 +87,35 @@ downloadEntitiesStreamImpl mayCallerUserId (SyncV2.DownloadEntitiesRequest {caus authZToken <- lift AuthZ.checkDownloadFromProjectBranchCodebase `whenLeftM` \_err -> throwError (SyncV2.DownloadEntitiesNoReadPermission branchRef) let codebaseLoc = Codebase.codebaseLocationForProjectBranchCodebase projectOwnerUserId contributorId pure $ Codebase.codebaseEnv authZToken codebaseLoc - q <- liftIO $ STM.newTBMQueueIO 10 + q <- UnliftIO.atomically $ do + q <- STM.newTBMQueue 10 + STM.writeTBMQueue q (NEL.singleton $ InitialC $ streamSettings) + pure q streamResults <- lift $ UnliftIO.toIO do Logging.logInfoText "Starting download entities stream" Codebase.runCodebaseTransaction codebase $ do + Debug.debugM Debug.Temp "Getting IDs for:" causalHash (_bhId, causalId) <- CausalQ.expectCausalIdsOf id (hash32ToCausalHash causalHash) + Debug.debugM Debug.Temp "Getting deps of" causalId cursor <- SSQ.allSerializedDependenciesOfCausalCursor causalId - Cursor.foldBatched cursor 1000 \batch -> do - PG.transactionUnsafeIO $ STM.atomically $ STM.writeTBMQueue q batch + Debug.debugLogM Debug.Temp "Got cursor" + Cursor.foldBatched cursor batchSize \batch -> do + Debug.debugLogM Debug.Temp "Emitting batch" + let entityChunkBatch = batch <&> \(entityCBOR, hash) -> EntityC (EntityChunk {hash, entityCBOR}) + PG.transactionUnsafeIO $ STM.atomically $ STM.writeTBMQueue q entityChunkBatch PG.transactionUnsafeIO $ STM.atomically $ STM.closeTBMQueue q - pure $ sourceIOWithAsync streamResults $ conduitToSourceIO do + liftIO $ Async.async streamResults + -- pure $ sourceIOWithAsync streamResults $ conduitToSourceIO do + pure $ conduitToSourceIO do stream q - Conduit..| ( Conduit.iterM \case - InitialC init -> Debug.debugM Debug.Temp "Initial " init - EntityC ec -> Debug.debugM Debug.Temp "Chunk " ec - ErrorC err -> Debug.debugM Debug.Temp "Error " err - ) where - stream :: STM.TBMQueue (NonEmpty (SyncV2.CBORBytes TempEntity, Hash32)) -> ConduitT () DownloadEntitiesChunk IO () + -- Conduit..| ( Conduit.iterM \case + -- InitialC init -> Debug.debugM Debug.Temp "Initial " init + -- EntityC ec -> Debug.debugM Debug.Temp "Chunk " ec + -- ErrorC err -> Debug.debugM Debug.Temp "Error " err + -- ) + + stream :: STM.TBMQueue (NonEmpty DownloadEntitiesChunk) -> ConduitT () DownloadEntitiesChunk IO () stream q = do let loop :: ConduitT () DownloadEntitiesChunk IO () loop = do @@ -111,10 +126,10 @@ downloadEntitiesStreamImpl mayCallerUserId (SyncV2.DownloadEntitiesRequest {caus Debug.debugLogM Debug.Temp "Queue closed. finishing up!" pure () Just batch -> do - let chunks = batch <&> \(entityCBOR, hash) -> EntityC (EntityChunk {hash, entityCBOR}) - Debug.debugLogM Debug.Temp $ "Emitting chunk of " <> show (length chunks) <> " entities" - yieldMany chunks + Debug.debugLogM Debug.Temp $ "Emitting chunk of " <> show (length batch) <> " entities" + yieldMany batch loop + loop Debug.debugLogM Debug.Temp "Done!" @@ -122,7 +137,7 @@ downloadEntitiesStreamImpl mayCallerUserId (SyncV2.DownloadEntitiesRequest {caus emitErr err = SourceT.source [ErrorC (ErrorChunk err)] -- | Run an IO action in the background while streaming the results. -sourceIOWithAsync :: IO a -> SourceIO r -> SourceIO r -sourceIOWithAsync action (SourceT k) = +_sourceIOWithAsync :: IO a -> SourceIO r -> SourceIO r +_sourceIOWithAsync action (SourceT k) = SourceT \k' -> Async.withAsync action \_ -> k k' From a14b4244f225746b15cdf7b829c8f38ad31c1b8f Mon Sep 17 00:00:00 2001 From: Chris Penner Date: Mon, 16 Dec 2024 16:46:53 -0800 Subject: [PATCH 19/20] Debugging cleanup --- src/Share/Web/UCM/SyncV2/Impl.hs | 36 +++++++++++++++++++------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/src/Share/Web/UCM/SyncV2/Impl.hs b/src/Share/Web/UCM/SyncV2/Impl.hs index 8b89dbfb..e2c92783 100644 --- a/src/Share/Web/UCM/SyncV2/Impl.hs +++ b/src/Share/Web/UCM/SyncV2/Impl.hs @@ -3,11 +3,12 @@ module Share.Web.UCM.SyncV2.Impl (server) where -import Conduit +import Conduit qualified as C import Control.Concurrent.STM qualified as STM import Control.Concurrent.STM.TBMQueue qualified as STM import Control.Monad.Except (ExceptT (ExceptT)) import Control.Monad.Trans.Except (runExceptT) +import Data.Conduit.Combinators qualified as C import Data.List.NonEmpty qualified as NEL import Servant import Servant.Conduit (ConduitToSourceIO (..)) @@ -104,20 +105,12 @@ downloadEntitiesStreamImpl mayCallerUserId (SyncV2.DownloadEntitiesRequest {caus let entityChunkBatch = batch <&> \(entityCBOR, hash) -> EntityC (EntityChunk {hash, entityCBOR}) PG.transactionUnsafeIO $ STM.atomically $ STM.writeTBMQueue q entityChunkBatch PG.transactionUnsafeIO $ STM.atomically $ STM.closeTBMQueue q - liftIO $ Async.async streamResults - -- pure $ sourceIOWithAsync streamResults $ conduitToSourceIO do - pure $ conduitToSourceIO do + pure $ sourceIOWithAsync streamResults $ conduitToSourceIO do stream q where - -- Conduit..| ( Conduit.iterM \case - -- InitialC init -> Debug.debugM Debug.Temp "Initial " init - -- EntityC ec -> Debug.debugM Debug.Temp "Chunk " ec - -- ErrorC err -> Debug.debugM Debug.Temp "Error " err - -- ) - - stream :: STM.TBMQueue (NonEmpty DownloadEntitiesChunk) -> ConduitT () DownloadEntitiesChunk IO () + stream :: STM.TBMQueue (NonEmpty DownloadEntitiesChunk) -> C.ConduitT () DownloadEntitiesChunk IO () stream q = do - let loop :: ConduitT () DownloadEntitiesChunk IO () + let loop :: C.ConduitT () DownloadEntitiesChunk IO () loop = do Debug.debugLogM Debug.Temp "Waiting for batch..." liftIO (STM.atomically (STM.readTBMQueue q)) >>= \case @@ -127,7 +120,7 @@ downloadEntitiesStreamImpl mayCallerUserId (SyncV2.DownloadEntitiesRequest {caus pure () Just batch -> do Debug.debugLogM Debug.Temp $ "Emitting chunk of " <> show (length batch) <> " entities" - yieldMany batch + C.yieldMany batch loop loop @@ -137,7 +130,20 @@ downloadEntitiesStreamImpl mayCallerUserId (SyncV2.DownloadEntitiesRequest {caus emitErr err = SourceT.source [ErrorC (ErrorChunk err)] -- | Run an IO action in the background while streaming the results. -_sourceIOWithAsync :: IO a -> SourceIO r -> SourceIO r -_sourceIOWithAsync action (SourceT k) = +-- +-- Servant doesn't provide any easier way to do bracketing like this, all the IO must be +-- inside the SourceIO somehow. +sourceIOWithAsync :: IO a -> SourceIO r -> SourceIO r +sourceIOWithAsync action (SourceT k) = SourceT \k' -> Async.withAsync action \_ -> k k' + +-- debug the output pipe. +_tap :: (Monad m) => (C.ConduitT a DownloadEntitiesChunk m ()) -> (C.ConduitT a DownloadEntitiesChunk m ()) +_tap s = + s + C..| ( C.iterM \case + InitialC init -> Debug.debugM Debug.Temp "Initial " init + EntityC ec -> Debug.debugM Debug.Temp "Chunk " ec + ErrorC err -> Debug.debugM Debug.Temp "Error " err + ) From 7938f89166b1357e72c1d0895fbacb0cb341bce3 Mon Sep 17 00:00:00 2001 From: Chris Penner Date: Fri, 24 Jan 2025 10:52:40 -0800 Subject: [PATCH 20/20] Fix up stream init settings --- src/Share/Web/UCM/SyncV2/Impl.hs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/Share/Web/UCM/SyncV2/Impl.hs b/src/Share/Web/UCM/SyncV2/Impl.hs index e2c92783..096753a0 100644 --- a/src/Share/Web/UCM/SyncV2/Impl.hs +++ b/src/Share/Web/UCM/SyncV2/Impl.hs @@ -33,6 +33,7 @@ import Share.Web.UCM.Sync.HashJWT qualified as HashJWT import Share.Web.UCM.SyncV2.Queries qualified as SSQ import U.Codebase.Sqlite.Orphans () import Unison.Debug qualified as Debug +import Unison.Hash32 (Hash32) import Unison.Share.API.Hash (HashJWTClaims (..)) import Unison.SyncV2.API qualified as SyncV2 import Unison.SyncV2.Types (DownloadEntitiesChunk (..), EntityChunk (..), ErrorChunk (..), StreamInitInfo (..)) @@ -43,8 +44,8 @@ import UnliftIO.Async qualified as Async batchSize :: Int32 batchSize = 1000 -streamSettings :: StreamInitInfo -streamSettings = StreamInitInfo {version = SyncV2.Version 1, entitySorting = SyncV2.Unsorted, numEntities = Nothing} +streamSettings :: Hash32 -> Maybe SyncV2.BranchRef -> StreamInitInfo +streamSettings rootCausalHash rootBranchRef = StreamInitInfo {version = SyncV2.Version 1, entitySorting = SyncV2.Unsorted, numEntities = Nothing, rootCausalHash, rootBranchRef} server :: Maybe UserId -> SyncV2.Routes WebAppServer server mayUserId = @@ -90,7 +91,7 @@ downloadEntitiesStreamImpl mayCallerUserId (SyncV2.DownloadEntitiesRequest {caus pure $ Codebase.codebaseEnv authZToken codebaseLoc q <- UnliftIO.atomically $ do q <- STM.newTBMQueue 10 - STM.writeTBMQueue q (NEL.singleton $ InitialC $ streamSettings) + STM.writeTBMQueue q (NEL.singleton $ InitialC $ streamSettings causalHash (Just branchRef)) pure q streamResults <- lift $ UnliftIO.toIO do Logging.logInfoText "Starting download entities stream"