Skip to content

Commit fd8d447

Browse files
committed
Introduce on-demand connection acquisition mode
1 parent 203e557 commit fd8d447

File tree

10 files changed

+336
-324
lines changed

10 files changed

+336
-324
lines changed

src/Database/PostgreSQL/PQTypes/Class.hs

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,16 @@ class (Applicative m, Monad m) => MonadDB m where
3030
-- given name.
3131
runPreparedQuery :: (HasCallStack, IsSQL sql) => QueryName -> sql -> m Int
3232

33-
-- | Get last SQL query that was executed.
34-
getLastQuery :: m SomeSQL
33+
-- | Get last SQL query that was executed and ID of the server process
34+
-- attached to the session that executed it.
35+
getLastQuery :: m (BackendPid, SomeSQL)
3536

3637
-- | Subsequent queries in the callback do not alter the result of
3738
-- 'getLastQuery'.
3839
withFrozenLastQuery :: m a -> m a
3940

40-
-- | Get ID of the server process attached to the current session.
41-
getBackendPid :: m BackendPid
42-
4341
-- | Get current connection statistics.
44-
getConnectionStats :: HasCallStack => m ConnectionStats
42+
getConnectionStats :: m ConnectionStats
4543

4644
-- | Get current query result.
4745
getQueryResult :: FromRow row => m (Maybe (QueryResult row))
@@ -52,11 +50,6 @@ class (Applicative m, Monad m) => MonadDB m where
5250
-- | Get current transaction settings.
5351
getTransactionSettings :: m TransactionSettings
5452

55-
-- | Set transaction settings to supplied ones. Note that it
56-
-- won't change any properties of currently running transaction,
57-
-- only the subsequent ones.
58-
setTransactionSettings :: TransactionSettings -> m ()
59-
6053
-- | Attempt to receive a notification from the server. This
6154
-- function waits until a notification arrives or specified
6255
-- number of microseconds has passed. If a negative number
@@ -97,11 +90,9 @@ instance
9790
runPreparedQuery name = withFrozenCallStack $ lift . runPreparedQuery name
9891
getLastQuery = lift getLastQuery
9992
withFrozenLastQuery m = controlT $ \run -> withFrozenLastQuery (run m)
100-
getBackendPid = lift getBackendPid
101-
getConnectionStats = withFrozenCallStack $ lift getConnectionStats
93+
getConnectionStats = lift getConnectionStats
10294
getQueryResult = lift getQueryResult
10395
clearQueryResult = lift clearQueryResult
10496
getTransactionSettings = lift getTransactionSettings
105-
setTransactionSettings = lift . setTransactionSettings
10697
getNotification = lift . getNotification
10798
withNewConnection m = controlT $ \run -> withNewConnection (run m)
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
module Database.PostgreSQL.PQTypes.Internal.BackendPid
22
( BackendPid (..)
3+
, noBackendPid
34
) where
45

56
-- | Process ID of the server process attached to the current session.
67
newtype BackendPid = BackendPid Int
78
deriving newtype (Eq, Ord, Show)
9+
10+
noBackendPid :: BackendPid
11+
noBackendPid = BackendPid 0

src/Database/PostgreSQL/PQTypes/Internal/Connection.hs

