Skip to content

Commit 56ea2fd

Browse files
authored
refactor types for DB entity (#1548)
1 parent ffecd4a commit 56ea2fd

File tree

11 files changed

+171
-116
lines changed

11 files changed

+171
-116
lines changed

simplexmq.cabal

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ library
135135
Simplex.Messaging.Server.QueueStore.QueueInfo
136136
Simplex.Messaging.ServiceScheme
137137
Simplex.Messaging.Session
138+
Simplex.Messaging.Agent.Store.Entity
138139
Simplex.Messaging.TMap
139140
Simplex.Messaging.Transport
140141
Simplex.Messaging.Transport.Buffer
@@ -308,6 +309,7 @@ library
308309
, network-transport ==0.5.6
309310
, network-udp ==0.0.*
310311
, random >=1.1 && <1.3
312+
, scientific ==0.3.7.*
311313
, simple-logger ==0.1.*
312314
, socks ==0.6.*
313315
, stm ==2.5.*

src/Simplex/Messaging/Agent.hs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ import Simplex.Messaging.Protocol
216216
)
217217
import qualified Simplex.Messaging.Protocol as SMP
218218
import Simplex.Messaging.ServiceScheme (ServiceScheme (..))
219+
import Simplex.Messaging.Agent.Store.Entity
219220
import qualified Simplex.Messaging.TMap as TM
220221
import Simplex.Messaging.Transport (SMPVersion)
221222
import Simplex.Messaging.Util
@@ -833,7 +834,7 @@ newConn c userId enableNtfs cMode userData_ clientData pqInitKeys subMode = do
833834
`catchE` \e -> withStore' c (`deleteConnRecord` connId) >> throwE e
834835

835836
setContactShortLink' :: AgentClient -> ConnId -> ConnInfo -> Maybe CRClientData -> AM (ConnShortLink 'CMContact)
836-
setContactShortLink' c connId userData clientData =
837+
setContactShortLink' c connId userData clientData =
837838
withConnLock c connId "setContactShortLink" $
838839
withStore c (`getConn` connId) >>= \case
839840
SomeConn _ (ContactConnection _ rq) -> do
@@ -934,7 +935,7 @@ newRcvConnSrv c userId connId enableNtfs cMode userData_ clientData pqInitKeys s
934935
createRcvQueue nonce_ qd e2eKeys = do
935936
AgentConfig {smpClientVRange = vr} <- asks config
936937
-- TODO [notifications] send correct NTF credentials here
937-
-- let ntfCreds_ = Nothing
938+
-- let ntfCreds_ = Nothing
938939
(rq, qUri, tSess, sessId) <- newRcvQueue_ c userId connId srvWithAuth vr qd subMode nonce_ e2eKeys `catchAgentError` \e -> liftIO (print e) >> throwE e
939940
atomically $ incSMPServerStat c userId srv connCreated
940941
rq' <- withStore c $ \db -> updateNewConnRcv db connId rq
@@ -1122,7 +1123,7 @@ joinConnSrv c userId connId enableNtfs cReqUri@CRContactUri {} cInfo pqSup subMo
11221123
Nothing -> throwE $ AGENT A_VERSION
11231124

11241125
delInvSL :: AgentClient -> ConnId -> SMPServerWithAuth -> SMP.LinkId -> AM ()
1125-
delInvSL c connId srv lnkId =
1126+
delInvSL c connId srv lnkId =
11261127
withStore' c (\db -> deleteInvShortLink db (protoServer srv) lnkId) `catchE` \e ->
11271128
liftIO $ nonBlockingWriteTBQueue (subQ c) ("", connId, AEvt SAEConn (ERR $ INTERNAL $ "error deleting short link " <> show e))
11281129

