@@ -4,7 +4,6 @@ module Network.AMQP.Extended
44 ( RabbitMqHooks (.. ),
55 RabbitMqAdminOpts (.. ),
66 AmqpEndpoint (.. ),
7- withConnection ,
87 openConnectionWithRetries ,
98 mkRabbitMqAdminClientEnv ,
109 mkRabbitMqAdminClientEnvWithCreds ,
@@ -17,7 +16,7 @@ module Network.AMQP.Extended
1716 )
1817where
1918
20- import Control.Exception (throwIO )
19+ import Control.Exception (AsyncException , throwIO )
2120import Control.Monad.Catch
2221import Control.Monad.Trans.Control
2322import Control.Monad.Trans.Maybe
@@ -37,7 +36,7 @@ import Network.HTTP.Client.TLS qualified as HTTP
3736import Network.RabbitMqAdmin
3837import Network.TLS
3938import Network.TLS.Extra.Cipher
40- import Servant
39+ import Servant hiding ( Handler )
4140import Servant.Client
4241import Servant.Client qualified as Servant
4342import System.Logger (Logger )
@@ -154,38 +153,6 @@ data RabbitMqConnectionError = RabbitMqConnectionFailed String
154153
155154instance Exception RabbitMqConnectionError
156155
157- -- | Connects with RabbitMQ and opens a channel.
158- withConnection ::
159- forall m a .
160- (MonadIO m , MonadMask m ) =>
161- Logger ->
162- AmqpEndpoint ->
163- Maybe Text ->
164- (Q. Connection -> m a ) ->
165- m a
166- withConnection l AmqpEndpoint {.. } connName k = do
167- -- Jittered exponential backoff with 1ms as starting delay and 1s as total
168- -- wait time.
169- let policy = limitRetriesByCumulativeDelay 1_000_000 $ fullJitterBackoff 1000
170- logError willRetry e retryStatus = do
171- Log. err l $
172- Log. msg (Log. val " Failed to connect to RabbitMQ" )
173- . Log. field " error" (displayException @ SomeException e)
174- . Log. field " willRetry" willRetry
175- . Log. field " retryCount" retryStatus. rsIterNumber
176- getConn =
177- recovering
178- policy
179- ( skipAsyncExceptions
180- <> [logRetries (const $ pure True ) logError]
181- )
182- ( const $ do
183- Log. info l $ Log. msg (Log. val " Trying to connect to RabbitMQ" )
184- connOpts <- mkConnectionOpts AmqpEndpoint {.. } connName
185- liftIO $ Q. openConnection'' connOpts
186- )
187- bracket getConn (liftIO . Q. closeConnection) k
188-
189156mkConnectionOpts :: (MonadIO m ) => AmqpEndpoint -> Maybe Text -> m Q. ConnectionOpts
190157mkConnectionOpts AmqpEndpoint {.. } name = do
191158 mTlsSettings <- traverse (liftIO . (mkTLSSettings host)) tls
@@ -225,17 +192,21 @@ openConnectionWithRetries l AmqpEndpoint {..} connName hooks = do
225192 . Log. field " error" (displayException @ SomeException e)
226193 . Log. field " willRetry" willRetry
227194 . Log. field " retryCount" retryStatus. rsIterNumber
228- getConn =
229- recovering
230- policy
231- ( skipAsyncExceptions
232- <> [logRetries (const $ pure True ) logError]
233- )
234- ( const $ do
235- Log. info l $ Log. msg (Log. val " Trying to connect to RabbitMQ" )
236- connOpts <- mkConnectionOpts AmqpEndpoint {.. } connName
237- liftIO $ Q. openConnection'' connOpts
238- )
195+ getConn = do
196+ Log. info l $ Log. msg (Log. val " About to enter recovering..." )
197+ conn <-
198+ recovering
199+ policy
200+ ( logAndSkipAsyncExceptions l
201+ <> [logRetries (const $ pure True ) logError]
202+ )
203+ ( const $ do
204+ Log. info l $ Log. msg (Log. val " Trying to connect to RabbitMQ" )
205+ connOpts <- mkConnectionOpts AmqpEndpoint {.. } connName
206+ liftIO $ Q. openConnection'' connOpts
207+ )
208+ Log. info l $ Log. msg (Log. val " Retrieved connection..." )
209+ pure conn
239210 bracket getConn (liftIO . Q. closeConnection) $ \ conn -> do
240211 liftBaseWith $ \ runInIO ->
241212 Q. addConnectionClosedHandler conn True $ void $ runInIO $ do
@@ -267,6 +238,20 @@ openConnectionWithRetries l AmqpEndpoint {..} connName hooks = do
267238 logException l " RabbitMQ channel closed" e
268239 openChan conn
269240
241+ -- | List of pre-made handlers that will skip retries on
242+ -- 'AsyncException' and 'SomeAsyncException' and log them.
243+ -- See also `Control.Retry.skipAsyncExceptions`
244+ logAndSkipAsyncExceptions :: (MonadIO m ) => Logger -> [RetryStatus -> Control.Monad.Catch. Handler m Bool ]
245+ logAndSkipAsyncExceptions l = handlers
246+ where
247+ asyncH _ = Handler $ \ (e :: AsyncException ) -> do
248+ logException l " AsyncException caught" (SomeException e)
249+ pure False
250+ someAsyncH _ = Handler $ \ (e :: SomeAsyncException ) -> do
251+ logException l " SomeAsyncException caught" (SomeException e)
252+ pure False
253+ handlers = [asyncH, someAsyncH]
254+
270255mkTLSSettings :: HostName -> RabbitMqTlsOpts -> IO TLSSettings
271256mkTLSSettings host opts = do
272257 setCAStore <- runMaybeT $ do
0 commit comments