Skip to content

Commit 756cb5b

Browse files
smp server: batch processing of subscription messages
1 parent 3134d62 commit 756cb5b

File tree

7 files changed

+291
-14
lines changed

7 files changed

+291
-14
lines changed
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
# Server: batched SUB command processing
2+
3+
Implementation plan for Part 1 of [RFC 2026-03-28-subscription-performance](../rfcs/2026-03-28-subscription-performance.md).
4+
5+
## Current state
6+
7+
When a batch of ~135 SUB commands arrives, the server already batches:
8+
- Queue record lookups (`getQueueRecs` in `receive`, Server.hs:1151)
9+
- Command verification (`verifyLoadedQueue`, Server.hs:1152)
10+
11+
But command processing is per-command (`foldrM process` in `client`, Server.hs:1372-1375). Each SUB calls `subscribeQueueAndDeliver` which calls `tryPeekMsg` - one DB query per queue. For Postgres, that's ~135 individual `SELECT ... FROM messages WHERE recipient_id = ? ORDER BY message_id ASC LIMIT 1` queries per batch.
12+
13+
## Goal
14+
15+
Replace ~135 individual message peek queries with 1 batched query per batch. No protocol changes.
16+
17+
## Implementation
18+
19+
### Step 1: Add `tryPeekMsgs` to MsgStoreClass
20+
21+
File: `src/Simplex/Messaging/Server/MsgStore/Types.hs`
22+
23+
Add to `MsgStoreClass`:
24+
25+
```haskell
26+
tryPeekMsgs :: s -> [StoreQueue s] -> ExceptT ErrorType IO (Map RecipientId Message)
27+
```
28+
29+
Returns a map from recipient ID to earliest pending message for each queue that has one. Queues with no messages are absent from the map.
30+
31+
### Step 2: Parameterize `deliver` to accept pre-fetched message
32+
33+
File: `src/Simplex/Messaging/Server.hs`
34+
35+
Currently `deliver` (inside `subscribeQueueAndDeliver`, line 1641) calls `tryPeekMsg ms q`. Add a parameter for an optional pre-fetched message:
36+
37+
```haskell
38+
deliver :: Maybe Message -> (Bool, Maybe Sub) -> M s ResponseAndMessage
39+
deliver prefetchedMsg (hasSub, sub_) = do
40+
stats <- asks serverStats
41+
fmap (either ((,Nothing) . err) id) $ liftIO $ runExceptT $ do
42+
msg_ <- maybe (tryPeekMsg ms q) (pure . Just) prefetchedMsg
43+
...
44+
```
45+
46+
When `Nothing` is passed, falls back to individual `tryPeekMsg` (existing behavior). When `Just msg` is passed, uses it directly (batched path).
47+
48+
### Step 3: Pre-fetch messages before the processing loop
49+
50+
File: `src/Simplex/Messaging/Server.hs`
51+
52+
Currently (lines 1372-1375):
53+
54+
```haskell
55+
forever $
56+
atomically (readTBQueue rcvQ)
57+
>>= foldrM process ([], [])
58+
>>= \(rs_, msgs) -> ...
59+
```
60+
61+
Add a pre-fetch step before the existing loop:
62+
63+
```haskell
64+
forever $ do
65+
batch <- atomically (readTBQueue rcvQ)
66+
msgMap <- prefetchMsgs batch
67+
foldrM (process msgMap) ([], []) batch
68+
>>= \(rs_, msgs) -> ...
69+
```
70+
71+
`prefetchMsgs` scans the batch, collects queues from SUB commands that have a verified queue (`q_ = Just (q, _)`), calls `tryPeekMsgs` once, returns the map. For batches with no SUBs it returns an empty map (no DB call).
72+
73+
`process` passes the looked-up message (or Nothing) through to `processCommand` and down to `deliver`.
74+
75+
The `foldrM process` loop, `processCommand`, `subscribeQueueAndDeliver`, and all other command handlers stay structurally the same. Only `deliver` gains one parameter, and the `client` loop gains one pre-fetch call.
76+
77+
### Step 4: Review
78+
79+
Review the typeclass signature and server usage. Confirm the interface has the right shape before implementing store backends.
80+
81+
### Step 5: Implement for each store backend
82+
83+
#### Postgres
84+
85+
File: `src/Simplex/Messaging/Server/MsgStore/Postgres.hs`
86+
87+
Single query using `DISTINCT ON`:
88+
89+
```sql
90+
SELECT DISTINCT ON (recipient_id)
91+
recipient_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body
92+
FROM messages
93+
WHERE recipient_id IN ?
94+
ORDER BY recipient_id, message_id ASC
95+
```
96+
97+
Build `Map RecipientId Message` from results.
98+
99+
#### STM
100+
101+
File: `src/Simplex/Messaging/Server/MsgStore/STM.hs`
102+
103+
Loop over queues, call `tryPeekMsg` for each, collect into map.
104+
105+
#### Journal
106+
107+
File: `src/Simplex/Messaging/Server/MsgStore/Journal.hs`
108+
109+
Loop over queues, call `tryPeekMsg` for each, collect into map.
110+
111+
### Step 6: Handle edge cases
112+
113+
1. **Mixed batches**: `prefetchMsgs` collects only SUB queues. Non-SUB commands get Nothing for the pre-fetched message and process unchanged.
114+
115+
2. **Already-subscribed queues**: Include in pre-fetch - `deliver` is called for re-SUBs too (delivers pending message).
116+
117+
3. **Service subscriptions**: The pre-fetch doesn't care about service state. `sharedSubscribeQueue` handles service association in STM; message peek is the same.
118+
119+
4. **Error queues**: Verification errors from `receive` are Left values in the batch. `prefetchMsgs` only looks at Right values with SUB commands.
120+
121+
5. **Empty pre-fetch**: If batch has no SUBs (e.g., all ACKs), `prefetchMsgs` returns empty map, no DB call made.
122+
123+
### Step 7: Batch other commands (future, not in scope)
124+
125+
The same pattern (pre-fetch before loop, parameterize handler) can extend to:
126+
- `ACK` with `tryDelPeekMsg` - batch delete+peek
127+
- `GET` with `tryPeekMsg` - same map lookup
128+
129+
Lower priority since these don't have the N-at-once pattern of subscriptions.
130+
131+
## File changes summary
132+
133+
| File | Change |
134+
|---|---|
135+
| `src/Simplex/Messaging/Server/MsgStore/Types.hs` | Add `tryPeekMsgs` to typeclass |
136+
| `src/Simplex/Messaging/Server/MsgStore/Postgres.hs` | Implement `tryPeekMsgs` with batch SQL |
137+
| `src/Simplex/Messaging/Server/MsgStore/STM.hs` | Implement `tryPeekMsgs` as loop |
138+
| `src/Simplex/Messaging/Server/MsgStore/Journal.hs` | Implement `tryPeekMsgs` as loop |
139+
| `src/Simplex/Messaging/Server.hs` | Add `prefetchMsgs`, parameterize `deliver` |
140+
141+
## Testing
142+
143+
1. Existing server tests must pass unchanged (correctness preserved).
144+
2. Add a test that subscribes a batch of queues (some with pending messages, some without) and verifies all get correct SOK + MSG responses.
145+
3. Prometheus metrics: existing `qSub` stat should still increment correctly.
146+
147+
## Performance expectation
148+
149+
For 300K queues across ~2200 batches:
150+
- Before: ~300K individual DB queries
151+
- After: ~2200 batched DB queries (one per batch of ~135)
152+
- ~136x reduction in DB round-trips
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
# Subscription performance
2+
3+
No protocol changes. This is an implementation RFC addressing subscription performance bottlenecks in both the SMP router and the agent.
4+
5+
## Problem
6+
7+
Subscribing large numbers of queues is slow. A messaging client with ~300K queues per router across 3 routers takes over 1 hour to subscribe. For comparison, the NTF server with ~1M queues per router across 12 routers took 20-30 minutes (prior to NTF client services, now in master).
8+
9+
Even on fast networks (cloud VMs), a client with 1.1M active subscriptions needed ~1.5M attempts (commands sent) to fully subscribe - ~36% retry rate caused by the timeout cascade described below.
10+
11+
### Root causes
12+
13+
#### 1. Router: per-command processing in batches
14+
15+
Batch verification and queue lookups are already done efficiently for the whole batch in `Server.hs`. But `processCommand` is called per-command in a loop - each SUB does its own individual DB query for message peek/delivery. With ~135 SUBs per batch (current SMP version), that's 135 individual DB queries per batch instead of 1 batched query.
16+
17+
For 300K queues, that's ~2200 batches x 135 queries = ~300K individual DB queries on the router, which is the dominant bottleneck when using PostgreSQL storage.
18+
19+
NSUB is cheaper because it just registers for notifications without message delivery - no per-queue DB query.
20+
21+
#### 2. Agent: all queues read and sent at once
22+
23+
`getUserServerRcvQueueSubs` reads all queues for a `(userId, server)` pair in one query with no LIMIT. For 300K queues, the entire result set is loaded into memory, then all ~2200 batches are queued to send without waiting for responses.
24+
25+
The NTF server agent uses cursor-style reading with configurable batch sizes (900 subs per chunk, 90K per DB fetch) and waits for each chunk to be processed before fetching the next.
26+
27+
#### 3. No backpressure on sends
28+
29+
`nonBlockingWriteTBQueue` bypasses the `sndQ` bound by forking a thread when the queue is full. All batches are queued immediately, and all their response timers start simultaneously. A 30-second per-response timeout means later batches time out not because the router is slow to respond to them specifically, but because they're waiting in the router's receive queue behind thousands of earlier commands.
30+
31+
This causes cascading timeouts: timed-out responses trigger `resubscribeSMPSession`, which retries all pending subs. Three consecutive timeouts can trigger connection drop via the monitor thread, causing a full reconnection and retry of everything.
32+
33+
## Solution
34+
35+
### Part 1: Router - batched command processing
36+
37+
Move the per-command processing loop inside command handlers so that commands of the same type within a batch can be processed together.
38+
39+
Current flow:
40+
```
41+
receive batch -> verify all -> lookup queues all -> for each command: processCommand (individual DB query)
42+
```
43+
44+
Proposed flow:
45+
```
46+
receive batch -> verify all -> lookup queues all -> group by command type -> process group:
47+
SUB group: one batched message peek query for all queues
48+
NSUB group: batch registration (already cheap, but can batch DB writes)
49+
other commands: process individually as before
50+
```
51+
52+
For SUB, the batched processing would:
53+
1. Collect all queue IDs from the SUB group
54+
2. Perform a single DB query to peek messages for all queues
55+
3. Distribute results back to individual responses
56+
57+
This reduces ~135 DB queries per batch to 1, cutting router-side DB load by ~100x for subscriptions.
58+
59+
Commands where batching doesn't matter (SEND, ACK, KEY, etc.) continue to be processed individually.
60+
61+
### Part 2: Agent - cursor-based subscription with backpressure
62+
63+
Replace the all-at-once fetch-and-send pattern with cursor-style batching, similar to what the NTF server agent does.
64+
65+
Changes to `subscribeUserServer`:
66+
1. Fetch queues in fixed-size batches (e.g., configurable, default ~1000) using LIMIT/OFFSET or cursor-based pagination.
67+
2. Send each batch and wait for responses before sending the next.
68+
3. Remove the use of `nonBlockingWriteTBQueue` for subscription batches - use blocking writes or structured backpressure so response timers don't start until the batch is actually sent.
69+
70+
This ensures:
71+
- Memory usage is bounded (not 300K queue records in memory at once)
72+
- Response timeouts are meaningful (timer starts when the router receives the batch, not when it's queued locally)
73+
- Retries are scoped to the failed batch, not all pending subs
74+
- Works on slow/lossy networks by naturally pacing sends
75+
76+
### Part 3: Response timeout for batches
77+
78+
The current per-response 30-second timeout doesn't account for batch processing time. Options:
79+
80+
1. **Stagger deadlines**: later responses in a batch get proportionally more time. The `rcvConcurrency` field was designed for this but is never used.
81+
2. **Per-batch timeout**: instead of timing individual responses, timeout the entire batch with a budget proportional to batch size.
82+
3. **No timeout for subscription responses**: since subscriptions are sent as batches with backpressure (Part 2), and the connection is monitored by pings, individual response timeouts may not be needed. A subscription that doesn't get a response will be retried on reconnect.
83+
84+
## Priority and ordering
85+
86+
Part 1 (router batching) gives the biggest improvement and is independent of Parts 2/3.
87+
88+
Part 2 (agent cursor + backpressure) eliminates the retry cascade and is critical for slow networks.
89+
90+
Part 3 (timeout handling) is a refinement that can be addressed after Parts 1 and 2.

src/Simplex/Messaging/Server.hs

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1366,14 +1366,20 @@ client
13661366
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " commands"
13671367
let THandleParams {thVersion} = thParams'
13681368
clntServiceId = (\THClientService {serviceId} -> serviceId) <$> (peerClientService =<< thAuth thParams')
1369-
process t acc@(rs, msgs) =
1369+
process msgMap t acc@(rs, msgs) =
13701370
(maybe acc (\(!r, !msg_) -> (r : rs, maybe msgs (: msgs) msg_)))
1371-
<$> processCommand clntServiceId thVersion t
1372-
forever $
1373-
atomically (readTBQueue rcvQ)
1374-
>>= foldrM process ([], [])
1371+
<$> processCommand clntServiceId thVersion msgMap t
1372+
forever $ do
1373+
batch <- atomically (readTBQueue rcvQ)
1374+
msgMap <- prefetchMsgs batch
1375+
foldrM (process msgMap) ([], []) batch
13751376
>>= \(rs_, msgs) -> mapM_ (atomically . writeTBQueue sndQ . (,msgs)) (L.nonEmpty rs_)
13761377
where
1378+
prefetchMsgs :: NonEmpty (VerifiedTransmission s) -> M s (Either ErrorType (Map RecipientId Message))
1379+
prefetchMsgs batch =
1380+
let subQueues = [q | (Just (q, _), (_, _, Cmd SRecipient SUB)) <- L.toList batch]
1381+
in liftIO $ runExceptT $ tryPeekMsgs ms subQueues
1382+
13771383
processProxiedCmd :: Transmission (Command 'ProxiedClient) -> M s (Maybe ResponseAndMessage)
13781384
processProxiedCmd (corrId, EntityId sessId, command) = (\t -> ((corrId, EntityId sessId, t), Nothing)) <$$> case command of
13791385
PRXY srv auth -> ifM allowProxy getRelay (pure $ Just $ ERR $ PROXY BASIC_AUTH)
@@ -1454,8 +1460,8 @@ client
14541460
mkIncProxyStats ps psOwn own sel = do
14551461
incStat $ sel ps
14561462
when own $ incStat $ sel psOwn
1457-
processCommand :: Maybe ServiceId -> VersionSMP -> VerifiedTransmission s -> M s (Maybe ResponseAndMessage)
1458-
processCommand clntServiceId clntVersion (q_, (corrId, entId, cmd)) = case cmd of
1463+
processCommand :: Maybe ServiceId -> VersionSMP -> Either ErrorType (Map RecipientId Message) -> VerifiedTransmission s -> M s (Maybe ResponseAndMessage)
1464+
processCommand clntServiceId clntVersion msgMap (q_, (corrId, entId, cmd)) = case cmd of
14591465
Cmd SProxiedClient command -> processProxiedCmd (corrId, entId, command)
14601466
Cmd SSender command -> case command of
14611467
SKEY k -> withQueue $ \q qr -> checkMode QMMessaging qr $ secureQueue_ q k
@@ -1479,7 +1485,7 @@ client
14791485
pure $ allowNewQueues && maybe True ((== auth_) . Just) newQueueBasicAuth
14801486
Cmd SRecipient command ->
14811487
case command of
1482-
SUB -> withQueue' subscribeQueueAndDeliver
1488+
SUB -> withQueue' $ subscribeQueueAndDeliver (M.lookup entId <$> msgMap)
14831489
GET -> withQueue getMessage
14841490
ACK msgId -> withQueue $ acknowledgeMsg msgId
14851491
KEY sKey -> withQueue $ \q _ -> either err (corrId,entId,) <$> secureQueue_ q sKey
@@ -1620,8 +1626,8 @@ client
16201626
suspendQueue_ :: (StoreQueue s, QueueRec) -> M s (Transmission BrokerMsg)
16211627
suspendQueue_ (q, _) = liftIO $ either err (const ok) <$> suspendQueue (queueStore ms) q
16221628

1623-
subscribeQueueAndDeliver :: StoreQueue s -> QueueRec -> M s ResponseAndMessage
1624-
subscribeQueueAndDeliver q qr@QueueRec {rcvServiceId} =
1629+
subscribeQueueAndDeliver :: Either ErrorType (Maybe Message) -> StoreQueue s -> QueueRec -> M s ResponseAndMessage
1630+
subscribeQueueAndDeliver prefetchedMsg q qr@QueueRec {rcvServiceId} =
16251631
liftIO (TM.lookupIO entId $ subscriptions clnt) >>= \case
16261632
Nothing ->
16271633
sharedSubscribeQueue q SRecipientService rcvServiceId subscribers subscriptions serviceSubsCount (newSubscription NoSub) rcvServices >>= \case
@@ -1642,7 +1648,7 @@ client
16421648
deliver (hasSub, sub_) = do
16431649
stats <- asks serverStats
16441650
fmap (either ((,Nothing) . err) id) $ liftIO $ runExceptT $ do
1645-
msg_ <- tryPeekMsg ms q
1651+
msg_ <- liftEither prefetchedMsg
16461652
msg' <- forM msg_ $ \msg -> liftIO $ do
16471653
ts <- getSystemSeconds
16481654
sub <- maybe (atomically getSub) pure sub_
@@ -2087,7 +2093,7 @@ client
20872093
-- rejectOrVerify filters allowed commands, no need to repeat it here.
20882094
-- INTERNAL is used because processCommand never returns Nothing for sender commands (could be extracted for better types).
20892095
-- `fst` removes empty message that is only returned for `SUB` command
2090-
Right t''@(_, (corrId', entId', _)) -> maybe (corrId', entId', ERR INTERNAL) fst <$> lift (processCommand Nothing fwdVersion t'')
2096+
Right t''@(_, (corrId', entId', _)) -> maybe (corrId', entId', ERR INTERNAL) fst <$> lift (processCommand Nothing fwdVersion (Right M.empty) t'')
20912097
-- encode response
20922098
r' <- case batchTransmissions clntTHParams [Right (Nothing, encodeTransmission clntTHParams r)] of
20932099
[] -> throwE INTERNAL -- at least 1 item is guaranteed from NonEmpty/Right

src/Simplex/Messaging/Server/MsgStore/Journal.hs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ import Data.Functor (($>))
6464
import Data.Int (Int64)
6565
import Data.List (sort)
6666
import qualified Data.Map.Strict as M
67-
import Data.Maybe (fromMaybe, isJust, isNothing, mapMaybe)
67+
import Data.Maybe (catMaybes, fromMaybe, isJust, isNothing, mapMaybe)
6868
import Data.Text (Text)
6969
import qualified Data.Text as T
7070
import Data.Text.Encoding (decodeLatin1)
@@ -672,6 +672,9 @@ instance MsgStoreClass (JournalMsgStore s) where
672672
atomically $ writeTVar tipMsg $ Just (Just ml)
673673
pure $ Just msg
674674

675+
tryPeekMsgs st qs =
676+
M.fromList . catMaybes <$> mapM (\q -> (recipientId' q,) <$$> tryPeekMsg st q) qs
677+
675678
tryDeleteMsg_ :: JournalQueue s -> JournalMsgQueue s -> Bool -> StoreIO s ()
676679
tryDeleteMsg_ q mq@JournalMsgQueue {tipMsg, handles} logState = StoreIO $ (`E.finally` when logState (updateActiveAt q)) $
677680
void $

src/Simplex/Messaging/Server/MsgStore/Postgres.hs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import Data.List (intersperse)
4141
import qualified Data.Map.Strict as M
4242
import Data.Text (Text)
4343
import Data.Time.Clock.System (SystemTime (..))
44-
import Database.PostgreSQL.Simple (Binary (..), Only (..), (:.) (..))
44+
import Database.PostgreSQL.Simple (Binary (..), In (..), Only (..), (:.) (..))
4545
import qualified Database.PostgreSQL.Simple as DB
4646
import qualified Database.PostgreSQL.Simple.Copy as DB
4747
import Database.PostgreSQL.Simple.SqlQQ (sql)
@@ -246,6 +246,25 @@ instance MsgStoreClass PostgresMsgStore where
246246
tryPeekMsg ms q = isolateQueue ms q "tryPeekMsg" $ tryPeekMsg_ q ()
247247
{-# INLINE tryPeekMsg #-}
248248

249+
tryPeekMsgs :: PostgresMsgStore -> [PostgresQueue] -> ExceptT ErrorType IO (Map RecipientId Message)
250+
tryPeekMsgs _ms [] = pure M.empty
251+
tryPeekMsgs ms qs =
252+
uninterruptibleMask_ $
253+
withDB' "tryPeekMsgs" (queueStore_ ms) $ \db ->
254+
M.fromList . map toRcvMsg <$>
255+
DB.query
256+
db
257+
[sql|
258+
SELECT DISTINCT ON (recipient_id)
259+
recipient_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body
260+
FROM messages
261+
WHERE recipient_id IN ?
262+
ORDER BY recipient_id, message_id ASC
263+
|]
264+
(Only (In (map recipientId' qs)))
265+
where
266+
toRcvMsg (rId :. msg) = (rId, toMessage msg)
267+
249268
tryDelMsg :: PostgresMsgStore -> PostgresQueue -> MsgId -> ExceptT ErrorType IO (Maybe Message)
250269
tryDelMsg ms q msgId =
251270
uninterruptibleMask_ $

src/Simplex/Messaging/Server/MsgStore/STM.hs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import Control.Monad.Trans.Except
2424
import Data.Functor (($>))
2525
import Data.Int (Int64)
2626
import qualified Data.Map.Strict as M
27+
import Data.Maybe (catMaybes)
2728
import Data.Text (Text)
2829
import Simplex.Messaging.Protocol
2930
import Simplex.Messaging.Server.MsgStore.Types
@@ -176,6 +177,9 @@ instance MsgStoreClass STMMsgStore where
176177
tryPeekMsg_ _ = tryPeekTQueue . msgTQueue
177178
{-# INLINE tryPeekMsg_ #-}
178179

180+
tryPeekMsgs st qs =
181+
M.fromList . catMaybes <$> mapM (\q -> (recipientId' q,) <$$> tryPeekMsg st q) qs
182+
179183
tryDeleteMsg_ :: STMQueue -> STMMsgQueue -> Bool -> STM ()
180184
tryDeleteMsg_ _ STMMsgQueue {msgTQueue = q, size} _logState =
181185
tryReadTQueue q >>= \case

0 commit comments

Comments
 (0)