Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
3400139
ntf server: maintain xor-hash of all associated queue IDs via Postgre…
epoberezkin Aug 29, 2025
5c5ee4a
Merge branch 'ep/smp-rcv-service' into ep/smp-rcv-service-diff
epoberezkin Aug 29, 2025
bd4e2a1
smp server: xor hash with triggers
epoberezkin Aug 29, 2025
2444f7a
Merge branch 'ep/smp-rcv-service' into ep/smp-rcv-service-diff
epoberezkin Aug 29, 2025
967d22c
fix sql and using pgcrypto extension in tests
epoberezkin Aug 30, 2025
b9a07b5
track counts and hashes in smp/ntf servers via triggers, smp server s…
epoberezkin Aug 31, 2025
a7ec4b9
Merge branch 'ep/smp-rcv-service' into ep/smp-rcv-service-diff
epoberezkin Sep 2, 2025
f7126a3
agent migrations with functions/triggers
epoberezkin Sep 2, 2025
7fba70a
remove agent triggers
epoberezkin Sep 2, 2025
59e9081
try tracking service subs in the agent (WIP, does not compile)
epoberezkin Sep 3, 2025
58a0fc8
Revert "try tracking service subs in the agent (WIP, does not compile)"
epoberezkin Sep 15, 2025
145e340
Merge branch 'ep/smp-rcv-service' into ep/smp-rcv-service-diff
epoberezkin Sep 15, 2025
0a26e17
Merge branch 'ep/smp-rcv-service' into ep/smp-rcv-service-diff
epoberezkin Sep 16, 2025
48e8cb9
Merge branch 'ep/smp-rcv-service' into ep/smp-rcv-service-diff
epoberezkin Sep 30, 2025
b197c4f
Merge branch 'ep/smp-rcv-service' into ep/smp-rcv-service-diff
epoberezkin Oct 23, 2025
f63b170
Merge branch 'ep/smp-rcv-service' into ep/smp-rcv-service-diff
epoberezkin Nov 6, 2025
65cbf2f
comment
epoberezkin Nov 7, 2025
63efcfd
Merge branch 'ep/smp-rcv-service' into ep/smp-rcv-service-diff
epoberezkin Nov 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions simplexmq.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ library
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250702_conn_invitations_remove_cascade_delete
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251009_queue_to_subscribe
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251010_client_notices
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251020_service_certs
else
exposed-modules:
Simplex.Messaging.Agent.Store.SQLite
Expand Down Expand Up @@ -223,6 +224,7 @@ library
Simplex.Messaging.Agent.Store.Postgres.Common
Simplex.Messaging.Agent.Store.Postgres.DB
Simplex.Messaging.Agent.Store.Postgres.Migrations
Simplex.Messaging.Agent.Store.Postgres.Migrations.Util
Simplex.Messaging.Agent.Store.Postgres.Util
if !flag(client_library)
exposed-modules:
Expand Down
10 changes: 5 additions & 5 deletions src/Simplex/Messaging/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ import Simplex.Messaging.Protocol
ErrorType (AUTH),
MsgBody,
MsgFlags (..),
IdsHash,
NtfServer,
ProtoServerWithAuth (..),
ProtocolServer (..),
Expand All @@ -222,6 +221,7 @@ import Simplex.Messaging.Protocol
SMPMsgMeta,
SParty (..),
SProtocolType (..),
ServiceSub (..),
SndPublicAuthKey,
SubscriptionMode (..),
UserProtocol,
Expand Down Expand Up @@ -500,7 +500,7 @@ resubscribeConnections :: AgentClient -> [ConnId] -> AE (Map ConnId (Either Agen
resubscribeConnections c = withAgentEnv c . resubscribeConnections' c
{-# INLINE resubscribeConnections #-}

subscribeClientServices :: AgentClient -> UserId -> AE (Map SMPServer (Either AgentErrorType (Int64, IdsHash)))
subscribeClientServices :: AgentClient -> UserId -> AE (Map SMPServer (Either AgentErrorType ServiceSub))
subscribeClientServices c = withAgentEnv c . subscribeClientServices' c
{-# INLINE subscribeClientServices #-}

Expand Down Expand Up @@ -1507,15 +1507,15 @@ resubscribeConnections' c connIds = do
[] -> pure True
rqs' -> anyM $ map (atomically . hasActiveSubscription c) rqs'

-- TODO [certs rcv] compare hash with lock
subscribeClientServices' :: AgentClient -> UserId -> AM (Map SMPServer (Either AgentErrorType (Int64, IdsHash)))
-- TODO [certs rcv] compare hash. possibly, it should return both expected and returned counts
subscribeClientServices' :: AgentClient -> UserId -> AM (Map SMPServer (Either AgentErrorType ServiceSub))
subscribeClientServices' c userId =
ifM useService subscribe $ throwError $ CMD PROHIBITED "no user service allowed"
where
useService = liftIO $ (Just True ==) <$> TM.lookupIO userId (useClientServices c)
subscribe = do
srvs <- withStore' c (`getClientServiceServers` userId)
lift $ M.fromList . zip srvs <$> mapConcurrently (tryAllErrors' . subscribeClientService c userId) srvs
lift $ M.fromList <$> mapConcurrently (\(srv, ServiceSub _ n idsHash) -> fmap (srv,) $ tryAllErrors' $ subscribeClientService c userId srv n idsHash) srvs

-- requesting messages sequentially, to reduce memory usage
getConnectionMessages' :: AgentClient -> NonEmpty ConnMsgReq -> AM' (NonEmpty (Either AgentErrorType (Maybe SMPMsgMeta)))
Expand Down
9 changes: 5 additions & 4 deletions src/Simplex/Messaging/Agent/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ import Simplex.Messaging.Protocol
RcvNtfPublicDhKey,
SMPMsgMeta (..),
SProtocolType (..),
ServiceSub,
SndPublicAuthKey,
SubscriptionMode (..),
NewNtfCreds (..),
Expand Down Expand Up @@ -1689,10 +1690,10 @@ processClientNotices c@AgentClient {presetServers} tSess notices = do
logError $ "processClientNotices error: " <> tshow e
notifySub' c "" $ ERR e

subscribeClientService :: AgentClient -> UserId -> SMPServer -> AM (Int64, IdsHash)
subscribeClientService c userId srv =
withLogClient c NRMBackground (userId, srv, Nothing) B.empty "SUBS" $
(`subscribeService` SMP.SRecipientService) . connectedClient
subscribeClientService :: AgentClient -> UserId -> SMPServer -> Int64 -> IdsHash -> AM ServiceSub
subscribeClientService c userId srv n idsHash =
withLogClient c NRMBackground (userId, srv, Nothing) B.empty "SUBS" $ \(SMPConnectedClient smp _) ->
subscribeService smp SMP.SRecipientService n idsHash

activeClientSession :: AgentClient -> SMPTransportSession -> SessionId -> STM Bool
activeClientSession c tSess sessId = sameSess <$> tryReadSessVar tSess (smpClients c)
Expand Down
2 changes: 1 addition & 1 deletion src/Simplex/Messaging/Agent/NtfSubSupervisor.hs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ runNtfWorker c srv Worker {doWork} =
_ -> ((ntfSubConnId sub, INTERNAL "NSACheck - no subscription ID") : errs, subs, subIds)
updateSub :: DB.Connection -> NtfServer -> UTCTime -> UTCTime -> (NtfSubscription, NtfSubStatus) -> IO (Maybe SMPServer)
updateSub db ntfServer ts nextCheckTs (sub, status)
| ntfShouldSubscribe status =
| status `elem` subscribeNtfStatuses =
let sub' = sub {ntfSubStatus = NASCreated status}
in Nothing <$ updateNtfSubscription db sub' (NSANtf NSACheck) nextCheckTs
-- ntf server stopped subscribing to this queue
Expand Down
13 changes: 7 additions & 6 deletions src/Simplex/Messaging/Agent/Store/AgentStore.hs
Original file line number Diff line number Diff line change
Expand Up @@ -431,19 +431,20 @@ getClientService db userId srv =
where
toService (kh, cert, pk, serviceId_) = ((kh, (cert, pk)), serviceId_)

getClientServiceServers :: DB.Connection -> UserId -> IO [SMPServer]
getClientServiceServers :: DB.Connection -> UserId -> IO [(SMPServer, ServiceSub)]
getClientServiceServers db userId =
map toServer
<$> DB.query
db
[sql|
SELECT c.host, c.port, s.key_hash
SELECT c.host, c.port, s.key_hash, c.rcv_service_id, c.rcv_service_queue_count, c.rcv_service_queue_ids_hash
FROM client_services c
JOIN servers s ON s.host = c.host AND s.port = c.port
|]
(Only userId)
where
toServer (host, port, kh) = SMPServer host port kh
toServer (host, port, kh, serviceId, n, Binary idsHash) =
(SMPServer host port kh, ServiceSub serviceId n (IdsHash idsHash))

setClientServiceId :: DB.Connection -> UserId -> SMPServer -> ServiceId -> IO ()
setClientServiceId db userId srv serviceId =
Expand Down Expand Up @@ -2099,7 +2100,7 @@ insertRcvQueue_ db connId' rq@RcvQueue {..} subMode serverKeyHash_ = do
ntf_public_key, ntf_private_key, ntf_id, rcv_ntf_dh_secret
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?);
|]
( (host server, port server, rcvId, rcvServiceAssoc, connId', rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret)
( (host server, port server, rcvId, BI rcvServiceAssoc, connId', rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret)
:. (sndId, queueMode, status, BI toSubscribe, qId, BI primary, dbReplaceQueueId, smpClientVersion, serverKeyHash_)
:. (shortLinkId <$> shortLink, shortLinkKey <$> shortLink, linkPrivSigKey <$> shortLink, linkEncFixedData <$> shortLink)
:. ntfCredsFields
Expand Down Expand Up @@ -2468,13 +2469,13 @@ rcvQueueQuery =

toRcvQueue ::
(UserId, C.KeyHash, ConnId, NonEmpty TransportHost, ServiceName, SMP.RecipientId, SMP.RcvPrivateAuthKey, SMP.RcvDhSecret, C.PrivateKeyX25519, Maybe C.DhSecretX25519, SMP.SenderId, Maybe QueueMode)
:. (QueueStatus, Maybe BoolInt, Maybe NoticeId, DBEntityId, BoolInt, Maybe Int64, Maybe RcvSwitchStatus, Maybe VersionSMPC, Int, ServiceAssoc)
:. (QueueStatus, Maybe BoolInt, Maybe NoticeId, DBEntityId, BoolInt, Maybe Int64, Maybe RcvSwitchStatus, Maybe VersionSMPC, Int, BoolInt)
:. (Maybe SMP.NtfPublicAuthKey, Maybe SMP.NtfPrivateAuthKey, Maybe SMP.NotifierId, Maybe RcvNtfDhSecret)
:. (Maybe SMP.LinkId, Maybe LinkKey, Maybe C.PrivateKeyEd25519, Maybe EncDataBytes) ->
RcvQueue
toRcvQueue
( (userId, keyHash, connId, host, port, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, queueMode)
:. (status, enableNtfs_, clientNoticeId, dbQueueId, BI primary, dbReplaceQueueId, rcvSwchStatus, smpClientVersion_, deleteErrors, rcvServiceAssoc)
:. (status, enableNtfs_, clientNoticeId, dbQueueId, BI primary, dbReplaceQueueId, rcvSwchStatus, smpClientVersion_, deleteErrors, BI rcvServiceAssoc)
:. (ntfPublicKey_, ntfPrivateKey_, notifierId_, rcvNtfDhSecret_)
:. (shortLinkId_, shortLinkKey_, linkPrivSigKey_, linkEncFixedData_)
) =
Expand Down
4 changes: 3 additions & 1 deletion src/Simplex/Messaging/Agent/Store/Postgres/Migrations/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250322_short_links
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250702_conn_invitations_remove_cascade_delete
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251009_queue_to_subscribe
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251010_client_notices
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251020_service_certs
import Simplex.Messaging.Agent.Store.Shared (Migration (..))

schemaMigrations :: [(String, Text, Maybe Text)]
Expand All @@ -19,7 +20,8 @@ schemaMigrations =
("20250322_short_links", m20250322_short_links, Just down_m20250322_short_links),
("20250702_conn_invitations_remove_cascade_delete", m20250702_conn_invitations_remove_cascade_delete, Just down_m20250702_conn_invitations_remove_cascade_delete),
("20251009_queue_to_subscribe", m20251009_queue_to_subscribe, Just down_m20251009_queue_to_subscribe),
("20251010_client_notices", m20251010_client_notices, Just down_m20251010_client_notices)
("20251010_client_notices", m20251010_client_notices, Just down_m20251010_client_notices),
("20251020_service_certs", m20251020_service_certs, Just down_m20251020_service_certs)
]

-- | The list of migrations in ascending order by date
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{-# LANGUAGE QuasiQuotes #-}

module Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251020_service_certs where

import Data.Text (Text)
import qualified Data.Text as T
import Text.RawString.QQ (r)

m20251020_service_certs :: Text
m20251020_service_certs =
T.pack
[r|
CREATE TABLE client_services(
user_id BIGINT NOT NULL REFERENCES users ON UPDATE RESTRICT ON DELETE CASCADE,
host TEXT NOT NULL,
port TEXT NOT NULL,
service_cert BYTEA NOT NULL,
service_cert_hash BYTEA NOT NULL,
service_priv_key BYTEA NOT NULL,
service_id BYTEA,
service_queue_count BIGINT NOT NULL DEFAULT 0,
service_queue_ids_hash BYTEA NOT NULL DEFAULT '\x00000000000000000000000000000000',
FOREIGN KEY(host, port) REFERENCES servers ON UPDATE CASCADE ON DELETE RESTRICT
);

CREATE UNIQUE INDEX idx_server_certs_user_id_host_port ON client_services(user_id, host, port);
CREATE INDEX idx_server_certs_host_port ON client_services(host, port);

ALTER TABLE rcv_queues ADD COLUMN rcv_service_assoc SMALLINT NOT NULL DEFAULT 0;
|]

down_m20251020_service_certs :: Text
down_m20251020_service_certs =
T.pack
[r|
ALTER TABLE rcv_queues DROP COLUMN rcv_service_assoc;

DROP INDEX idx_server_certs_host_port;
DROP INDEX idx_server_certs_user_id_host_port;
DROP TABLE client_services;
|]
46 changes: 46 additions & 0 deletions src/Simplex/Messaging/Agent/Store/Postgres/Migrations/Util.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{-# LANGUAGE QuasiQuotes #-}

module Simplex.Messaging.Agent.Store.Postgres.Migrations.Util where

import Data.Text (Text)
import qualified Data.Text as T
import Text.RawString.QQ (r)

-- xor_combine is only applied to locally computed md5 hashes (128 bits/16 bytes),
-- so it is safe to require that all values are of the same length.
createXorHashFuncs :: Text
createXorHashFuncs =
T.pack
[r|
CREATE OR REPLACE FUNCTION xor_combine(state BYTEA, value BYTEA) RETURNS BYTEA
LANGUAGE plpgsql IMMUTABLE STRICT
AS $$
DECLARE
result BYTEA := state;
i INTEGER;
len INTEGER := octet_length(value);
BEGIN
IF octet_length(state) != len THEN
RAISE EXCEPTION 'Inputs must be equal length (% != %)', octet_length(state), len;
END IF;
FOR i IN 0..len-1 LOOP
result := set_byte(result, i, get_byte(state, i) # get_byte(value, i));
END LOOP;
RETURN result;
END;
$$;

CREATE OR REPLACE AGGREGATE xor_aggregate(BYTEA) (
SFUNC = xor_combine,
STYPE = BYTEA,
INITCOND = '\x00000000000000000000000000000000' -- 16 bytes
);
|]

dropXorHashFuncs :: Text
dropXorHashFuncs =
T.pack
[r|
DROP AGGREGATE xor_aggregate(BYTEA);
DROP FUNCTION xor_combine;
|]
Loading
Loading