@@ -183,6 +183,9 @@ CREATE INDEX idx_messages_recipient_id_message_id ON messages (recipient_id, mes
183183CREATE INDEX idx_messages_recipient_id_msg_ts on messages(recipient_id, msg_ts);
184184CREATE INDEX idx_messages_recipient_id_msg_quota on messages(recipient_id, msg_quota);
185185
186+ DROP INDEX idx_msg_queues_updated_at;
187+ CREATE INDEX idx_msg_queues_updated_at_recipient_id ON msg_queues (deleted_at, updated_at, msg_queue_size, recipient_id);
188+
186189CREATE FUNCTION write_message(
187190 p_recipient_id BYTEA,
188191 p_msg_id BYTEA,
@@ -232,18 +235,33 @@ BEGIN
232235 WHERE recipient_id = p_recipient_id AND deleted_at IS NULL
233236 FOR UPDATE;
234237
235- IF FOUND THEN
236- SELECT message_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body INTO msg
237- FROM messages
238- WHERE recipient_id = p_recipient_id
239- ORDER BY message_id ASC LIMIT 1;
238+ IF NOT FOUND THEN
239+ RETURN;
240+ END IF;
240241
241- IF FOUND AND msg.msg_id = p_msg_id THEN
242- DELETE FROM messages WHERE message_id = msg.message_id;
243- IF FOUND THEN
244- CALL dec_msg_count(p_recipient_id, q_size, 1);
245- RETURN QUERY VALUES (msg.msg_id, msg.msg_ts, msg.msg_quota, msg.msg_ntf_flag, msg.msg_body);
246- END IF;
242+ SELECT message_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body
243+ INTO msg
244+ FROM messages
245+ WHERE recipient_id = p_recipient_id
246+ ORDER BY message_id ASC LIMIT 1;
247+
248+ IF NOT FOUND THEN
249+ IF q_size != 0 THEN
250+ UPDATE msg_queues
251+ SET msg_can_write = TRUE, msg_queue_size = 0
252+ WHERE recipient_id = p_recipient_id;
253+ END IF;
254+ RETURN;
255+ END IF;
256+
257+ IF msg.msg_id = p_msg_id THEN
258+ DELETE FROM messages WHERE message_id = msg.message_id;
259+ IF FOUND THEN
260+ UPDATE msg_queues
261+ SET msg_can_write = msg_can_write OR msg_queue_size <= 1,
262+ msg_queue_size = GREATEST(msg_queue_size - 1, 0)
263+ WHERE recipient_id = p_recipient_id;
264+ RETURN QUERY VALUES (msg.msg_id, msg.msg_ts, msg.msg_quota, msg.msg_ntf_flag, msg.msg_body);
247265 END IF;
248266 END IF;
249267END;
@@ -255,37 +273,61 @@ LANGUAGE plpgsql AS $$
255273DECLARE
256274 q_size BIGINT;
257275 msg RECORD;
276+ msg_deleted BOOLEAN;
258277BEGIN
259278 SELECT msg_queue_size INTO q_size
260279 FROM msg_queues
261280 WHERE recipient_id = p_recipient_id AND deleted_at IS NULL
262281 FOR UPDATE;
263282
264- IF FOUND THEN
265- SELECT message_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body INTO msg
283+ IF NOT FOUND THEN
284+ RETURN;
285+ END IF;
286+
287+ SELECT message_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body
288+ INTO msg
289+ FROM messages
290+ WHERE recipient_id = p_recipient_id
291+ ORDER BY message_id ASC LIMIT 1;
292+
293+ IF NOT FOUND THEN
294+ IF q_size != 0 THEN
295+ UPDATE msg_queues
296+ SET msg_can_write = TRUE, msg_queue_size = 0
297+ WHERE recipient_id = p_recipient_id;
298+ END IF;
299+ RETURN;
300+ END IF;
301+
302+ IF msg.msg_id = p_msg_id THEN
303+ DELETE FROM messages WHERE message_id = msg.message_id;
304+
305+ msg_deleted := FOUND;
306+ IF msg_deleted THEN
307+ RETURN QUERY VALUES (msg.msg_id, msg.msg_ts, msg.msg_quota, msg.msg_ntf_flag, msg.msg_body);
308+ END IF;
309+
310+ SELECT msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body
311+ INTO msg
266312 FROM messages
267313 WHERE recipient_id = p_recipient_id
268314 ORDER BY message_id ASC LIMIT 1;
269315
270316 IF FOUND THEN
271- IF msg.msg_id = p_msg_id THEN
272- DELETE FROM messages WHERE message_id = msg.message_id;
273-
274- IF FOUND THEN
275- CALL dec_msg_count(p_recipient_id, q_size, 1);
276- RETURN QUERY VALUES (msg.msg_id, msg.msg_ts, msg.msg_quota, msg.msg_ntf_flag, msg.msg_body);
277- END IF;
278-
279- RETURN QUERY (
280- SELECT msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body
281- FROM messages
282- WHERE recipient_id = p_recipient_id
283- ORDER BY message_id ASC LIMIT 1
284- );
285- ELSE
286- RETURN QUERY VALUES (msg.msg_id, msg.msg_ts, msg.msg_quota, msg.msg_ntf_flag, msg.msg_body);
317+ RETURN QUERY VALUES (msg.msg_id, msg.msg_ts, msg.msg_quota, msg.msg_ntf_flag, msg.msg_body);
318+ IF msg_deleted THEN
319+ UPDATE msg_queues
320+ SET msg_can_write = msg_can_write OR msg_queue_size <= 1,
321+ msg_queue_size = GREATEST(msg_queue_size - 1, 0)
322+ WHERE recipient_id = p_recipient_id;
287323 END IF;
324+ ELSIF msg_deleted OR q_size != 0 THEN
325+ UPDATE msg_queues
326+ SET msg_can_write = TRUE, msg_queue_size = 0
327+ WHERE recipient_id = p_recipient_id;
288328 END IF;
329+ ELSE
330+ RETURN QUERY VALUES (msg.msg_id, msg.msg_ts, msg.msg_quota, msg.msg_ntf_flag, msg.msg_body);
289331 END IF;
290332END;
291333$$;
@@ -319,7 +361,10 @@ BEGIN
319361
320362 GET DIAGNOSTICS del_count = ROW_COUNT;
321363 IF del_count > 0 THEN
322- CALL dec_msg_count(p_recipient_id, q_size, del_count);
364+ UPDATE msg_queues
365+ SET msg_can_write = msg_can_write OR msg_queue_size <= del_count,
366+ msg_queue_size = GREATEST(msg_queue_size - del_count, 0)
367+ WHERE recipient_id = p_recipient_id;
323368 END IF;
324369 RETURN del_count;
325370END;
328373CREATE PROCEDURE expire_old_messages(
329374 p_now_ts BIGINT,
330375 p_ttl BIGINT,
376+ batch_size INT,
331377 OUT r_expired_msgs_count BIGINT,
332378 OUT r_stored_msgs_count BIGINT,
333379 OUT r_stored_queues BIGINT
@@ -336,42 +382,46 @@ LANGUAGE plpgsql AS $$
336382DECLARE
337383 old_ts BIGINT := p_now_ts - p_ttl;
338384 very_old_ts BIGINT := p_now_ts - 2 * p_ttl - 86400;
385+ rids BYTEA[];
339386 rid BYTEA;
340- min_id BIGINT;
341- q_size BIGINT;
387+ last_rid BYTEA := '\x';
342388 del_count BIGINT;
343389 total_deleted BIGINT := 0;
344390BEGIN
345- FOR rid IN
346- SELECT recipient_id
347- FROM msg_queues
348- WHERE deleted_at IS NULL AND updated_at > very_old_ts
349391 LOOP
350- BEGIN -- sub-transaction for each queue
351- del_count := delete_expired_msgs(rid, old_ts);
352- total_deleted := total_deleted + del_count;
353- EXCEPTION WHEN OTHERS THEN
354- ROLLBACK;
355- RAISE WARNING 'STORE, expire_old_messages, error expiring queue %: %', encode(rid, 'base64'), SQLERRM;
356- CONTINUE;
357- END;
358- COMMIT;
392+ SELECT array_agg(recipient_id)
393+ INTO rids
394+ FROM (
395+ SELECT recipient_id
396+ FROM msg_queues
397+ WHERE deleted_at IS NULL
398+ AND updated_at > very_old_ts
399+ AND msg_queue_size > 0
400+ AND recipient_id > last_rid
401+ ORDER BY recipient_id ASC
402+ LIMIT batch_size
403+ ) qs;
404+
405+ EXIT WHEN rids IS NULL OR cardinality(rids) = 0;
406+
407+ FOREACH rid IN ARRAY rids
408+ LOOP
409+ BEGIN
410+ del_count := delete_expired_msgs(rid, old_ts);
411+ total_deleted := total_deleted + del_count;
412+ EXCEPTION WHEN OTHERS THEN
413+ RAISE WARNING 'STORE, expire_old_messages, error expiring queue %: %', encode(rid, 'base64'), SQLERRM;
414+ CONTINUE;
415+ END;
416+ COMMIT;
417+ END LOOP;
418+ last_rid := rids[cardinality(rids)];
359419 END LOOP;
360420
361421 r_expired_msgs_count := total_deleted;
362422 r_stored_msgs_count := (SELECT COUNT(1) FROM messages);
363423 r_stored_queues := (SELECT COUNT(1) FROM msg_queues WHERE deleted_at IS NULL);
364424END;
365- $$;
366-
367- CREATE PROCEDURE dec_msg_count(p_recipient_id BYTEA, p_size BIGINT, p_change BIGINT)
368- LANGUAGE plpgsql AS $$
369- BEGIN
370- UPDATE msg_queues
371- SET msg_can_write = msg_can_write OR p_size <= p_change,
372- msg_queue_size = GREATEST(p_size - p_change, 0)
373- WHERE recipient_id = p_recipient_id;
374- END;
375425$$;
376426 |]
377427
@@ -384,7 +434,9 @@ DROP FUNCTION try_del_msg;
384434DROP FUNCTION try_del_peek_msg;
385435DROP FUNCTION delete_expired_msgs;
386436DROP PROCEDURE expire_old_messages;
387- DROP PROCEDURE dec_msg_count;
437+
438+ DROP INDEX idx_msg_queues_updated_at_recipient_id;
439+ CREATE INDEX idx_msg_queues_updated_at ON msg_queues (deleted_at, updated_at);
388440
389441DROP INDEX idx_messages_recipient_id_message_id;
390442DROP INDEX idx_messages_recipient_id_msg_ts;
0 commit comments