Skip to content

Commit 9be9a76

Browse files
authored
smp server: improve message expiration (#1634)
* smp server: limit by time the queues to export journal messages for * pass queue/msg thresholds separately * reset db connection on errors * Revert "smp server: limit by time the queues to export journal messages for" This reverts commit d3bc0cb. * fix test compilation * flag to expire messages * improve test * expire messages newer than quota
1 parent 80f7be6 commit 9be9a76

File tree

4 files changed

+85
-65
lines changed

4 files changed

+85
-65
lines changed

src/Simplex/Messaging/Server/MsgStore/Postgres.hs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,12 @@ instance MsgStoreClass PostgresMsgStore where
109109
expireOldMessages :: Bool -> PostgresMsgStore -> Int64 -> Int64 -> IO MessageStats
110110
expireOldMessages _tty ms now ttl =
111111
maybeFirstRow' newMessageStats toMessageStats $ withConnection st $ \db ->
112-
DB.query db "CALL expire_old_messages(?,?,?,0,0,0)" (now, ttl, 10000 :: Int)
112+
DB.query db "CALL expire_old_messages(?,?,?,0,0,0)" (oldQueue, oldMsg, batchSize)
113113
where
114114
st = dbStore $ queueStore_ ms
115+
oldQueue = 0 :: Int64 -- expire all queues
116+
oldMsg = now - ttl
117+
batchSize = 10000 :: Int
115118
toMessageStats (expiredMsgsCount, storedMsgsCount, storedQueues) =
116119
MessageStats {expiredMsgsCount, storedMsgsCount, storedQueues}
117120

@@ -298,8 +301,8 @@ deleteAllMessages ms =
298301
db
299302
[sql|
300303
UPDATE msg_queues
301-
SET msg_queue_size = 0, msg_can_write = TRUE
302-
WHERE msg_queue_size != 0 OR msg_can_write = FALSE
304+
SET msg_queue_size = 0, msg_can_write = TRUE, msg_queue_expire = FALSE
305+
WHERE msg_queue_size != 0 OR msg_can_write = FALSE OR msg_queue_expire = TRUE
303306
|]
304307

305308
updateQueueCounts :: PostgresMsgStore -> IO ()
@@ -311,23 +314,24 @@ updateQueueCounts ms =
311314
CREATE TEMP TABLE queue_stats AS
312315
SELECT recipient_id,
313316
COUNT(*) AS size,
314-
BOOL_OR(msg_quota) AS has_quota
317+
SUM(CASE WHEN msg_quota THEN 1 ELSE 0 END) AS quota_count
315318
FROM messages
316319
GROUP BY recipient_id
317320
|]
318321
void $ DB.execute_
319322
db
320323
[sql|
321324
UPDATE msg_queues
322-
SET msg_queue_size = 0, msg_can_write = TRUE
323-
WHERE msg_queue_size != 0 OR msg_can_write = FALSE
325+
SET msg_queue_size = 0, msg_can_write = TRUE, msg_queue_expire = FALSE
326+
WHERE msg_queue_size != 0 OR msg_can_write = FALSE OR msg_queue_expire = TRUE
324327
|]
325328
void $ DB.execute_
326329
db
327330
[sql|
328331
UPDATE msg_queues q
329332
SET msg_queue_size = s.size,
330-
msg_can_write = NOT s.has_quota
333+
msg_can_write = s.quota_count = 0,
334+
msg_queue_expire = s.size > s.quota_count
331335
FROM queue_stats s
332336
WHERE q.recipient_id = s.recipient_id
333337
|]

