Skip to content

Commit a4f049d

Browse files
agent: parameterize withWork, getWorkItem with StoreError; parameterized Binary for SQLite (#1617)
* agent: parameterize withWork StoreError * getWorkItem * export * binary * remove handleWrkErr AnyStoreError constraint * put AnyError in AnyStoreError constraint * move typeclass --------- Co-authored-by: Evgeny Poberezkin <[email protected]>
1 parent 8fea152 commit a4f049d

File tree

4 files changed

+29
-19
lines changed

4 files changed

+29
-19
lines changed

src/Simplex/Messaging/Agent/Client.hs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1974,19 +1974,20 @@ withWork :: AgentClient -> TMVar () -> (DB.Connection -> IO (Either StoreError (
19741974
withWork c doWork = withWork_ c doWork . withStore' c
19751975
{-# INLINE withWork #-}
19761976

1977-
withWork_ :: MonadIO m => AgentClient -> TMVar () -> ExceptT e m (Either StoreError (Maybe a)) -> (a -> ExceptT e m ()) -> ExceptT e m ()
1977+
withWork_ :: (AnyStoreError e', MonadIO m) => AgentClient -> TMVar () -> ExceptT e m (Either e' (Maybe a)) -> (a -> ExceptT e m ()) -> ExceptT e m ()
19781978
withWork_ c doWork getWork action =
19791979
getWork >>= \case
19801980
Right (Just r) -> action r
19811981
Right Nothing -> noWork
19821982
-- worker is stopped here (noWork) because the next iteration is likely to produce the same result
1983-
Left e@SEWorkItemError {} -> noWork >> notifyErr (CRITICAL False) e
1984-
Left e -> notifyErr INTERNAL e
1983+
Left e
1984+
| isWorkItemError e -> noWork >> notifyErr (CRITICAL False) e
1985+
| otherwise -> notifyErr INTERNAL e
19851986
where
19861987
noWork = liftIO $ noWorkToDo doWork
19871988
notifyErr err e = atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR $ err $ show e)
19881989

1989-
withWorkItems :: MonadIO m => AgentClient -> TMVar () -> ExceptT e m (Either StoreError [Either StoreError a]) -> (NonEmpty a -> ExceptT e m ()) -> ExceptT e m ()
1990+
withWorkItems :: (AnyStoreError e', MonadIO m) => AgentClient -> TMVar () -> ExceptT e m (Either e' [Either e' a]) -> (NonEmpty a -> ExceptT e m ()) -> ExceptT e m ()
19901991
withWorkItems c doWork getWork action = do
19911992
getWork >>= \case
19921993
Right [] -> noWork
@@ -1995,20 +1996,17 @@ withWorkItems c doWork getWork action = do
19951996
case L.nonEmpty items of
19961997
Just items' -> action items'
19971998
Nothing -> do
1998-
let criticalErr = find workItemError errs
1999+
let criticalErr = find isWorkItemError errs
19992000
forM_ criticalErr $ \err -> do
20002001
notifyErr (CRITICAL False) err
2001-
when (all workItemError errs) noWork
2002+
when (all isWorkItemError errs) noWork
20022003
unless (null errs) $
20032004
atomically $
20042005
writeTBQueue (subQ c) ("", "", AEvt SAENone $ ERRS $ map (\e -> ("", INTERNAL $ show e)) errs)
20052006
Left e
2006-
| workItemError e -> noWork >> notifyErr (CRITICAL False) e
2007+
| isWorkItemError e -> noWork >> notifyErr (CRITICAL False) e
20072008
| otherwise -> notifyErr INTERNAL e
20082009
where
2009-
workItemError = \case
2010-
SEWorkItemError {} -> True
2011-
_ -> False
20122010
noWork = liftIO $ noWorkToDo doWork
20132011
notifyErr err e = atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR $ err $ show e)
20142012

src/Simplex/Messaging/Agent/Store.hs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -693,10 +693,20 @@ data StoreError
693693
| -- | XFTP Deleted snd chunk replica not found.
694694
SEDeletedSndChunkReplicaNotFound
695695
| -- | Error when reading work item that suspends worker - do not use!
696-
SEWorkItemError ByteString
696+
SEWorkItemError {errContext :: String}
697697
| -- | Servers stats not found.
698698
SEServersStatsNotFound
699699
deriving (Eq, Show, Exception)
700700

701701
instance AnyError StoreError where
702702
fromSomeException = SEInternal . bshow
703+
704+
class (Show e, AnyError e) => AnyStoreError e where
705+
isWorkItemError :: e -> Bool
706+
mkWorkItemError :: String -> e
707+
708+
instance AnyStoreError StoreError where
709+
isWorkItemError = \case
710+
SEWorkItemError {} -> True
711+
_ -> False
712+
mkWorkItemError errContext = SEWorkItemError {errContext}

src/Simplex/Messaging/Agent/Store/AgentStore.hs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,8 @@ module Simplex.Messaging.Agent.Store.AgentStore
237237
firstRow',
238238
maybeFirstRow,
239239
fromOnlyBI,
240+
getWorkItem,
241+
getWorkItems,
240242
)
241243
where
242244

@@ -966,25 +968,25 @@ getPendingQueueMsg db connId SndQueue {dbQueueId} =
966968
_ -> Left $ SEInternal "unexpected snd msg data"
967969
markMsgFailed msgId = DB.execute db "UPDATE snd_message_deliveries SET failed = 1 WHERE conn_id = ? AND internal_id = ?" (connId, msgId)
968970

969-
getWorkItem :: Show i => ByteString -> IO (Maybe i) -> (i -> IO (Either StoreError a)) -> (i -> IO ()) -> IO (Either StoreError (Maybe a))
971+
getWorkItem :: (Show i, AnyStoreError e) => String -> IO (Maybe i) -> (i -> IO (Either e a)) -> (i -> IO ()) -> IO (Either e (Maybe a))
970972
getWorkItem itemName getId getItem markFailed =
971973
runExceptT $ handleWrkErr itemName "getId" getId >>= mapM (tryGetItem itemName getItem markFailed)
972974

973-
getWorkItems :: Show i => ByteString -> IO [i] -> (i -> IO (Either StoreError a)) -> (i -> IO ()) -> IO (Either StoreError [Either StoreError a])
975+
getWorkItems :: (Show i, AnyStoreError e) => String -> IO [i] -> (i -> IO (Either e a)) -> (i -> IO ()) -> IO (Either e [Either e a])
974976
getWorkItems itemName getIds getItem markFailed =
975977
runExceptT $ handleWrkErr itemName "getIds" getIds >>= mapM (tryE . tryGetItem itemName getItem markFailed)
976978

977-
tryGetItem :: Show i => ByteString -> (i -> IO (Either StoreError a)) -> (i -> IO ()) -> i -> ExceptT StoreError IO a
979+
tryGetItem :: (Show i, AnyStoreError e) => String -> (i -> IO (Either e a)) -> (i -> IO ()) -> i -> ExceptT e IO a
978980
tryGetItem itemName getItem markFailed itemId = ExceptT (getItem itemId) `catchAllErrors` \e -> mark >> throwE e
979981
where
980-
mark = handleWrkErr itemName ("markFailed ID " <> bshow itemId) $ markFailed itemId
982+
mark = handleWrkErr itemName ("markFailed ID " <> show itemId) $ markFailed itemId
981983

982984
-- Errors caught by this function will suspend worker as if there is no more work,
983-
handleWrkErr :: ByteString -> ByteString -> IO a -> ExceptT StoreError IO a
985+
handleWrkErr :: forall e a. AnyStoreError e => String -> String -> IO a -> ExceptT e IO a
984986
handleWrkErr itemName opName action = ExceptT $ first mkError <$> E.try action
985987
where
986-
mkError :: E.SomeException -> StoreError
987-
mkError e = SEWorkItemError $ itemName <> " " <> opName <> " error: " <> bshow e
988+
mkError :: E.SomeException -> e
989+
mkError e = mkWorkItemError $ itemName <> " " <> opName <> " error: " <> show e
988990

989991
updatePendingMsgRIState :: DB.Connection -> ConnId -> InternalId -> RI2State -> IO ()
990992
updatePendingMsgRIState db connId msgId RI2State {slowInterval, fastInterval} =

src/Simplex/Messaging/Agent/Store/SQLite/DB.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ import Simplex.Messaging.Util (diffToMicroseconds, tshow)
5252
newtype BoolInt = BI {unBI :: Bool}
5353
deriving newtype (FromField, ToField)
5454

55-
newtype Binary = Binary {fromBinary :: ByteString}
55+
newtype Binary a = Binary {fromBinary :: a}
5656
deriving newtype (FromField, ToField)
5757

5858
data Connection = Connection

0 commit comments

Comments
 (0)