Lines changed: 76 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
module Database.PostgreSQL.PQTypes.Internal.Connection
22
( -- * Connection
33
Connection (..)
4-
, getBackendPidIO
5-
, ConnectionData (..)
6-
, withConnectionData
74
, ConnectionStats (..)
85
, initialConnectionStats
96
, ConnectionSettings (..)
107
, defaultConnectionSettings
118
, ConnectionSourceM (..)
9+
, InternalConnectionSource (..)
1210
, ConnectionSource (..)
1311
, simpleSource
1412
, poolSource
@@ -121,58 +119,41 @@ initialConnectionStats =
121119
-- executing an SQL query).
122120
--
123121
-- See https://gitlab.haskell.org/ghc/ghc/-/issues/10975 for more info.
124-
data ConnectionData = ConnectionData
125-
{ cdPtr :: !(Ptr PGconn)
122+
data Connection = Connection
123+
{ connPtr :: !(Ptr PGconn)
126124
-- ^ Pointer to connection object.
127-
, cdBackendPid :: !BackendPid
125+
, connBackendPid :: !BackendPid
128126
-- ^ Process ID of the server process attached to the current session.
129-
, cdStats :: !ConnectionStats
130-
-- ^ Statistics associated with the connection.
131-
, cdPreparedQueries :: !(IORef (S.Set T.Text))
127+
, connPreparedQueries :: !(IORef (S.Set T.Text))
132128
-- ^ A set of named prepared statements of the connection.
133129
}
134130

135-
-- | Wrapper for hiding representation of a connection object.
136-
newtype Connection = Connection
137-
{ unConnection :: MVar (Maybe ConnectionData)
131+
data InternalConnectionSource m x = InternalConnectionSource
132+
{ takeConnection :: m (Connection, x)
133+
, putConnection :: forall r. (Connection, x) -> ExitCase r -> m ()
138134
}
139135

140-
getBackendPidIO :: Connection -> IO BackendPid
141-
getBackendPidIO conn = do
142-
withConnectionData conn "getBackendPidIO" $ \cd -> do
143-
pure (cd, cdBackendPid cd)
144-
145-
withConnectionData
146-
:: Connection
147-
-> String
148-
-> (ConnectionData -> IO (ConnectionData, r))
149-
-> IO r
150-
withConnectionData (Connection mvc) fname f = modifyMVar mvc $ \case
151-
Nothing -> hpqTypesError $ fname ++ ": no connection"
152-
Just cd -> do
153-
(cd', r) <- f cd
154-
cd' `seq` pure (Just cd', r)
155-
156136
-- | Database connection supplier.
157-
newtype ConnectionSourceM m = ConnectionSourceM
158-
{ withConnection :: forall r. (Connection -> m r) -> m r
159-
}
137+
data ConnectionSourceM m
138+
= forall cdata. ConnectionSourceM (InternalConnectionSource m cdata)
160139

161140
-- | Wrapper for a polymorphic connection source.
162141
newtype ConnectionSource (cs :: [(Type -> Type) -> Constraint]) = ConnectionSource
163142
{ unConnectionSource :: forall m. MkConstraint m cs => ConnectionSourceM m
164143
}
165144

166-
-- | Default connection supplier. It establishes new
167-
-- database connection each time 'withConnection' is called.
145+
-- | Default connection supplier. It establishes new database connection each
146+
-- time 'withConnection' is called.
168147
simpleSource
169148
:: ConnectionSettings
170149
-> ConnectionSource [MonadBase IO, MonadMask]
171150
simpleSource cs =
172151
ConnectionSource $
173-
ConnectionSourceM
174-
{ withConnection = bracket (liftBase $ connect cs) (liftBase . disconnect)
175-
}
152+
ConnectionSourceM $
153+
InternalConnectionSource
154+
{ takeConnection = (,()) <$> liftBase (connect cs)
155+
, putConnection = \(conn, ()) _ -> liftBase $ disconnect conn
156+
}
176157

177158
-- | Pooled source. It uses striped pool from @resource-pool@ package to cache
178159
-- established connections and reuse them.
@@ -189,24 +170,13 @@ poolSource cs mkPoolConfig = do
189170
pure $ ConnectionSource (sourceM pool)
190171
where
191172
sourceM pool =
192-
ConnectionSourceM
193-
{ withConnection = doWithConnection pool . (clearStats >=>)
194-
}
195-
196-
doWithConnection pool m =
197-
fst
198-
<$> generalBracket
199-
(liftBase $ takeResource pool)
200-
( \(resource, local) -> \case
173+
ConnectionSourceM $
174+
InternalConnectionSource
175+
{ takeConnection = liftBase $ takeResource pool
176+
, putConnection = \(resource, local) -> \case
201177
ExitCaseSuccess _ -> liftBase $ putResource local resource
202178
_ -> liftBase $ destroyResource pool local resource
203-
)
204-
(\(resource, _) -> m resource)
205-
206-
clearStats conn@(Connection mv) = do
207-
liftBase . modifyMVar_ mv $ \mconn ->
208-
pure $ (\cd -> cd {cdStats = initialConnectionStats}) <$> mconn
209-
pure conn
179+
}
210180

211181
----------------------------------------
212182

@@ -230,29 +200,22 @@ connect ConnectionSettings {..} = mask $ \unmask -> do
230200
registerComposites connPtr csComposites
231201
conn <- do
232202
preparedQueries <- newIORef S.empty
233-
fmap Connection . newMVar $
234-
Just
235-
ConnectionData
236-
{ cdPtr = connPtr
237-
, cdBackendPid = noBackendPid
238-
, cdStats = initialConnectionStats
239-
, cdPreparedQueries = preparedQueries
240-
}
203+
pure
204+
Connection
205+
{ connPtr = connPtr
206+
, connBackendPid = noBackendPid
207+
, connPreparedQueries = preparedQueries
208+
}
241209
F.forM_ csRole $ \role -> runQueryIO conn $ "SET ROLE " <> role
242210

243211
let selectPid = "SELECT pg_backend_pid()" :: RawSQL ()
244-
(_, res) <- runQueryIO conn selectPid
212+
(_, res, _) <- runQueryIO conn selectPid
245213
case F.toList $ mkQueryResult @(Identity Int32) selectPid noBackendPid res of
246-
[pid] -> withConnectionData conn fname $ \cd -> do
247-
pure (cd {cdBackendPid = BackendPid $ fromIntegral pid}, ())
214+
[pid] -> pure $ conn {connBackendPid = BackendPid $ fromIntegral pid}
248215
pids -> do
249216
let err = HPQTypesError $ "unexpected backend pid: " ++ show pids
250217
rethrowWithContext selectPid noBackendPid $ toException err
251-
252-
pure conn
253218
where
254-
noBackendPid = BackendPid 0
255-
256219
fname = "connect"
257220

258221
openConnection :: (forall r. IO r -> IO r) -> CString -> IO (Ptr PGconn)
@@ -299,21 +262,16 @@ connect ConnectionSettings {..} = mask $ \unmask -> do
299262
-- | Low-level function for disconnecting from the database. Useful if one wants
300263
-- to implement custom connection source.
301264
disconnect :: Connection -> IO ()
302-
disconnect (Connection mvconn) = modifyMVar_ mvconn $ \mconn -> do
303-
case mconn of
304-
Just cd -> do
305-
let conn = cdPtr cd
306-
-- This covers the case when a connection is closed while other Haskell
307-
-- threads are using GHC's IO manager to wait on the descriptor. This is
308-
-- commonly the case with asynchronous notifications, for example. Since
309-
-- libpq is responsible for opening and closing the file descriptor, GHC's
310-
-- IO manager needs to be informed that the file descriptor has been
311-
-- closed. The IO manager will then raise an exception in those threads.
312-
c_PQsocket conn >>= \case
313-
-1 -> c_PQfinish conn -- can happen if the connection is bad/lost
314-
fd -> closeFdWith (\_ -> c_PQfinish conn) fd
315-
Nothing -> E.throwIO (HPQTypesError "disconnect: no connection (shouldn't happen)")
316-
pure Nothing
265+
disconnect Connection {..} = do
266+
-- This covers the case when a connection is closed while other Haskell
267+
-- threads are using GHC's IO manager to wait on the descriptor. This is
268+
-- commonly the case with asynchronous notifications, for example. Since libpq
269+
-- is responsible for opening and closing the file descriptor, GHC's IO
270+
-- manager needs to be informed that the file descriptor has been closed. The
271+
-- IO manager will then raise an exception in those threads.
272+
c_PQsocket connPtr >>= \case
273+
-1 -> c_PQfinish connPtr -- can happen if the connection is bad/lost
274+
fd -> closeFdWith (\_ -> c_PQfinish connPtr) fd
317275

318276
----------------------------------------
319277
-- Query running
@@ -323,14 +281,14 @@ runQueryIO
323281
:: (HasCallStack, IsSQL sql)
324282
=> Connection
325283
-> sql
326-
-> IO (Int, ForeignPtr PGresult)
327-
runQueryIO conn sql = do
328-
runQueryImpl "runQueryIO" conn sql $ \ConnectionData {..} -> do
329-
let allocParam = ParamAllocator $ withPGparam cdPtr
284+
-> IO (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats)
285+
runQueryIO conn@Connection {..} sql = do
286+
runQueryImpl conn sql $ do
287+
let allocParam = ParamAllocator $ withPGparam connPtr
330288
withSQL sql allocParam $ \param query ->
331289
(,)
332290
<$> (fromIntegral <$> c_PQparamCount param)
333-
<*> c_PQparamExec cdPtr nullPtr param query c_RESULT_BINARY
291+
<*> c_PQparamExec connPtr nullPtr param query c_RESULT_BINARY
334292

335293
-- | Name of a prepared query.
336294
newtype QueryName = QueryName T.Text
@@ -342,42 +300,41 @@ runPreparedQueryIO
342300
=> Connection
343301
-> QueryName
344302
-> sql
345-
-> IO (Int, ForeignPtr PGresult)
346-
runPreparedQueryIO conn (QueryName queryName) sql = do
347-
runQueryImpl "runPreparedQueryIO" conn sql $ \ConnectionData {..} -> do
303+
-> IO (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats)
304+
runPreparedQueryIO conn@Connection {..} (QueryName queryName) sql = do
305+
runQueryImpl conn sql $ do
348306
when (T.null queryName) $ do
349307
E.throwIO
350308
DBException
351309
{ dbeQueryContext = sql
352-
, dbeBackendPid = cdBackendPid
310+
, dbeBackendPid = connBackendPid
353311
, dbeError = HPQTypesError "runPreparedQueryIO: unnamed prepared query is not supported"
354312
, dbeCallStack = callStack
355313
}
356-
let allocParam = ParamAllocator $ withPGparam cdPtr
314+
let allocParam = ParamAllocator $ withPGparam connPtr
357315
withSQL sql allocParam $ \param query -> do
358-
preparedQueries <- readIORef cdPreparedQueries
316+
preparedQueries <- readIORef connPreparedQueries
359317
BS.useAsCString (T.encodeUtf8 queryName) $ \cname -> do
360318
when (queryName `S.notMember` preparedQueries) . E.mask_ $ do
361319
-- Mask asynchronous exceptions, because if preparation of the query
362320
-- succeeds, we need to reflect that fact in cdPreparedQueries since
363321
-- you can't prepare a query with the same name more than once.
364-
res <- c_PQparamPrepare cdPtr nullPtr param cname query
365-
void . withForeignPtr res $ verifyResult sql cdBackendPid cdPtr
366-
modifyIORef' cdPreparedQueries $ S.insert queryName
322+
res <- c_PQparamPrepare connPtr nullPtr param cname query
323+
void . withForeignPtr res $ verifyResult sql connBackendPid connPtr
324+
modifyIORef' connPreparedQueries $ S.insert queryName
367325
(,)
368326
<$> (fromIntegral <$> c_PQparamCount param)
369-
<*> c_PQparamExecPrepared cdPtr nullPtr param cname c_RESULT_BINARY
327+
<*> c_PQparamExecPrepared connPtr nullPtr param cname c_RESULT_BINARY
370328

371329
-- | Shared implementation of 'runQueryIO' and 'runPreparedQueryIO'.
372330
runQueryImpl
373331
:: (HasCallStack, IsSQL sql)
374-
=> String
375-
-> Connection
332+
=> Connection
376333
-> sql
377-
-> (ConnectionData -> IO (Int, ForeignPtr PGresult))
378334
-> IO (Int, ForeignPtr PGresult)
379-
runQueryImpl fname conn sql execSql = do
380-
withConnDo $ \cd@ConnectionData {..} -> E.mask $ \restore -> do
335+
-> IO (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats)
336+
runQueryImpl Connection {..} sql execSql = do
337+
E.mask $ \restore -> do
381338
-- While the query runs, the current thread will not be able to receive
382339
-- asynchronous exceptions. This prevents clients of the library from
383340
-- interrupting execution of the query. To remedy that we spawn a separate
@@ -386,35 +343,35 @@ runQueryImpl fname conn sql execSql = do
386343
-- runtime system is used) and react appropriately.
387344
queryRunner <- async . restore $ do
388345
t1 <- getMonotonicTime
389-
(paramCount, res) <- execSql cd
346+
(paramCount, res) <- execSql
390347
t2 <- getMonotonicTime
391-
affected <- withForeignPtr res $ verifyResult sql cdBackendPid cdPtr
392-
stats' <- case affected of
348+
affected <- withForeignPtr res $ verifyResult sql connBackendPid connPtr
349+
updateStats <- case affected of
393350
Left _ ->
394-
pure
395-
cdStats
396-
{ statsQueries = statsQueries cdStats + 1
397-
, statsParams = statsParams cdStats + paramCount
398-
, statsTime = statsTime cdStats + (t2 - t1)
351+
pure $ \stats ->
352+
stats
353+
{ statsQueries = statsQueries stats + 1
354+
, statsParams = statsParams stats + paramCount
355+
, statsTime = statsTime stats + (t2 - t1)
399356
}
400357
Right rows -> do
401358
columns <- fromIntegral <$> withForeignPtr res c_PQnfields
402-
pure
359+
pure $ \stats ->
403360
ConnectionStats
404-
{ statsQueries = statsQueries cdStats + 1
405-
, statsRows = statsRows cdStats + rows
406-
, statsValues = statsValues cdStats + (rows * columns)
407-
, statsParams = statsParams cdStats + paramCount
408-
, statsTime = statsTime cdStats + (t2 - t1)
361+
{ statsQueries = statsQueries stats + 1
362+
, statsRows = statsRows stats + rows
363+
, statsValues = statsValues stats + (rows * columns)
364+
, statsParams = statsParams stats + paramCount
365+
, statsTime = statsTime stats + (t2 - t1)
409366
}
410-
pure (cd {cdStats = stats'}, (either id id affected, res))
367+
pure (either id id affected, res, updateStats)
411368
-- If we receive an exception while waiting for the execution to complete,
412369
-- we need to send a request to PostgreSQL for query cancellation and wait
413370
-- for the query runner thread to terminate. It is paramount we make the
414371
-- exception handler uninterruptible as we can't exit from the main block
415372
-- until the query runner thread has terminated.
416373
E.onException (restore $ wait queryRunner) . E.uninterruptibleMask_ $ do
417-
c_PQcancel cdPtr >>= \case
374+
c_PQcancel connPtr >>= \case
418375
-- If query cancellation request was successfully processed, there is
419376
-- nothing else to do apart from waiting for the runner to terminate.
420377
Nothing -> cancel queryRunner
@@ -427,10 +384,8 @@ runQueryImpl fname conn sql execSql = do
427384
poll queryRunner >>= \case
428385
Just _ -> pure ()
429386
Nothing -> do
430-
void $ c_PQcancel cdPtr
387+
void $ c_PQcancel connPtr
431388
cancel queryRunner
432-
where
433-
withConnDo = withConnectionData conn fname
434389

435390
verifyResult
436391
:: (HasCallStack, IsSQL sql)

0 commit comments

Comments
 (0)