Skip to content

Commit 2286726

Browse files
authored
smp server: start options maintenance and skip-warnings (#1465)
* smp server: start options `maintenance` and `skip-warnings` * ignore invalid parsing of the last lines * parsingErr * fix
1 parent 1b8110a commit 2286726

File tree

7 files changed

+118
-51
lines changed

7 files changed

+118
-51
lines changed

src/Simplex/Messaging/Server.hs

Lines changed: 57 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ import qualified Data.List.NonEmpty as L
6969
import qualified Data.Map.Strict as M
7070
import Data.Maybe (catMaybes, fromMaybe, isJust, isNothing)
7171
import Data.Semigroup (Sum (..))
72+
import Data.Text (Text)
7273
import qualified Data.Text as T
7374
import Data.Text.Encoding (decodeLatin1)
7475
import qualified Data.Text.IO as T
@@ -104,14 +105,15 @@ import Simplex.Messaging.Server.QueueStore
104105
import Simplex.Messaging.Server.QueueStore.QueueInfo
105106
import Simplex.Messaging.Server.QueueStore.STM
106107
import Simplex.Messaging.Server.Stats
108+
import Simplex.Messaging.Server.StoreLog (foldLogLines)
107109
import Simplex.Messaging.TMap (TMap)
108110
import qualified Simplex.Messaging.TMap as TM
109111
import Simplex.Messaging.Transport
110112
import Simplex.Messaging.Transport.Buffer (trimCR)
111113
import Simplex.Messaging.Transport.Server
112114
import Simplex.Messaging.Util
113115
import Simplex.Messaging.Version
114-
import System.Exit (exitFailure)
116+
import System.Exit (exitFailure, exitSuccess)
115117
import System.IO (hPrint, hPutStrLn, hSetNewlineMode, universalNewlineMode)
116118
import System.Mem.Weak (deRefWeak)
117119
import UnliftIO (timeout)
@@ -162,14 +164,18 @@ newMessageStats :: MessageStats
162164
newMessageStats = MessageStats 0 0 0
163165

164166
smpServer :: TMVar Bool -> ServerConfig -> Maybe AttachHTTP -> M ()
165-
smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHTTP_ = do
167+
smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOptions} attachHTTP_ = do
166168
s <- asks server
167169
pa <- asks proxyAgent
168-
msgStats_ <- processServerMessages
170+
msgStats_ <- processServerMessages startOptions
169171
ntfStats <- restoreServerNtfs
170172
liftIO $ mapM_ (printMessageStats "messages") msgStats_
171173
liftIO $ printMessageStats "notifications" ntfStats
172174
restoreServerStats msgStats_ ntfStats
175+
when (maintenance startOptions) $ do
176+
liftIO $ putStrLn "Server started in 'maintenance' mode, exiting"
177+
stopServer s
178+
liftIO $ exitSuccess
173179
raceAny_
174180
( serverThread s "server subscribedQ" subscribedQ subscribers subClients pendingSubEvents subscriptions cancelSub
175181
: serverThread s "server ntfSubscribedQ" ntfSubscribedQ Env.notifiers ntfSubClients pendingNtfSubEvents ntfSubscriptions (\_ -> pure ())
@@ -1816,16 +1822,16 @@ exportMessages tty ms f drainMsgs = do
18161822
exitFailure
18171823
encodeMessages rId = mconcat . map (\msg -> BLD.byteString (strEncode $ MLRv3 rId msg) <> BLD.char8 '\n')
18181824

1819-
processServerMessages :: M (Maybe MessageStats)
1820-
processServerMessages = do
1825+
processServerMessages :: StartOptions -> M (Maybe MessageStats)
1826+
processServerMessages StartOptions {skipWarnings} = do
18211827
old_ <- asks (messageExpiration . config) $>>= (liftIO . fmap Just . expireBeforeEpoch)
18221828
expire <- asks $ expireMessagesOnStart . config
18231829
asks msgStore >>= liftIO . processMessages old_ expire
18241830
where
18251831
processMessages :: Maybe Int64 -> Bool -> AMsgStore -> IO (Maybe MessageStats)
18261832
processMessages old_ expire = \case
18271833
AMS SMSMemory ms@STMMsgStore {storeConfig = STMStoreConfig {storePath}} -> case storePath of
1828-
Just f -> ifM (doesFileExist f) (Just <$> importMessages False ms f old_) (pure Nothing)
1834+
Just f -> ifM (doesFileExist f) (Just <$> importMessages False ms f old_ skipWarnings) (pure Nothing)
18291835
Nothing -> pure Nothing
18301836
AMS SMSJournal ms
18311837
| expire -> Just <$> case old_ of
@@ -1858,44 +1864,56 @@ processServerMessages = do
18581864
logError $ "STORE: processValidateQueue, failed opening message queue, " <> tshow e
18591865
exitFailure
18601866

1861-
-- TODO this function should be called after importing queues from store log
1862-
importMessages :: forall s. STMStoreClass s => Bool -> s -> FilePath -> Maybe Int64 -> IO MessageStats
1863-
importMessages tty ms f old_ = do
1867+
importMessages :: forall s. STMStoreClass s => Bool -> s -> FilePath -> Maybe Int64 -> Bool -> IO MessageStats
1868+
importMessages tty ms f old_ skipWarnings = do
18641869
logInfo $ "restoring messages from file " <> T.pack f
1865-
LB.readFile f >>= runExceptT . foldM restoreMsg (0, Nothing, (0, 0, M.empty)) . LB.lines >>= \case
1866-
Left e -> do
1867-
when tty $ putStrLn ""
1868-
logError . T.pack $ "error restoring messages: " <> e
1869-
liftIO exitFailure
1870-
Right (lineCount, _, (storedMsgsCount, expiredMsgsCount, overQuota)) -> do
1871-
putStrLn $ progress lineCount
1872-
renameFile f $ f <> ".bak"
1873-
mapM_ setOverQuota_ overQuota
1874-
logQueueStates ms
1875-
storedQueues <- M.size <$> readTVarIO (queues $ stmQueueStore ms)
1876-
pure MessageStats {storedMsgsCount, expiredMsgsCount, storedQueues}
1870+
(lineCount, _, (storedMsgsCount, expiredMsgsCount, overQuota)) <-
1871+
foldLogLines tty f restoreMsg (0, Nothing, (0, 0, M.empty))
1872+
putStrLn $ progress lineCount
1873+
renameFile f $ f <> ".bak"
1874+
mapM_ setOverQuota_ overQuota
1875+
logQueueStates ms
1876+
storedQueues <- M.size <$> readTVarIO (queues $ stmQueueStore ms)
1877+
pure MessageStats {storedMsgsCount, expiredMsgsCount, storedQueues}
18771878
where
18781879
progress i = "Processed " <> show i <> " lines"
1879-
restoreMsg :: (Int, Maybe (RecipientId, StoreQueue s), (Int, Int, M.Map RecipientId (StoreQueue s))) -> LB.ByteString -> ExceptT String IO (Int, Maybe (RecipientId, StoreQueue s), (Int, Int, M.Map RecipientId (StoreQueue s)))
1880-
restoreMsg (!i, q_, (!stored, !expired, !overQuota)) s' = do
1881-
when (tty && i `mod` 1000 == 0) $ liftIO $ putStr (progress i <> "\r") >> hFlush stdout
1882-
MLRv3 rId msg <- liftEither . first (msgErr "parsing") $ strDecode s
1883-
liftError show $ addToMsgQueue rId msg
1880+
restoreMsg :: (Int, Maybe (RecipientId, StoreQueue s), (Int, Int, M.Map RecipientId (StoreQueue s))) -> Bool -> ByteString -> IO (Int, Maybe (RecipientId, StoreQueue s), (Int, Int, M.Map RecipientId (StoreQueue s)))
1881+
restoreMsg (!i, q_, counts@(!stored, !expired, !overQuota)) eof s = do
1882+
when (tty && i `mod` 1000 == 0) $ putStr (progress i <> "\r") >> hFlush stdout
1883+
case strDecode s of
1884+
Right (MLRv3 rId msg) -> runExceptT (addToMsgQueue rId msg) >>= either (exitErr . tshow) pure
1885+
Left e
1886+
| eof -> warnOrExit (parsingErr e) $> (i + 1, q_, counts)
1887+
| otherwise -> exitErr $ parsingErr e
18841888
where
1885-
s = LB.toStrict s'
1889+
exitErr e = do
1890+
when tty $ putStrLn ""
1891+
logError $ "error restoring messages: " <> e
1892+
liftIO exitFailure
1893+
parsingErr :: String -> Text
1894+
parsingErr e = "parsing error (" <> T.pack e <> "): " <> safeDecodeUtf8 (B.take 100 s)
18861895
addToMsgQueue rId msg = do
1887-
q <- case q_ of
1896+
qOrErr <- case q_ of
18881897
-- to avoid lookup when restoring the next message to the same queue
1889-
Just (rId', q') | rId' == rId -> pure q'
1890-
_ -> ExceptT $ getQueue ms SRecipient rId
1898+
Just (rId', q') | rId' == rId -> pure $ Right q'
1899+
_ -> liftIO $ getQueue ms SRecipient rId
1900+
case qOrErr of
1901+
Right q -> addToQueue_ q rId msg
1902+
Left AUTH -> liftIO $ do
1903+
when tty $ putStrLn ""
1904+
warnOrExit $ "queue " <> safeDecodeUtf8 (encode $ unEntityId rId) <> " does not exist"
1905+
pure (i + 1, Nothing, counts)
1906+
Left e -> throwE e
1907+
addToQueue_ q rId msg =
18911908
(i + 1,Just (rId, q),) <$> case msg of
18921909
Message {msgTs}
18931910
| maybe True (systemSeconds msgTs >=) old_ -> do
18941911
writeMsg ms q False msg >>= \case
18951912
Just _ -> pure (stored + 1, expired, overQuota)
1896-
Nothing -> do
1913+
Nothing -> liftIO $ do
1914+
when tty $ putStrLn ""
18971915
logError $ decodeLatin1 $ "message queue " <> strEncode rId <> " is full, message not restored: " <> strEncode (messageId msg)
1898-
pure (stored, expired, overQuota)
1916+
pure counts
18991917
| otherwise -> pure (stored, expired + 1, overQuota)
19001918
MessageQuota {} ->
19011919
-- queue was over quota at some point,
@@ -1907,8 +1925,13 @@ importMessages tty ms f old_ = do
19071925
withPeekMsgQueue ms q "mergeQuotaMsgs" $ maybe (pure ()) $ \case
19081926
(mq, MessageQuota {}) -> tryDeleteMsg_ q mq False
19091927
_ -> pure ()
1910-
msgErr :: Show e => String -> e -> String
1911-
msgErr op e = op <> " error (" <> show e <> "): " <> B.unpack (B.take 100 s)
1928+
warnOrExit e
1929+
| skipWarnings = logWarn e'
1930+
| otherwise = do
1931+
logWarn $ e' <> ", start with --skip-warnings option to ignore this error"
1932+
exitFailure
1933+
where
1934+
e' = "warning restoring messages: " <> e
19121935

19131936
printMessageStats :: T.Text -> MessageStats -> IO ()
19141937
printMessageStats name MessageStats {storedMsgsCount, expiredMsgsCount, storedQueues} =

src/Simplex/Messaging/Server/Env/STM.hs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,13 @@ data ServerConfig = ServerConfig
116116
allowSMPProxy :: Bool, -- auth is the same with `newQueueBasicAuth`
117117
serverClientConcurrency :: Int,
118118
-- | server public information
119-
information :: Maybe ServerPublicInfo
119+
information :: Maybe ServerPublicInfo,
120+
startOptions :: StartOptions
121+
}
122+
123+
data StartOptions = StartOptions
124+
{ maintenance :: Bool,
125+
skipWarnings :: Bool
120126
}
121127

