Skip to content

Commit 77bf76e

Browse files
authored
smp server: fix in-memory server not restoring queue/service associations after 2+ restarts (#1618)
* smp server: fix in-memory server not restoring queue/service associations after 2+ restarts * fix test, do not reuse database
1 parent beafac1 commit 77bf76e

File tree

3 files changed

+55
-7
lines changed

3 files changed

+55
-7
lines changed

src/Simplex/Messaging/Server/QueueStore/STM.hs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,16 +114,19 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where
114114
serviceQueuesCount serviceSel = foldM (\n s -> (n +) . S.size <$> readTVarIO (serviceSel s)) 0
115115

116116
addQueue_ :: STMQueueStore q -> (RecipientId -> QueueRec -> IO q) -> RecipientId -> QueueRec -> IO (Either ErrorType q)
117-
addQueue_ st mkQ rId qr@QueueRec {senderId = sId, notifier, queueData} = do
117+
addQueue_ st mkQ rId qr@QueueRec {senderId = sId, notifier, queueData, rcvServiceId} = do
118118
sq <- mkQ rId qr
119119
add sq $>> withLog "addStoreQueue" st (\s -> logCreateQueue s rId qr) $> Right sq
120120
where
121121
STMQueueStore {queues, senders, notifiers, links} = st
122122
add q = atomically $ ifM hasId (pure $ Left DUPLICATE_) $ Right () <$ do
123123
TM.insert rId q queues
124124
TM.insert sId rId senders
125-
forM_ notifier $ \NtfCreds {notifierId} -> TM.insert notifierId rId notifiers
125+
forM_ notifier $ \NtfCreds {notifierId = nId, ntfServiceId} -> do
126+
TM.insert nId rId notifiers
127+
mapM_ (addServiceQueue st serviceNtfQueues nId) ntfServiceId
126128
forM_ queueData $ \(lnkId, _) -> TM.insert lnkId rId links
129+
mapM_ (addServiceQueue st serviceRcvQueues rId) rcvServiceId
127130
hasId = anyM [TM.member rId queues, TM.member sId senders, hasNotifier, hasLink]
128131
hasNotifier = maybe (pure False) (\NtfCreds {notifierId} -> TM.member notifierId notifiers) notifier
129132
hasLink = maybe (pure False) (\(lnkId, _) -> TM.member lnkId links) queueData

tests/ServerTests.hs

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ serverTests = do
8686
describe "Message notifications" $ do
8787
testMessageNotifications
8888
testMessageServiceNotifications
89+
testServiceNotificationsTwoRestarts
8990
describe "Message expiration" $ do
9091
testMsgExpireOnSend
9192
testMsgExpireOnInterval
@@ -1091,7 +1092,6 @@ testMessageServiceNotifications =
10911092
(rcvNtfPubDhKey, _) <- atomically $ C.generateKeyPair g
10921093
Resp "1" _ (NID nId _) <- signSendRecv rh rKey ("1", rId, NKEY nPub rcvNtfPubDhKey)
10931094
serviceKeys@(_, servicePK) <- atomically $ C.generateKeyPair g
1094-
-- TODO [certs] we need to get certificate fingerprint and include it into signed over for NSUB commands
10951095
testNtfServiceClient t serviceKeys $ \nh1 -> do
10961096
-- can't subscribe without service signature in service connection
10971097
Resp "2a" _ (ERR SERVICE) <- signSendRecv nh1 nKey ("2a", nId, NSUB)
@@ -1155,6 +1155,51 @@ testMessageServiceNotifications =
11551155
Resp "" _ (NMSG _ _) <- tGet1 nh
11561156
pure ()
11571157

1158+
testServiceNotificationsTwoRestarts :: SpecWith (ASrvTransport, AStoreType)
1159+
testServiceNotificationsTwoRestarts =
1160+
it "subscribe notifier as service and deliver notifications after two restarts" $ \ps@(ATransport t, _) -> do
1161+
g <- C.newRandom
1162+
(sPub, sKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g
1163+
(nPub, nKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g
1164+
serviceKeys@(_, servicePK) <- atomically $ C.generateKeyPair g
1165+
(rcvNtfPubDhKey, _) <- atomically $ C.generateKeyPair g
1166+
(rId, rKey, sId, dec, serviceId) <- withSmpServerStoreLogOn ps testPort $ runTest2 t $ \sh rh -> do
1167+
(sId, rId, rKey, dhShared) <- createAndSecureQueue rh sPub
1168+
let dec = decryptMsgV3 dhShared
1169+
Resp "0" _ (NID nId _) <- signSendRecv rh rKey ("0", rId, NKEY nPub rcvNtfPubDhKey)
1170+
testNtfServiceClient t serviceKeys $ \nh -> do
1171+
Resp "1" _ (SOK (Just serviceId)) <- serviceSignSendRecv nh nKey servicePK ("1", nId, NSUB)
1172+
deliverMessage rh rId rKey sh sId sKey nh "hello" dec
1173+
pure (rId, rKey, sId, dec, serviceId)
1174+
threadDelay 250000
1175+
withSmpServerStoreLogOn ps testPort $ runTest2 t $ \sh rh ->
1176+
testNtfServiceClient t serviceKeys $ \nh -> do
1177+
Resp "2.1" serviceId' (SOKS n) <- signSendRecv nh (C.APrivateAuthKey C.SEd25519 servicePK) ("2.1", serviceId, NSUBS)
1178+
n `shouldBe` 1
1179+
Resp "2.2" _ (SOK Nothing) <- signSendRecv rh rKey ("2.2", rId, SUB)
1180+
serviceId' `shouldBe` serviceId
1181+
deliverMessage rh rId rKey sh sId sKey nh "hello 2" dec
1182+
threadDelay 250000
1183+
withSmpServerStoreLogOn ps testPort $ runTest2 t $ \sh rh ->
1184+
testNtfServiceClient t serviceKeys $ \nh -> do
1185+
Resp "3.1" _ (SOKS n) <- signSendRecv nh (C.APrivateAuthKey C.SEd25519 servicePK) ("3.1", serviceId, NSUBS)
1186+
n `shouldBe` 1
1187+
Resp "3.2" _ (SOK Nothing) <- signSendRecv rh rKey ("3.2", rId, SUB)
1188+
deliverMessage rh rId rKey sh sId sKey nh "hello 3" dec
1189+
where
1190+
runTest2 :: Transport c => TProxy c 'TServer -> (THandleSMP c 'TClient -> THandleSMP c 'TClient -> IO a) -> ThreadId -> IO a
1191+
runTest2 _ test' server = do
1192+
a <- testSMPClient $ \h1 -> testSMPClient $ \h2 -> test' h1 h2
1193+
killThread server
1194+
pure a
1195+
deliverMessage rh rId rKey sh sId sKey nh msgText dec = do
1196+
Resp "msg-1" _ OK <- signSendRecv sh sKey ("msg-1", sId, _SEND' msgText)
1197+
Resp "" _ (Msg mId msg) <- tGet1 rh
1198+
Resp "msg-2" _ OK <- signSendRecv rh rKey ("msg-2", rId, ACK mId)
1199+
(dec mId msg, Right msgText) #== "delivered from queue"
1200+
Resp "" _ (NMSG _ _) <- tGet1 nh
1201+
pure ()
1202+
11581203
testMsgExpireOnSend :: SpecWith (ASrvTransport, AStoreType)
11591204
testMsgExpireOnSend =
11601205
it "should expire messages that are not received before messageTTL on SEND" $ \(ATransport (t :: TProxy c 'TServer), msType) -> do

tests/Test.hs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ main = do
102102
] -- skipComparisonForDownMigrations
103103
testStoreDBOpts
104104
"src/Simplex/Messaging/Server/QueueStore/Postgres/server_schema.sql"
105-
aroundAll_ (postgressBracket testServerDBConnectInfo) $
105+
around_ (postgressBracket testServerDBConnectInfo) $
106106
describe "SMP server via TLS, postgres+jornal message store" $
107107
before (pure (transport @TLS, ASType SQSPostgres SMSJournal)) serverTests
108108
#endif
@@ -122,13 +122,13 @@ main = do
122122
[] -- skipComparisonForDownMigrations
123123
ntfTestStoreDBOpts
124124
"src/Simplex/Messaging/Notifications/Server/Store/ntf_server_schema.sql"
125-
aroundAll_ (postgressBracket ntfTestServerDBConnectInfo) $ do
125+
around_ (postgressBracket ntfTestServerDBConnectInfo) $ do
126126
describe "Notifications server (SMP server: jornal store)" $
127127
ntfServerTests (transport @TLS, ASType SQSMemory SMSJournal)
128-
aroundAll_ (postgressBracket testServerDBConnectInfo) $
128+
around_ (postgressBracket testServerDBConnectInfo) $
129129
describe "Notifications server (SMP server: postgres+jornal store)" $
130130
ntfServerTests (transport @TLS, ASType SQSPostgres SMSJournal)
131-
aroundAll_ (postgressBracket testServerDBConnectInfo) $ do
131+
around_ (postgressBracket testServerDBConnectInfo) $ do
132132
describe "SMP client agent, postgres+jornal message store" $ agentTests (transport @TLS, ASType SQSPostgres SMSJournal)
133133
describe "SMP proxy, postgres+jornal message store" $
134134
before (pure $ ASType SQSPostgres SMSJournal) smpProxyTests

0 commit comments

Comments
 (0)