Skip to content

Commit e02c900

Browse files
authored
database_observability: add foreign keys to PG schema_details collector (#4549)
1 parent 2e72ada commit e02c900

File tree

2 files changed

+247
-2
lines changed

2 files changed

+247
-2
lines changed

internal/component/database_observability/postgres/collector/schema_details.go

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,38 @@ const (
126126
GROUP BY index_relations.relname, pg_am.amname, pg_index.indisunique
127127
ORDER BY index_name
128128
`
129+
130+
// selectForeignKeys retrieves foreign key constraints for a specified table
131+
/*
132+
pg_constraint stores all constraints
133+
join pg_class (table info) to get the source table
134+
join to pg_namespace (schema info) for schema filtering
135+
join to pg_class again to get referenced table
136+
use generate_subscripts() to correlate multi-column foreign keys by position
137+
pg_attribute joined twice to get column names for both source and referenced columns
138+
*/
139+
selectForeignKeys = `
140+
SELECT
141+
constraints.conname as constraint_name,
142+
source_column.attname as column_name,
143+
referenced_table.relname as referenced_table_name,
144+
referenced_column.attname as referenced_column_name
145+
FROM pg_constraint constraints
146+
JOIN pg_class source_table ON constraints.conrelid = source_table.oid
147+
JOIN pg_namespace schema ON source_table.relnamespace = schema.oid
148+
JOIN pg_class referenced_table ON constraints.confrelid = referenced_table.oid
149+
JOIN generate_subscripts(constraints.conkey, 1) AS position ON true
150+
JOIN pg_attribute source_column ON constraints.conrelid = source_column.attrelid
151+
AND source_column.attnum = constraints.conkey[position]
152+
AND NOT source_column.attisdropped
153+
JOIN pg_attribute referenced_column ON constraints.confrelid = referenced_column.attrelid
154+
AND referenced_column.attnum = constraints.confkey[position]
155+
AND NOT referenced_column.attisdropped
156+
WHERE constraints.contype = 'f'
157+
AND schema.nspname = $1
158+
AND source_table.relname = $2
159+
ORDER BY constraints.conname, position
160+
`
129161
)
130162

131163
type tableInfo struct {
@@ -137,8 +169,9 @@ type tableInfo struct {
137169
}
138170

139171
type tableSpec struct {
140-
Columns []columnSpec `json:"columns"`
141-
Indexes []indexSpec `json:"indexes,omitempty"`
172+
Columns []columnSpec `json:"columns"`
173+
Indexes []indexSpec `json:"indexes,omitempty"`
174+
ForeignKeys []foreignKey `json:"foreign_keys,omitempty"`
142175
}
143176

144177
type columnSpec struct {
@@ -159,6 +192,13 @@ type indexSpec struct {
159192
Nullable bool `json:"nullable"`
160193
}
161194

195+
type foreignKey struct {
196+
Name string `json:"name"`
197+
ColumnName string `json:"column_name"`
198+
ReferencedTableName string `json:"referenced_table_name"`
199+
ReferencedColumnName string `json:"referenced_column_name"`
200+
}
201+
162202
type SchemaDetailsArguments struct {
163203
DB *sql.DB
164204
EntryHandler loki.EntryHandler
@@ -430,5 +470,32 @@ func (c *SchemaDetails) fetchColumnsDefinitions(ctx context.Context, databaseNam
430470
return nil, err
431471
}
432472

473+
fkRS, err := c.dbConnection.QueryContext(ctx, selectForeignKeys, schemaName, tableName)
474+
if err != nil {
475+
level.Error(c.logger).Log("msg", "failed to query foreign keys", "datname", databaseName, "schema", schemaName, "table", tableName, "err", err)
476+
return nil, err
477+
}
478+
defer fkRS.Close()
479+
480+
for fkRS.Next() {
481+
var constraintName, columnName, referencedTableName, referencedColumnName string
482+
if err := fkRS.Scan(&constraintName, &columnName, &referencedTableName, &referencedColumnName); err != nil {
483+
level.Error(c.logger).Log("msg", "failed to scan foreign keys", "datname", databaseName, "schema", schemaName, "table", tableName, "err", err)
484+
return nil, err
485+
}
486+
487+
tblSpec.ForeignKeys = append(tblSpec.ForeignKeys, foreignKey{
488+
Name: constraintName,
489+
ColumnName: columnName,
490+
ReferencedTableName: referencedTableName,
491+
ReferencedColumnName: referencedColumnName,
492+
})
493+
}
494+
495+
if err := fkRS.Err(); err != nil {
496+
level.Error(c.logger).Log("msg", "failed to iterate over foreign keys result set", "datname", databaseName, "schema", schemaName, "table", tableName, "err", err)
497+
return nil, err
498+
}
499+
433500
return tblSpec, nil
434501
}

internal/component/database_observability/postgres/collector/schema_details_test.go

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,16 @@ func TestSchemaTable(t *testing.T) {
9090
}).AddRow("authors_pkey", "btree", true, pq.StringArray{"id"}, pq.StringArray{}, true),
9191
)
9292

93+
mock.ExpectQuery(selectForeignKeys).WithArgs("public", "authors").RowsWillBeClosed().
94+
WillReturnRows(
95+
sqlmock.NewRows([]string{
96+
"constraint_name",
97+
"column_name",
98+
"referenced_table_name",
99+
"referenced_column_name",
100+
}),
101+
)
102+
93103
err = collector.Start(t.Context())
94104
require.NoError(t, err)
95105

@@ -189,6 +199,16 @@ func TestSchemaTable(t *testing.T) {
189199
}).AddRow("authors_pkey", "btree", true, pq.StringArray{"id"}, pq.StringArray{}, false),
190200
)
191201

202+
mock.ExpectQuery(selectForeignKeys).WithArgs("public", "authors").RowsWillBeClosed().
203+
WillReturnRows(
204+
sqlmock.NewRows([]string{
205+
"constraint_name",
206+
"column_name",
207+
"referenced_table_name",
208+
"referenced_column_name",
209+
}),
210+
)
211+
192212
mock.ExpectQuery(selectColumnNames).WithArgs("public.categories").RowsWillBeClosed().
193213
WillReturnRows(
194214
sqlmock.NewRows([]string{
@@ -213,6 +233,16 @@ func TestSchemaTable(t *testing.T) {
213233
}).AddRow("categories_pkey", "btree", true, pq.StringArray{"id"}, pq.StringArray{}, false),
214234
)
215235

236+
mock.ExpectQuery(selectForeignKeys).WithArgs("public", "categories").RowsWillBeClosed().
237+
WillReturnRows(
238+
sqlmock.NewRows([]string{
239+
"constraint_name",
240+
"column_name",
241+
"referenced_table_name",
242+
"referenced_column_name",
243+
}),
244+
)
245+
216246
mock.ExpectQuery(selectColumnNames).WithArgs("postgis.spatial_ref_sys").RowsWillBeClosed().
217247
WillReturnRows(
218248
sqlmock.NewRows([]string{
@@ -237,6 +267,16 @@ func TestSchemaTable(t *testing.T) {
237267
}),
238268
)
239269

270+
mock.ExpectQuery(selectForeignKeys).WithArgs("postgis", "spatial_ref_sys").RowsWillBeClosed().
271+
WillReturnRows(
272+
sqlmock.NewRows([]string{
273+
"constraint_name",
274+
"column_name",
275+
"referenced_table_name",
276+
"referenced_column_name",
277+
}),
278+
)
279+
240280
err = collector.Start(t.Context())
241281
require.NoError(t, err)
242282

@@ -346,6 +386,16 @@ func TestSchemaTable(t *testing.T) {
346386
AddRow("idx_users_created_at", "btree", false, pq.StringArray{"created_at"}, nil, false),
347387
)
348388

389+
mock.ExpectQuery(selectForeignKeys).WithArgs("public", "users").RowsWillBeClosed().
390+
WillReturnRows(
391+
sqlmock.NewRows([]string{
392+
"constraint_name",
393+
"column_name",
394+
"referenced_table_name",
395+
"referenced_column_name",
396+
}),
397+
)
398+
349399
err = collector.Start(t.Context())
350400
require.NoError(t, err)
351401

@@ -356,6 +406,7 @@ func TestSchemaTable(t *testing.T) {
356406
collector.Stop()
357407
lokiClient.Stop()
358408

409+
// Run this after Stop() to avoid race conditions
359410
err = mock.ExpectationsWereMet()
360411
require.NoError(t, err)
361412

@@ -487,6 +538,16 @@ func TestSchemaTable(t *testing.T) {
487538
}),
488539
)
489540

541+
mock.ExpectQuery(selectForeignKeys).WithArgs("public", "test_table").RowsWillBeClosed().
542+
WillReturnRows(
543+
sqlmock.NewRows([]string{
544+
"constraint_name",
545+
"column_name",
546+
"referenced_table_name",
547+
"referenced_column_name",
548+
}),
549+
)
550+
490551
err = collector.Start(t.Context())
491552
require.NoError(t, err)
492553

@@ -581,6 +642,16 @@ func Test_collector_detects_auto_increment_column(t *testing.T) {
581642
}),
582643
)
583644

645+
mock.ExpectQuery(selectForeignKeys).WithArgs("public", "users").RowsWillBeClosed().
646+
WillReturnRows(
647+
sqlmock.NewRows([]string{
648+
"constraint_name",
649+
"column_name",
650+
"referenced_table_name",
651+
"referenced_column_name",
652+
}),
653+
)
654+
584655
err = collector.Start(t.Context())
585656
require.NoError(t, err)
586657

@@ -672,6 +743,16 @@ func Test_collector_detects_auto_increment_column(t *testing.T) {
672743
}),
673744
)
674745

746+
mock.ExpectQuery(selectForeignKeys).WithArgs("public", "products").RowsWillBeClosed().
747+
WillReturnRows(
748+
sqlmock.NewRows([]string{
749+
"constraint_name",
750+
"column_name",
751+
"referenced_table_name",
752+
"referenced_column_name",
753+
}),
754+
)
755+
675756
err = collector.Start(t.Context())
676757
require.NoError(t, err)
677758

@@ -696,4 +777,101 @@ func Test_collector_detects_auto_increment_column(t *testing.T) {
696777
expectedTableSpec := base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"integer","not_null":true,"auto_increment":true,"primary_key":true},{"name":"code","type":"integer","not_null":true,"auto_increment":true},{"name":"name","type":"character varying(255)","not_null":true}]}`))
697778
require.Equal(t, fmt.Sprintf(`level="info" datname="identity_test_db" schema="public" table="products" table_spec="%s"`, expectedTableSpec), lokiEntries[2].Line)
698779
})
780+
781+
t.Run("collector detects foreign keys", func(t *testing.T) {
782+
t.Parallel()
783+
784+
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
785+
require.NoError(t, err)
786+
defer db.Close()
787+
788+
lokiClient := loki_fake.NewClient(func() {})
789+
790+
collector, err := NewSchemaDetails(SchemaDetailsArguments{
791+
DB: db,
792+
EntryHandler: lokiClient,
793+
Logger: log.NewLogfmtLogger(os.Stderr),
794+
})
795+
require.NoError(t, err)
796+
require.NotNil(t, collector)
797+
798+
mock.ExpectQuery(selectDatabaseName).WithoutArgs().RowsWillBeClosed().
799+
WillReturnRows(
800+
sqlmock.NewRows([]string{
801+
"datname",
802+
}).AddRow(
803+
"books_store",
804+
),
805+
)
806+
807+
mock.ExpectQuery(selectSchemaNames).WithoutArgs().RowsWillBeClosed().
808+
WillReturnRows(
809+
sqlmock.NewRows([]string{
810+
"schema_name",
811+
}).AddRow("public"),
812+
)
813+
814+
mock.ExpectQuery(selectTableNames).WithArgs("public").RowsWillBeClosed().
815+
WillReturnRows(
816+
sqlmock.NewRows([]string{
817+
"table_name",
818+
}).AddRow("books"),
819+
)
820+
821+
mock.ExpectQuery(selectColumnNames).WithArgs("public.books").RowsWillBeClosed().
822+
WillReturnRows(
823+
sqlmock.NewRows([]string{
824+
"column_name",
825+
"column_type",
826+
"not_nullable",
827+
"column_default",
828+
"identity_generation",
829+
"is_primary_key",
830+
}).AddRow("id", "integer", true, "", "", true).
831+
AddRow("title", "character varying(255)", true, "", "", false).
832+
AddRow("author_id", "integer", true, "", "", false).
833+
AddRow("category_id", "integer", false, "", "", false),
834+
)
835+
836+
mock.ExpectQuery(selectIndexes).WithArgs("public", "books").RowsWillBeClosed().
837+
WillReturnRows(
838+
sqlmock.NewRows([]string{
839+
"index_name",
840+
"index_type",
841+
"unique",
842+
"column_names",
843+
"expressions",
844+
"has_nullable_column",
845+
}).AddRow("books_pkey", "btree", true, pq.StringArray{"id"}, pq.StringArray{}, false),
846+
)
847+
848+
mock.ExpectQuery(selectForeignKeys).WithArgs("public", "books").RowsWillBeClosed().
849+
WillReturnRows(
850+
sqlmock.NewRows([]string{
851+
"constraint_name",
852+
"column_name",
853+
"referenced_table_name",
854+
"referenced_column_name",
855+
}).AddRow("fk_books_author", "author_id", "authors", "id").
856+
AddRow("fk_books_category", "category_id", "categories", "id"),
857+
)
858+
859+
err = collector.Start(t.Context())
860+
require.NoError(t, err)
861+
862+
require.Eventually(t, func() bool {
863+
return len(lokiClient.Received()) == 3
864+
}, 2*time.Second, 100*time.Millisecond)
865+
866+
collector.Stop()
867+
lokiClient.Stop()
868+
869+
err = mock.ExpectationsWereMet()
870+
require.NoError(t, err)
871+
872+
lokiEntries := lokiClient.Received()
873+
require.Len(t, lokiEntries, 3)
874+
expectedTableSpec := base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"integer","not_null":true,"primary_key":true},{"name":"title","type":"character varying(255)","not_null":true},{"name":"author_id","type":"integer","not_null":true},{"name":"category_id","type":"integer"}],"indexes":[{"name":"books_pkey","type":"btree","columns":["id"],"unique":true,"nullable":false}],"foreign_keys":[{"name":"fk_books_author","column_name":"author_id","referenced_table_name":"authors","referenced_column_name":"id"},{"name":"fk_books_category","column_name":"category_id","referenced_table_name":"categories","referenced_column_name":"id"}]}`))
875+
require.Equal(t, fmt.Sprintf(`level="info" datname="books_store" schema="public" table="books" table_spec="%s"`, expectedTableSpec), lokiEntries[2].Line)
876+
})
699877
}

0 commit comments

Comments
 (0)