Skip to content

Commit 7ac80bf

Browse files
agent: store shared message body only once (when it is the same across messages when batching) (#1453)
* agent: store shared message body only once (when it is the same across messages when batching) * rename * refactor * refactor * save bodies and messages in single transaction * comment * comment * comment * box * mapME * box * ValueOrRef * remove instances * refactor * comments * test * refactor * mapAccumLM compatibility with ghc 8.10.7 --------- Co-authored-by: Evgeny Poberezkin <[email protected]>
1 parent 0d8a1a2 commit 7ac80bf

File tree

13 files changed

+294
-95
lines changed

13 files changed

+294
-95
lines changed

simplexmq.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ library
152152
Simplex.Messaging.Agent.Store.Postgres.DB
153153
Simplex.Messaging.Agent.Store.Postgres.Migrations
154154
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20241210_initial
155+
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250203_msg_bodies
155156
if !flag(client_library)
156157
exposed-modules:
157158
Simplex.Messaging.Agent.Store.Postgres.Util

src/Simplex/Messaging/Agent.hs

Lines changed: 81 additions & 40 deletions
Large diffs are not rendered by default.

src/Simplex/Messaging/Agent/Client.hs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ module Simplex.Messaging.Agent.Client
146146
withStore',
147147
withStoreBatch,
148148
withStoreBatch',
149+
unsafeWithStore,
149150
storeError,
150151
userServers,
151152
pickServer,
@@ -2009,6 +2010,11 @@ withStore c action = do
20092010
]
20102011
#endif
20112012

2013+
unsafeWithStore :: AgentClient -> (DB.Connection -> IO a) -> AM' a
2014+
unsafeWithStore c action = do
2015+
st <- asks store
2016+
liftIO $ agentOperationBracket c AODatabase (\_ -> pure ()) $ withTransaction st action
2017+
20122018
withStoreBatch :: Traversable t => AgentClient -> (DB.Connection -> t (IO (Either AgentErrorType a))) -> AM' (t (Either AgentErrorType a))
20132019
withStoreBatch c actions = do
20142020
st <- asks store

src/Simplex/Messaging/Agent/Protocol.hs

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ module Simplex.Messaging.Agent.Protocol
140140
serializeQueueStatus,
141141
queueStatusT,
142142
agentMessageType,
143+
aMessageType,
143144
extraSMPServerHosts,
144145
updateSMPServerHosts,
145146
)
@@ -167,7 +168,7 @@ import Data.Time.Clock.System (SystemTime)
167168
import Data.Type.Equality
168169
import Data.Typeable ()
169170
import Data.Word (Word16, Word32)
170-
import Simplex.Messaging.Agent.Store.DB (FromField (..), ToField (..))
171+
import Simplex.Messaging.Agent.Store.DB (Binary (..), FromField (..), ToField (..))
171172
import Simplex.FileTransfer.Description
172173
import Simplex.FileTransfer.Protocol (FileParty (..))
173174
import Simplex.FileTransfer.Transport (XFTPErrorType)
@@ -855,20 +856,7 @@ agentMessageType = \case
855856
AgentConnInfo _ -> AM_CONN_INFO
856857
AgentConnInfoReply {} -> AM_CONN_INFO_REPLY
857858
AgentRatchetInfo _ -> AM_RATCHET_INFO
858-
AgentMessage _ aMsg -> case aMsg of
859-
-- HELLO is used both in v1 and in v2, but differently.
860-
-- - in v1 (and, possibly, in v2 for simplex connections) can be sent multiple times,
861-
-- until the queue is secured - the OK response from the server instead of initial AUTH errors confirms it.
862-
-- - in v2 duplexHandshake it is sent only once, when it is known that the queue was secured.
863-
HELLO -> AM_HELLO_
864-
A_MSG _ -> AM_A_MSG_
865-
A_RCVD {} -> AM_A_RCVD_
866-
A_QCONT _ -> AM_QCONT_
867-
QADD _ -> AM_QADD_
868-
QKEY _ -> AM_QKEY_
869-
QUSE _ -> AM_QUSE_
870-
QTEST _ -> AM_QTEST_
871-
EREADY _ -> AM_EREADY_
859+
AgentMessage _ aMsg -> aMessageType aMsg
872860

