Skip to content

Commit 7c21945

Browse files
authored
smp server: option to skip expiring messages on start, read queue state file end only (#1400)
1 parent ffecf20 commit 7c21945

File tree

6 files changed

+29
-11
lines changed

6 files changed

+29
-11
lines changed

src/Simplex/Messaging/Server.hs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1761,20 +1761,22 @@ exportMessages tty ms f drainMsgs = do
17611761
processServerMessages :: M MessageStats
17621762
processServerMessages = do
17631763
old_ <- asks (messageExpiration . config) $>>= (liftIO . fmap Just . expireBeforeEpoch)
1764-
asks msgStore >>= liftIO . processMessages old_
1764+
expire <- asks $ expireMessagesOnStart . config
1765+
asks msgStore >>= liftIO . processMessages old_ expire
17651766
where
1766-
processMessages :: Maybe Int64 -> AMsgStore -> IO MessageStats
1767-
processMessages old_ = \case
1767+
processMessages :: Maybe Int64 -> Bool -> AMsgStore -> IO MessageStats
1768+
processMessages old_ expire = \case
17681769
AMS SMSMemory ms@STMMsgStore {storeConfig = STMStoreConfig {storePath}} -> case storePath of
17691770
Just f -> ifM (doesFileExist f) (importMessages False ms f old_) (pure newMessageStats)
17701771
Nothing -> pure newMessageStats
1771-
AMS SMSJournal ms -> case old_ of
1772+
AMS SMSJournal ms | expire -> case old_ of
17721773
Just old -> do
17731774
logInfo "expiring journal store messages..."
17741775
withAllMsgQueues False ms $ \_ -> processExpireQueue old
17751776
Nothing -> do
17761777
logInfo "validating journal store messages..."
17771778
withAllMsgQueues False ms $ \_ -> processValidateQueue
1779+
AMS SMSJournal _ -> logWarn "skipping message expiration" $> newMessageStats
17781780
where
17791781
processExpireQueue old q =
17801782
runExceptT expireQueue >>= \case

src/Simplex/Messaging/Server/Env/STM.hs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ data ServerConfig = ServerConfig
7979
controlPortAdminAuth :: Maybe BasicAuth,
8080
-- | time after which the messages can be removed from the queues and check interval, seconds
8181
messageExpiration :: Maybe ExpirationConfig,
82+
expireMessagesOnStart :: Bool,
8283
-- | notification expiration interval (seconds)
8384
notificationExpiration :: ExpirationConfig,
8485
-- | time after which the socket with inactive client can be disconnected (without any messages or commands, incl. PING),
@@ -155,6 +156,9 @@ defaultMaxJournalMsgCount = 256
155156
defaultMsgQueueQuota :: Int
156157
defaultMsgQueueQuota = 128
157158

159+
defaultStateTailSize :: Int
160+
defaultStateTailSize = 512
161+
158162
data Env = Env
159163
{ config :: ServerConfig,
160164
serverActive :: TVar Bool,
@@ -284,7 +288,7 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, storeLogFile, msgSt
284288
AMSType SMSMemory -> AMS SMSMemory <$> newMsgStore STMStoreConfig {storePath = storeMsgsFile, quota = msgQueueQuota}
285289
AMSType SMSJournal -> case storeMsgsFile of
286290
Just storePath ->
287-
let cfg = JournalStoreConfig {storePath, quota = msgQueueQuota, pathParts = journalMsgStoreDepth, maxMsgCount = maxJournalMsgCount, maxStateLines = maxJournalStateLines}
291+
let cfg = JournalStoreConfig {storePath, quota = msgQueueQuota, pathParts = journalMsgStoreDepth, maxMsgCount = maxJournalMsgCount, maxStateLines = maxJournalStateLines, stateTailSize = defaultStateTailSize}
288292
in AMS SMSJournal <$> newMsgStore cfg
289293
Nothing -> putStrLn "Error: journal msg store require path in [STORE_LOG], restore_messages" >> exitFailure
290294
ntfStore <- NtfStore <$> TM.emptyIO

src/Simplex/Messaging/Server/Main.hs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
137137
doesFileExist iniFile >>= \case
138138
True -> readIniFile iniFile >>= either exitError a
139139
_ -> exitError $ "Error: server is not initialized (" <> iniFile <> " does not exist).\nRun `" <> executableName <> " init`."
140-
newJournalMsgStore = newMsgStore JournalStoreConfig {storePath = storeMsgsJournalDir, pathParts = journalMsgStoreDepth, quota = defaultMsgQueueQuota, maxMsgCount = defaultMaxJournalMsgCount, maxStateLines = defaultMaxJournalStateLines}
140+
newJournalMsgStore = newMsgStore JournalStoreConfig {storePath = storeMsgsJournalDir, pathParts = journalMsgStoreDepth, quota = defaultMsgQueueQuota, maxMsgCount = defaultMaxJournalMsgCount, maxStateLines = defaultMaxJournalStateLines, stateTailSize = defaultStateTailSize}
141141
iniFile = combine cfgPath "smp-server.ini"
142142
serverVersion = "SMP server v" <> simplexMQVersion
143143
defaultServerPorts = "5223,443"
@@ -238,6 +238,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
238238
<> ("restore_messages: " <> onOff enableStoreLog <> "\n\n")
239239
<> "# Messages and notifications expiration periods.\n"
240240
<> ("expire_messages_days: " <> tshow defMsgExpirationDays <> "\n")
241+
<> "expire_messages_on_start: on\n"
241242
<> ("expire_ntfs_hours: " <> tshow defNtfExpirationHours <> "\n\n")
242243
<> "# Log daily server statistics to CSV file\n"
243244
<> ("log_stats: " <> onOff logStats <> "\n\n")
@@ -403,6 +404,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
403404
defaultMessageExpiration
404405
{ ttl = 86400 * readIniDefault defMsgExpirationDays "STORE_LOG" "expire_messages_days" ini
405406
},
407+
expireMessagesOnStart = fromMaybe True $ iniOnOff "STORE_LOG" "expire_messages_on_start" ini,
406408
notificationExpiration =
407409
defaultNtfExpiration
408410
{ ttl = 3600 * readIniDefault defNtfExpirationHours "STORE_LOG" "expire_ntfs_hours" ini

src/Simplex/Messaging/Server/MsgStore/Journal.hs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ import qualified Data.Attoparsec.ByteString.Char8 as A
4545
import Data.Bitraversable (bimapM)
4646
import Data.ByteString.Char8 (ByteString)
4747
import qualified Data.ByteString.Char8 as B
48-
import qualified Data.ByteString.Lazy.Char8 as LB
4948
import Data.Functor (($>))
5049
import Data.Int (Int64)
5150
import Data.List (intercalate)
@@ -85,7 +84,8 @@ data JournalStoreConfig = JournalStoreConfig
8584
-- When this limit is reached, the file will be changed.
8685
-- This number should be set bigger than queue quota.
8786
maxMsgCount :: Int,
88-
maxStateLines :: Int
87+
maxStateLines :: Int,
88+
stateTailSize :: Int
8989
}
9090

9191
data JMQueue = JMQueue
@@ -530,7 +530,7 @@ readWriteQueueState JournalMsgStore {random, config} statePath =
530530
where
531531
tempBackup = statePath <> ".bak"
532532
readQueueState = do
533-
ls <- LB.lines <$> LB.readFile statePath
533+
ls <- B.lines <$> readFileTail
534534
case ls of
535535
[] -> writeNewQueueState
536536
_ -> do
@@ -541,7 +541,7 @@ readWriteQueueState JournalMsgStore {random, config} statePath =
541541
logWarn $ "STORE: readWriteQueueState, empty queue state - initialized, " <> T.pack statePath
542542
st <- newMsgQueueState <$> newJournalId random
543543
writeQueueState st
544-
useLastLine len isLastLine ls = case strDecode $ LB.toStrict $ last ls of
544+
useLastLine len isLastLine ls = case strDecode $ last ls of
545545
Right st
546546
| len > maxStateLines config || not isLastLine ->
547547
backupWriteQueueState st
@@ -571,6 +571,14 @@ readWriteQueueState JournalMsgStore {random, config} statePath =
571571
sh <- openFile statePath AppendMode
572572
closeOnException sh $ appendState sh st
573573
pure (st, sh)
574+
readFileTail =
575+
IO.withFile statePath ReadMode $ \h -> do
576+
size <- IO.hFileSize h
577+
let sz = stateTailSize config
578+
sz' = fromIntegral sz
579+
if size > sz'
580+
then IO.hSeek h AbsoluteSeek (size - sz') >> B.hGet h sz
581+
else B.hGet h (fromIntegral size)
574582

575583
validQueueState :: MsgQueueState -> Bool
576584
validQueueState MsgQueueState {readState = rs, writeState = ws, size}

tests/CoreTests/MsgStoreTests.hs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ testJournalStoreCfg =
6363
pathParts = journalMsgStoreDepth,
6464
quota = 3,
6565
maxMsgCount = 4,
66-
maxStateLines = 2
66+
maxStateLines = 2,
67+
stateTailSize = 256
6768
}
6869

6970
mkMessage :: MonadIO m => ByteString -> m Message

tests/SMPClient.hs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ cfgMS msType =
132132
controlPortUserAuth = Nothing,
133133
controlPortAdminAuth = Nothing,
134134
messageExpiration = Just defaultMessageExpiration,
135+
expireMessagesOnStart = True,
135136
notificationExpiration = defaultNtfExpiration,
136137
inactiveClientExpiration = Just defaultInactiveClientExpiration,
137138
logStatsInterval = Nothing,

0 commit comments

Comments
 (0)