122128
defMsgExpirationDays :: Int64

src/Simplex/Messaging/Server/Main.hs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
7373
True -> exitError $ "Error: server is already initialized (" <> iniFile <> " exists).\nRun `" <> executableName <> " start`."
7474
_ -> initializeServer opts
7575
OnlineCert certOpts -> withIniFile $ \_ -> genOnline cfgPath certOpts
76-
Start -> withIniFile runServer
76+
Start opts -> withIniFile $ runServer opts
7777
Delete -> do
7878
confirmOrExit
7979
"WARNING: deleting the server will make all queues inaccessible, because the server identity (certificate fingerprint) will change.\nTHIS CANNOT BE UNDONE!"
@@ -107,7 +107,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
107107
"Messages not imported"
108108
ms <- newJournalMsgStore
109109
readQueueStore storeLogFile ms
110-
msgStats <- importMessages True ms storeMsgsFilePath Nothing -- no expiration
110+
msgStats <- importMessages True ms storeMsgsFilePath Nothing False -- no expiration
111111
putStrLn "Import completed"
112112
printMessageStats "Messages" msgStats
113113
putStrLn $ case readMsgStoreType ini of
@@ -322,7 +322,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
322322
<> (webDisabled <> "key: " <> T.pack httpsKeyFile <> "\n")
323323
where
324324
webDisabled = if disableWeb then "# " else ""
325-
runServer ini = do
325+
runServer startOptions ini = do
326326
hSetBuffering stdout LineBuffering
327327
hSetBuffering stderr LineBuffering
328328
fp <- checkSavedFingerprint cfgPath defaultX509Config
@@ -463,7 +463,8 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
463463
},
464464
allowSMPProxy = True,
465465
serverClientConcurrency = readIniDefault defaultProxyClientConcurrency "PROXY" "client_concurrency" ini,
466-
information = serverPublicInfo ini
466+
information = serverPublicInfo ini,
467+
startOptions
467468
}
468469
textToOwnServers :: Text -> [ByteString]
469470
textToOwnServers = map encodeUtf8 . T.words
@@ -635,7 +636,7 @@ printSourceCode = \case
635636
data CliCommand
636637
= Init InitOptions
637638
| OnlineCert CertOptions
638-
| Start
639+
| Start StartOptions
639640
| Delete
640641
| Journal JournalCmd
641642

