Skip to content

Commit 6c66cf3

Browse files
authored
smp server: set message counts correctly after import (#1632)
1 parent a137d01 commit 6c66cf3

File tree

2 files changed

+61
-6
lines changed

2 files changed

+61
-6
lines changed

src/Simplex/Messaging/Server/Main.hs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ import Simplex.Messaging.Agent.Store.Postgres (checkSchemaExists)
7474
import Simplex.Messaging.Server.MsgStore.Journal (JournalQueue)
7575
import Simplex.Messaging.Server.MsgStore.Types (QSType (..))
7676
import Simplex.Messaging.Server.MsgStore.Journal (postgresQueueStore)
77-
import Simplex.Messaging.Server.MsgStore.Postgres (PostgresMsgStoreCfg (..), batchInsertMessages, exportDbMessages)
77+
import Simplex.Messaging.Server.MsgStore.Postgres
7878
import Simplex.Messaging.Server.QueueStore.Postgres (batchInsertQueues, batchInsertServices, foldQueueRecs, foldServiceRecs)
7979
import Simplex.Messaging.Server.QueueStore.STM (STMQueueStore (..))
8080
import Simplex.Messaging.Server.QueueStore.Types
@@ -653,9 +653,16 @@ importMessagesToDatabase :: FilePath -> DBOpts -> IO Int64
653653
importMessagesToDatabase msgsLogFile dbOpts = do
654654
let storeCfg = PostgresStoreCfg {dbOpts, dbStoreLogPath = Nothing, confirmMigrations = MCConsole, deletedTTL = 86400 * defaultDeletedTTL}
655655
ms <- newMsgStore $ PostgresMsgStoreCfg storeCfg defaultMsgQueueQuota
656-
mCnt <- batchInsertMessages True msgsLogFile $ queueStore ms
656+
mCnt <- getDbMessageCount ms
657+
when (mCnt > 0) $ do
658+
confirmOrExit ("WARNING: the database contains messages, they will be deleted.") "Message records not imported"
659+
deleteAllMessages ms
660+
inserted <- batchInsertMessages True msgsLogFile $ queueStore ms
661+
mCnt' <- getDbMessageCount ms
662+
unless (inserted == mCnt') $ putStrLn $ "WARNING: inserted " <> show inserted <> " rows, table has " <> show mCnt' <> " messages."
663+
updateQueueCounts ms
657664
renameFile msgsLogFile $ msgsLogFile <> ".bak"
658-
pure mCnt
665+
pure mCnt'
659666

660667
exportDatabaseToStoreLog :: FilePath -> DBOpts -> FilePath -> IO (Int, Int)
661668
exportDatabaseToStoreLog logPath dbOpts storeLogFilePath = do

src/Simplex/Messaging/Server/MsgStore/Postgres.hs

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@ module Simplex.Messaging.Server.MsgStore.Postgres
1919
PostgresQueue,
2020
exportDbMessages,
2121
getDbMessageStats,
22+
getDbMessageCount,
23+
deleteAllMessages,
2224
batchInsertMessages,
25+
updateQueueCounts,
2326
)
2427
where
2528

@@ -282,7 +285,54 @@ getDbMessageStats ms =
282285
toMessageStats (storedQueues, storedMsgsCount) =
283286
MessageStats {storedQueues, storedMsgsCount, expiredMsgsCount = 0}
284287

285-
-- TODO [messages] update counts
288+
getDbMessageCount :: PostgresMsgStore -> IO Int64
289+
getDbMessageCount ms =
290+
maybeFirstRow' 0 fromOnly $
291+
withConnection (dbStore $ queueStore_ ms) (`DB.query_` "SELECT COUNT(*) FROM messages")
292+
293+
deleteAllMessages :: PostgresMsgStore -> IO ()
294+
deleteAllMessages ms =
295+
withConnection (dbStore $ queueStore_ ms) $ \db -> do
296+
void $ DB.execute_ db "TRUNCATE messages"
297+
void $ DB.execute_
298+
db
299+
[sql|
300+
UPDATE msg_queues
301+
SET msg_queue_size = 0, msg_can_write = TRUE
302+
WHERE msg_queue_size != 0 OR msg_can_write = FALSE
303+
|]
304+
305+
updateQueueCounts :: PostgresMsgStore -> IO ()
306+
updateQueueCounts ms =
307+
withConnection (dbStore $ queueStore_ ms) $ \db -> do
308+
void $ DB.execute_
309+
db
310+
[sql|
311+
CREATE TEMP TABLE queue_stats AS
312+
SELECT recipient_id,
313+
COUNT(*) AS size,
314+
BOOL_OR(msg_quota) AS has_quota
315+
FROM messages
316+
GROUP BY recipient_id
317+
|]
318+
void $ DB.execute_
319+
db
320+
[sql|
321+
UPDATE msg_queues
322+
SET msg_queue_size = 0, msg_can_write = TRUE
323+
WHERE msg_queue_size != 0 OR msg_can_write = FALSE
324+
|]
325+
void $ DB.execute_
326+
db
327+
[sql|
328+
UPDATE msg_queues q
329+
SET msg_queue_size = s.size,
330+
msg_can_write = NOT s.has_quota
331+
FROM queue_stats s
332+
WHERE q.recipient_id = s.recipient_id
333+
|]
334+
void $ DB.execute_ db "DROP TABLE queue_stats"
335+
286336
batchInsertMessages :: StoreQueueClass q => Bool -> FilePath -> PostgresQueueStore q -> IO Int64
287337
batchInsertMessages tty f toStore = do
288338
putStrLn "Importing messages..."
@@ -296,8 +346,6 @@ batchInsertMessages tty f toStore = do
296346
FROM STDIN WITH (FORMAT CSV)
297347
|]
298348
foldLogLines tty f (putMessage db) (0 :: Int, 0) >>= (DB.putCopyEnd db $>)
299-
Only mCnt : _ <- withTransaction st (`DB.query_` "SELECT count(*) FROM messages")
300-
unless (inserted == mCnt) $ putStrLn $ "WARNING: inserted " <> show inserted <> " rows, table has " <> show mCnt <> " messages."
301349
pure inserted
302350
where
303351
putMessage db (!i, !cnt) _eof s = do

0 commit comments

Comments
 (0)