Skip to content

Commit 488c708

Browse files
authored
agent: store interface (#1436)
1 parent fdde986 commit 488c708

File tree

15 files changed

+97
-85
lines changed

15 files changed

+97
-85
lines changed

simplexmq.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ library
9898
Simplex.Messaging.Agent.Store.AgentStore
9999
Simplex.Messaging.Agent.Store.Common
100100
Simplex.Messaging.Agent.Store.DB
101+
Simplex.Messaging.Agent.Store.Interface
101102
Simplex.Messaging.Agent.Store.Migrations
102103
Simplex.Messaging.Agent.Store.Shared
103104
Simplex.Messaging.Agent.TRcvQueues

src/Simplex/Messaging/Agent.hs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ import Simplex.Messaging.Agent.Store
170170
import Simplex.Messaging.Agent.Store.AgentStore
171171
import Simplex.Messaging.Agent.Store.Common (DBStore)
172172
import qualified Simplex.Messaging.Agent.Store.DB as DB
173+
import Simplex.Messaging.Agent.Store.Interface (closeDBStore, execSQL)
173174
import qualified Simplex.Messaging.Agent.Store.Migrations as Migrations
174175
import Simplex.Messaging.Agent.Store.Shared (UpMigration (..), upMigration)
175176
import Simplex.Messaging.Client (SMPClientError, ServerTransmission (..), ServerTransmissionBatch, temporaryClientError, unexpectedResponse)
@@ -279,7 +280,7 @@ disposeAgentClient c@AgentClient {acThread, agentEnv = Env {store}} = do
279280
t_ <- atomically (swapTVar acThread Nothing) $>>= (liftIO . deRefWeak)
280281
disconnectAgentClient c
281282
mapM_ killThread t_
282-
liftIO $ closeStore store
283+
liftIO $ closeDBStore store
283284

284285
resumeAgentClient :: AgentClient -> IO ()
285286
resumeAgentClient c = atomically $ writeTVar (active c) True

src/Simplex/Messaging/Agent/Env/SQLite.hs

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
{-# LANGUAGE CPP #-}
21
{-# LANGUAGE ConstraintKinds #-}
32
{-# LANGUAGE DataKinds #-}
43
{-# LANGUAGE DuplicateRecordFields #-}
@@ -70,6 +69,7 @@ import Simplex.Messaging.Agent.Protocol
7069
import Simplex.Messaging.Agent.RetryInterval
7170
import Simplex.Messaging.Agent.Store (createStore)
7271
import Simplex.Messaging.Agent.Store.Common (DBStore)
72+
import Simplex.Messaging.Agent.Store.Interface (DBOpts)
7373
import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation (..), MigrationError (..))
7474
import Simplex.Messaging.Client
7575
import qualified Simplex.Messaging.Crypto as C
@@ -87,11 +87,6 @@ import Simplex.Messaging.Util (allFinally, catchAllErrors, catchAllErrors', tryA
8787
import System.Mem.Weak (Weak)
8888
import System.Random (StdGen, newStdGen)
8989
import UnliftIO.STM
90-
#if defined(dbPostgres)
91-
import Database.PostgreSQL.Simple (ConnectInfo (..))
92-
#else
93-
import Data.ByteArray (ScrubbedBytes)
94-
#endif
9590

9691
type AM' a = ReaderT Env IO a
9792

@@ -277,13 +272,8 @@ newSMPAgentEnv config store = do
277272
multicastSubscribers <- newTMVarIO 0
278273
pure Env {config, store, random, randomServer, ntfSupervisor, xftpAgent, multicastSubscribers}
279274

280-
#if defined(dbPostgres)
281-
createAgentStore :: ConnectInfo -> String -> MigrationConfirmation -> IO (Either MigrationError DBStore)
275+
createAgentStore :: DBOpts -> MigrationConfirmation -> IO (Either MigrationError DBStore)
282276
createAgentStore = createStore
283-
#else
284-
createAgentStore :: FilePath -> ScrubbedBytes -> Bool -> MigrationConfirmation -> Bool -> IO (Either MigrationError DBStore)
285-
createAgentStore = createStore
286-
#endif
287277

288278
data NtfSupervisor = NtfSupervisor
289279
{ ntfTkn :: TVar (Maybe NtfToken),

src/Simplex/Messaging/Agent/Store.hs

Lines changed: 4 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
{-# LANGUAGE CPP #-}
21
{-# LANGUAGE ConstraintKinds #-}
32
{-# LANGUAGE DataKinds #-}
43
{-# LANGUAGE DeriveAnyClass #-}
@@ -26,13 +25,12 @@ import Data.List (find)
2625
import Data.List.NonEmpty (NonEmpty)
2726
import qualified Data.List.NonEmpty as L
2827
import Data.Maybe (isJust)
29-
import Data.Text (Text)
3028
import Data.Time (UTCTime)
3129
import Data.Type.Equality
3230
import Simplex.Messaging.Agent.Protocol
3331
import Simplex.Messaging.Agent.RetryInterval (RI2State)
3432
import Simplex.Messaging.Agent.Store.Common
35-
import qualified Simplex.Messaging.Agent.Store.DB as DB
33+
import Simplex.Messaging.Agent.Store.Interface (DBOpts, createDBStore)
3634
import qualified Simplex.Messaging.Agent.Store.Migrations as Migrations
3735
import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation (..), MigrationError (..))
3836
import qualified Simplex.Messaging.Crypto as C
@@ -54,30 +52,9 @@ import Simplex.Messaging.Protocol
5452
VersionSMPC,
5553
)
5654
import qualified Simplex.Messaging.Protocol as SMP
57-
#if defined(dbPostgres)
58-
import Database.PostgreSQL.Simple (ConnectInfo (..))
59-
import qualified Simplex.Messaging.Agent.Store.Postgres as Store
60-
#else
61-
import Data.ByteArray (ScrubbedBytes)
62-
import qualified Simplex.Messaging.Agent.Store.SQLite as Store
63-
#endif
64-
65-
#if defined(dbPostgres)
66-
createStore :: ConnectInfo -> String -> MigrationConfirmation -> IO (Either MigrationError DBStore)
67-
createStore connectInfo schema = Store.createDBStore connectInfo schema Migrations.app
68-
#else
69-
createStore :: FilePath -> ScrubbedBytes -> Bool -> MigrationConfirmation -> Bool -> IO (Either MigrationError DBStore)
70-
createStore dbFilePath dbKey keepKey = Store.createDBStore dbFilePath dbKey keepKey Migrations.app
71-
#endif
72-
73-
closeStore :: DBStore -> IO ()
74-
closeStore = Store.closeDBStore
75-
76-
reopenStore :: DBStore -> IO ()
77-
reopenStore = Store.reopenDBStore
78-
79-
execSQL :: DB.Connection -> Text -> IO [Text]
80-
execSQL = Store.execSQL
55+
56+
createStore :: DBOpts -> MigrationConfirmation -> IO (Either MigrationError DBStore)
57+
createStore dbOpts = createDBStore dbOpts Migrations.app
8158

8259
-- * Queue types
8360

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
{-# LANGUAGE CPP #-}
2+
3+
module Simplex.Messaging.Agent.Store.Interface
4+
#if defined(dbPostgres)
5+
( module Simplex.Messaging.Agent.Store.Postgres,
6+
)
7+
where
8+
import Simplex.Messaging.Agent.Store.Postgres
9+
#else
10+
( module Simplex.Messaging.Agent.Store.SQLite,
11+
)
12+
where
13+
import Simplex.Messaging.Agent.Store.SQLite
14+
#endif

src/Simplex/Messaging/Agent/Store/Postgres.hs

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
{-# LANGUAGE ScopedTypeVariables #-}
66

77
module Simplex.Messaging.Agent.Store.Postgres
8-
( createDBStore,
8+
( DBOpts (..),
9+
createDBStore,
910
closeDBStore,
1011
reopenDBStore,
1112
execSQL,
@@ -14,47 +15,49 @@ where
1415

1516
import Control.Exception (throwIO)
1617
import Control.Monad (unless, void)
18+
import Data.ByteString (ByteString)
1719
import Data.Functor (($>))
1820
import Data.String (fromString)
1921
import Data.Text (Text)
20-
import Database.PostgreSQL.Simple (ConnectInfo (..), Only (..))
22+
import Database.PostgreSQL.Simple (Only (..))
2123
import qualified Database.PostgreSQL.Simple as PSQL
2224
import Database.PostgreSQL.Simple.SqlQQ (sql)
2325
import Simplex.Messaging.Agent.Store.Migrations (migrateSchema)
2426
import Simplex.Messaging.Agent.Store.Postgres.Common
2527
import qualified Simplex.Messaging.Agent.Store.Postgres.DB as DB
26-
import Simplex.Messaging.Agent.Store.Postgres.Util (createDBAndUserIfNotExists)
2728
import Simplex.Messaging.Agent.Store.Shared (Migration (..), MigrationConfirmation (..), MigrationError (..))
2829
import Simplex.Messaging.Util (ifM)
2930
import UnliftIO.Exception (bracketOnError, onException)
3031
import UnliftIO.MVar
3132
import UnliftIO.STM
3233

33-
-- | Create a new Postgres DBStore with the given connection info, schema name and migrations.
34-
-- This function creates the user and/or database passed in connectInfo if they do not exist
35-
-- (expects the default 'postgres' user and 'postgres' db to exist).
34+
data DBOpts = DBOpts
35+
{ connstr :: ByteString,
36+
schema :: String
37+
}
38+
39+
-- | Create a new Postgres DBStore with the given connection string, schema name and migrations.
3640
-- If passed schema does not exist in connectInfo database, it will be created.
3741
-- Applies necessary migrations to schema.
3842
-- TODO [postgres] authentication / user password, db encryption (?)
39-
createDBStore :: ConnectInfo -> String -> [Migration] -> MigrationConfirmation -> IO (Either MigrationError DBStore)
40-
createDBStore connectInfo schema migrations confirmMigrations = do
41-
createDBAndUserIfNotExists connectInfo
42-
st <- connectPostgresStore connectInfo schema
43+
createDBStore :: DBOpts -> [Migration] -> MigrationConfirmation -> IO (Either MigrationError DBStore)
44+
createDBStore DBOpts {connstr, schema} migrations confirmMigrations = do
45+
st <- connectPostgresStore connstr schema
4346
r <- migrateSchema st migrations confirmMigrations True `onException` closeDBStore st
4447
case r of
4548
Right () -> pure $ Right st
4649
Left e -> closeDBStore st $> Left e
4750

48-
connectPostgresStore :: ConnectInfo -> String -> IO DBStore
49-
connectPostgresStore dbConnectInfo dbSchema = do
50-
(dbConn, dbNew) <- connectDB dbConnectInfo dbSchema -- TODO [postgres] analogue for dbBusyLoop?
51+
connectPostgresStore :: ByteString -> String -> IO DBStore
52+
connectPostgresStore dbConnstr dbSchema = do
53+
(dbConn, dbNew) <- connectDB dbConnstr dbSchema -- TODO [postgres] analogue for dbBusyLoop?
5154
dbConnection <- newMVar dbConn
5255
dbClosed <- newTVarIO False
53-
pure DBStore {dbConnectInfo, dbSchema, dbConnection, dbNew, dbClosed}
56+
pure DBStore {dbConnstr, dbSchema, dbConnection, dbNew, dbClosed}
5457

55-
connectDB :: ConnectInfo -> String -> IO (DB.Connection, Bool)
56-
connectDB dbConnectInfo schema = do
57-
db <- PSQL.connect dbConnectInfo
58+
connectDB :: ByteString -> String -> IO (DB.Connection, Bool)
59+
connectDB connstr schema = do
60+
db <- PSQL.connectPostgreSQL connstr
5861
schemaExists <- prepare db `onException` PSQL.close db
5962
let dbNew = not schemaExists
6063
pure (db, dbNew)
@@ -84,12 +87,12 @@ closeDBStore st@DBStore {dbClosed} =
8487
atomically $ writeTVar dbClosed True
8588

8689
openPostgresStore_ :: DBStore -> IO ()
87-
openPostgresStore_ DBStore {dbConnectInfo, dbSchema, dbConnection, dbClosed} =
90+
openPostgresStore_ DBStore {dbConnstr, dbSchema, dbConnection, dbClosed} =
8891
bracketOnError
8992
(takeMVar dbConnection)
9093
(tryPutMVar dbConnection)
9194
$ \_dbConn -> do
92-
(dbConn, _dbNew) <- connectDB dbConnectInfo dbSchema
95+
(dbConn, _dbNew) <- connectDB dbConnstr dbSchema
9396
atomically $ writeTVar dbClosed False
9497
putMVar dbConnection dbConn
9598

src/Simplex/Messaging/Agent/Store/Postgres/Common.hs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,14 @@ module Simplex.Messaging.Agent.Store.Postgres.Common
1010
)
1111
where
1212

13+
import Data.ByteString (ByteString)
1314
import qualified Database.PostgreSQL.Simple as PSQL
1415
import UnliftIO.MVar
1516
import UnliftIO.STM
1617

1718
-- TODO [postgres] use log_min_duration_statement instead of custom slow queries (SQLite's Connection type)
1819
data DBStore = DBStore
19-
{ dbConnectInfo :: PSQL.ConnectInfo,
20+
{ dbConnstr :: ByteString,
2021
dbSchema :: String,
2122
dbConnection :: MVar PSQL.Connection,
2223
dbClosed :: TVar Bool,

src/Simplex/Messaging/Agent/Store/Postgres/Util.hs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
module Simplex.Messaging.Agent.Store.Postgres.Util
77
( createDBAndUserIfNotExists,
8-
-- for tests
98
dropSchema,
109
dropAllSchemasExceptSystem,
1110
dropDatabaseAndUser,

src/Simplex/Messaging/Agent/Store/SQLite.hs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@
2525
{-# OPTIONS_GHC -fno-warn-orphans #-}
2626

2727
module Simplex.Messaging.Agent.Store.SQLite
28-
( createDBStore,
28+
( DBOpts (..),
29+
createDBStore,
2930
closeDBStore,
3031
reopenDBStore,
3132
execSQL,
@@ -64,8 +65,15 @@ import UnliftIO.STM
6465

6566
-- * SQLite Store implementation
6667

67-
createDBStore :: FilePath -> ScrubbedBytes -> Bool -> [Migration] -> MigrationConfirmation -> Bool -> IO (Either MigrationError DBStore)
68-
createDBStore dbFilePath dbKey keepKey migrations confirmMigrations vacuum = do
68+
data DBOpts = DBOpts
69+
{ dbFilePath :: FilePath,
70+
dbKey :: ScrubbedBytes,
71+
keepKey :: Bool,
72+
vacuum :: Bool
73+
}
74+
75+
createDBStore :: DBOpts -> [Migration] -> MigrationConfirmation -> IO (Either MigrationError DBStore)
76+
createDBStore DBOpts {dbFilePath, dbKey, keepKey, vacuum} migrations confirmMigrations = do
6977
let dbDir = takeDirectory dbFilePath
7078
createDirectoryIfMissing True dbDir
7179
st <- connectSQLiteStore dbFilePath dbKey keepKey

tests/AgentTests/FunctionalAPITests.hs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ import Simplex.Messaging.Agent.Env.SQLite (AgentConfig (..), InitialAgentServers
8585
import Simplex.Messaging.Agent.Protocol hiding (CON, CONF, INFO, REQ, SENT)
8686
import qualified Simplex.Messaging.Agent.Protocol as A
8787
import Simplex.Messaging.Agent.Store.Common (DBStore (..), withTransaction)
88+
import Simplex.Messaging.Agent.Store.Interface
8889
import qualified Simplex.Messaging.Agent.Store.DB as DB
8990
import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation (..), MigrationError (..))
9091
import Simplex.Messaging.Client (NetworkConfig (..), ProtocolClientConfig (..), SMPProxyFallback (..), SMPProxyMode (..), TransportSessionMode (..), defaultClientConfig)
@@ -3107,13 +3108,13 @@ getSMPAgentClient' clientId cfg' initServers dbPath = do
31073108

31083109
#if defined(dbPostgres)
31093110
createStore :: String -> IO (Either MigrationError DBStore)
3110-
createStore schema = createAgentStore testDBConnectInfo schema MCError
3111+
createStore schema = createAgentStore (DBOpts testDBConnstr schema) MCError
31113112

31123113
insertUser :: DBStore -> IO ()
31133114
insertUser st = withTransaction st (`DB.execute_` "INSERT INTO users DEFAULT VALUES")
31143115
#else
31153116
createStore :: String -> IO (Either MigrationError DBStore)
3116-
createStore dbPath = createAgentStore dbPath "" False MCError True
3117+
createStore dbPath = createAgentStore (DBOpts dbPath "" False True) MCError
31173118

31183119
insertUser :: DBStore -> IO ()
31193120
insertUser st = withTransaction st (`DB.execute_` "INSERT INTO users (user_id) VALUES (1)")

0 commit comments

Comments
 (0)