873861
data APrivHeader = APrivHeader
874862
{ -- | sequential ID assigned by the sending agent
@@ -946,6 +934,22 @@ data AMessage
946934
EREADY AgentMsgId
947935
deriving (Show)
948936

937+
aMessageType :: AMessage -> AgentMessageType
938+
aMessageType = \case
939+
-- HELLO is used both in v1 and in v2, but differently.
940+
-- - in v1 (and, possibly, in v2 for simplex connections) can be sent multiple times,
941+
-- until the queue is secured - the OK response from the server instead of initial AUTH errors confirms it.
942+
-- - in v2 duplexHandshake it is sent only once, when it is known that the queue was secured.
943+
HELLO -> AM_HELLO_
944+
A_MSG _ -> AM_A_MSG_
945+
A_RCVD {} -> AM_A_RCVD_
946+
A_QCONT _ -> AM_QCONT_
947+
QADD _ -> AM_QADD_
948+
QKEY _ -> AM_QKEY_
949+
QUSE _ -> AM_QUSE_
950+
QTEST _ -> AM_QTEST_
951+
EREADY _ -> AM_EREADY_
952+
949953
-- | this type is used to send as part of the protocol between different clients
950954
-- TODO possibly, rename fields and types referring to external and internal IDs to make them different
951955
data AMessageReceipt = AMessageReceipt
@@ -1010,6 +1014,10 @@ instance Encoding AMessage where
10101014
QTEST_ -> QTEST <$> smpP
10111015
EREADY_ -> EREADY <$> smpP
10121016

1017+
instance ToField AMessage where toField = toField . Binary . smpEncode
1018+
1019+
instance FromField AMessage where fromField = blobFieldParser smpP
1020+
10131021
instance Encoding AMessageReceipt where
10141022
smpEncode AMessageReceipt {agentMsgId, msgHash, rcptInfo} =
10151023
smpEncode (agentMsgId, msgHash, Large rcptInfo)

src/Simplex/Messaging/Agent/Store.hs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -543,10 +543,16 @@ data SndMsgData = SndMsgData
543543
pqEncryption :: PQEncryption,
544544
internalHash :: MsgHash,
545545
prevMsgHash :: MsgHash,
546-
encryptKey_ :: Maybe MsgEncryptKeyX448,
547-
paddedLen_ :: Maybe Int
546+
sndMsgPrepData_ :: Maybe SndMsgPrepData
548547
}
549548

549+
data SndMsgPrepData = SndMsgPrepData
550+
{ encryptKey :: MsgEncryptKeyX448,
551+
paddedLen :: Int,
552+
sndMsgBodyId :: Int64
553+
}
554+
deriving (Show)
555+
550556
data SndMsg = SndMsg
551557
{ internalId :: InternalId,
552558
internalSndId :: InternalSndId,
@@ -563,8 +569,16 @@ data PendingMsgData = PendingMsgData
563569
pqEncryption :: PQEncryption,
564570
msgRetryState :: Maybe RI2State,
565571
internalTs :: InternalTs,
566-
encryptKey_ :: Maybe MsgEncryptKeyX448,
567-
paddedLen_ :: Maybe Int
572+
internalSndId :: InternalSndId,
573+
prevMsgHash :: PrevSndMsgHash,
574+
pendingMsgPrepData_ :: Maybe PendingMsgPrepData
575+
}
576+
deriving (Show)
577+
578+
data PendingMsgPrepData = PendingMsgPrepData
579+
{ encryptKey :: MsgEncryptKeyX448,
580+
paddedLen :: Int,
581+
sndMsgBody :: AMessage
568582
}
569583
deriving (Show)
570584

