Skip to content

Commit 65c7301

Browse files
committed
Added pool events
1 parent 050d093 commit 65c7301

File tree

1 file changed

+99
-22
lines changed

1 file changed

+99
-22
lines changed

async_postgres.lua

Lines changed: 99 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ end
167167
---@field array_result boolean option to receive PGResult row fields as array instead of table (default: false)
168168
---@field private conn PGconn native connection object (do not use it directly, otherwise be careful not to store it anywhere else, otherwise closing connection will be impossible)
169169
---@field private queries { push: fun(self, q: PGQuery), prepend: fun(self, q: PGQuery), pop: (fun(self): PGQuery), size: fun(self): number } list of queries
170+
---@field package errorHandler function function that just calls self:onError(...)
170171
---@field package acquired boolean
171172
---@field package pool PGPool?
172173
local Client = {}
@@ -203,7 +204,7 @@ function Client:reset(callback)
203204
self.retryAttempted = 0
204205
end
205206

206-
xpcall(callback, ErrorNoHaltWithStack, ok, err)
207+
xpcall(callback, self.errorHandler, ok, err)
207208
self:processQueue()
208209
end)
209210

@@ -237,10 +238,10 @@ function Client:connect(callback)
237238
---@cast conn PGconn
238239
self.conn = conn
239240
self.conn:setNotifyCallback(function(channel, payload, backendPID)
240-
self:onNotify(channel, payload, backendPID)
241+
xpcall(self.onNotify, self.errorHandler, self, channel, payload, backendPID)
241242
end)
242243

243-
xpcall(callback, ErrorNoHaltWithStack, ok)
244+
xpcall(callback, self.errorHandler, ok)
244245
self:processQueue()
245246
else
246247
---@cast conn string
@@ -250,7 +251,7 @@ function Client:connect(callback)
250251

251252
-- async_postgres.connect() can throw error if for example url is invalid
252253
if not ok then
253-
xpcall(callback, ErrorNoHaltWithStack, false, err)
254+
xpcall(callback, self.errorHandler, false, err)
254255
return
255256
end
256257

@@ -290,7 +291,7 @@ function Client:runQuery(query)
290291
self.conn:setArrayResult(false)
291292
end
292293

293-
xpcall(query.callback, ErrorNoHaltWithStack, ok, result, errdata)
294+
xpcall(query.callback, self.errorHandler, ok, result, errdata)
294295
self:processQueue()
295296
end
296297

@@ -319,7 +320,7 @@ function Client:processQueue()
319320
local query = self.queries:pop()
320321
local ok, err = pcall(self.runQuery, self, query)
321322
if not ok then
322-
xpcall(query.callback, ErrorNoHaltWithStack, false, err)
323+
xpcall(query.callback, self.errorHandler, false, err)
323324
end
324325
end
325326

@@ -434,8 +435,7 @@ function Client:close(wait)
434435
while self:wait() do
435436
iterations = iterations + 1
436437
if iterations > 1000 then
437-
ErrorNoHaltWithStack(
438-
"PGClient:close() - waiting for queries too long, closing forcefully")
438+
self:onError("PGClient:close() - waiting for queries too long, closing forcefully")
439439
break
440440
end
441441
end
@@ -444,11 +444,11 @@ function Client:close(wait)
444444
local iterations = 0
445445
while self.queries:size() ~= 0 do
446446
local q = self.queries:pop()
447-
xpcall(q.callback, ErrorNoHaltWithStack, false, "connection to the database was closed")
447+
xpcall(q.callback, self.errorHandler, false, "connection to the database was closed")
448448

449449
iterations = iterations + 1
450450
if iterations > 1000 then
451-
ErrorNoHaltWithStack("PGClient:close() - queries are infinite, ignoring them")
451+
self:onError("PGClient:close() - queries are infinite, ignoring them")
452452
break
453453
end
454454
end
@@ -617,7 +617,8 @@ end
617617

618618
--- Releases client back to the pool, only use after getting it from the pool
619619
---@see PGPool.connect to acquire a client from the pool
620-
function Client:release()
620+
---@param suppress boolean? if true, then Pool:onRelease won't be called
621+
function Client:release(suppress)
621622
if self.acquired == false then
622623
error("client was not acquired, :release() was called multiple times or misused")
623624
end
@@ -630,10 +631,15 @@ function Client:release()
630631
self.pool = nil
631632

632633
---@cast pool PGPool
634+
635+
if not suppress then
636+
xpcall(function() return pool:onRelease(self) end, self.errorHandler)
637+
end
638+
633639
pool:processQueue() -- after client was release, we need to process pool queue
634640
end
635641

636-
--- This function is called when NOTIFY message is received
642+
--- This **event** function is called when NOTIFY message is received
637643
---
638644
--- You can set it to your own function to handle NOTIFY messages
639645
---@param channel string
@@ -642,6 +648,16 @@ end
642648
function Client:onNotify(channel, payload, backendPID)
643649
end
644650

