@@ -43,7 +43,7 @@ module Simplex.Messaging.Agent.Store.AgentStore
4343 getDeletedConn ,
4444 getConns ,
4545 getDeletedConns ,
46- getConnData ,
46+ getConnsData ,
4747 setConnDeleted ,
4848 setConnUserId ,
4949 setConnAgentVersion ,
@@ -257,8 +257,9 @@ import Data.List (foldl', sortBy)
257257import Data.List.NonEmpty (NonEmpty (.. ))
258258import qualified Data.List.NonEmpty as L
259259import qualified Data.Map.Strict as M
260- import Data.Maybe (catMaybes , fromMaybe , isJust , isNothing )
260+ import Data.Maybe (catMaybes , fromMaybe , isJust , isNothing , mapMaybe )
261261import Data.Ord (Down (.. ))
262+ import qualified Data.Set as S
262263import Data.Text.Encoding (decodeLatin1 , encodeUtf8 )
263264import Data.Time.Clock (NominalDiffTime , UTCTime , addUTCTime , getCurrentTime )
264265import Data.Word (Word32 )
@@ -287,12 +288,14 @@ import Simplex.Messaging.Protocol
287288import qualified Simplex.Messaging.Protocol as SMP
288289import Simplex.Messaging.Agent.Store.Entity
289290import Simplex.Messaging.Transport.Client (TransportHost )
290- import Simplex.Messaging.Util ( bshow , catchAllErrors , eitherToMaybe , firstRow , firstRow' , ifM , maybeFirstRow , maybeFirstRow' , tshow , ($>>=) , (<$$>) )
291+ import Simplex.Messaging.Util
291292import Simplex.Messaging.Version.Internal
292293import qualified UnliftIO.Exception as E
293294import UnliftIO.STM
294295#if defined(dbPostgres)
295- import Database.PostgreSQL.Simple (Only (.. ), Query , SqlError , (:.) (.. ))
296+ import Data.List (sortOn )
297+ import Data.Map.Strict (Map )
298+ import Database.PostgreSQL.Simple (In (.. ), Only (.. ), Query , SqlError , (:.) (.. ))
296299import Database.PostgreSQL.Simple.Errors (constraintViolation )
297300import Database.PostgreSQL.Simple.SqlQQ (sql )
298301#else
@@ -427,7 +430,7 @@ deleteConnRecord db connId = DB.execute db "DELETE FROM connections WHERE conn_i
427430
428431checkConfirmedSndQueueExists_ :: DB. Connection -> NewSndQueue -> IO Bool
429432checkConfirmedSndQueueExists_ db SndQueue {server, sndId} =
430- maybeFirstRow' False fromOnly $
433+ maybeFirstRow' False fromOnlyBI $
431434 DB. query
432435 db
433436 " SELECT 1 FROM snd_queues WHERE host = ? AND port = ? AND snd_id = ? AND status != ? LIMIT 1"
@@ -1070,7 +1073,7 @@ toRcvMsg ((agentMsgId, internalTs, brokerId, brokerTs) :. (sndMsgId, integrity,
10701073
10711074checkRcvMsgHashExists :: DB. Connection -> ConnId -> ByteString -> IO Bool
10721075checkRcvMsgHashExists db connId hash =
1073- maybeFirstRow' False fromOnly $
1076+ maybeFirstRow' False fromOnlyBI $
10741077 DB. query
10751078 db
10761079 " SELECT 1 FROM encrypted_rcv_message_hashes WHERE conn_id = ? AND hash = ? LIMIT 1"
@@ -1298,21 +1301,26 @@ insertedRowId db = fromOnly . head <$> DB.query_ db q
12981301 q = " SELECT last_insert_rowid()"
12991302#endif
13001303
1301- getPendingCommandServers :: DB. Connection -> ConnId -> IO [Maybe SMPServer ]
1302- getPendingCommandServers db connId = do
1304+ getPendingCommandServers :: DB. Connection -> [ ConnId ] -> IO [( ConnId , NonEmpty ( Maybe SMPServer )) ]
1305+ getPendingCommandServers db connIds =
13031306 -- TODO review whether this can break if, e.g., the server has another key hash.
1304- map smpServer
1305- <$> DB. query
1307+ mapMaybe connServers . groupOn' rowConnId
1308+ <$> DB. query_
13061309 db
13071310 [sql |
1308- SELECT DISTINCT c.host, c.port, COALESCE(c.server_key_hash, s.key_hash)
1311+ SELECT DISTINCT c.conn_id, c. host, c.port, COALESCE(c.server_key_hash, s.key_hash)
13091312 FROM commands c
13101313 LEFT JOIN servers s ON s.host = c.host AND s.port = c.port
1311- WHERE conn_id = ?
1314+ ORDER BY c.conn_id
13121315 |]
1313- (Only connId)
13141316 where
1317+ rowConnId (Only connId :. _) = connId
1318+ connServers rs =
1319+ let connId = rowConnId $ L. head rs
1320+ srvs = L. map (\ (_ :. r) -> smpServer r) rs
1321+ in if connId `S.member` conns then Just (connId, srvs) else Nothing
13151322 smpServer (host, port, keyHash) = SMPServer <$> host <*> port <*> keyHash
1323+ conns = S. fromList connIds
13161324
13171325getPendingServerCommand :: DB. Connection -> ConnId -> Maybe SMPServer -> IO (Either StoreError (Maybe PendingCommand ))
13181326getPendingServerCommand db connId srv_ = getWorkItem " command" getCmdId getCommand markCommandFailed
@@ -2030,21 +2038,19 @@ getDeletedConn = getAnyConn True
20302038{-# INLINE getDeletedConn #-}
20312039
20322040getAnyConn :: Bool -> DB. Connection -> ConnId -> IO (Either StoreError SomeConn )
2033- getAnyConn deleted' dbConn connId =
2034- getConnData dbConn connId >>= \ case
2041+ getAnyConn deleted' db connId =
2042+ getConnData deleted' db connId >>= \ case
2043+ Just (cData, cMode) -> do
2044+ rQ <- getRcvQueuesByConnId_ db connId
2045+ sQ <- getSndQueuesByConnId_ db connId
2046+ pure $ case (rQ, sQ, cMode) of
2047+ (Just rqs, Just sqs, CMInvitation ) -> Right $ SomeConn SCDuplex (DuplexConnection cData rqs sqs)
2048+ (Just (rq :| _), Nothing , CMInvitation ) -> Right $ SomeConn SCRcv (RcvConnection cData rq)
2049+ (Nothing , Just (sq :| _), CMInvitation ) -> Right $ SomeConn SCSnd (SndConnection cData sq)
2050+ (Just (rq :| _), Nothing , CMContact ) -> Right $ SomeConn SCContact (ContactConnection cData rq)
2051+ (Nothing , Nothing , _) -> Right $ SomeConn SCNew (NewConnection cData)
2052+ _ -> Left SEConnNotFound
20352053 Nothing -> pure $ Left SEConnNotFound
2036- Just (cData@ ConnData {deleted}, cMode)
2037- | deleted /= deleted' -> pure $ Left SEConnNotFound
2038- | otherwise -> do
2039- rQ <- getRcvQueuesByConnId_ dbConn connId
2040- sQ <- getSndQueuesByConnId_ dbConn connId
2041- pure $ case (rQ, sQ, cMode) of
2042- (Just rqs, Just sqs, CMInvitation ) -> Right $ SomeConn SCDuplex (DuplexConnection cData rqs sqs)
2043- (Just (rq :| _), Nothing , CMInvitation ) -> Right $ SomeConn SCRcv (RcvConnection cData rq)
2044- (Nothing , Just (sq :| _), CMInvitation ) -> Right $ SomeConn SCSnd (SndConnection cData sq)
2045- (Just (rq :| _), Nothing , CMContact ) -> Right $ SomeConn SCContact (ContactConnection cData rq)
2046- (Nothing , Nothing , _) -> Right $ SomeConn SCNew (NewConnection cData)
2047- _ -> Left SEConnNotFound
20482054
20492055getConns :: DB. Connection -> [ConnId ] -> IO [Either StoreError SomeConn ]
20502056getConns = getAnyConns_ False
@@ -2054,28 +2060,84 @@ getDeletedConns :: DB.Connection -> [ConnId] -> IO [Either StoreError SomeConn]
20542060getDeletedConns = getAnyConns_ True
20552061{-# INLINE getDeletedConns #-}
20562062
2063+ #if defined(dbPostgres)
20572064getAnyConns_ :: Bool -> DB. Connection -> [ConnId ] -> IO [Either StoreError SomeConn ]
2058- getAnyConns_ deleted' db connIds = forM connIds $ E. handle handleDBError . getAnyConn deleted' db
2065+ getAnyConns_ deleted' db connIds = do
2066+ cs <- getConnsData_ deleted' db connIds
2067+ let connIds' = M. keys cs
2068+ rQs :: Map ConnId (NonEmpty RcvQueue ) <- getRcvQueuesByConnIds_ connIds'
2069+ sQs :: Map ConnId (NonEmpty SndQueue ) <- getSndQueuesByConnIds_ connIds'
2070+ pure $ map (result cs rQs sQs) connIds
20592071 where
2060- handleDBError :: E. SomeException -> IO (Either StoreError SomeConn )
2061- handleDBError = pure . Left . SEInternal . bshow
2072+ getRcvQueuesByConnIds_ connIds' =
2073+ toQueueMap primaryFirst toRcvQueue
2074+ <$> DB. query db (rcvQueueQuery <> " WHERE q.conn_id IN ? AND q.deleted = 0" ) (Only (In connIds'))
2075+ where
2076+ primaryFirst RcvQueue {primary = p, dbReplaceQueueId = i} RcvQueue {primary = p', dbReplaceQueueId = i'} =
2077+ compare (Down p) (Down p') <> compare i i'
2078+ getSndQueuesByConnIds_ connIds' =
2079+ toQueueMap primaryFirst toSndQueue
2080+ <$> DB. query db (sndQueueQuery <> " WHERE q.conn_id IN ?" ) (Only (In connIds'))
2081+ where
2082+ primaryFirst SndQueue {primary = p, dbReplaceQueueId = i} SndQueue {primary = p', dbReplaceQueueId = i'} =
2083+ compare (Down p) (Down p') <> compare i i'
2084+ toQueueMap primaryFst toQueue =
2085+ M. fromList . map (\ qs@ (q :| _) -> (qConnId q, L. sortBy primaryFst qs)) . groupOn' qConnId . sortOn qConnId . map toQueue
2086+ result cs rQs sQs connId = case M. lookup connId cs of
2087+ Just (cData, cMode) -> case (M. lookup connId rQs, M. lookup connId sQs, cMode) of
2088+ (Just rqs, Just sqs, CMInvitation ) -> Right $ SomeConn SCDuplex (DuplexConnection cData rqs sqs)
2089+ (Just (rq :| _), Nothing , CMInvitation ) -> Right $ SomeConn SCRcv (RcvConnection cData rq)
2090+ (Nothing , Just (sq :| _), CMInvitation ) -> Right $ SomeConn SCSnd (SndConnection cData sq)
2091+ (Just (rq :| _), Nothing , CMContact ) -> Right $ SomeConn SCContact (ContactConnection cData rq)
2092+ (Nothing , Nothing , _) -> Right $ SomeConn SCNew (NewConnection cData)
2093+ _ -> Left SEConnNotFound
2094+ Nothing -> Left SEConnNotFound
2095+
2096+ getConnsData :: DB. Connection -> [ConnId ] -> IO [Either StoreError (Maybe (ConnData , ConnectionMode ))]
2097+ getConnsData db connIds = do
2098+ cs <- getConnsData_ False db connIds
2099+ pure $ map (Right . (`M.lookup` cs)) connIds
2100+
2101+ getConnsData_ :: Bool -> DB. Connection -> [ConnId ] -> IO (Map ConnId (ConnData , ConnectionMode ))
2102+ getConnsData_ deleted' db connIds =
2103+ M. fromList . map ((\ c@ (ConnData {connId}, _) -> (connId, c)) . rowToConnData) <$>
2104+ DB. query
2105+ db
2106+ [sql |
2107+ SELECT user_id, conn_id, conn_mode, smp_agent_version, enable_ntfs,
2108+ last_external_snd_msg_id, deleted, ratchet_sync_state, pq_support
2109+ FROM connections
2110+ WHERE conn_id IN ? AND deleted = ?
2111+ |]
2112+ (In connIds, BI deleted')
2113+
2114+ #else
2115+ getAnyConns_ :: Bool -> DB. Connection -> [ConnId ] -> IO [Either StoreError SomeConn ]
2116+ getAnyConns_ deleted' db connIds = forM connIds $ E. handle handleDBError . getAnyConn deleted' db
2117+
2118+ getConnsData :: DB. Connection -> [ConnId ] -> IO [Either StoreError (Maybe (ConnData , ConnectionMode ))]
2119+ getConnsData db connIds = forM connIds $ E. handle handleDBError . fmap Right . getConnData False db
20622120
2063- getConnData :: DB. Connection -> ConnId -> IO (Maybe (ConnData , ConnectionMode ))
2064- getConnData db connId' =
2065- maybeFirstRow cData $
2121+ handleDBError :: E. SomeException -> IO (Either StoreError a )
2122+ handleDBError = pure . Left . SEInternal . bshow
2123+ #endif
2124+
2125+ getConnData :: Bool -> DB. Connection -> ConnId -> IO (Maybe (ConnData , ConnectionMode ))
2126+ getConnData deleted' db connId' =
2127+ maybeFirstRow rowToConnData $
20662128 DB. query
20672129 db
20682130 [sql |
2069- SELECT
2070- user_id, conn_id, conn_mode, smp_agent_version, enable_ntfs,
2131+ SELECT user_id, conn_id, conn_mode, smp_agent_version, enable_ntfs,
20712132 last_external_snd_msg_id, deleted, ratchet_sync_state, pq_support
20722133 FROM connections
2073- WHERE conn_id = ?
2134+ WHERE conn_id = ? AND deleted = ?
20742135 |]
2075- (Only connId')
2076- where
2077- cData (userId, connId, cMode, connAgentVersion, enableNtfs_, lastExternalSndId, BI deleted, ratchetSyncState, pqSupport) =
2078- (ConnData {userId, connId, connAgentVersion, enableNtfs = maybe True unBI enableNtfs_, lastExternalSndId, deleted, ratchetSyncState, pqSupport}, cMode)
2136+ (connId', BI deleted')
2137+
2138+ rowToConnData :: (UserId , ConnId , ConnectionMode , VersionSMPA , Maybe BoolInt , PrevExternalSndId , BoolInt , RatchetSyncState , PQSupport ) -> (ConnData , ConnectionMode )
2139+ rowToConnData (userId, connId, cMode, connAgentVersion, enableNtfs_, lastExternalSndId, BI deleted, ratchetSyncState, pqSupport) =
2140+ (ConnData {userId, connId, connAgentVersion, enableNtfs = maybe True unBI enableNtfs_, lastExternalSndId, deleted, ratchetSyncState, pqSupport}, cMode)
20792141
20802142setConnDeleted :: DB. Connection -> Bool -> ConnId -> IO ()
20812143setConnDeleted db waitDelivery connId
@@ -2114,7 +2176,7 @@ addProcessedRatchetKeyHash db connId hash =
21142176
21152177checkRatchetKeyHashExists :: DB. Connection -> ConnId -> ByteString -> IO Bool
21162178checkRatchetKeyHashExists db connId hash =
2117- maybeFirstRow' False fromOnly $
2179+ maybeFirstRow' False fromOnlyBI $
21182180 DB. query
21192181 db
21202182 " SELECT 1 FROM processed_ratchet_key_hashes WHERE conn_id = ? AND hash = ? LIMIT 1"
0 commit comments