Skip to content

Commit 59927e7

Browse files
committed
Simplify custom type autoloading with pgxpool
Provide a backwards-compatible configuration option for pgxpool which streamlines the use of the bulk loading and registration of types: - ReuseTypeMaps: if enabled, pgxpool will cache the typemap information, avoiding the need to perform any further queries as new connections are created. ReuseTypeMaps is disabled by default as in some situations, a connection string might resolve to a pool of servers which do not share the same type name -> OID mapping.
1 parent 22fb4e4 commit 59927e7

File tree

4 files changed

+85
-17
lines changed

4 files changed

+85
-17
lines changed

derived_types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ type derivedTypeInfo struct {
156156
// or indirectly required to complete the registration.
157157
// The result of this call can be passed into RegisterTypes to complete the process.
158158
func LoadTypes(ctx context.Context, c *Conn, typeNames []string) ([]*pgtype.Type, error) {
159-
m := c.TypeMap().Copy()
159+
m := c.TypeMap()
160160
if typeNames == nil || len(typeNames) == 0 {
161161
return nil, fmt.Errorf("No type names were supplied.")
162162
}

pgtype/pgtype.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -214,13 +214,13 @@ type Map struct {
214214
TryWrapScanPlanFuncs []TryWrapScanPlanFunc
215215
}
216216

217-
// Copy returns a new Map containing the same registered types.
218-
func (m *Map) Copy() *Map {
219-
newMap := NewMap()
217+
// Types() returns the non-default types which were registered
218+
func (m *Map) Types() []*Type {
219+
result := make([]*Type, 0, len(m.oidToType))
220220
for _, type_ := range m.oidToType {
221-
newMap.RegisterType(type_)
221+
result = append(result, type_)
222222
}
223-
return newMap
223+
return result
224224
}
225225

226226
func NewMap() *Map {

pgxpool/pool.go

Lines changed: 50 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,17 @@ import (
1212

1313
"github.com/jackc/pgx/v5"
1414
"github.com/jackc/pgx/v5/pgconn"
15+
"github.com/jackc/pgx/v5/pgtype"
1516
"github.com/jackc/puddle/v2"
1617
)
1718

18-
var defaultMaxConns = int32(4)
19-
var defaultMinConns = int32(0)
20-
var defaultMaxConnLifetime = time.Hour
21-
var defaultMaxConnIdleTime = time.Minute * 30
22-
var defaultHealthCheckPeriod = time.Minute
19+
var (
20+
defaultMaxConns = int32(4)
21+
defaultMinConns = int32(0)
22+
defaultMaxConnLifetime = time.Hour
23+
defaultMaxConnIdleTime = time.Minute * 30
24+
defaultHealthCheckPeriod = time.Minute
25+
)
2326

2427
type connResource struct {
2528
conn *pgx.Conn
@@ -100,6 +103,10 @@ type Pool struct {
100103

101104
closeOnce sync.Once
102105
closeChan chan struct{}
106+
107+
reuseTypeMap bool
108+
autoLoadTypes []*pgtype.Type
109+
autoLoadMutex *sync.Mutex
103110
}
104111

105112
// Config is the configuration struct for creating a pool. It must be created by [ParseConfig] and then it can be
@@ -147,6 +154,13 @@ type Config struct {
147154
// HealthCheckPeriod is the duration between checks of the health of idle connections.
148155
HealthCheckPeriod time.Duration
149156

157+
// ReuseTypeMaps, if enabled, will reuse the typemap information being used by AutoLoadTypes.
158+
// This removes the need to query the database each time a new connection is created;
159+
// only RegisterTypes will need to be called for each new connection.
160+
// In some situations, where OID mapping can differ between pg servers in the pool, perhaps due
161+
// to certain replication strategies, this should be left disabled.
162+
ReuseTypeMaps bool
163+
150164
createdByParseConfig bool // Used to enforce created by ParseConfig rule.
151165
}
152166

@@ -185,6 +199,7 @@ func NewWithConfig(ctx context.Context, config *Config) (*Pool, error) {
185199
config: config,
186200
beforeConnect: config.BeforeConnect,
187201
afterConnect: config.AfterConnect,
202+
reuseTypeMap: config.ReuseTypeMaps,
188203
beforeAcquire: config.BeforeAcquire,
189204
afterRelease: config.AfterRelease,
190205
beforeClose: config.BeforeClose,
@@ -196,6 +211,7 @@ func NewWithConfig(ctx context.Context, config *Config) (*Pool, error) {
196211
healthCheckPeriod: config.HealthCheckPeriod,
197212
healthCheckChan: make(chan struct{}, 1),
198213
closeChan: make(chan struct{}),
214+
autoLoadMutex: new(sync.Mutex),
199215
}
200216

201217
if t, ok := config.ConnConfig.Tracer.(AcquireTracer); ok {
@@ -223,8 +239,12 @@ func NewWithConfig(ctx context.Context, config *Config) (*Pool, error) {
223239
return nil, err
224240
}
225241
}
226-
227-
conn, err := pgx.ConnectConfig(ctx, connConfig)
242+
var conn *pgx.Conn
243+
if p.reuseTypeMap {
244+
conn, err = p.ConnectConfigReusingTypeMap(ctx, connConfig)
245+
} else {
246+
conn, err = pgx.ConnectConfig(ctx, connConfig)
247+
}
228248
if err != nil {
229249
return nil, err
230250
}
@@ -278,6 +298,29 @@ func NewWithConfig(ctx context.Context, config *Config) (*Pool, error) {
278298
return p, nil
279299
}
280300

301+
func (p *Pool) ConnectConfigReusingTypeMap(ctx context.Context, connConfig *pgx.ConnConfig) (*pgx.Conn, error) {
302+
if connConfig.AutoLoadTypes == nil || len(connConfig.AutoLoadTypes) == 0 {
303+
return pgx.ConnectConfig(ctx, connConfig)
304+
}
305+
if p.autoLoadTypes == nil {
306+
p.autoLoadMutex.Lock()
307+
defer p.autoLoadMutex.Unlock()
308+
if p.autoLoadTypes == nil {
309+
conn, err := pgx.ConnectConfig(ctx, connConfig)
310+
if err == nil {
311+
p.autoLoadTypes = conn.TypeMap().Types()
312+
}
313+
return conn, err
314+
}
315+
}
316+
connConfig.AutoLoadTypes = nil
317+
conn, err := pgx.ConnectConfig(ctx, connConfig)
318+
if err == nil {
319+
conn.TypeMap().RegisterTypes(p.autoLoadTypes)
320+
}
321+
return conn, err
322+
}
323+
281324
// ParseConfig builds a Config from connString. It parses connString with the same behavior as [pgx.ParseConfig] with the
282325
// addition of the following variables:
283326
//
@@ -487,7 +530,6 @@ func (p *Pool) checkMinConns() error {
487530
func (p *Pool) createIdleResources(parentCtx context.Context, targetResources int) error {
488531
ctx, cancel := context.WithCancel(parentCtx)
489532
defer cancel()
490-
491533
errs := make(chan error, targetResources)
492534

493535
for i := 0; i < targetResources; i++ {
@@ -500,7 +542,6 @@ func (p *Pool) createIdleResources(parentCtx context.Context, targetResources in
500542
errs <- err
501543
}()
502544
}
503-
504545
var firstError error
505546
for i := 0; i < targetResources; i++ {
506547
err := <-errs

pgxpool/pool_test.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,35 @@ func TestPoolBeforeConnect(t *testing.T) {
261261
assert.EqualValues(t, "pgx", str)
262262
}
263263

264+
func TestAutoLoadTypes(t *testing.T) {
265+
t.Parallel()
266+
267+
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
268+
defer cancel()
269+
270+
config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
271+
require.NoError(t, err)
272+
273+
controllerConn, err := pgx.Connect(ctx, os.Getenv("PGX_TEST_DATABASE"))
274+
require.NoError(t, err)
275+
defer controllerConn.Close(ctx)
276+
pgxtest.SkipCockroachDB(t, controllerConn, "Server does not support autoloading of uint64")
277+
db1, err := pgxpool.NewWithConfig(ctx, config)
278+
require.NoError(t, err)
279+
defer db1.Close()
280+
db1.Exec(ctx, "DROP DOMAIN IF EXISTS autoload_uint64; CREATE DOMAIN autoload_uint64 as numeric(20,0)")
281+
defer db1.Exec(ctx, "DROP DOMAIN autoload_uint64")
282+
283+
config.ConnConfig.AutoLoadTypes = []string{"autoload_uint64"}
284+
db2, err := pgxpool.NewWithConfig(ctx, config)
285+
require.NoError(t, err)
286+
287+
var n uint64
288+
err = db2.QueryRow(ctx, "select 12::autoload_uint64").Scan(&n)
289+
require.NoError(t, err)
290+
assert.EqualValues(t, uint64(12), n)
291+
}
292+
264293
func TestPoolAfterConnect(t *testing.T) {
265294
t.Parallel()
266295

@@ -676,7 +705,6 @@ func TestPoolQuery(t *testing.T) {
676705
stats = pool.Stat()
677706
assert.EqualValues(t, 0, stats.AcquiredConns())
678707
assert.EqualValues(t, 1, stats.TotalConns())
679-
680708
}
681709

682710
func TestPoolQueryRow(t *testing.T) {
@@ -1104,7 +1132,6 @@ func TestConnectEagerlyReachesMinPoolSize(t *testing.T) {
11041132
}
11051133

11061134
t.Fatal("did not reach min pool size")
1107-
11081135
}
11091136

11101137
func TestPoolSendBatchBatchCloseTwice(t *testing.T) {

0 commit comments

Comments
 (0)