Skip to content

Commit b1f4b7a

Browse files
committed
add PostgresPool
1 parent 157d4f3 commit b1f4b7a

File tree

6 files changed

+569
-1
lines changed

6 files changed

+569
-1
lines changed

README.md

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,81 @@ print(tostring(postgres)) --> "<Postgres socket: 0xffffff>"
374374

375375
Returns string representation of current state of `Postgres` object.
376376

377+
## PostgresPool
378+
379+
Although Lua is single-threaded, in asynchronous environments that share state,
380+
multiple coroutines may attempt to use the same database connection while
381+
another coroutine is yielded. This can cause issues because PostgreSQL
382+
connections are stateful — if one coroutine starts a query and yields (e.g.,
383+
waiting for network I/O), another coroutine might try to use the same
384+
connection while it's mid-operation, corrupting the protocol state.
385+
386+
pgmoon tracks when a connection is busy executing a query. `PostgresPool` uses
387+
this busy tracking to manage a pool of connections, automatically assigning
388+
queries to available connections and creating new ones as needed.
389+
390+
The `PostgresPool` connection is designed to match the interface of the
391+
`Postgres` connection, so it can be a drop in for when you need a connection
392+
pool.
393+
394+
395+
```lua
396+
local PostgresPool = require("pgmoon.pool").PostgresPool
397+
local pool = PostgresPool({
398+
host = "127.0.0.1",
399+
port = "5432",
400+
database = "mydb",
401+
user = "postgres",
402+
max_pool_size = 10
403+
})
404+
405+
assert(pool:connect())
406+
407+
-- Queries are automatically assigned to available connections
408+
-- Safe to call from multiple coroutines simultaneously
409+
local res = assert(pool:query("select * from users limit 10"))
410+
```
411+
412+
The first connection must be established with `connect` before any queries are
413+
made (this is to validate that you connection settings are valid). If the pool
414+
is fully occupied when an query is issued, then a new connection will
415+
dynamically be established and used, and then returned to the pool.
416+
417+
Methods like `disconnect`, `settimeout`, `setkeepalive` operate on every
418+
connection currently in the pool.
419+
420+
421+
### Configuration
422+
423+
`PostgresPool` accepts the same configuration options as `Postgres`, plus:
424+
425+
* `"max_pool_size"`: Maximum number of connections in the pool (optional). When all connections are busy and the limit is reached, queries will return an error.
426+
427+
### Methods
428+
429+
`PostgresPool` provides the same query interface as `Postgres`:
430+
431+
* `pool:connect()` — Creates the first connection in the pool
432+
* `pool:disconnect()` — Disconnects all connections in the pool
433+
* `pool:keepalive(...)` — Calls keepalive on all connections (OpenResty only)
434+
* `pool:settimeout(...)` — Sets timeout on all existing and future connections
435+
* `pool:query(...)` — Runs a query on an available connection
436+
* `pool:simple_query(q)` — Runs a simple query on an available connection
437+
* `pool:extended_query(...)` — Runs an extended query on an available connection
438+
* `pool:set_type_deserializer(...)` — Sets type deserializer on all connections
439+
* `pool:escape_literal(val)` — Same as `Postgres:escape_literal`
440+
* `pool:escape_identifier(val)` — Same as `Postgres:escape_identifier`
441+
* `pool:setup_hstore()` — Same as `Postgres:setup_hstore`
442+
443+
Additional pool-specific methods:
444+
445+
* `pool:pool_size()` — Returns the current number of connections in the pool
446+
* `pool:active_connections()` — Returns the number of connections currently executing queries
447+
448+
> **Note:** `wait_for_notification` is not supported with `PostgresPool` because
449+
> notifications are tied to the specific socket connection that issued the
450+
> `LISTEN` command.
451+
377452
## Extended and simple query protocols
378453

379454
pgmoon will issue your query to the database server using either the simple or

pgmoon-dev-1.rockspec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ build = {
3636
["pgmoon.crypto"] = "pgmoon/crypto.lua",
3737
["pgmoon.hstore"] = "pgmoon/hstore.lua",
3838
["pgmoon.json"] = "pgmoon/json.lua",
39+
["pgmoon.pool"] = "pgmoon/pool.lua",
3940
["pgmoon.socket"] = "pgmoon/socket.lua",
4041
["pgmoon.util"] = "pgmoon/util.lua",
4142
},