651+
--- This **event** function is called whenever an error occurs inside connect/query callback.
652+
---
653+
--- You can set it to your own function to handle errors.
654+
--- By default this function calls `ErrorNoHaltWithStack`
655+
---@param message string error message
656+
function Client:onError(message)
657+
-- return used here to hide additional stack trace
658+
return ErrorNoHaltWithStack(message)
659+
end
660+
645661
--- Creates a new client with given connection url
646662
--- ```lua
647663
--- local client = async_postgres.Client("postgresql://user:password@localhost:5432/database")
@@ -659,11 +675,16 @@ end
659675
---@param url string connection url, see libpq documentation for more information
660676
---@return PGClient
661677
function async_postgres.Client(url)
662-
return setmetatable({
678+
---@class PGClient
679+
local client = setmetatable({
663680
url = url,
664681
connecting = false,
665682
queries = Queue.new(),
666683
}, Client)
684+
685+
client.errorHandler = function(...) return client:onError(...) end
686+
687+
return client
667688
end
668689

669690
---@class PGTransactionContext
@@ -770,8 +791,10 @@ end
770791
---@class PGPool
771792
---@field url string **readonly** connection url
772793
---@field max number maximum number of clients in the pool (default: 10)
794+
---@field threshold number threshold of waiting :connect(...) acquire functions to create a new client (default: 5)
773795
---@field private clients PGClient[]
774796
---@field private queue { push: fun(self, f: function), prepend: fun(self, f: function), pop: (fun(self): function), size: fun(self): number }
797+
---@field private errorHandler function function that just calls self:onError(...)
775798
local Pool = {}
776799

777800
---@private
@@ -799,23 +822,29 @@ function Pool:acquireClient()
799822

800823
-- then if client is ready to use, just call callback immideatly
801824
if client:connected() then
825+
-- notify about acquired client
826+
xpcall(function() return self:onAcquire(client) end, client.errorHandler)
827+
828+
-- call callback
802829
local callback = self.queue:pop()
803-
xpcall(callback, ErrorNoHaltWithStack, client)
830+
xpcall(callback, client.errorHandler, client)
831+
804832
-- if client was not connected, begin connection process
805833
-- unless it's already connecting, then just wait until it will be connected
806834
elseif not client.connecting then
807835
client:connect(function(ok, err)
808836
if ok then
809-
client:release()
837+
client:release(true)
838+
xpcall(function() return self:onConnect(client) end, client.errorHandler)
810839
self:acquireClient()
811840
return
812841
end
813842

814-
ErrorNoHaltWithStack("PGPool - failed to connect to the database: " .. err)
843+
self:onError("PGPool - failed to connect to the database: " .. err)
815844

816845
-- try to restart acquiring, maybe we can connect with another try
817846
timer.Simple(5, function()
818-
client:release()
847+
client:release(true)
819848
self:acquireClient()
820849
end)
821850
end)
@@ -838,9 +867,14 @@ function Pool:processQueue()
838867
-- if we haven't found available clients, and queue is too big, we need to create new client
839868
local clients = #self.clients
840869
local waiters = self.queue:size()
841-
local threshold = clients * 2
870+
local threshold = clients * self.threshold
842871
if clients < self.max and waiters > threshold then
843-
self.clients[clients + 1] = async_postgres.Client(self.url)
872+
local client = async_postgres.Client(self.url)
873+
client.onError = function(client, message)
874+
return self:onError(message)
875+
end
876+
877+
self.clients[clients + 1] = client
844878
self:acquireClient()
845879
end
846880
end
@@ -949,19 +983,21 @@ function Pool:describePortal(name, callback)
949983
end
950984

951985
---@async
986+
---@param client PGClient
987+
---@param callback fun(ctx: PGTransactionContext)
952988
local function transactionThread(client, callback)
953989
xpcall(function()
954990
local ctx = TransactionContext.new(client)
955991
local ok = xpcall(function()
956992
ctx:query("BEGIN")
957993
callback(ctx)
958994
ctx:query("COMMIT")
959-
end, ErrorNoHaltWithStack)
995+
end, client.errorHandler)
960996

961997
if not ok then
962998
ctx:query("ROLLBACK")
963999
end
964-
end, ErrorNoHaltWithStack)
1000+
end, client.errorHandler)
9651001
client:release()
9661002
end
9671003

@@ -987,14 +1023,55 @@ function Pool:transaction(callback)
9871023
end)
9881024
end
9891025

1026+
--- This **event** function is called whenever new client connection
1027+
--- was estabileshed.
1028+
--- You can run setup commands on a client.
1029+
--- ```lua
1030+
--- function pool:onConnect(client)
1031+
--- client:query("SET DATESTYLE = iso, mdy")
1032+
--- end
1033+
--- ```
1034+
---@param client PGClient client that was connected
1035+
function Pool:onConnect(client)
1036+
end
1037+
1038+
--- This **event** function is called whenever a client was acquired.
1039+
---@param client PGClient client that was acquired
1040+
function Pool:onAcquire(client)
1041+
end
1042+
1043+
--- This **event** function is called whenever an error occurs inside connect/query callback for client or pool.
1044+
---@param message string error message
1045+
---@param client PGClient? client that caused the error, or nil if error happened in pool
1046+
function Pool:onError(message, client)
1047+
return ErrorNoHaltWithStack(message)
1048+
end
1049+
1050+
--- This **event** function is called whenever a client was released back to the pool.
1051+
---
1052+
--- Warning! This funct
1053+
---@param client PGClient client that was released
1054+
function Pool:onRelease(client)
1055+
end
1056+
9901057
--- Creates a new connection pool with given connection url,
9911058
--- then use :connect() to get available connection,
9921059
--- and then :release() to release it back to the pool
9931060
function async_postgres.Pool(url)
994-
return setmetatable({
1061+
---@class PGPool
1062+
local pool = setmetatable({
9951063
url = url,
9961064
clients = { async_postgres.Client(url) },
9971065
queue = Queue.new(),
9981066
max = 10,
1067+
threshold = 5,
9991068
}, Pool)
1069+
1070+
pool.clients[1].onError = function(client, message)
1071+
return pool:onError(message, client)
1072+
end
1073+
1074+
pool.errorHandler = function(...) return pool:onError(...) end
1075+
1076+
return pool
10001077
end

0 commit comments

Comments
 (0)