Skip to content

Commit 982e208

Browse files
committed
dbo11y: remove comments from normalized sql text in postgres
Strip away comments from `pg_stat_statements` normalized sql queries, as postgres doesn't do that automatically. While at it: - move some stuff from `lexer.go` closer to postgres component - keep a normalizer instance around in `query_details.go` - polish some test cases formatting
1 parent 1d34414 commit 982e208

File tree

4 files changed

+274
-232
lines changed

4 files changed

+274
-232
lines changed

internal/component/database_observability/lexer.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,6 @@ import (
77
"github.com/DataDog/go-sqllexer"
88
)
99

10-
// ExtractTableNames extracts the table names from a SQL query
11-
func ExtractTableNames(sql string) ([]string, error) {
12-
normalizer := sqllexer.NewNormalizer(
13-
sqllexer.WithCollectTables(true),
14-
)
15-
_, metadata, err := normalizer.Normalize(sql, sqllexer.WithDBMS(sqllexer.DBMSPostgres))
16-
if err != nil {
17-
return nil, fmt.Errorf("failed to normalize SQL: %w", err)
18-
}
19-
20-
// Return all table names, including those that end with "..." for truncated queries, as we can't know if the table name was truncated or not
21-
return metadata.Tables, nil
22-
}
23-
2410
// RedactSql obfuscates a SQL query by replacing literals with ? placeholders
2511
func RedactSql(sql string) string {
2612
obfuscatedSql := sqllexer.NewObfuscator().Obfuscate(sql)

internal/component/database_observability/lexer_test.go

Lines changed: 18 additions & 146 deletions
Original file line numberDiff line numberDiff line change
@@ -49,26 +49,26 @@ func TestPgSqlParser_Redact(t *testing.T) {
4949
sql: `WITH active_users AS (
5050
SELECT * FROM users WHERE last_login > '2024-01-01'
5151
), recent_orders AS (
52-
SELECT o.* FROM orders o
53-
JOIN active_users u ON u.id = o.user_id
52+
SELECT o.* FROM orders o
53+
JOIN active_users u ON u.id = o.user_id
5454
WHERE o.created_at > '2024-03-01'
5555
)
56-
SELECT au.name, COUNT(ro.id) as order_count
57-
FROM active_users au
58-
LEFT JOIN recent_orders ro ON ro.user_id = au.id
59-
GROUP BY au.name
56+
SELECT au.name, COUNT(ro.id) as order_count
57+
FROM active_users au
58+
LEFT JOIN recent_orders ro ON ro.user_id = au.id
59+
GROUP BY au.name
6060
HAVING COUNT(ro.id) > 5`,
6161
want: `WITH active_users AS (
6262
SELECT * FROM users WHERE last_login > ?
6363
), recent_orders AS (
64-
SELECT o.* FROM orders o
65-
JOIN active_users u ON u.id = o.user_id
64+
SELECT o.* FROM orders o
65+
JOIN active_users u ON u.id = o.user_id
6666
WHERE o.created_at > ?
6767
)
68-
SELECT au.name, COUNT(ro.id) as order_count
69-
FROM active_users au
70-
LEFT JOIN recent_orders ro ON ro.user_id = au.id
71-
GROUP BY au.name
68+
SELECT au.name, COUNT(ro.id) as order_count
69+
FROM active_users au
70+
LEFT JOIN recent_orders ro ON ro.user_id = au.id
71+
GROUP BY au.name
7272
HAVING COUNT(ro.id) > ?`,
7373
},
7474
{
@@ -106,13 +106,13 @@ func TestPgSqlParser_Redact(t *testing.T) {
106106
{
107107
name: "WITH statement with UPDATE",
108108
sql: `WITH inactive_users AS (
109-
SELECT id FROM users
109+
SELECT id FROM users
110110
WHERE last_login < '2023-01-01' AND status = 'active'
111111
)
112112
UPDATE users SET status = 'inactive', updated_at = '2024-03-20'
113113
WHERE id IN (SELECT id FROM inactive_users)`,
114114
want: `WITH inactive_users AS (
115-
SELECT id FROM users
115+
SELECT id FROM users
116116
WHERE last_login < ? AND status = ?
117117
)
118118
UPDATE users SET status = ?, updated_at = ?
@@ -121,16 +121,16 @@ func TestPgSqlParser_Redact(t *testing.T) {
121121
{
122122
name: "WITH statement with DELETE",
123123
sql: `WITH old_orders AS (
124-
SELECT id FROM orders
124+
SELECT id FROM orders
125125
WHERE created_at < '2023-01-01' AND status = 'completed'
126126
)
127-
DELETE FROM order_items
127+
DELETE FROM order_items
128128
WHERE order_id IN (SELECT id FROM old_orders)`,
129129
want: `WITH old_orders AS (
130-
SELECT id FROM orders
130+
SELECT id FROM orders
131131
WHERE created_at < ? AND status = ?
132132
)
133-
DELETE FROM order_items
133+
DELETE FROM order_items
134134
WHERE order_id IN (SELECT id FROM old_orders)`,
135135
},
136136
{
@@ -185,134 +185,6 @@ func TestPgSqlParser_Redact(t *testing.T) {
185185
}
186186
}
187187

188-
func TestPgSqlParser_ExtractTableNames(t *testing.T) {
189-
tests := []struct {
190-
name string
191-
sql string
192-
want []string
193-
wantErr bool
194-
}{
195-
{
196-
name: "simple select",
197-
sql: "SELECT * FROM users",
198-
want: []string{"users"},
199-
},
200-
{
201-
name: "select with join",
202-
sql: "SELECT * FROM users u JOIN orders o ON u.id = o.user_id",
203-
want: []string{"orders", "users"},
204-
},
205-
{
206-
name: "select with schema qualified tables",
207-
sql: "SELECT * FROM public.users JOIN sales.orders ON users.id = orders.user_id",
208-
want: []string{"public.users", "sales.orders"},
209-
},
210-
{
211-
name: "insert statement",
212-
sql: "INSERT INTO users (name, email) VALUES ('John', '[email protected]')",
213-
want: []string{"users"},
214-
},
215-
{
216-
name: "update statement",
217-
sql: "UPDATE users SET last_login = NOW() WHERE id = 1",
218-
want: []string{"users"},
219-
},
220-
{
221-
name: "delete statement",
222-
sql: "DELETE FROM users WHERE id = 1",
223-
want: []string{"users"},
224-
},
225-
{
226-
name: "with clause",
227-
sql: `WITH active_users AS (
228-
SELECT * FROM users WHERE status = 'active'
229-
)
230-
SELECT * FROM active_users au
231-
JOIN orders o ON o.user_id = au.id`,
232-
want: []string{"orders", "users"},
233-
},
234-
{
235-
name: "subquery in where clause",
236-
sql: `SELECT * FROM orders
237-
WHERE user_id IN (SELECT id FROM users WHERE status = 'active')`,
238-
want: []string{"orders", "users"},
239-
},
240-
{
241-
name: "multiple schema qualified tables with aliases",
242-
sql: `SELECT u.name, o.total, p.status
243-
FROM public.users u
244-
JOIN sales.orders o ON u.id = o.user_id
245-
LEFT JOIN shipping.packages p ON o.id = p.order_id`,
246-
want: []string{"public.users", "sales.orders", "shipping.packages"},
247-
},
248-
{
249-
name: "truncated query with ...",
250-
sql: "SELECT * FROM users JOIN orders ON users.id = orders.user_id AND...",
251-
want: []string{"users", "orders"},
252-
},
253-
{
254-
name: "truncated query with incomplete comment",
255-
sql: "SELECT * FROM users JOIN orders ON users.id = orders.user_id /* some comment that gets truncated...",
256-
want: []string{"users", "orders"},
257-
},
258-
{
259-
name: "truncated query mid-table name",
260-
sql: "SELECT * FROM users JOIN ord...",
261-
want: []string{"users", "ord..."},
262-
},
263-
{
264-
name: "truncated query with schema qualified tables",
265-
sql: "SELECT * FROM public.users JOIN sales.orders ON users.id = orders.user_id AND...",
266-
want: []string{"public.users", "sales.orders"},
267-
},
268-
{
269-
name: "query with table.* expression",
270-
sql: "SELECT u.*, o.* FROM users u JOIN orders o ON u.id = o.user_id",
271-
want: []string{"users", "orders"},
272-
},
273-
{
274-
name: "query with type cast",
275-
sql: "SELECT u.id, '2024-03-20'::timestamp FROM users u",
276-
want: []string{"users"},
277-
},
278-
}
279-
280-
for _, tt := range tests {
281-
t.Run(tt.name, func(t *testing.T) {
282-
got, err := ExtractTableNames(tt.sql)
283-
if (err != nil) != tt.wantErr {
284-
t.Errorf("ExtractTableNames() error = %v, wantErr %v", err, tt.wantErr)
285-
return
286-
}
287-
if !tt.wantErr {
288-
if len(got) != len(tt.want) {
289-
t.Errorf("ExtractTableNames()\nGOT = %v\nWANT = %v", got, tt.want)
290-
return
291-
}
292-
// Compare slices ignoring order since table names might come in different order
293-
gotMap := make(map[string]bool)
294-
wantMap := make(map[string]bool)
295-
for _, table := range got {
296-
gotMap[table] = true
297-
}
298-
for _, table := range tt.want {
299-
wantMap[table] = true
300-
}
301-
for table := range gotMap {
302-
if !wantMap[table] {
303-
t.Errorf("ExtractTableNames() got unexpected table = %v", table)
304-
}
305-
}
306-
for table := range wantMap {
307-
if !gotMap[table] {
308-
t.Errorf("ExtractTableNames() missing expected table = %v", table)
309-
}
310-
}
311-
}
312-
})
313-
}
314-
}
315-
316188
func TestContainsReservedKeywords(t *testing.T) {
317189
tests := []struct {
318190
name string

internal/component/database_observability/postgres/collector/query_details.go

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"strings"
88
"time"
99

10+
"github.com/DataDog/go-sqllexer"
1011
"github.com/go-kit/log"
1112
"go.uber.org/atomic"
1213

@@ -55,6 +56,7 @@ type QueryDetails struct {
5556
collectInterval time.Duration
5657
entryHandler loki.EntryHandler
5758
tableRegistry *TableRegistry
59+
normalizer *sqllexer.Normalizer
5860

5961
logger log.Logger
6062
running *atomic.Bool
@@ -68,6 +70,7 @@ func NewQueryDetails(args QueryDetailsArguments) (*QueryDetails, error) {
6870
collectInterval: args.CollectInterval,
6971
entryHandler: args.EntryHandler,
7072
tableRegistry: args.TableRegistry,
73+
normalizer: sqllexer.NewNormalizer(sqllexer.WithCollectTables(true), sqllexer.WithCollectComments(true)),
7174
logger: log.With(args.Logger, "collector", QueryDetailsCollector),
7275
running: &atomic.Bool{},
7376
}, nil
@@ -129,23 +132,25 @@ func (c QueryDetails) fetchAndAssociate(ctx context.Context) error {
129132
for rs.Next() {
130133
var queryID, queryText string
131134
var databaseName database
132-
err := rs.Scan(
133-
&queryID,
134-
&queryText,
135-
&databaseName,
136-
)
135+
err := rs.Scan(&queryID, &queryText, &databaseName)
137136
if err != nil {
138137
level.Error(c.logger).Log("msg", "failed to scan result set for pg_stat_statements", "err", err)
139138
continue
140139
}
141140

141+
queryText, err = RemoveComments(c.normalizer, queryText)
142+
if err != nil {
143+
level.Error(c.logger).Log("msg", "failed to remove comments", "err", err)
144+
continue
145+
}
146+
142147
c.entryHandler.Chan() <- database_observability.BuildLokiEntry(
143148
logging.LevelInfo,
144149
OP_QUERY_ASSOCIATION,
145-
fmt.Sprintf(`queryid="%s" querytext=%q datname="%s" engine="postgres"`, queryID, queryText, databaseName),
150+
fmt.Sprintf(`queryid="%s" querytext=%q datname="%s"`, queryID, queryText, databaseName),
146151
)
147152

148-
tables, err := c.tryTokenizeTableNames(queryText)
153+
tables, err := TokenizeTableNames(c.normalizer, queryText)
149154
if err != nil {
150155
level.Error(c.logger).Log("msg", "failed to tokenize table names", "err", err)
151156
continue
@@ -160,7 +165,7 @@ func (c QueryDetails) fetchAndAssociate(ctx context.Context) error {
160165
c.entryHandler.Chan() <- database_observability.BuildLokiEntry(
161166
logging.LevelInfo,
162167
OP_QUERY_PARSED_TABLE_NAME,
163-
fmt.Sprintf(`queryid="%s" datname="%s" table="%s" engine="postgres" validated="%t"`, queryID, databaseName, table, validated),
168+
fmt.Sprintf(`queryid="%s" datname="%s" table="%s" validated="%t"`, queryID, databaseName, table, validated),
164169
)
165170
}
166171
}
@@ -173,12 +178,29 @@ func (c QueryDetails) fetchAndAssociate(ctx context.Context) error {
173178
return nil
174179
}
175180

176-
func (c QueryDetails) tryTokenizeTableNames(sqlText string) ([]string, error) {
181+
func TokenizeTableNames(normalizer *sqllexer.Normalizer, sqlText string) ([]string, error) {
177182
sqlText = strings.TrimSuffix(sqlText, "...")
178-
tables, err := database_observability.ExtractTableNames(sqlText)
183+
_, metadata, err := normalizer.Normalize(sqlText, sqllexer.WithDBMS(sqllexer.DBMSPostgres))
184+
if err != nil {
185+
return nil, fmt.Errorf("failed to tokenize table names: %w", err)
186+
}
187+
188+
return metadata.Tables, nil
189+
}
190+
191+
func RemoveComments(normalizer *sqllexer.Normalizer, sqlText string) (string, error) {
192+
_, metadata, err := normalizer.Normalize(sqlText, sqllexer.WithDBMS(sqllexer.DBMSPostgres))
179193
if err != nil {
180-
return nil, fmt.Errorf("failed to extract table names: %w", err)
194+
return sqlText, fmt.Errorf("failed to redact comments: %w", err)
195+
}
196+
197+
if len(metadata.Comments) == 0 {
198+
return sqlText, nil
199+
}
200+
201+
for _, comment := range metadata.Comments {
202+
sqlText = strings.ReplaceAll(sqlText, comment, "")
181203
}
182204

183-
return tables, nil
205+
return strings.TrimSpace(sqlText), nil
184206
}

0 commit comments

Comments
 (0)