pgmoon/pool.lua

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
local Postgres
2+
Postgres = require("pgmoon").Postgres
3+
local PostgresPool
4+
do
5+
local _class_0
6+
local _base_0 = {
7+
NULL = Postgres.NULL,
8+
PG_TYPES = Postgres.PG_TYPES,
9+
type_deserializers = Postgres.type_deserializers,
10+
_get_connection = function(self)
11+
if #self.pool == 0 then
12+
return nil, "not connected"
13+
end
14+
local _list_0 = self.pool
15+
for _index_0 = 1, #_list_0 do
16+
local pg = _list_0[_index_0]
17+
if not (pg.busy) then
18+
return pg
19+
end
20+
end
21+
if self.config.max_pool_size and #self.pool >= self.config.max_pool_size then
22+
return nil, "pool exhausted, max_pool_size reached"
23+
end
24+
local pg = self:_create_instance()
25+
local ok, err = pg:connect()
26+
if not (ok) then
27+
return nil, err
28+
end
29+
table.insert(self.pool, pg)
30+
return pg
31+
end,
32+
_create_instance = function(self)
33+
local pg_config
34+
do
35+
local _tbl_0 = { }
36+
for k, v in pairs(self.config) do
37+
if k ~= "max_pool_size" then
38+
_tbl_0[k] = v
39+
end
40+
end
41+
pg_config = _tbl_0
42+
end
43+
local pg = Postgres(pg_config)
44+
pg.PG_TYPES = self.PG_TYPES
45+
pg.type_deserializers = self.type_deserializers
46+
pg.parent_pool = self
47+
if self._timeout then
48+
pg:settimeout(self._timeout)
49+
end
50+
return pg
51+
end,
52+
connect = function(self)
53+
if #self.pool > 0 then
54+
return nil, "already connected"
55+
end
56+
local pg = self:_create_instance()
57+
local ok, err = pg:connect()
58+
if not (ok) then
59+
return nil, err
60+
end
61+
table.insert(self.pool, pg)
62+
return true
63+
end,
64+
disconnect = function(self)
65+
local _list_0 = self.pool
66+
for _index_0 = 1, #_list_0 do
67+
local pg = _list_0[_index_0]
68+
pg:disconnect()
69+
end
70+
self.pool = { }
71+
return true
72+
end,
73+
keepalive = function(self, ...)
74+
local _list_0 = self.pool
75+
for _index_0 = 1, #_list_0 do
76+
local pg = _list_0[_index_0]
77+
pg:keepalive(...)
78+
end
79+
self.pool = { }
80+
return true
81+
end,
82+
settimeout = function(self, ...)
83+
self._timeout = ...
84+
local _list_0 = self.pool
85+
for _index_0 = 1, #_list_0 do
86+
local pg = _list_0[_index_0]
87+
pg:settimeout(...)
88+
end
89+
end,
90+
set_type_deserializer = function(self, ...)
91+
Postgres.set_type_deserializer(self, ...)
92+
local _list_0 = self.pool
93+
for _index_0 = 1, #_list_0 do
94+
local pg = _list_0[_index_0]
95+
pg.PG_TYPES = self.PG_TYPES
96+
pg.type_deserializers = self.type_deserializers
97+
end
98+
end,
99+
query = function(self, ...)
100+
local pg, err = self:_get_connection()
101+
if not (pg) then
102+
return nil, err
103+
end
104+
return pg:query(...)
105+
end,
106+
simple_query = function(self, q)
107+
local pg, err = self:_get_connection()
108+
if not (pg) then
109+
return nil, err
110+
end
111+
return pg:simple_query(q)
112+
end,
113+
extended_query = function(self, ...)
114+
local pg, err = self:_get_connection()
115+
if not (pg) then
116+
return nil, err
117+
end
118+
return pg:extended_query(...)
119+
end,
120+
wait_for_notification = function(self)
121+
return error("can't use wait for notification with pool")
122+
end,
123+
escape_identifier = Postgres.escape_identifier,
124+
escape_literal = Postgres.escape_literal,
125+
encode_bytea = Postgres.encode_bytea,
126+
decode_bytea = Postgres.decode_bytea,
127+
setup_hstore = Postgres.setup_hstore,
128+
pool_size = function(self)
129+
return #self.pool
130+
end,
131+
active_connections = function(self)
132+
local count = 0
133+
local _list_0 = self.pool
134+
for _index_0 = 1, #_list_0 do
135+
local pg = _list_0[_index_0]
136+
if pg.busy then
137+
count = count + 1
138+
end
139+
end
140+
return count
141+
end,
142+
__tostring = function(self)
143+
return "<PostgresPool size: " .. tostring(self:pool_size()) .. ">"
144+
end
145+
}
146+
_base_0.__index = _base_0
147+
_class_0 = setmetatable({
148+
__init = function(self, config)
149+
if config == nil then
150+
config = { }
151+
end
152+
self.config = config
153+
self.pool = { }
154+
self._timeout = nil
155+
self.convert_null = self.config.convert_null or false
156+
end,
157+
__base = _base_0,
158+
__name = "PostgresPool"
159+
}, {
160+
__index = _base_0,
161+
__call = function(cls, ...)
162+
local _self_0 = setmetatable({}, _base_0)
163+
cls.__init(_self_0, ...)
164+
return _self_0
165+
end
166+
})
167+
_base_0.__class = _class_0
168+
PostgresPool = _class_0
169+
end
170+
return {
171+
PostgresPool = PostgresPool,
172+
new = PostgresPool
173+
}