@@ -669,7 +670,7 @@ cliCommandP cfgPath logPath iniFile =
669670
hsubparser
670671
( command "init" (info (Init <$> initP) (progDesc $ "Initialize server - creates " <> cfgPath <> " and " <> logPath <> " directories and configuration files"))
671672
<> command "cert" (info (OnlineCert <$> certOptionsP) (progDesc $ "Generate new online TLS server credentials (configuration: " <> iniFile <> ")"))
672-
<> command "start" (info (pure Start) (progDesc $ "Start server (configuration: " <> iniFile <> ")"))
673+
<> command "start" (info (Start <$> startOptionsP) (progDesc $ "Start server (configuration: " <> iniFile <> ")"))
673674
<> command "delete" (info (pure Delete) (progDesc "Delete configuration and log files"))
674675
<> command "journal" (info (Journal <$> journalCmdP) (progDesc "Import/export messages to/from journal storage"))
675676
)
@@ -811,6 +812,18 @@ cliCommandP cfgPath logPath iniFile =
811812
disableWeb,
812813
scripted
813814
}
815+
startOptionsP = do
816+
maintenance <-
817+
switch
818+
( long "maintenance"
819+
<> help "Do not start the server, only perform start and stop tasks"
820+
)
821+
skipWarnings <-
822+
switch
823+
( long "skip-warnings"
824+
<> help "Start the server with non-critical start warnings"
825+
)
826+
pure StartOptions {maintenance, skipWarnings}
814827
journalCmdP =
815828
hsubparser
816829
( command "import" (info (pure JCImport) (progDesc "Import message log file into a new journal storage"))

src/Simplex/Messaging/Server/QueueStore/STM.hs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ import Control.Monad.IO.Class
3838
import Control.Monad.Trans.Except
3939
import Data.Bitraversable (bimapM)
4040
import qualified Data.ByteString.Char8 as B
41-
import qualified Data.ByteString.Lazy.Char8 as LB
4241
import Data.Functor (($>))
4342
import qualified Data.Text as T
4443
import Data.Text.Encoding (decodeLatin1)
@@ -48,7 +47,8 @@ import Simplex.Messaging.Server.MsgStore.Types
4847
import Simplex.Messaging.Server.QueueStore
4948
import Simplex.Messaging.Server.StoreLog
5049
import qualified Simplex.Messaging.TMap as TM
51-
import Simplex.Messaging.Util (ifM, tshow, ($>>=), (<$$))
50+
import Simplex.Messaging.Util (ifM, safeDecodeUtf8, tshow, ($>>=), (<$$))
51+
import System.Exit (exitFailure)
5252
import System.IO
5353
import UnliftIO.STM
5454

@@ -196,12 +196,11 @@ withLog :: STMStoreClass s => String -> s -> (StoreLog 'WriteMode -> IO ()) -> I
196196
withLog name = withLog' name . storeLog . stmQueueStore
197197

198198
readQueueStore :: forall s. STMStoreClass s => FilePath -> s -> IO ()
199-
readQueueStore f st = withFile f ReadMode $ LB.hGetContents >=> mapM_ processLine . LB.lines
199+
readQueueStore f st = readLogLines False f processLine
200200
where
201-
processLine :: LB.ByteString -> IO ()
202-
processLine s' = either printError procLogRecord (strDecode s)
201+
processLine :: Bool -> B.ByteString -> IO ()
202+
processLine eof s = either printError procLogRecord (strDecode s)
203203
where
204-
s = LB.toStrict s'
205204
procLogRecord :: StoreLogRecord -> IO ()
206205
procLogRecord = \case
207206
CreateQueue rId q -> addQueue st rId q >>= qError rId "CreateQueue"
@@ -214,7 +213,11 @@ readQueueStore f st = withFile f ReadMode $ LB.hGetContents >=> mapM_ processLin
214213
DeleteNotifier qId -> withQueue qId "DeleteNotifier" $ deleteQueueNotifier st
215214
UpdateTime qId t -> withQueue qId "UpdateTime" $ \q -> updateQueueTime st q t
216215
printError :: String -> IO ()
217-
printError e = B.putStrLn $ "Error parsing log: " <> B.pack e <> " - " <> s
216+
printError e
217+
| eof = logWarn err
218+
| otherwise = logError err >> exitFailure
219+
where
220+
err = "Error parsing log: " <> T.pack e <> " - " <> safeDecodeUtf8 s
218221
withQueue :: forall a. RecipientId -> T.Text -> (StoreQueue s -> IO (Either ErrorType a)) -> IO ()
219222
withQueue qId op a = runExceptT go >>= qError qId op
220223
where

src/Simplex/Messaging/Server/StoreLog.hs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,16 @@ module Simplex.Messaging.Server.StoreLog
2828
logUpdateQueueTime,
2929
readWriteStoreLog,
3030
writeQueueStore,
31+
readLogLines,
32+
foldLogLines,
3133
)
3234
where
3335

3436
import Control.Applicative (optional, (<|>))
3537
import Control.Concurrent.STM
3638
import qualified Control.Exception as E
3739
import Control.Logger.Simple
40+
import Control.Monad (when)
3841
import qualified Data.Attoparsec.ByteString.Char8 as A
3942
import qualified Data.ByteString.Char8 as B
4043
import Data.Functor (($>))
@@ -254,3 +257,21 @@ writeQueueStore s st = readTVarIO qs >>= mapM_ writeQueue . M.assocs
254257
readTVarIO (queueRec' q) >>= \case
255258
Just q' -> logCreateQueue s rId q'
256259
Nothing -> atomically $ TM.delete rId qs
260+
261+
readLogLines :: Bool -> FilePath -> (Bool -> B.ByteString -> IO ()) -> IO ()
262+
readLogLines tty f action = foldLogLines tty f (const action) ()
263+
264+
foldLogLines :: Bool -> FilePath -> (a -> Bool -> B.ByteString -> IO a) -> a -> IO a
265+
foldLogLines tty f action initValue = do
266+
(count :: Int, acc) <- withFile f ReadMode $ \h -> ifM (hIsEOF h) (pure (0, initValue)) (loop h 0 initValue)
267+
putStrLn $ progress count
268+
pure acc
269+
where
270+
loop h i acc = do
271+
s <- B.hGetLine h
272+
eof <- hIsEOF h
273+
acc' <- action acc eof s
274+
let i' = i + 1
275+
when (tty && i' `mod` 100000 == 0) $ putStr (progress i' <> "\r") >> hFlush stdout
276+
if eof then pure (i', acc') else loop h i' acc'
277+
progress i = "Processed: " <> show i <> " lines"

tests/CoreTests/MsgStoreTests.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ testExportImportStore ms = do
222222
ms' <- newMsgStore cfg
223223
readWriteQueueStore testStoreLogFile ms' >>= closeStoreLog
224224
stats@MessageStats {storedMsgsCount = 5, expiredMsgsCount = 0, storedQueues = 2} <-
225-
importMessages False ms' testStoreMsgsFile Nothing
225+
importMessages False ms' testStoreMsgsFile Nothing False
226226
printMessageStats "Messages" stats
227227
length <$> listDirectory (msgQueueDirectory ms rId1) `shouldReturn` 2
228228
length <$> listDirectory (msgQueueDirectory ms rId2) `shouldReturn` 4 -- state file is backed up, 2 message files
@@ -231,7 +231,7 @@ testExportImportStore ms = do
231231
stmStore <- newMsgStore testSMTStoreConfig
232232
readWriteQueueStore testStoreLogFile stmStore >>= closeStoreLog
233233
MessageStats {storedMsgsCount = 5, expiredMsgsCount = 0, storedQueues = 2} <-
234-
importMessages False stmStore testStoreMsgsFile2 Nothing
234+
importMessages False stmStore testStoreMsgsFile2 Nothing False
235235
exportMessages False stmStore testStoreMsgsFile False
236236
(B.sort <$> B.readFile testStoreMsgsFile `shouldReturn`) =<< (B.sort <$> B.readFile (testStoreMsgsFile2 <> ".bak"))
237237

tests/SMPClient.hs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,8 @@ cfgMS msType =
163163
smpAgentCfg = defaultSMPClientAgentConfig {persistErrorInterval = 1}, -- seconds
164164
allowSMPProxy = False,
165165
serverClientConcurrency = 2,
166-
information = Nothing
166+
information = Nothing,
167+
startOptions = StartOptions {maintenance = False, skipWarnings = False}
167168
}
168169

169170
cfgV7 :: ServerConfig

0 commit comments

Comments
 (0)