src/Simplex/Messaging/Server/QueueStore/Postgres/Migrations.hs

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -177,14 +177,15 @@ CREATE TABLE messages(
177177

178178
ALTER TABLE msg_queues
179179
ADD COLUMN msg_can_write BOOLEAN NOT NULL DEFAULT TRUE,
180+
ADD COLUMN msg_queue_expire BOOLEAN NOT NULL DEFAULT FALSE,
180181
ADD COLUMN msg_queue_size BIGINT NOT NULL DEFAULT 0;
181182

182183
CREATE INDEX idx_messages_recipient_id_message_id ON messages (recipient_id, message_id);
183184
CREATE INDEX idx_messages_recipient_id_msg_ts on messages(recipient_id, msg_ts);
184185
CREATE INDEX idx_messages_recipient_id_msg_quota on messages(recipient_id, msg_quota);
185186

186187
DROP INDEX idx_msg_queues_updated_at;
187-
CREATE INDEX idx_msg_queues_updated_at_recipient_id ON msg_queues (deleted_at, updated_at, msg_queue_size, recipient_id);
188+
CREATE INDEX idx_msg_queues_updated_at_recipient_id ON msg_queues (deleted_at, updated_at, msg_queue_expire, recipient_id);
188189

189190
CREATE FUNCTION write_message(
190191
p_recipient_id BYTEA,
@@ -215,6 +216,7 @@ BEGIN
215216

216217
UPDATE msg_queues
217218
SET msg_can_write = NOT quota_written,
219+
msg_queue_expire = TRUE,
218220
msg_queue_size = msg_queue_size + 1
219221
WHERE recipient_id = p_recipient_id;
220222

@@ -248,7 +250,9 @@ BEGIN
248250
IF NOT FOUND THEN
249251
IF q_size != 0 THEN
250252
UPDATE msg_queues
251-
SET msg_can_write = TRUE, msg_queue_size = 0
253+
SET msg_can_write = TRUE,
254+
msg_queue_expire = FALSE,
255+
msg_queue_size = 0
252256
WHERE recipient_id = p_recipient_id;
253257
END IF;
254258
RETURN;
@@ -259,6 +263,7 @@ BEGIN
259263
IF FOUND THEN
260264
UPDATE msg_queues
261265
SET msg_can_write = msg_can_write OR msg_queue_size <= 1,
266+
msg_queue_expire = msg_queue_size > 1,
262267
msg_queue_size = GREATEST(msg_queue_size - 1, 0)
263268
WHERE recipient_id = p_recipient_id;
264269
RETURN QUERY VALUES (msg.msg_id, msg.msg_ts, msg.msg_quota, msg.msg_ntf_flag, msg.msg_body);
@@ -293,7 +298,9 @@ BEGIN
293298
IF NOT FOUND THEN
294299
IF q_size != 0 THEN
295300
UPDATE msg_queues
296-
SET msg_can_write = TRUE, msg_queue_size = 0
301+
SET msg_can_write = TRUE,
302+
msg_queue_expire = FALSE,
303+
msg_queue_size = 0
297304
WHERE recipient_id = p_recipient_id;
298305
END IF;
299306
RETURN;
@@ -318,12 +325,15 @@ BEGIN
318325
IF msg_deleted THEN
319326
UPDATE msg_queues
320327
SET msg_can_write = msg_can_write OR msg_queue_size <= 1,
328+
msg_queue_expire = msg_queue_size > 1,
321329
msg_queue_size = GREATEST(msg_queue_size - 1, 0)
322330
WHERE recipient_id = p_recipient_id;
323331
END IF;
324332
ELSIF msg_deleted OR q_size != 0 THEN
325333
UPDATE msg_queues
326-
SET msg_can_write = TRUE, msg_queue_size = 0
334+
SET msg_can_write = TRUE,
335+
msg_queue_expire = FALSE,
336+
msg_queue_size = 0
327337
WHERE recipient_id = p_recipient_id;
328338
END IF;
329339
ELSE
@@ -336,7 +346,7 @@ CREATE FUNCTION delete_expired_msgs(p_recipient_id BYTEA, p_old_ts BIGINT) RETUR
336346
LANGUAGE plpgsql AS $$
337347
DECLARE
338348
q_size BIGINT;
339-
min_id BIGINT;
349+
keep_min_id BIGINT;
340350
del_count BIGINT;
341351
BEGIN
342352
SELECT msg_queue_size INTO q_size
@@ -348,21 +358,20 @@ BEGIN
348358
RETURN 0;
349359
END IF;
350360

351-
SELECT LEAST( -- ignores NULLs
352-
(SELECT MIN(message_id) FROM messages WHERE recipient_id = p_recipient_id AND msg_ts >= p_old_ts),
353-
(SELECT MIN(message_id) FROM messages WHERE recipient_id = p_recipient_id AND msg_quota = TRUE)
354-
) INTO min_id;
361+
SELECT MIN(message_id) INTO keep_min_id
362+
FROM messages WHERE recipient_id = p_recipient_id AND msg_ts >= p_old_ts AND msg_quota = FALSE;
355363

356-
IF min_id IS NULL THEN
357-
DELETE FROM messages WHERE recipient_id = p_recipient_id;
364+
IF keep_min_id IS NULL THEN
365+
DELETE FROM messages WHERE recipient_id = p_recipient_id AND msg_quota = FALSE;
358366
ELSE
359-
DELETE FROM messages WHERE recipient_id = p_recipient_id AND message_id < min_id;
367+
DELETE FROM messages WHERE recipient_id = p_recipient_id AND message_id < keep_min_id AND msg_quota = FALSE;
360368
END IF;
361369

362370
GET DIAGNOSTICS del_count = ROW_COUNT;
363371
IF del_count > 0 THEN
364372
UPDATE msg_queues
365373
SET msg_can_write = msg_can_write OR msg_queue_size <= del_count,
374+
msg_queue_expire = msg_queue_size > del_count AND keep_min_id IS NOT NULL,
366375
msg_queue_size = GREATEST(msg_queue_size - del_count, 0)
367376
WHERE recipient_id = p_recipient_id;
368377
END IF;
@@ -371,17 +380,15 @@ END;
371380
$$;
372381

373382
CREATE PROCEDURE expire_old_messages(
374-
p_now_ts BIGINT,
375-
p_ttl BIGINT,
383+
p_old_queue BIGINT,
384+
p_old_ts BIGINT,
376385
batch_size INT,
377386
OUT r_expired_msgs_count BIGINT,
378387
OUT r_stored_msgs_count BIGINT,
379388
OUT r_stored_queues BIGINT
380389
)
381390
LANGUAGE plpgsql AS $$
382391
DECLARE
383-
old_ts BIGINT := p_now_ts - p_ttl;
384-
very_old_ts BIGINT := p_now_ts - 2 * p_ttl - 86400;
385392
rids BYTEA[];
386393
rid BYTEA;
387394
last_rid BYTEA := '\x';
@@ -395,8 +402,8 @@ BEGIN
395402
SELECT recipient_id
396403
FROM msg_queues
397404
WHERE deleted_at IS NULL
398-
AND updated_at > very_old_ts
399-
AND msg_queue_size > 0
405+
AND updated_at > p_old_queue
406+
AND msg_queue_expire = TRUE
400407
AND recipient_id > last_rid
401408
ORDER BY recipient_id ASC
402409
LIMIT batch_size
@@ -407,7 +414,7 @@ BEGIN
407414
FOREACH rid IN ARRAY rids
408415
LOOP
409416
BEGIN
410-
del_count := delete_expired_msgs(rid, old_ts);
417+
del_count := delete_expired_msgs(rid, p_old_ts);
411418
total_deleted := total_deleted + del_count;
412419
EXCEPTION WHEN OTHERS THEN
413420
RAISE WARNING 'STORE, expire_old_messages, error expiring queue %: %', encode(rid, 'base64'), SQLERRM;
@@ -444,6 +451,7 @@ DROP INDEX idx_messages_recipient_id_msg_quota;
444451

445452
ALTER TABLE msg_queues
446453
DROP COLUMN msg_can_write,
454+
DROP COLUMN msg_queue_expire,
447455
DROP COLUMN msg_queue_size;
448456

449457
DROP TABLE messages;

src/Simplex/Messaging/Server/QueueStore/Postgres/server_schema.sql

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ CREATE FUNCTION smp_server.delete_expired_msgs(p_recipient_id bytea, p_old_ts bi
2121
AS $$
2222
DECLARE
2323
q_size BIGINT;
24-
min_id BIGINT;
24+
keep_min_id BIGINT;
2525
del_count BIGINT;
2626
BEGIN
2727
SELECT msg_queue_size INTO q_size
@@ -33,21 +33,20 @@ BEGIN
3333
RETURN 0;
3434
END IF;
3535

36-
SELECT LEAST( -- ignores NULLs
37-
(SELECT MIN(message_id) FROM messages WHERE recipient_id = p_recipient_id AND msg_ts >= p_old_ts),
38-
(SELECT MIN(message_id) FROM messages WHERE recipient_id = p_recipient_id AND msg_quota = TRUE)
39-
) INTO min_id;
36+
SELECT MIN(message_id) INTO keep_min_id
37+
FROM messages WHERE recipient_id = p_recipient_id AND msg_ts >= p_old_ts AND msg_quota = FALSE;
4038

41-
IF min_id IS NULL THEN
42-
DELETE FROM messages WHERE recipient_id = p_recipient_id;
39+
IF keep_min_id IS NULL THEN
40+
DELETE FROM messages WHERE recipient_id = p_recipient_id AND msg_quota = FALSE;
4341
ELSE
44-
DELETE FROM messages WHERE recipient_id = p_recipient_id AND message_id < min_id;
42+
DELETE FROM messages WHERE recipient_id = p_recipient_id AND message_id < keep_min_id AND msg_quota = FALSE;
4543
END IF;
4644

4745
GET DIAGNOSTICS del_count = ROW_COUNT;
4846
IF del_count > 0 THEN
4947
UPDATE msg_queues
5048
SET msg_can_write = msg_can_write OR msg_queue_size <= del_count,
49+
msg_queue_expire = msg_queue_size > del_count AND keep_min_id IS NOT NULL,
5150
msg_queue_size = GREATEST(msg_queue_size - del_count, 0)
5251
WHERE recipient_id = p_recipient_id;
5352
END IF;
@@ -57,12 +56,10 @@ $$;
5756

5857

5958

60-
CREATE PROCEDURE smp_server.expire_old_messages(IN p_now_ts bigint, IN p_ttl bigint, IN batch_size integer, OUT r_expired_msgs_count bigint, OUT r_stored_msgs_count bigint, OUT r_stored_queues bigint)
59+
CREATE PROCEDURE smp_server.expire_old_messages(IN p_old_queue bigint, IN p_old_ts bigint, IN batch_size integer, OUT r_expired_msgs_count bigint, OUT r_stored_msgs_count bigint, OUT r_stored_queues bigint)
6160
LANGUAGE plpgsql
6261
AS $$
6362
DECLARE
64-
old_ts BIGINT := p_now_ts - p_ttl;
65-
very_old_ts BIGINT := p_now_ts - 2 * p_ttl - 86400;
6663
rids BYTEA[];
6764
rid BYTEA;
6865
last_rid BYTEA := '\x';
@@ -76,8 +73,8 @@ BEGIN
7673
SELECT recipient_id
7774
FROM msg_queues
7875
WHERE deleted_at IS NULL
79-
AND updated_at > very_old_ts
80-
AND msg_queue_size > 0
76+
AND updated_at > p_old_queue
77+
AND msg_queue_expire = TRUE
8178
AND recipient_id > last_rid
8279
ORDER BY recipient_id ASC
8380
LIMIT batch_size
@@ -88,7 +85,7 @@ BEGIN
8885
FOREACH rid IN ARRAY rids
8986
LOOP
9087
BEGIN
91-
del_count := delete_expired_msgs(rid, old_ts);
88+
del_count := delete_expired_msgs(rid, p_old_ts);
9289
total_deleted := total_deleted + del_count;
9390
EXCEPTION WHEN OTHERS THEN
9491
RAISE WARNING 'STORE, expire_old_messages, error expiring queue %: %', encode(rid, 'base64'), SQLERRM;
@@ -132,7 +129,9 @@ BEGIN
132129
IF NOT FOUND THEN
133130
IF q_size != 0 THEN
134131
UPDATE msg_queues
135-
SET msg_can_write = TRUE, msg_queue_size = 0
132+
SET msg_can_write = TRUE,
133+
msg_queue_expire = FALSE,
134+
msg_queue_size = 0
136135
WHERE recipient_id = p_recipient_id;
137136
END IF;
138137
RETURN;
@@ -143,6 +142,7 @@ BEGIN
143142
IF FOUND THEN
144143
UPDATE msg_queues
145144
SET msg_can_write = msg_can_write OR msg_queue_size <= 1,
145+
msg_queue_expire = msg_queue_size > 1,
146146
msg_queue_size = GREATEST(msg_queue_size - 1, 0)
147147
WHERE recipient_id = p_recipient_id;
148148
RETURN QUERY VALUES (msg.msg_id, msg.msg_ts, msg.msg_quota, msg.msg_ntf_flag, msg.msg_body);
@@ -179,7 +179,9 @@ BEGIN
179179
IF NOT FOUND THEN
180180
IF q_size != 0 THEN
181181
UPDATE msg_queues
182-
SET msg_can_write = TRUE, msg_queue_size = 0
182+
SET msg_can_write = TRUE,
183+
msg_queue_expire = FALSE,
184+
msg_queue_size = 0
183185
WHERE recipient_id = p_recipient_id;
184186
END IF;
185187
RETURN;
@@ -204,12 +206,15 @@ BEGIN
204206
IF msg_deleted THEN
205207
UPDATE msg_queues
206208
SET msg_can_write = msg_can_write OR msg_queue_size <= 1,
209+
msg_queue_expire = msg_queue_size > 1,
207210
msg_queue_size = GREATEST(msg_queue_size - 1, 0)
208211
WHERE recipient_id = p_recipient_id;
209212
END IF;
210213
ELSIF msg_deleted OR q_size != 0 THEN
211214
UPDATE msg_queues
212-
SET msg_can_write = TRUE, msg_queue_size = 0
215+
SET msg_can_write = TRUE,
216+
msg_queue_expire = FALSE,
217+
msg_queue_size = 0
213218
WHERE recipient_id = p_recipient_id;
214219
END IF;
215220
ELSE
@@ -241,6 +246,7 @@ BEGIN
241246

242247
UPDATE msg_queues
243248
SET msg_can_write = NOT quota_written,
249+
msg_queue_expire = TRUE,
244250
msg_queue_size = msg_queue_size + 1
245251
WHERE recipient_id = p_recipient_id;
246252

@@ -303,6 +309,7 @@ CREATE TABLE smp_server.msg_queues (
303309
rcv_service_id bytea,
304310
ntf_service_id bytea,
305311
msg_can_write boolean DEFAULT true NOT NULL,
312+
msg_queue_expire boolean DEFAULT false NOT NULL,
306313
msg_queue_size bigint DEFAULT 0 NOT NULL
307314
);
308315

@@ -375,7 +382,7 @@ CREATE UNIQUE INDEX idx_msg_queues_sender_id ON smp_server.msg_queues USING btre
375382

376383

377384

378-
CREATE INDEX idx_msg_queues_updated_at_recipient_id ON smp_server.msg_queues USING btree (deleted_at, updated_at, msg_queue_size, recipient_id);
385+
CREATE INDEX idx_msg_queues_updated_at_recipient_id ON smp_server.msg_queues USING btree (deleted_at, updated_at, msg_queue_expire, recipient_id);
379386

380387

381388

0 commit comments

Comments
 (0)