src/Simplex/Messaging/Agent/Store/AgentStore.hs

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ module Simplex.Messaging.Agent.Store.AgentStore
9292
updateRcvIds,
9393
createRcvMsg,
9494
updateRcvMsgHash,
95+
createSndMsgBody,
9596
updateSndIds,
9697
createSndMsg,
9798
updateSndMsgHash,
@@ -770,6 +771,14 @@ createRcvMsg db connId rq@RcvQueue {dbQueueId} rcvMsgData@RcvMsgData {msgMeta =
770771
updateRcvMsgHash db connId sndMsgId internalRcvId internalHash
771772
DB.execute db "UPDATE rcv_queues SET last_broker_ts = ? WHERE conn_id = ? AND rcv_queue_id = ?" (brokerTs, connId, dbQueueId)
772773

774+
createSndMsgBody :: DB.Connection -> AMessage -> IO Int64
775+
createSndMsgBody db aMessage =
776+
fromOnly . head <$>
777+
DB.query
778+
db
779+
"INSERT INTO snd_message_bodies (agent_msg) VALUES (?) RETURNING snd_message_body_id"
780+
(Only aMessage)
781+
773782
updateSndIds :: DB.Connection -> ConnId -> IO (Either StoreError (InternalId, InternalSndId, PrevSndMsgHash))
774783
updateSndIds db connId = runExceptT $ do
775784
(lastInternalId, lastInternalSndId, prevSndHash) <- ExceptT $ retrieveLastIdsAndHashSnd_ db connId
@@ -836,26 +845,33 @@ getPendingQueueMsg db connId SndQueue {dbQueueId} =
836845
(connId, dbQueueId)
837846
getMsgData :: InternalId -> IO (Either StoreError (Maybe RcvQueue, PendingMsgData))
838847
getMsgData msgId = runExceptT $ do
839-
msg <- ExceptT $ firstRow pendingMsgData err getMsgData_
848+
msg <- ExceptT $ firstRow' pendingMsgData err getMsgData_
840849
rq_ <- liftIO $ L.head <$$> getRcvQueuesByConnId_ db connId
841850
pure (rq_, msg)
842851
where
843852
getMsgData_ =
844853
DB.query
845854
db
846855
[sql|
847-
SELECT m.msg_type, m.msg_flags, m.msg_body, m.pq_encryption, m.internal_ts, s.retry_int_slow, s.retry_int_fast, s.msg_encrypt_key, s.padded_msg_len
856+
SELECT
857+
m.msg_type, m.msg_flags, m.msg_body, m.pq_encryption, m.internal_ts, m.internal_snd_id, s.previous_msg_hash,
858+
s.retry_int_slow, s.retry_int_fast, s.msg_encrypt_key, s.padded_msg_len, sb.agent_msg
848859
FROM messages m
849860
JOIN snd_messages s ON s.conn_id = m.conn_id AND s.internal_id = m.internal_id
861+
LEFT JOIN snd_message_bodies sb ON sb.snd_message_body_id = s.snd_message_body_id
850862
WHERE m.conn_id = ? AND m.internal_id = ?
851863
|]
852864
(connId, msgId)
853865
err = SEInternal $ "msg delivery " <> bshow msgId <> " returned []"
854-
pendingMsgData :: (AgentMessageType, Maybe MsgFlags, MsgBody, PQEncryption, InternalTs, Maybe Int64, Maybe Int64, Maybe CR.MsgEncryptKeyX448, Maybe Int) -> PendingMsgData
855-
pendingMsgData (msgType, msgFlags_, msgBody, pqEncryption, internalTs, riSlow_, riFast_, encryptKey_, paddedLen_) =
866+
pendingMsgData :: (AgentMessageType, Maybe MsgFlags, MsgBody, PQEncryption, InternalTs, InternalSndId, PrevSndMsgHash, Maybe Int64, Maybe Int64, Maybe CR.MsgEncryptKeyX448, Maybe Int, Maybe AMessage) -> Either StoreError PendingMsgData
867+
pendingMsgData (msgType, msgFlags_, msgBody, pqEncryption, internalTs, internalSndId, prevMsgHash, riSlow_, riFast_, encryptKey_, paddedLen_, sndMsgBody_) = do
856868
let msgFlags = fromMaybe SMP.noMsgFlags msgFlags_
857869
msgRetryState = RI2State <$> riSlow_ <*> riFast_
858-
in PendingMsgData {msgId, msgType, msgFlags, msgBody, pqEncryption, msgRetryState, internalTs, encryptKey_, paddedLen_}
870+
result pendingMsgPrepData_ = PendingMsgData {msgId, msgType, msgFlags, msgBody, pqEncryption, msgRetryState, internalTs, internalSndId, prevMsgHash, pendingMsgPrepData_}
871+
in result <$> case (encryptKey_, paddedLen_, sndMsgBody_) of
872+
(Nothing, Nothing, Nothing) -> Right Nothing
873+
(Just encryptKey, Just paddedLen, Just sndMsgBody) -> Right $ Just PendingMsgPrepData {encryptKey, paddedLen, sndMsgBody}
874+
_ -> Left $ SEInternal "unexpected snd msg data"
859875
markMsgFailed msgId = DB.execute db "UPDATE snd_message_deliveries SET failed = 1 WHERE conn_id = ? AND internal_id = ?" (connId, msgId)
860876

861877
getWorkItem :: Show i => ByteString -> IO (Maybe i) -> (i -> IO (Either StoreError a)) -> (i -> IO ()) -> IO (Either StoreError (Maybe a))
@@ -997,7 +1013,6 @@ deleteDeliveredSndMsg db connId msgId = do
9971013
cnt <- countPendingSndDeliveries_ db connId msgId
9981014
when (cnt == 0) $ deleteMsg db connId msgId
9991015

1000-
-- TODO [save once] Delete from shared message bodies if no deliveries reference it. (`when (cnt == 0)`)
10011016
deleteSndMsgDelivery :: DB.Connection -> ConnId -> SndQueue -> InternalId -> Bool -> IO ()
10021017
deleteSndMsgDelivery db connId SndQueue {dbQueueId} msgId keepForReceipt = do
10031018
DB.execute
@@ -1006,11 +1021,19 @@ deleteSndMsgDelivery db connId SndQueue {dbQueueId} msgId keepForReceipt = do
10061021
(connId, dbQueueId, msgId)
10071022
cnt <- countPendingSndDeliveries_ db connId msgId
10081023
when (cnt == 0) $ do
1009-
del <-
1010-
maybeFirstRow id (DB.query db "SELECT rcpt_internal_id, rcpt_status FROM snd_messages WHERE conn_id = ? AND internal_id = ?" (connId, msgId)) >>= \case
1011-
Just (Just (_ :: Int64), Just MROk) -> pure deleteMsg
1012-
_ -> pure $ if keepForReceipt then deleteMsgContent else deleteMsg
1013-
del db connId msgId
1024+
maybeFirstRow id (DB.query db "SELECT rcpt_internal_id, rcpt_status, snd_message_body_id FROM snd_messages WHERE conn_id = ? AND internal_id = ?" (connId, msgId)) >>= \case
1025+
Just (Just (_ :: Int64), Just MROk, sndMsgBodyId_) -> do
1026+
forM_ sndMsgBodyId_ deleteSndMsgBody
1027+
deleteMsg db connId msgId
1028+
Just (_, _, Just (sndMsgBodyId :: Int64)) -> do
1029+
deleteSndMsgBody sndMsgBodyId
1030+
delKeepForReceipt
1031+
_ ->
1032+
delKeepForReceipt
1033+
where
1034+
delKeepForReceipt = if keepForReceipt then deleteMsgContent db connId msgId else deleteMsg db connId msgId
1035+
deleteSndMsgBody sndMsgBodyId =
1036+
DB.execute db "DELETE FROM snd_message_bodies WHERE snd_message_body_id = ?" (Only sndMsgBodyId)
10141037

10151038
countPendingSndDeliveries_ :: DB.Connection -> ConnId -> InternalId -> IO Int
10161039
countPendingSndDeliveries_ db connId msgId = do
@@ -2207,11 +2230,15 @@ insertSndMsgDetails_ dbConn connId SndMsgData {..} =
22072230
dbConn
22082231
[sql|
22092232
INSERT INTO snd_messages
2210-
( conn_id, internal_snd_id, internal_id, internal_hash, previous_msg_hash, msg_encrypt_key, padded_msg_len)
2233+
( conn_id, internal_snd_id, internal_id, internal_hash, previous_msg_hash, msg_encrypt_key, padded_msg_len, snd_message_body_id)
22112234
VALUES
2212-
(?,?,?,?,?,?,?)
2235+
(?,?,?,?,?,?,?,?)
22132236
|]
2214-
(connId, internalSndId, internalId, Binary internalHash, Binary prevMsgHash, encryptKey_, paddedLen_)
2237+
(connId, internalSndId, internalId, Binary internalHash, Binary prevMsgHash, encryptKey_, paddedLen_, sndMsgBodyId_)
2238+
where
2239+
(encryptKey_, paddedLen_, sndMsgBodyId_) = case sndMsgPrepData_ of
2240+
Nothing -> (Nothing, Nothing, Nothing)
2241+
Just SndMsgPrepData {encryptKey, paddedLen, sndMsgBodyId} -> (Just encryptKey, Just paddedLen, Just sndMsgBodyId)
22152242

22162243
updateSndMsgHash :: DB.Connection -> ConnId -> InternalSndId -> MsgHash -> IO ()
22172244
updateSndMsgHash db connId internalSndId internalHash =

src/Simplex/Messaging/Agent/Store/Postgres/Migrations.hs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@ import Database.PostgreSQL.Simple.Internal (Connection (..))
2525
import Database.PostgreSQL.Simple.SqlQQ (sql)
2626
import Simplex.Messaging.Agent.Store.Postgres.Common
2727
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20241210_initial
28+
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250203_msg_bodies
2829
import Simplex.Messaging.Agent.Store.Shared
2930
import UnliftIO.MVar
3031

3132
schemaMigrations :: [(String, Text, Maybe Text)]
3233
schemaMigrations =
33-
[ ("20241210_initial", m20241210_initial, Nothing)
34+
[ ("20241210_initial", m20241210_initial, Nothing),
35+
("20250203_msg_bodies", m20250203_msg_bodies, Just down_m20250203_msg_bodies)
3436
]
3537

3638
-- | The list of migrations in ascending order by date
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
{-# LANGUAGE QuasiQuotes #-}
2+
3+
module Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250203_msg_bodies where
4+
5+
import Data.Text (Text)
6+
import qualified Data.Text as T
7+
import Text.RawString.QQ (r)
8+
9+
m20250203_msg_bodies :: Text
10+
m20250203_msg_bodies =
11+
T.pack
12+
[r|
13+
ALTER TABLE snd_messages ADD COLUMN msg_encrypt_key BYTEA;
14+
ALTER TABLE snd_messages ADD COLUMN padded_msg_len BIGINT;
15+
16+
17+
CREATE TABLE snd_message_bodies (
18+
snd_message_body_id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
19+
agent_msg BYTEA NOT NULL DEFAULT ''::BYTEA
20+
);
21+
ALTER TABLE snd_messages ADD COLUMN snd_message_body_id BIGINT REFERENCES snd_message_bodies ON DELETE SET NULL;
22+
CREATE INDEX idx_snd_messages_snd_message_body_id ON snd_messages(snd_message_body_id);
23+
|]
24+
25+
26+
down_m20250203_msg_bodies :: Text
27+
down_m20250203_msg_bodies =
28+
T.pack
29+
[r|
30+
DROP INDEX idx_snd_messages_snd_message_body_id;
31+
ALTER TABLE snd_messages DROP COLUMN snd_message_body_id;
32+
DROP TABLE snd_message_bodies;
33+
34+
35+
ALTER TABLE snd_messages DROP COLUMN msg_encrypt_key;
36+
ALTER TABLE snd_messages DROP COLUMN padded_msg_len;
37+
|]

src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20250203_msg_bodies.hs

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,25 +12,22 @@ ALTER TABLE snd_messages ADD COLUMN msg_encrypt_key BLOB;
1212
ALTER TABLE snd_messages ADD COLUMN padded_msg_len INTEGER;
1313

1414

15-
-- CREATE TABLE msg_bodies (
16-
-- msg_body_id INTEGER PRIMARY KEY,
17-
-- msg_body BLOB NOT NULL DEFAULT x''
18-
-- )
19-
20-
-- ALTER TABLE snd_messages ADD COLUMN msg_body_id INTEGER REFERENCES msg_bodies ON DELETE CASCADE;
21-
22-
-- fkey to msg_bodies
23-
-- on each delivery check if other deliveries reference the same msg_body_id, if not delete it
15+
CREATE TABLE snd_message_bodies (
16+
snd_message_body_id INTEGER PRIMARY KEY,
17+
agent_msg BLOB NOT NULL DEFAULT x''
18+
);
19+
ALTER TABLE snd_messages ADD COLUMN snd_message_body_id INTEGER REFERENCES snd_message_bodies ON DELETE SET NULL;
20+
CREATE INDEX idx_snd_messages_snd_message_body_id ON snd_messages(snd_message_body_id);
2421
|]
2522

2623
down_m20250203_msg_bodies :: Query
2724
down_m20250203_msg_bodies =
2825
[sql|
29-
ALTER TABLE snd_messages DROP COLUMN msg_encrypt_key;
30-
ALTER TABLE snd_messages DROP COLUMN padded_msg_len;
26+
DROP INDEX idx_snd_messages_snd_message_body_id;
27+
ALTER TABLE snd_messages DROP COLUMN snd_message_body_id;
28+
DROP TABLE snd_message_bodies;
3129

3230

33-
-- ALTER TABLE snd_messages DROP COLUMN msg_body_id;
34-
35-
-- DROP TABLE msg_bodies;
31+
ALTER TABLE snd_messages DROP COLUMN msg_encrypt_key;
32+
ALTER TABLE snd_messages DROP COLUMN padded_msg_len;
3633
|]

src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ CREATE TABLE snd_messages(
129129
rcpt_status TEXT,
130130
msg_encrypt_key BLOB,
131131
padded_msg_len INTEGER,
132+
snd_message_body_id INTEGER REFERENCES snd_message_bodies ON DELETE SET NULL,
132133
PRIMARY KEY(conn_id, internal_snd_id),
133134
FOREIGN KEY(conn_id, internal_id) REFERENCES messages
134135
ON DELETE CASCADE
@@ -417,6 +418,10 @@ CREATE TABLE ntf_tokens_to_delete(
417418
del_failed INTEGER DEFAULT 0,
418419
created_at TEXT NOT NULL DEFAULT(datetime('now'))
419420
);
421+
CREATE TABLE snd_message_bodies(
422+
snd_message_body_id INTEGER PRIMARY KEY,
423+
agent_msg BLOB NOT NULL DEFAULT x''
424+
);
420425
CREATE UNIQUE INDEX idx_rcv_queues_ntf ON rcv_queues(host, port, ntf_id);
421426
CREATE UNIQUE INDEX idx_rcv_queue_id ON rcv_queues(conn_id, rcv_queue_id);
422427
CREATE UNIQUE INDEX idx_snd_queue_id ON snd_queues(conn_id, snd_queue_id);
@@ -543,3 +548,6 @@ CREATE INDEX idx_snd_message_deliveries_expired ON snd_message_deliveries(
543548
internal_id
544549
);
545550
CREATE INDEX idx_rcv_files_redirect_id on rcv_files(redirect_id);
551+
CREATE INDEX idx_snd_messages_snd_message_body_id ON snd_messages(
552+
snd_message_body_id
553+
);

0 commit comments

Comments
 (0)