22{-# LANGUAGE NamedFieldPuns #-}
33{-# LANGUAGE OverloadedStrings #-}
44{-# LANGUAGE QuasiQuotes #-}
5+ {-# LANGUAGE ScopedTypeVariables #-}
56{-# LANGUAGE TupleSections #-}
67
78module Simplex.Messaging.Agent.Store.Postgres.Common
1920
2021import Control.Concurrent.MVar
2122import Control.Concurrent.STM
22- import Control.Exception ( bracket )
23+ import qualified Control.Exception as E
2324import Data.ByteString (ByteString )
2425import qualified Database.PostgreSQL.Simple as PSQL
2526import Numeric.Natural (Natural )
@@ -32,11 +33,7 @@ data DBStore = DBStore
3233 dbPoolSize :: Int ,
3334 dbPriorityPool :: DBStorePool ,
3435 dbPool :: DBStorePool ,
35- -- dbPoolSize :: Int,
36- -- dbPool :: TBQueue PSQL.Connection,
37- -- -- MVar is needed for fair pool distribution, without STM retry contention.
38- -- -- Only one thread can be blocked on STM read.
39- -- dbSem :: MVar (),
36+ dbConnect :: IO PSQL. Connection ,
4037 dbClosed :: TVar Bool ,
4138 dbNew :: Bool
4239 }
@@ -55,15 +52,23 @@ data DBStorePool = DBStorePool
5552 }
5653
5754withConnectionPriority :: DBStore -> Bool -> (PSQL. Connection -> IO a ) -> IO a
58- withConnectionPriority DBStore {dbPriorityPool, dbPool} priority =
59- withConnectionPool $ if priority then dbPriorityPool else dbPool
55+ withConnectionPriority DBStore {dbPriorityPool, dbPool, dbConnect } priority =
56+ withConnectionPool ( if priority then dbPriorityPool else dbPool) dbConnect
6057{-# INLINE withConnectionPriority #-}
6158
62- withConnectionPool :: DBStorePool -> (PSQL. Connection -> IO a ) -> IO a
63- withConnectionPool DBStorePool {dbPoolConns, dbSem} =
64- bracket
65- (withMVar dbSem $ \ _ -> atomically $ readTBQueue dbPoolConns)
66- (atomically . writeTBQueue dbPoolConns)
59+ withConnectionPool :: DBStorePool -> IO PSQL. Connection -> (PSQL. Connection -> IO a ) -> IO a
60+ withConnectionPool DBStorePool {dbPoolConns, dbSem} dbConnect action =
61+ E. mask $ \ restore -> do
62+ conn <- withMVar dbSem $ \ _ -> atomically $ readTBQueue dbPoolConns
63+ r <- restore (action conn) `E.onException` reset conn
64+ atomically $ writeTBQueue dbPoolConns conn
65+ pure r
66+ where
67+ reset conn = do
68+ conn' <- E. try dbConnect >>= \ case
69+ Right conn' -> PSQL. close conn >> pure conn'
70+ Left (_ :: E. SomeException ) -> pure conn
71+ atomically $ writeTBQueue dbPoolConns conn'
6772
6873withConnection :: DBStore -> (PSQL. Connection -> IO a ) -> IO a
6974withConnection st = withConnectionPriority st False
0 commit comments