pgmoon/pool.moon

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
import Postgres from require "pgmoon"
2+
3+
class PostgresPool
4+
NULL: Postgres.NULL
5+
PG_TYPES: Postgres.PG_TYPES
6+
type_deserializers: Postgres.type_deserializers
7+
8+
new: (@config={}) =>
9+
@pool = {}
10+
@_timeout = nil
11+
@convert_null = @config.convert_null or false
12+
13+
_get_connection: =>
14+
-- Must call connect() first
15+
return nil, "not connected" if #@pool == 0
16+
17+
-- Find first non-busy instance
18+
for pg in *@pool
19+
unless pg.busy
20+
return pg
21+
22+
-- All busy, check max_pool_size
23+
if @config.max_pool_size and #@pool >= @config.max_pool_size
24+
return nil, "pool exhausted, max_pool_size reached"
25+
26+
-- Create and connect new instance
27+
pg = @_create_instance!
28+
ok, err = pg\connect!
29+
return nil, err unless ok
30+
31+
table.insert @pool, pg
32+
pg
33+
34+
_create_instance: =>
35+
-- Filter out pool-specific config keys
36+
pg_config = {k, v for k, v in pairs @config when k != "max_pool_size"}
37+
pg = Postgres pg_config
38+
39+
-- Apply stored settings
40+
pg.PG_TYPES = @PG_TYPES
41+
pg.type_deserializers = @type_deserializers
42+
pg.parent_pool = @
43+
pg\settimeout @_timeout if @_timeout
44+
45+
pg
46+
47+
-- Connection lifecycle
48+
connect: =>
49+
return nil, "already connected" if #@pool > 0
50+
pg = @_create_instance!
51+
ok, err = pg\connect!
52+
return nil, err unless ok
53+
table.insert @pool, pg
54+
true
55+
56+
disconnect: =>
57+
for pg in *@pool
58+
pg\disconnect!
59+
@pool = {}
60+
true
61+
62+
keepalive: (...) =>
63+
for pg in *@pool
64+
pg\keepalive ...
65+
@pool = {}
66+
true
67+
68+
-- Settings (apply to all existing + store for new)
69+
settimeout: (...) =>
70+
@_timeout = ...
71+
for pg in *@pool
72+
pg\settimeout ...
73+
74+
set_type_deserializer: (...) =>
75+
Postgres.set_type_deserializer @, ...
76+
77+
-- ensure all collections point to the pools type table
78+
for pg in *@pool
79+
pg.PG_TYPES = @PG_TYPES
80+
pg.type_deserializers = @type_deserializers
81+
82+
-- Query methods (delegate to available connection)
83+
query: (...) =>
84+
pg, err = @_get_connection!
85+
return nil, err unless pg
86+
pg\query ...
87+
88+
simple_query: (q) =>
89+
pg, err = @_get_connection!
90+
return nil, err unless pg
91+
pg\simple_query q
92+
93+
extended_query: (...) =>
94+
pg, err = @_get_connection!
95+
return nil, err unless pg
96+
pg\extended_query ...
97+
98+
-- wait_for_notification is tied to the socket connection that issued the
99+
-- `LISTEN` query, so it's not compatible with pooling
100+
wait_for_notification: =>
101+
error "can't use wait for notification with pool"
102+
103+
-- Static methods (delegate to Postgres)
104+
-- note the reciever will be the pool object
105+
escape_identifier: Postgres.escape_identifier
106+
escape_literal: Postgres.escape_literal
107+
encode_bytea: Postgres.encode_bytea
108+
decode_bytea: Postgres.decode_bytea
109+
setup_hstore: Postgres.setup_hstore
110+
111+
-- Pool info helpers
112+
pool_size: => #@pool
113+
active_connections: =>
114+
count = 0
115+
for pg in *@pool
116+
if pg.busy
117+
count += 1
118+
count
119+
120+
__tostring: =>
121+
"<PostgresPool size: #{@pool_size!}>"
122+
123+
{ :PostgresPool, new: PostgresPool }

0 commit comments

Comments
 (0)