Skip to content

Commit 8759ab2

Browse files
authored
Merge pull request #413 from PDOK/perf-prewarm-index
perf: prewarm search index
2 parents bb2ec91 + feffe11 commit 8759ab2

File tree

5 files changed

+515
-222
lines changed

5 files changed

+515
-222
lines changed

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,14 @@ Set `LOG_SQL=true` environment variable to enable logging of all SQL queries to
253253
stdout for debug purposes. Only applies to OGC API Features with
254254
GeoPackage and/or PostgreSQL data source.
255255
256+
#### PostgreSQL shared_buffers
257+
258+
This only applies when Features Search (geocoding) is enabled: You can inspect the PostgreSQL buffercache for the search
259+
index by invoking `select * from gokoala_inspect_buffercache();` in the PostgreSQL database. The search index is
260+
automatically prewarmed after ETL is done. But in case you want to invoke it manually (one example would be after a
261+
restart of the database) you can invoke `select gokoala_prewarm_partitions();`. This function defaults to the current
262+
search index, but this can be overridden when desired.
263+
256264
## Develop
257265
258266
Design principles:

internal/ogc/features_search/etl/etl.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ type Load interface {
5454
PostLoad(collectionID string, index string, revision string) error
5555

5656
// Optimize once ETL is completed (optional)
57-
Optimize() error
57+
Optimize(index string) error
5858

5959
// Close connection to the target database
6060
Close()
@@ -154,7 +154,7 @@ func ImportFile(collection config.Collection, searchIndex string, revision strin
154154

155155
if !skipOptimize {
156156
log.Println("start optimizing")
157-
if err = target.Optimize(); err != nil {
157+
if err = target.Optimize(searchIndex); err != nil {
158158
return err
159159
}
160160
log.Println("completed optimizing")

internal/ogc/features_search/etl/etl_test.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,91 @@ func TestCreateSearchIndexIdempotent(t *testing.T) {
7070
require.NoError(t, err)
7171
}
7272

73+
func TestPreWarmPartitionFunction(t *testing.T) {
74+
if testing.Short() {
75+
t.Skip("Skipping integration test in short mode")
76+
}
77+
ctx := context.Background()
78+
79+
// given existing populated search index
80+
dbPort, postgisContainer, err := setupPostgis(ctx, t)
81+
if err != nil {
82+
t.Error(err)
83+
}
84+
defer terminateContainer(ctx, t, postgisContainer)
85+
dbConn := makeDbConnection(dbPort)
86+
87+
err = CreateSearchIndex(dbConn, "search_index", 28992, language.Dutch)
88+
require.NoError(t, err)
89+
err = insertTestData(ctx, dbConn)
90+
require.NoError(t, err)
91+
92+
db, err := pgx.Connect(ctx, dbConn)
93+
require.NoError(t, err)
94+
defer db.Close(ctx)
95+
96+
// when/then
97+
rows, err := db.Query(ctx, `SELECT * FROM gokoala_prewarm_partitions(idx_suffixes := array['ts_idx'])`)
98+
require.NoError(t, err)
99+
defer rows.Close()
100+
}
101+
102+
func TestInspectBufferCacheFunction(t *testing.T) {
103+
if testing.Short() {
104+
t.Skip("Skipping integration test in short mode")
105+
}
106+
ctx := context.Background()
107+
108+
// given existing populated search index
109+
dbPort, postgisContainer, err := setupPostgis(ctx, t)
110+
if err != nil {
111+
t.Error(err)
112+
}
113+
defer terminateContainer(ctx, t, postgisContainer)
114+
dbConn := makeDbConnection(dbPort)
115+
116+
err = CreateSearchIndex(dbConn, "search_index", 28992, language.Dutch)
117+
require.NoError(t, err)
118+
err = insertTestData(ctx, dbConn)
119+
require.NoError(t, err)
120+
121+
db, err := pgx.Connect(ctx, dbConn)
122+
require.NoError(t, err)
123+
defer db.Close(ctx)
124+
125+
// when
126+
rows, err := db.Query(ctx, `SELECT * FROM gokoala_inspect_buffercache()`)
127+
require.NoError(t, err)
128+
defer rows.Close()
129+
130+
type result struct {
131+
ObjectName string
132+
Kind string
133+
TotalSize string
134+
CachedSize string
135+
PercentageCached *float64
136+
}
137+
138+
var got []result
139+
for rows.Next() {
140+
var r result
141+
err := rows.Scan(
142+
&r.ObjectName,
143+
&r.Kind,
144+
&r.TotalSize,
145+
&r.CachedSize,
146+
&r.PercentageCached,
147+
)
148+
require.NoError(t, err)
149+
got = append(got, r)
150+
}
151+
152+
// then
153+
require.NoError(t, rows.Err())
154+
require.NotEmpty(t, got)
155+
require.NotEmpty(t, got[0].ObjectName)
156+
}
157+
73158
func makeDbConnection(dbPort nat.Port) string {
74159
return fmt.Sprintf("postgres://postgres:postgres@127.0.0.1:%d/%s?sslmode=disable", dbPort.Int(), "search_db")
75160
}

internal/ogc/features_search/etl/load/postgres.go

Lines changed: 15 additions & 220 deletions
Original file line numberDiff line numberDiff line change
@@ -11,50 +11,15 @@ import (
1111
t "github.com/PDOK/gokoala/internal/ogc/features_search/etl/transform"
1212
"github.com/jackc/pgx/v5"
1313
pgxgeom "github.com/twpayne/pgx-geom"
14-
"golang.org/x/text/language"
1514
)
1615

1716
const (
18-
indexNameFullText = "ts_idx"
19-
indexNameGeometry = "geometry_idx"
20-
indexNameBbox = "bbox_idx"
21-
indexNamePreRank = "pre_rank_idx"
22-
2317
alphaPartition = "_alpha"
2418
betaPartition = "_beta"
2519

2620
postgresDetachErr = "already pending detach in partitioned table"
2721
)
2822

29-
var (
30-
postgresExtensions = []string{"postgis", "unaccent"}
31-
32-
indexNames = []string{indexNameFullText, indexNameGeometry, indexNameBbox, indexNamePreRank}
33-
34-
//nolint:dupword
35-
tableDefinition = `
36-
create table if not exists %[1]s (
37-
id serial,
38-
feature_id text not null,
39-
external_fid text null,
40-
collection_id text not null,
41-
collection_version int not null,
42-
display_name text not null,
43-
suggest text not null,
44-
geometry_type geometry_type not null,
45-
bbox geometry(polygon, %[2]d) null,
46-
geometry geometry(point, %[2]d) null,
47-
ts tsvector generated always as (to_tsvector('custom_dict', suggest)) stored
48-
) %[3]s;`
49-
50-
metadataTableDefinition = `
51-
create table if not exists %[1]s_metadata (
52-
collection_id text not null,
53-
revision uuid not null,
54-
revision_date timestamptz default now() not null
55-
);`
56-
)
57-
5823
type Postgres struct {
5924
db *pgx.Conn
6025

@@ -180,10 +145,22 @@ func (p *Postgres) PostLoad(collectionID string, index string, revision string)
180145
return nil
181146
}
182147

183-
func (p *Postgres) Optimize() error {
184-
_, err := p.db.Exec(context.Background(), `vacuum analyze;`)
148+
func (p *Postgres) Optimize(index string) error {
149+
log.Println("perform targeted VACUUM + ANALYZE on loaded partition")
150+
_, err := p.db.Exec(context.Background(), fmt.Sprintf(`vacuum analyze %s;`, p.partitionToLoad))
151+
if err != nil {
152+
return fmt.Errorf("failed optimizing: error performing vacuum analyze on loaded partition: %w", err)
153+
}
154+
_, err = p.db.Exec(context.Background(), fmt.Sprintf(`analyze %s;`, index))
155+
if err != nil {
156+
return fmt.Errorf("failed optimizing: error performing analyze on search index: %w", err)
157+
}
158+
159+
// Execute pg_prewarm on all partitions and important indexes (forcing these into Postgres shared_buffers memory)
160+
log.Println("prewarming partitions")
161+
_, err = p.db.Exec(context.Background(), `select gokoala_prewarm_partitions()`)
185162
if err != nil {
186-
return fmt.Errorf("failed optimizing: error performing vacuum analyze: %w", err)
163+
return fmt.Errorf("failed optimizing: prewarm function failed: %w", err)
187164
}
188165
return nil
189166
}
@@ -209,125 +186,6 @@ func (p *Postgres) GetRevision(collectionID string, index string) (string, error
209186
return currentRevision, nil
210187
}
211188

212-
// Init initialize search index. Should be idempotent!
213-
//
214-
// Since not all DDL in Postgres support the "if not exists" syntax we use a bit
215-
// of pl/pgsql to make it idempotent.
216-
func (p *Postgres) Init(index string, srid int, lang language.Tag) error {
217-
geometryType := `
218-
do $$ begin
219-
create type geometry_type as enum ('POINT', 'MULTIPOINT', 'LINESTRING', 'MULTILINESTRING', 'POLYGON', 'MULTIPOLYGON');
220-
exception
221-
when duplicate_object then null;
222-
end $$;`
223-
_, err := p.db.Exec(context.Background(), geometryType)
224-
if err != nil {
225-
return fmt.Errorf("error creating geometry type: %w", err)
226-
}
227-
228-
textSearchConfig := `
229-
do $$ begin
230-
create text search configuration custom_dict (copy = simple);
231-
exception
232-
when unique_violation then null;
233-
end $$;`
234-
_, err = p.db.Exec(context.Background(), textSearchConfig)
235-
if err != nil {
236-
return fmt.Errorf("error creating text search configuration: %w", err)
237-
}
238-
239-
// This adds the 'unaccent' extension to allow searching with/without diacritics. Must happen in separate transaction.
240-
alterTextSearchConfig := `
241-
do $$ begin
242-
alter text search configuration custom_dict
243-
alter mapping for hword, hword_part, word
244-
with unaccent, simple;
245-
exception
246-
when unique_violation then null;
247-
end $$;`
248-
_, err = p.db.Exec(context.Background(), alterTextSearchConfig)
249-
if err != nil {
250-
return fmt.Errorf("error altering text search configuration: %w", err)
251-
}
252-
253-
// create search index table
254-
_, err = p.db.Exec(context.Background(), fmt.Sprintf(tableDefinition, index, srid, "partition by list(collection_id)"))
255-
if err != nil {
256-
return fmt.Errorf("error creating search index table: %w", err)
257-
}
258-
259-
// create primary key when it doesn't exist yet
260-
primaryKey := fmt.Sprintf(`
261-
do $$
262-
begin
263-
if not exists (
264-
select 1
265-
from pg_constraint
266-
where conrelid = '%[1]s'::regclass
267-
and contype = 'p'
268-
)
269-
then
270-
alter table %[1]s
271-
add constraint %[1]s_pkey primary key (id, collection_id, collection_version);
272-
end if;
273-
end;
274-
$$;`, index)
275-
_, err = p.db.Exec(context.Background(), primaryKey)
276-
if err != nil {
277-
return fmt.Errorf("error creating primary key: %w", err)
278-
}
279-
280-
// create search index metadata table
281-
_, err = p.db.Exec(context.Background(), fmt.Sprintf(metadataTableDefinition, index))
282-
if err != nil {
283-
return fmt.Errorf("error creating search index metadata table: %w", err)
284-
}
285-
286-
// create metadata primary key when it doesn't exist yet
287-
metadataPrimaryKey := fmt.Sprintf(`
288-
do $$
289-
begin
290-
if not exists (
291-
select 1
292-
from pg_constraint
293-
where conrelid = '%[1]s_metadata'::regclass
294-
and contype = 'p'
295-
)
296-
then
297-
alter table %[1]s_metadata
298-
add constraint %[1]s_metadata_pkey primary key (collection_id);
299-
end if;
300-
end;
301-
$$;`, index)
302-
_, err = p.db.Exec(context.Background(), metadataPrimaryKey)
303-
if err != nil {
304-
return fmt.Errorf("error creating metadata primary key: %w", err)
305-
}
306-
307-
// create custom collation to correctly handle "numbers in strings" when sorting results
308-
// see https://www.postgresql.org/docs/12/collation.html#id-1.6.10.4.5.7.5
309-
collation := fmt.Sprintf(`create collation if not exists custom_numeric (provider = icu, locale = '%s-u-kn-true');`, lang.String())
310-
_, err = p.db.Exec(context.Background(), collation)
311-
if err != nil {
312-
return fmt.Errorf("error creating numeric collation: %w", err)
313-
}
314-
315-
if err = p.createIndexes(index, false); err != nil {
316-
return err
317-
}
318-
return err
319-
}
320-
321-
func createExtensions(ctx context.Context, db *pgx.Conn) error {
322-
for _, ext := range postgresExtensions {
323-
_, err := db.Exec(ctx, `create extension if not exists `+ext+`;`)
324-
if err != nil {
325-
return fmt.Errorf("error creating %s extension: %w", ext, err)
326-
}
327-
}
328-
return nil
329-
}
330-
331189
func (p *Postgres) isPartition(collectionID string, index string) (bool, error) {
332190
result := false
333191
err := p.db.QueryRow(context.Background(), `select exists (
@@ -341,66 +199,3 @@ func (p *Postgres) isPartition(collectionID string, index string) (bool, error)
341199
) as is_partition_of_search_index;`, index, collectionID).Scan(&result)
342200
return result, err
343201
}
344-
345-
func (p *Postgres) createIndexes(table string, usePrefix bool) error {
346-
// GIN indexes are best for text search
347-
indexName := indexNameFullText
348-
if usePrefix {
349-
indexName = fmt.Sprintf("%s_%s", table, indexNameFullText)
350-
}
351-
_, err := p.db.Exec(context.Background(), fmt.Sprintf(`create index if not exists %[2]s on only %[1]s using gin(ts);`, table, indexName))
352-
if err != nil {
353-
return fmt.Errorf("error creating GIN index: %w", err)
354-
}
355-
356-
// GIST indexes for geometry column to support search within a bounding box
357-
indexName = indexNameGeometry
358-
if usePrefix {
359-
indexName = fmt.Sprintf("%s_%s", table, indexNameGeometry)
360-
}
361-
_, err = p.db.Exec(context.Background(), fmt.Sprintf(`create index if not exists %[2]s on only %[1]s using gist(geometry);`, table, indexName))
362-
if err != nil {
363-
return fmt.Errorf("error creating geometry GIST index: %w", err)
364-
}
365-
366-
// GIST indexes for bbox column to support search within a bounding box
367-
indexName = indexNameBbox
368-
if usePrefix {
369-
indexName = fmt.Sprintf("%s_%s", table, indexNameBbox)
370-
}
371-
_, err = p.db.Exec(context.Background(), fmt.Sprintf(`create index if not exists %[2]s on only %[1]s using gist(bbox);`, table, indexName))
372-
if err != nil {
373-
return fmt.Errorf("error creating bbox GIST index: %w", err)
374-
}
375-
376-
// index used to pre-rank results when generic search terms are used
377-
indexName = indexNamePreRank
378-
if usePrefix {
379-
indexName = fmt.Sprintf("%s_%s", table, indexNamePreRank)
380-
}
381-
preRankIndex := fmt.Sprintf(`create index if not exists %[2]s on only %[1]s
382-
(array_length(string_to_array(suggest, ' '), 1) asc, display_name collate "custom_numeric" asc);`, table, indexName)
383-
_, err = p.db.Exec(context.Background(), preRankIndex)
384-
if err != nil {
385-
return fmt.Errorf("error creating pre-rank index: %w", err)
386-
}
387-
return nil
388-
}
389-
390-
// CHECK constraint is to avoid ACCESS EXCLUSIVE lock on partition as mentioned on
391-
// https://www.postgresql.org/docs/current/ddl-partitioning.html#DDL-PARTITIONING-DECLARATIVE-MAINTENANCE
392-
func (p *Postgres) createCheck(collectionID string) error {
393-
dropCheck := fmt.Sprintf(`alter table %[1]s drop constraint if exists %[1]s_col_chk;`, p.partitionToLoad)
394-
_, err := p.db.Exec(context.Background(), dropCheck)
395-
if err != nil {
396-
return fmt.Errorf("error dropping CHECK constraint: %w", err)
397-
}
398-
399-
addCheck := fmt.Sprintf(`alter table if exists %[1]s add constraint %[1]s_col_chk check (collection_id = '%[2]s');`,
400-
p.partitionToLoad, collectionID)
401-
_, err = p.db.Exec(context.Background(), addCheck)
402-
if err != nil {
403-
return fmt.Errorf("error creating CHECK constraint: %w", err)
404-
}
405-
return nil
406-
}

0 commit comments

Comments
 (0)