@@ -1293,7 +1294,7 @@ getConnectionMessages' c = mapM $ tryAgentError' . getConnectionMessage
12931294
msg_ <- getQueueMessage c rq `catchAgentError` \e -> atomically (releaseGetLock c rq) >> throwError e
12941295
when (isNothing msg_) $ do
12951296
atomically $ releaseGetLock c rq
1296-
forM_ msgTs_ $ \msgTs -> withStore' c $ \db -> setLastBrokerTs db connId (DBQueueId dbQueueId) msgTs
1297+
forM_ msgTs_ $ \msgTs -> withStore' c $ \db -> setLastBrokerTs db connId (DBEntityId dbQueueId) msgTs
12971298
pure msg_
12981299
{-# INLINE getConnectionMessages' #-}
12991300

@@ -1910,7 +1911,7 @@ switchConnection' c connId =
19101911
_ -> throwE $ CMD PROHIBITED "switchConnection: not duplex"
19111912

19121913
switchDuplexConnection :: AgentClient -> Connection 'CDuplex -> RcvQueue -> AM ConnectionStats
1913-
switchDuplexConnection c (DuplexConnection cData@ConnData {connId, userId} rqs sqs) rq@RcvQueue {server, dbQueueId = DBQueueId dbQueueId, sndId} = do
1914+
switchDuplexConnection c (DuplexConnection cData@ConnData {connId, userId} rqs sqs) rq@RcvQueue {server, dbQueueId = DBEntityId dbQueueId, sndId} = do
19141915
checkRQSwchStatus rq RSSwitchStarted
19151916
clientVRange <- asks $ smpClientVRange . config
19161917
-- try to get the server that is different from all queues, or at least from the primary rcv queue
@@ -2940,7 +2941,7 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
29402941
Just qInfo@(Compatible sqInfo@SMPQueueInfo {queueAddress}) ->
29412942
case (findQ (qAddress sqInfo) sqs, findQ addr sqs) of
29422943
(Just _, _) -> qError "QADD: queue address is already used in connection"
2943-
(_, Just sq@SndQueue {dbQueueId = DBQueueId dbQueueId}) -> do
2944+
(_, Just sq@SndQueue {dbQueueId = DBEntityId dbQueueId}) -> do
29442945
let (delSqs, keepSqs) = L.partition ((Just dbQueueId ==) . dbReplaceQId) sqs
29452946
case L.nonEmpty keepSqs of
29462947
Just sqs' -> do
@@ -3278,7 +3279,7 @@ newSndQueue userId connId (Compatible (SMPQueueInfo smpClientVersion SMPQueueAdd
32783279
e2ePubKey = Just e2ePubKey,
32793280
-- setting status to Secured prevents SKEY when queue was already secured with LKEY
32803281
status = if isJust sndKeys_ then Secured else New,
3281-
dbQueueId = DBNewQueue,
3282+
dbQueueId = DBNewEntity,
32823283
primary = True,
32833284
dbReplaceQueueId = Nothing,
32843285
sndSwchStatus = Nothing,

src/Simplex/Messaging/Agent/Client.hs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@ import Simplex.Messaging.Protocol
278278
import qualified Simplex.Messaging.Protocol as SMP
279279
import Simplex.Messaging.Server.QueueStore.QueueInfo
280280
import Simplex.Messaging.Session
281+
import Simplex.Messaging.Agent.Store.Entity
281282
import Simplex.Messaging.TMap (TMap)
282283
import qualified Simplex.Messaging.TMap as TM
283284
import Simplex.Messaging.Transport (SMPVersion, SessionId, THandleParams (sessionId, thVersion), TransportError (..), TransportPeer (..), sndAuthKeySMPVersion, shortLinksSMPVersion)
@@ -1083,7 +1084,7 @@ sendOrProxySMPCommand ::
10831084
UserId ->
10841085
SMPServer ->
10851086
ConnId -> -- session entity ID, for short links LinkId is used
1086-
ByteString ->
1087+
ByteString ->
10871088
SMP.EntityId -> -- sender or link ID
10881089
(SMPClient -> ProxiedRelay -> ExceptT SMPClientError IO (Either ProxyClientError a)) ->
10891090
(SMPClient -> ExceptT SMPClientError IO a) ->
@@ -1395,7 +1396,7 @@ newRcvQueue_ c userId connId (ProtoServerWithAuth srv auth) vRange cqrd subMode
13951396
queueMode,
13961397
shortLink,
13971398
status = New,
1398-
dbQueueId = DBNewQueue,
1399+
dbQueueId = DBNewEntity,
13991400
primary = True,
14001401
dbReplaceQueueId = Nothing,
14011402
rcvSwchStatus = Nothing,
@@ -1408,7 +1409,7 @@ newRcvQueue_ c userId connId (ProtoServerWithAuth srv auth) vRange cqrd subMode
14081409
where
14091410
mkShortLinkCreds :: (THandleParams SMPVersion 'TClient, QueueIdsKeys) -> AM (Maybe ShortLinkCreds)
14101411
mkShortLinkCreds (thParams', QIK {sndId, queueMode, linkId}) = case (cqrd, queueMode) of
1411-
(CQRMessaging ld, Just QMMessaging) ->
1412+
(CQRMessaging ld, Just QMMessaging) ->
14121413
withLinkData ld $ \lnkId CQRData {linkKey, privSigKey, srvReq = (sndId', d)} ->
14131414
if sndId == sndId'
14141415
then pure $ Just $ ShortLinkCreds lnkId linkKey privSigKey (fst d)

src/Simplex/Messaging/Agent/Store.hs

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -52,30 +52,19 @@ import Simplex.Messaging.Protocol
5252
VersionSMPC,
5353
)
5454
import qualified Simplex.Messaging.Protocol as SMP
55+
import Simplex.Messaging.Agent.Store.Entity
5556

5657
createStore :: DBOpts -> MigrationConfirmation -> IO (Either MigrationError DBStore)
5758
createStore dbOpts = createDBStore dbOpts appMigrations
5859

5960
-- * Queue types
6061

61-
data QueueStored = QSStored | QSNew
62+
type RcvQueue = StoredRcvQueue 'DBStored
6263

63-
data SQueueStored (q :: QueueStored) where
64-
SQSStored :: SQueueStored 'QSStored
65-
SQSNew :: SQueueStored 'QSNew
66-
67-
data DBQueueId (q :: QueueStored) where
68-
DBQueueId :: Int64 -> DBQueueId 'QSStored
69-
DBNewQueue :: DBQueueId 'QSNew
70-
71-
deriving instance Show (DBQueueId q)
72-
73-
type RcvQueue = StoredRcvQueue 'QSStored
74-
75-
type NewRcvQueue = StoredRcvQueue 'QSNew
64+
type NewRcvQueue = StoredRcvQueue 'DBNew
7665

7766
-- | A receive queue. SMP queue through which the agent receives messages from a sender.
78-
data StoredRcvQueue (q :: QueueStored) = RcvQueue
67+
data StoredRcvQueue (q :: DBStored) = RcvQueue
7968
{ userId :: UserId,
8069
connId :: ConnId,
8170
server :: SMPServer,
@@ -98,7 +87,7 @@ data StoredRcvQueue (q :: QueueStored) = RcvQueue
9887
-- | queue status
9988
status :: QueueStatus,
10089
-- | database queue ID (within connection)
101-
dbQueueId :: DBQueueId q,
90+
dbQueueId :: DBEntityId' q,
10291
-- | True for a primary or a next primary queue of the connection (next if dbReplaceQueueId is set)
10392
primary :: Bool,
10493
-- | database queue ID to replace, Nothing if this queue is not replacing another, `Just Nothing` is used for replacing old queues
@@ -160,12 +149,12 @@ data InvShortLink = InvShortLink
160149
}
161150
deriving (Show)
162151

163-
type SndQueue = StoredSndQueue 'QSStored
152+
type SndQueue = StoredSndQueue 'DBStored
164153

165-
type NewSndQueue = StoredSndQueue 'QSNew
154+
type NewSndQueue = StoredSndQueue 'DBNew
166155

167156
-- | A send queue. SMP queue through which the agent sends messages to a recipient.
168-
data StoredSndQueue (q :: QueueStored) = SndQueue
157+
data StoredSndQueue (q :: DBStored) = SndQueue
169158
{ userId :: UserId,
170159
connId :: ConnId,
171160
server :: SMPServer,
@@ -184,7 +173,7 @@ data StoredSndQueue (q :: QueueStored) = SndQueue
184173
-- | queue status
185174
status :: QueueStatus,
186175
-- | database queue ID (within connection)
187-
dbQueueId :: DBQueueId q,
176+
dbQueueId :: DBEntityId' q,
188177
-- | True for a primary or a next primary queue of the connection (next if dbReplaceQueueId is set)
189178
primary :: Bool,
190179
-- | ID of the queue this one is replacing
@@ -257,7 +246,7 @@ instance SMPQueueRec RcvQueue where
257246
{-# INLINE qUserId #-}
258247
qConnId RcvQueue {connId} = connId
259248
{-# INLINE qConnId #-}
260-
dbQId RcvQueue {dbQueueId = DBQueueId qId} = qId
249+
dbQId RcvQueue {dbQueueId = DBEntityId qId} = qId
261250
{-# INLINE dbQId #-}
262251
dbReplaceQId RcvQueue {dbReplaceQueueId} = dbReplaceQueueId
263252
{-# INLINE dbReplaceQId #-}
@@ -267,7 +256,7 @@ instance SMPQueueRec SndQueue where
267256
{-# INLINE qUserId #-}
268257
qConnId SndQueue {connId} = connId
269258
{-# INLINE qConnId #-}
270-
dbQId SndQueue {dbQueueId = DBQueueId qId} = qId
259+
dbQId SndQueue {dbQueueId = DBEntityId qId} = qId
271260
{-# INLINE dbQId #-}
272261
dbReplaceQId SndQueue {dbReplaceQueueId} = dbReplaceQueueId
273262
{-# INLINE dbReplaceQId #-}

src/Simplex/Messaging/Agent/Store/AgentStore.hs

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,7 @@ import Simplex.Messaging.Notifications.Types
283283
import Simplex.Messaging.Parsers (parseAll)
284284
import Simplex.Messaging.Protocol
285285
import qualified Simplex.Messaging.Protocol as SMP
286+
import Simplex.Messaging.Agent.Store.Entity
286287
import Simplex.Messaging.Transport.Client (TransportHost)
287288
import Simplex.Messaging.Util (bshow, catchAllErrors, eitherToMaybe, firstRow, firstRow', ifM, maybeFirstRow, tshow, ($>>=), (<$$>))
288289
import Simplex.Messaging.Version.Internal
@@ -858,7 +859,7 @@ createRcvMsg db connId rq@RcvQueue {dbQueueId} rcvMsgData@RcvMsgData {msgMeta =
858859
updateRcvMsgHash db connId sndMsgId internalRcvId internalHash
859860
setLastBrokerTs db connId dbQueueId brokerTs
860861

861-
setLastBrokerTs :: DB.Connection -> ConnId -> DBQueueId 'QSStored -> UTCTime -> IO ()
862+
setLastBrokerTs :: DB.Connection -> ConnId -> DBEntityId -> UTCTime -> IO ()
862863
setLastBrokerTs db connId dbQueueId brokerTs =
863864
DB.execute db "UPDATE rcv_queues SET last_broker_ts = ? WHERE conn_id = ? AND rcv_queue_id = ? AND (last_broker_ts IS NULL OR last_broker_ts < ?)" (brokerTs, connId, dbQueueId, brokerTs)
864865

@@ -1212,7 +1213,7 @@ getSndRatchet db connId v =
12121213
DB.query db "SELECT ratchet_state, x3dh_pub_key_1, x3dh_pub_key_2, pq_pub_kem FROM ratchets WHERE conn_id = ?" (Only connId)
12131214
where
12141215
result = \case
1215-
(Just ratchetState, Just k1, Just k2, pKem_) ->
1216+
(Just ratchetState, Just k1, Just k2, pKem_) ->
12161217
let params = case pKem_ of
12171218
Nothing -> CR.AE2ERatchetParams CR.SRKSProposed (CR.E2ERatchetParams v k1 k2 Nothing)
12181219
Just (CR.ARKP s pKem) -> CR.AE2ERatchetParams s (CR.E2ERatchetParams v k1 k2 (Just pKem))
@@ -1811,15 +1812,6 @@ instance ToField QueueStatus where toField = toField . serializeQueueStatus
18111812

18121813
instance FromField QueueStatus where fromField = fromTextField_ queueStatusT
18131814

1814-
instance ToField (DBQueueId 'QSStored) where toField (DBQueueId qId) = toField qId
1815-
1816-
instance FromField (DBQueueId 'QSStored) where
1817-
#if defined(dbPostgres)
1818-
fromField x dat = DBQueueId <$> fromField x dat
1819-
#else
1820-
fromField x = DBQueueId <$> fromField x
1821-
#endif
1822-
18231815
instance ToField InternalRcvId where toField (InternalRcvId x) = toField x
18241816

18251817
deriving newtype instance FromField InternalRcvId
@@ -2018,13 +2010,13 @@ insertSndQueue_ db connId' sq@SndQueue {..} serverKeyHash_ = do
20182010
smp_client_version=EXCLUDED.smp_client_version,
20192011
server_key_hash=EXCLUDED.server_key_hash
20202012
|]
2021-
((host server, port server, sndId, queueMode, connId', sndPublicKey, sndPrivateKey, e2ePubKey, e2eDhSecret)
2013+
((host server, port server, sndId, queueMode, connId', sndPublicKey, sndPrivateKey, e2ePubKey, e2eDhSecret)
20222014
:. (status, qId, BI primary, dbReplaceQueueId, smpClientVersion, serverKeyHash_))
20232015
pure (sq :: NewSndQueue) {connId = connId', dbQueueId = qId}
20242016

2025-
newQueueId_ :: [Only Int64] -> DBQueueId 'QSStored
2026-
newQueueId_ [] = DBQueueId 1
2027-
newQueueId_ (Only maxId : _) = DBQueueId (maxId + 1)
2017+
newQueueId_ :: [Only Int64] -> DBEntityId
2018+
newQueueId_ [] = DBEntityId 1
2019+
newQueueId_ (Only maxId : _) = DBEntityId (maxId + 1)
20282020

20292021
-- * getConn helpers
20302022

@@ -2160,7 +2152,7 @@ rcvQueueQuery =
21602152

21612153
toRcvQueue ::
21622154
(UserId, C.KeyHash, ConnId, NonEmpty TransportHost, ServiceName, SMP.RecipientId, SMP.RcvPrivateAuthKey, SMP.RcvDhSecret, C.PrivateKeyX25519, Maybe C.DhSecretX25519, SMP.SenderId, Maybe QueueMode)
2163-
:. (QueueStatus, DBQueueId 'QSStored, BoolInt, Maybe Int64, Maybe RcvSwitchStatus, Maybe VersionSMPC, Int)
2155+
:. (QueueStatus, DBEntityId, BoolInt, Maybe Int64, Maybe RcvSwitchStatus, Maybe VersionSMPC, Int)
21642156
:. (Maybe SMP.NtfPublicAuthKey, Maybe SMP.NtfPrivateAuthKey, Maybe SMP.NotifierId, Maybe RcvNtfDhSecret)
21652157
:. (Maybe SMP.LinkId, Maybe LinkKey, Maybe C.PrivateKeyEd25519, Maybe EncDataBytes) ->
21662158
RcvQueue
@@ -2210,7 +2202,7 @@ sndQueueQuery =
22102202
toSndQueue ::
22112203
(UserId, C.KeyHash, ConnId, NonEmpty TransportHost, ServiceName, SenderId, Maybe QueueMode)
22122204
:. (Maybe SndPublicAuthKey, SndPrivateAuthKey, Maybe C.PublicKeyX25519, C.DhSecretX25519, QueueStatus)
2213-
:. (DBQueueId 'QSStored, BoolInt, Maybe Int64, Maybe SndSwitchStatus, VersionSMPC) ->
2205+
:. (DBEntityId, BoolInt, Maybe Int64, Maybe SndSwitchStatus, VersionSMPC) ->
22142206
SndQueue
22152207
toSndQueue
22162208
( (userId, keyHash, connId, host, port, sndId, queueMode)
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
{-# LANGUAGE CPP #-}
2+
{-# LANGUAGE DataKinds #-}
3+
{-# LANGUAGE FlexibleInstances #-}
4+
{-# LANGUAGE GADTs #-}
5+
{-# LANGUAGE KindSignatures #-}
6+
{-# LANGUAGE LambdaCase #-}
7+
{-# LANGUAGE ScopedTypeVariables #-}
8+
{-# LANGUAGE StandaloneDeriving #-}
9+
{-# LANGUAGE TypeApplications #-}
10+
11+
module Simplex.Messaging.Agent.Store.Entity where
12+
13+
import Data.Aeson (FromJSON (..), ToJSON (..))
14+
import qualified Data.Aeson as J
15+
import qualified Data.Aeson.Encoding as JE
16+
import Data.Int (Int64)
17+
import Data.Scientific (floatingOrInteger)
18+
import Simplex.Messaging.Agent.Store.DB (FromField (..), ToField (..))
19+
20+
data DBStored = DBStored | DBNew
21+
22+
data SDBStored (s :: DBStored) where
23+
SDBStored :: SDBStored 'DBStored
24+
SDBNew :: SDBStored 'DBNew
25+
26+
deriving instance Show (SDBStored s)
27+
28+
class DBStoredI s where sdbStored :: SDBStored s
29+
30+
instance DBStoredI 'DBStored where sdbStored = SDBStored
31+
32+
instance DBStoredI 'DBNew where sdbStored = SDBNew
33+
34+
data DBEntityId' (s :: DBStored) where
35+
DBEntityId :: Int64 -> DBEntityId' 'DBStored
36+
DBNewEntity :: DBEntityId' 'DBNew
37+
38+
deriving instance Show (DBEntityId' s)
39+
40+
deriving instance Eq (DBEntityId' s)
41+
42+
type DBEntityId = DBEntityId' 'DBStored
43+
44+
type DBNewEntity = DBEntityId' 'DBNew
45+
46+
instance ToJSON (DBEntityId' s) where
47+
toEncoding = \case
48+
DBEntityId i -> toEncoding i
49+
DBNewEntity -> JE.null_
50+
toJSON = \case
51+
DBEntityId i -> toJSON i
52+
DBNewEntity -> J.Null
53+
54+
instance DBStoredI s => FromJSON (DBEntityId' s) where
55+
parseJSON v = case (v, sdbStored @s) of
56+
(J.Null, SDBNew) -> pure DBNewEntity
57+
(J.Number n, SDBStored) -> case floatingOrInteger n of
58+
Left (_ :: Double) -> fail "bad DBEntityId"
59+
Right i -> pure $ DBEntityId (fromInteger i)
60+
_ -> fail "bad DBEntityId"
61+
omittedField = case sdbStored @s of
62+
SDBStored -> Nothing
63+
SDBNew -> Just DBNewEntity
64+
65+
instance FromField DBEntityId where
66+
#if defined(dbPostgres)
67+
fromField x dat = DBEntityId <$> fromField x dat
68+
#else
69+
fromField x = DBEntityId <$> fromField x
70+
#endif
71+
72+
instance ToField DBEntityId where toField (DBEntityId i) = toField i

0 commit comments

Comments
 (0)