diff --git a/internal/component/database_observability/lexer.go b/internal/component/database_observability/lexer.go index 481e8bc5c5..3ac88b0c8e 100644 --- a/internal/component/database_observability/lexer.go +++ b/internal/component/database_observability/lexer.go @@ -7,20 +7,6 @@ import ( "github.com/DataDog/go-sqllexer" ) -// ExtractTableNames extracts the table names from a SQL query -func ExtractTableNames(sql string) ([]string, error) { - normalizer := sqllexer.NewNormalizer( - sqllexer.WithCollectTables(true), - ) - _, metadata, err := normalizer.Normalize(sql, sqllexer.WithDBMS(sqllexer.DBMSPostgres)) - if err != nil { - return nil, fmt.Errorf("failed to normalize SQL: %w", err) - } - - // Return all table names, including those that end with "..." for truncated queries, as we can't know if the table name was truncated or not - return metadata.Tables, nil -} - // RedactSql obfuscates a SQL query by replacing literals with ? placeholders func RedactSql(sql string) string { obfuscatedSql := sqllexer.NewObfuscator().Obfuscate(sql) diff --git a/internal/component/database_observability/lexer_test.go b/internal/component/database_observability/lexer_test.go index 93ed3c3653..7208748482 100644 --- a/internal/component/database_observability/lexer_test.go +++ b/internal/component/database_observability/lexer_test.go @@ -49,26 +49,26 @@ func TestPgSqlParser_Redact(t *testing.T) { sql: `WITH active_users AS ( SELECT * FROM users WHERE last_login > '2024-01-01' ), recent_orders AS ( - SELECT o.* FROM orders o - JOIN active_users u ON u.id = o.user_id + SELECT o.* FROM orders o + JOIN active_users u ON u.id = o.user_id WHERE o.created_at > '2024-03-01' ) - SELECT au.name, COUNT(ro.id) as order_count - FROM active_users au - LEFT JOIN recent_orders ro ON ro.user_id = au.id - GROUP BY au.name + SELECT au.name, COUNT(ro.id) as order_count + FROM active_users au + LEFT JOIN recent_orders ro ON ro.user_id = au.id + GROUP BY au.name HAVING COUNT(ro.id) > 5`, want: `WITH active_users AS ( SELECT * FROM users WHERE last_login > ? ), recent_orders AS ( - SELECT o.* FROM orders o - JOIN active_users u ON u.id = o.user_id + SELECT o.* FROM orders o + JOIN active_users u ON u.id = o.user_id WHERE o.created_at > ? ) - SELECT au.name, COUNT(ro.id) as order_count - FROM active_users au - LEFT JOIN recent_orders ro ON ro.user_id = au.id - GROUP BY au.name + SELECT au.name, COUNT(ro.id) as order_count + FROM active_users au + LEFT JOIN recent_orders ro ON ro.user_id = au.id + GROUP BY au.name HAVING COUNT(ro.id) > ?`, }, { @@ -106,13 +106,13 @@ func TestPgSqlParser_Redact(t *testing.T) { { name: "WITH statement with UPDATE", sql: `WITH inactive_users AS ( - SELECT id FROM users + SELECT id FROM users WHERE last_login < '2023-01-01' AND status = 'active' ) UPDATE users SET status = 'inactive', updated_at = '2024-03-20' WHERE id IN (SELECT id FROM inactive_users)`, want: `WITH inactive_users AS ( - SELECT id FROM users + SELECT id FROM users WHERE last_login < ? AND status = ? ) UPDATE users SET status = ?, updated_at = ? @@ -121,16 +121,16 @@ func TestPgSqlParser_Redact(t *testing.T) { { name: "WITH statement with DELETE", sql: `WITH old_orders AS ( - SELECT id FROM orders + SELECT id FROM orders WHERE created_at < '2023-01-01' AND status = 'completed' ) - DELETE FROM order_items + DELETE FROM order_items WHERE order_id IN (SELECT id FROM old_orders)`, want: `WITH old_orders AS ( - SELECT id FROM orders + SELECT id FROM orders WHERE created_at < ? AND status = ? ) - DELETE FROM order_items + DELETE FROM order_items WHERE order_id IN (SELECT id FROM old_orders)`, }, { @@ -185,134 +185,6 @@ func TestPgSqlParser_Redact(t *testing.T) { } } -func TestPgSqlParser_ExtractTableNames(t *testing.T) { - tests := []struct { - name string - sql string - want []string - wantErr bool - }{ - { - name: "simple select", - sql: "SELECT * FROM users", - want: []string{"users"}, - }, - { - name: "select with join", - sql: "SELECT * FROM users u JOIN orders o ON u.id = o.user_id", - want: []string{"orders", "users"}, - }, - { - name: "select with schema qualified tables", - sql: "SELECT * FROM public.users JOIN sales.orders ON users.id = orders.user_id", - want: []string{"public.users", "sales.orders"}, - }, - { - name: "insert statement", - sql: "INSERT INTO users (name, email) VALUES ('John', 'john@example.com')", - want: []string{"users"}, - }, - { - name: "update statement", - sql: "UPDATE users SET last_login = NOW() WHERE id = 1", - want: []string{"users"}, - }, - { - name: "delete statement", - sql: "DELETE FROM users WHERE id = 1", - want: []string{"users"}, - }, - { - name: "with clause", - sql: `WITH active_users AS ( - SELECT * FROM users WHERE status = 'active' - ) - SELECT * FROM active_users au - JOIN orders o ON o.user_id = au.id`, - want: []string{"orders", "users"}, - }, - { - name: "subquery in where clause", - sql: `SELECT * FROM orders - WHERE user_id IN (SELECT id FROM users WHERE status = 'active')`, - want: []string{"orders", "users"}, - }, - { - name: "multiple schema qualified tables with aliases", - sql: `SELECT u.name, o.total, p.status - FROM public.users u - JOIN sales.orders o ON u.id = o.user_id - LEFT JOIN shipping.packages p ON o.id = p.order_id`, - want: []string{"public.users", "sales.orders", "shipping.packages"}, - }, - { - name: "truncated query with ...", - sql: "SELECT * FROM users JOIN orders ON users.id = orders.user_id AND...", - want: []string{"users", "orders"}, - }, - { - name: "truncated query with incomplete comment", - sql: "SELECT * FROM users JOIN orders ON users.id = orders.user_id /* some comment that gets truncated...", - want: []string{"users", "orders"}, - }, - { - name: "truncated query mid-table name", - sql: "SELECT * FROM users JOIN ord...", - want: []string{"users", "ord..."}, - }, - { - name: "truncated query with schema qualified tables", - sql: "SELECT * FROM public.users JOIN sales.orders ON users.id = orders.user_id AND...", - want: []string{"public.users", "sales.orders"}, - }, - { - name: "query with table.* expression", - sql: "SELECT u.*, o.* FROM users u JOIN orders o ON u.id = o.user_id", - want: []string{"users", "orders"}, - }, - { - name: "query with type cast", - sql: "SELECT u.id, '2024-03-20'::timestamp FROM users u", - want: []string{"users"}, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := ExtractTableNames(tt.sql) - if (err != nil) != tt.wantErr { - t.Errorf("ExtractTableNames() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !tt.wantErr { - if len(got) != len(tt.want) { - t.Errorf("ExtractTableNames()\nGOT = %v\nWANT = %v", got, tt.want) - return - } - // Compare slices ignoring order since table names might come in different order - gotMap := make(map[string]bool) - wantMap := make(map[string]bool) - for _, table := range got { - gotMap[table] = true - } - for _, table := range tt.want { - wantMap[table] = true - } - for table := range gotMap { - if !wantMap[table] { - t.Errorf("ExtractTableNames() got unexpected table = %v", table) - } - } - for table := range wantMap { - if !gotMap[table] { - t.Errorf("ExtractTableNames() missing expected table = %v", table) - } - } - } - }) - } -} - func TestContainsReservedKeywords(t *testing.T) { tests := []struct { name string diff --git a/internal/component/database_observability/postgres/collector/query_details.go b/internal/component/database_observability/postgres/collector/query_details.go index b59a25474e..1c991cd6b9 100644 --- a/internal/component/database_observability/postgres/collector/query_details.go +++ b/internal/component/database_observability/postgres/collector/query_details.go @@ -7,6 +7,7 @@ import ( "strings" "time" + "github.com/DataDog/go-sqllexer" "github.com/go-kit/log" "go.uber.org/atomic" @@ -55,6 +56,7 @@ type QueryDetails struct { collectInterval time.Duration entryHandler loki.EntryHandler tableRegistry *TableRegistry + normalizer *sqllexer.Normalizer logger log.Logger running *atomic.Bool @@ -68,6 +70,7 @@ func NewQueryDetails(args QueryDetailsArguments) (*QueryDetails, error) { collectInterval: args.CollectInterval, entryHandler: args.EntryHandler, tableRegistry: args.TableRegistry, + normalizer: sqllexer.NewNormalizer(sqllexer.WithCollectTables(true), sqllexer.WithCollectComments(true)), logger: log.With(args.Logger, "collector", QueryDetailsCollector), running: &atomic.Bool{}, }, nil @@ -129,23 +132,25 @@ func (c QueryDetails) fetchAndAssociate(ctx context.Context) error { for rs.Next() { var queryID, queryText string var databaseName database - err := rs.Scan( - &queryID, - &queryText, - &databaseName, - ) + err := rs.Scan(&queryID, &queryText, &databaseName) if err != nil { level.Error(c.logger).Log("msg", "failed to scan result set for pg_stat_statements", "err", err) continue } + queryText, err = RemoveComments(c.normalizer, queryText) + if err != nil { + level.Error(c.logger).Log("msg", "failed to remove comments", "err", err) + continue + } + c.entryHandler.Chan() <- database_observability.BuildLokiEntry( logging.LevelInfo, OP_QUERY_ASSOCIATION, - fmt.Sprintf(`queryid="%s" querytext=%q datname="%s" engine="postgres"`, queryID, queryText, databaseName), + fmt.Sprintf(`queryid="%s" querytext=%q datname="%s"`, queryID, queryText, databaseName), ) - tables, err := c.tryTokenizeTableNames(queryText) + tables, err := TokenizeTableNames(c.normalizer, queryText) if err != nil { level.Error(c.logger).Log("msg", "failed to tokenize table names", "err", err) continue @@ -160,7 +165,7 @@ func (c QueryDetails) fetchAndAssociate(ctx context.Context) error { c.entryHandler.Chan() <- database_observability.BuildLokiEntry( logging.LevelInfo, OP_QUERY_PARSED_TABLE_NAME, - fmt.Sprintf(`queryid="%s" datname="%s" table="%s" engine="postgres" validated="%t"`, queryID, databaseName, table, validated), + fmt.Sprintf(`queryid="%s" datname="%s" table="%s" validated="%t"`, queryID, databaseName, table, validated), ) } } @@ -173,12 +178,29 @@ func (c QueryDetails) fetchAndAssociate(ctx context.Context) error { return nil } -func (c QueryDetails) tryTokenizeTableNames(sqlText string) ([]string, error) { +func TokenizeTableNames(normalizer *sqllexer.Normalizer, sqlText string) ([]string, error) { sqlText = strings.TrimSuffix(sqlText, "...") - tables, err := database_observability.ExtractTableNames(sqlText) + _, metadata, err := normalizer.Normalize(sqlText, sqllexer.WithDBMS(sqllexer.DBMSPostgres)) + if err != nil { + return nil, fmt.Errorf("failed to tokenize table names: %w", err) + } + + return metadata.Tables, nil +} + +func RemoveComments(normalizer *sqllexer.Normalizer, sqlText string) (string, error) { + _, metadata, err := normalizer.Normalize(sqlText, sqllexer.WithDBMS(sqllexer.DBMSPostgres)) if err != nil { - return nil, fmt.Errorf("failed to extract table names: %w", err) + return sqlText, fmt.Errorf("failed to redact comments: %w", err) + } + + if len(metadata.Comments) == 0 { + return sqlText, nil + } + + for _, comment := range metadata.Comments { + sqlText = strings.ReplaceAll(sqlText, comment, "") } - return tables, nil + return strings.TrimSpace(sqlText), nil } diff --git a/internal/component/database_observability/postgres/collector/query_details_test.go b/internal/component/database_observability/postgres/collector/query_details_test.go index 5868f8b82b..e38c4e3b5a 100644 --- a/internal/component/database_observability/postgres/collector/query_details_test.go +++ b/internal/component/database_observability/postgres/collector/query_details_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/DATA-DOG/go-sqlmock" + "github.com/DataDog/go-sqllexer" "github.com/go-kit/log" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" @@ -40,8 +41,8 @@ func TestQueryDetails(t *testing.T) { {"op": OP_QUERY_PARSED_TABLE_NAME}, }, logsLines: []string{ - "level=\"info\" queryid=\"abc123\" querytext=\"SELECT * FROM some_table WHERE id = $1\" datname=\"some_database\" engine=\"postgres\"", - `level="info" queryid="abc123" datname="some_database" table="some_table" engine="postgres" validated="true"`, + `level="info" queryid="abc123" querytext="SELECT * FROM some_table WHERE id = $1" datname="some_database"`, + `level="info" queryid="abc123" datname="some_database" table="some_table" validated="true"`, }, tableRegistry: &TableRegistry{ tables: map[database]map[schema]map[table]struct{}{ @@ -65,8 +66,8 @@ func TestQueryDetails(t *testing.T) { {"op": OP_QUERY_PARSED_TABLE_NAME}, }, logsLines: []string{ - "level=\"info\" queryid=\"abc123\" querytext=\"SELECT * FROM public.users WHERE id = $1\" datname=\"some_database\" engine=\"postgres\"", - `level="info" queryid="abc123" datname="some_database" table="public.users" engine="postgres" validated="true"`, + `level="info" queryid="abc123" querytext="SELECT * FROM public.users WHERE id = $1" datname="some_database"`, + `level="info" queryid="abc123" datname="some_database" table="public.users" validated="true"`, }, tableRegistry: &TableRegistry{ tables: map[database]map[schema]map[table]struct{}{ @@ -90,8 +91,8 @@ func TestQueryDetails(t *testing.T) { {"op": OP_QUERY_PARSED_TABLE_NAME}, }, logsLines: []string{ - "level=\"info\" queryid=\"abc123\" querytext=\"WITH some_with_table AS (SELECT * FROM some_table WHERE id = $1) SELECT * FROM some_with_table\" datname=\"some_database\" engine=\"postgres\"", - `level="info" queryid="abc123" datname="some_database" table="some_table" engine="postgres" validated="false"`, + `level="info" queryid="abc123" querytext="WITH some_with_table AS (SELECT * FROM some_table WHERE id = $1) SELECT * FROM some_with_table" datname="some_database"`, + `level="info" queryid="abc123" datname="some_database" table="some_table" validated="false"`, }, }, { @@ -106,8 +107,8 @@ func TestQueryDetails(t *testing.T) { {"op": OP_QUERY_PARSED_TABLE_NAME}, }, logsLines: []string{ - "level=\"info\" queryid=\"abc123\" querytext=\"INSERT INTO some_table (id, name) VALUES (...)\" datname=\"some_database\" engine=\"postgres\"", - `level="info" queryid="abc123" datname="some_database" table="some_table" engine="postgres" validated="false"`, + `level="info" queryid="abc123" querytext="INSERT INTO some_table (id, name) VALUES (...)" datname="some_database"`, + `level="info" queryid="abc123" datname="some_database" table="some_table" validated="false"`, }, }, { @@ -123,9 +124,9 @@ func TestQueryDetails(t *testing.T) { {"op": OP_QUERY_PARSED_TABLE_NAME}, }, logsLines: []string{ - "level=\"info\" queryid=\"abc123\" querytext=\"WITH some_with_table AS (SELECT id, name FROM some_other_table WHERE id = $1) INSERT INTO some_table (id, name) SELECT id, name FROM some_with_table\" datname=\"some_database\" engine=\"postgres\"", - `level="info" queryid="abc123" datname="some_database" table="some_other_table" engine="postgres" validated="false"`, - `level="info" queryid="abc123" datname="some_database" table="some_table" engine="postgres" validated="false"`, + `level="info" queryid="abc123" querytext="WITH some_with_table AS (SELECT id, name FROM some_other_table WHERE id = $1) INSERT INTO some_table (id, name) SELECT id, name FROM some_with_table" datname="some_database"`, + `level="info" queryid="abc123" datname="some_database" table="some_other_table" validated="false"`, + `level="info" queryid="abc123" datname="some_database" table="some_table" validated="false"`, }, }, { @@ -140,8 +141,8 @@ func TestQueryDetails(t *testing.T) { {"op": OP_QUERY_PARSED_TABLE_NAME}, }, logsLines: []string{ - "level=\"info\" queryid=\"abc123\" querytext=\"UPDATE some_table SET active = false, reason = ? WHERE id = $1 AND name = $2\" datname=\"some_database\" engine=\"postgres\"", - `level="info" queryid="abc123" datname="some_database" table="some_table" engine="postgres" validated="false"`, + `level="info" queryid="abc123" querytext="UPDATE some_table SET active = false, reason = ? WHERE id = $1 AND name = $2" datname="some_database"`, + `level="info" queryid="abc123" datname="some_database" table="some_table" validated="false"`, }, }, { @@ -156,8 +157,8 @@ func TestQueryDetails(t *testing.T) { {"op": OP_QUERY_PARSED_TABLE_NAME}, }, logsLines: []string{ - "level=\"info\" queryid=\"abc123\" querytext=\"DELETE FROM some_table WHERE id = $1\" datname=\"some_database\" engine=\"postgres\"", - `level="info" queryid="abc123" datname="some_database" table="some_table" engine="postgres" validated="false"`, + `level="info" queryid="abc123" querytext="DELETE FROM some_table WHERE id = $1" datname="some_database"`, + `level="info" queryid="abc123" datname="some_database" table="some_table" validated="false"`, }, }, { @@ -173,9 +174,9 @@ func TestQueryDetails(t *testing.T) { {"op": OP_QUERY_PARSED_TABLE_NAME}, }, logsLines: []string{ - "level=\"info\" queryid=\"abc123\" querytext=\"WITH some_with_table AS (SELECT id, name FROM some_other_table WHERE id = $1) DELETE FROM some_table WHERE id IN (SELECT id FROM some_with_table)\" datname=\"some_database\" engine=\"postgres\"", - `level="info" queryid="abc123" datname="some_database" table="some_other_table" engine="postgres" validated="false"`, - `level="info" queryid="abc123" datname="some_database" table="some_table" engine="postgres" validated="false"`, + `level="info" queryid="abc123" querytext="WITH some_with_table AS (SELECT id, name FROM some_other_table WHERE id = $1) DELETE FROM some_table WHERE id IN (SELECT id FROM some_with_table)" datname="some_database"`, + `level="info" queryid="abc123" datname="some_database" table="some_other_table" validated="false"`, + `level="info" queryid="abc123" datname="some_database" table="some_table" validated="false"`, }, }, { @@ -191,9 +192,9 @@ func TestQueryDetails(t *testing.T) { {"op": OP_QUERY_PARSED_TABLE_NAME}, }, logsLines: []string{ - "level=\"info\" queryid=\"abc123\" querytext=\"SELECT t.id, t.val1, o.val2 FROM some_table t INNER JOIN other_table AS o ON t.id = o.id WHERE o.val2 = $1 ORDER BY t.val1 DESC\" datname=\"some_database\" engine=\"postgres\"", - `level="info" queryid="abc123" datname="some_database" table="some_table" engine="postgres" validated="false"`, - `level="info" queryid="abc123" datname="some_database" table="other_table" engine="postgres" validated="false"`, + `level="info" queryid="abc123" querytext="SELECT t.id, t.val1, o.val2 FROM some_table t INNER JOIN other_table AS o ON t.id = o.id WHERE o.val2 = $1 ORDER BY t.val1 DESC" datname="some_database"`, + `level="info" queryid="abc123" datname="some_database" table="some_table" validated="false"`, + `level="info" queryid="abc123" datname="some_database" table="other_table" validated="false"`, }, }, { @@ -214,10 +215,10 @@ func TestQueryDetails(t *testing.T) { {"op": OP_QUERY_PARSED_TABLE_NAME}, }, logsLines: []string{ - "level=\"info\" queryid=\"xyz456\" querytext=\"INSERT INTO some_table...\" datname=\"some_database\" engine=\"postgres\"", - `level="info" queryid="xyz456" datname="some_database" table="some_table" engine="postgres" validated="false"`, - "level=\"info\" queryid=\"abc123\" querytext=\"SELECT * FROM another_table WHERE id = $1\" datname=\"some_database\" engine=\"postgres\"", - `level="info" queryid="abc123" datname="some_database" table="another_table" engine="postgres" validated="false"`, + `level="info" queryid="xyz456" querytext="INSERT INTO some_table..." datname="some_database"`, + `level="info" queryid="xyz456" datname="some_database" table="some_table" validated="false"`, + `level="info" queryid="abc123" querytext="SELECT * FROM another_table WHERE id = $1" datname="some_database"`, + `level="info" queryid="abc123" datname="some_database" table="another_table" validated="false"`, }, }, { @@ -232,8 +233,8 @@ func TestQueryDetails(t *testing.T) { {"op": OP_QUERY_PARSED_TABLE_NAME}, }, logsLines: []string{ - "level=\"info\" queryid=\"abc123\" querytext=\"SELECT * FROM some_table WHERE id = $1 AND name =\" datname=\"some_database\" engine=\"postgres\"", - `level="info" queryid="abc123" datname="some_database" table="some_table" engine="postgres" validated="false"`, + `level="info" queryid="abc123" querytext="SELECT * FROM some_table WHERE id = $1 AND name =" datname="some_database"`, + `level="info" queryid="abc123" datname="some_database" table="some_table" validated="false"`, }, }, { @@ -247,7 +248,7 @@ func TestQueryDetails(t *testing.T) { {"op": OP_QUERY_ASSOCIATION}, }, logsLines: []string{ - `level="info" queryid="abc123" querytext="START TRANSACTION" datname="some_database" engine="postgres"`, + `level="info" queryid="abc123" querytext="START TRANSACTION" datname="some_database"`, }, }, { @@ -267,9 +268,9 @@ func TestQueryDetails(t *testing.T) { {"op": OP_QUERY_PARSED_TABLE_NAME}, }, logsLines: []string{ - "level=\"info\" queryid=\"xyz456\" querytext=\"not valid sql\" datname=\"some_database\" engine=\"postgres\"", - "level=\"info\" queryid=\"abc123\" querytext=\"SELECT * FROM some_table WHERE id = $1\" datname=\"some_database\" engine=\"postgres\"", - `level="info" queryid="abc123" datname="some_database" table="some_table" engine="postgres" validated="false"`, + `level="info" queryid="xyz456" querytext="not valid sql" datname="some_database"`, + `level="info" queryid="abc123" querytext="SELECT * FROM some_table WHERE id = $1" datname="some_database"`, + `level="info" queryid="abc123" datname="some_database" table="some_table" validated="false"`, }, }, { @@ -290,10 +291,10 @@ func TestQueryDetails(t *testing.T) { {"op": OP_QUERY_PARSED_TABLE_NAME}, }, logsLines: []string{ - "level=\"info\" queryid=\"abc123\" querytext=\"SELECT * FROM some_table WHERE id = $1\" datname=\"some_database\" engine=\"postgres\"", - `level="info" queryid="abc123" datname="some_database" table="some_table" engine="postgres" validated="false"`, - "level=\"info\" queryid=\"abc123\" querytext=\"SELECT * FROM some_table WHERE id = $1\" datname=\"other_schema\" engine=\"postgres\"", - `level="info" queryid="abc123" datname="other_schema" table="some_table" engine="postgres" validated="false"`, + `level="info" queryid="abc123" querytext="SELECT * FROM some_table WHERE id = $1" datname="some_database"`, + `level="info" queryid="abc123" datname="some_database" table="some_table" validated="false"`, + `level="info" queryid="abc123" querytext="SELECT * FROM some_table WHERE id = $1" datname="other_schema"`, + `level="info" queryid="abc123" datname="other_schema" table="some_table" validated="false"`, }, }, { @@ -310,10 +311,10 @@ func TestQueryDetails(t *testing.T) { {"op": OP_QUERY_PARSED_TABLE_NAME}, }, logsLines: []string{ - "level=\"info\" queryid=\"abc123\" querytext=\"SELECT * FROM (SELECT id, name FROM employees_us_east UNION SELECT id, name FROM employees_us_west) AS employees_us UNION SELECT id, name FROM employees_emea\" datname=\"some_database\" engine=\"postgres\"", - `level="info" queryid="abc123" datname="some_database" table="employees_us_east" engine="postgres" validated="false"`, - `level="info" queryid="abc123" datname="some_database" table="employees_us_west" engine="postgres" validated="false"`, - `level="info" queryid="abc123" datname="some_database" table="employees_emea" engine="postgres" validated="false"`, + `level="info" queryid="abc123" querytext="SELECT * FROM (SELECT id, name FROM employees_us_east UNION SELECT id, name FROM employees_us_west) AS employees_us UNION SELECT id, name FROM employees_emea" datname="some_database"`, + `level="info" queryid="abc123" datname="some_database" table="employees_us_east" validated="false"`, + `level="info" queryid="abc123" datname="some_database" table="employees_us_west" validated="false"`, + `level="info" queryid="abc123" datname="some_database" table="employees_emea" validated="false"`, }, }, { @@ -328,8 +329,8 @@ func TestQueryDetails(t *testing.T) { {"op": OP_QUERY_PARSED_TABLE_NAME}, }, logsLines: []string{ - "level=\"info\" queryid=\"abc123\" querytext=\"SHOW CREATE TABLE some_table\" datname=\"some_database\" engine=\"postgres\"", - `level="info" queryid="abc123" datname="some_database" table="some_table" engine="postgres" validated="false"`, + `level="info" queryid="abc123" querytext="SHOW CREATE TABLE some_table" datname="some_database"`, + `level="info" queryid="abc123" datname="some_database" table="some_table" validated="false"`, }, }, { @@ -343,7 +344,7 @@ func TestQueryDetails(t *testing.T) { {"op": OP_QUERY_ASSOCIATION}, }, logsLines: []string{ - "level=\"info\" queryid=\"abc123\" querytext=\"SHOW VARIABLES LIKE $1\" datname=\"some_database\" engine=\"postgres\"", + `level="info" queryid="abc123" querytext="SHOW VARIABLES LIKE $1" datname="some_database"`, }, }, { @@ -358,8 +359,8 @@ func TestQueryDetails(t *testing.T) { {"op": OP_QUERY_PARSED_TABLE_NAME}, }, logsLines: []string{ - "level=\"info\" queryid=\"abc123\" querytext=\"SELECT * FROM some_table WHERE\" datname=\"some_database\" engine=\"postgres\"", - `level="info" queryid="abc123" datname="some_database" table="some_table" engine="postgres" validated="false"`, + `level="info" queryid="abc123" querytext="SELECT * FROM some_table WHERE" datname="some_database"`, + `level="info" queryid="abc123" datname="some_database" table="some_table" validated="false"`, }, }, { @@ -367,7 +368,7 @@ func TestQueryDetails(t *testing.T) { eventStatementsRows: [][]driver.Value{ { "3871016669222913500", - `SELECT "pizza_to_ingredients"."pizza_id", "i"."id", "i"."name", "i"."calories_per_slice", "i"."vegetarian", "i"."type" FROM "ingredients" AS "i" JOIN "pizza_to_ingredients" AS "pizza_to_ingredients" ON ("pizza_to_ingredients"."pizza_id") IN ($1 /*, ... */) WHERE ("i"."id" = "pizza_to_ingredients"."ingredient_id")`, + `SELECT "pizza_to_ingredients"."pizza_id", "i"."id", "i"."name", "i"."calories_per_slice", "i"."vegetarian", "i"."type" FROM "ingredients" AS "i" JOIN "pizza_to_ingredients" AS "pizza_to_ingredients" ON ("pizza_to_ingredients"."pizza_id") IN ($1) WHERE ("i"."id" = "pizza_to_ingredients"."ingredient_id")`, "quickpizza", }, { @@ -398,15 +399,15 @@ func TestQueryDetails(t *testing.T) { {"op": OP_QUERY_PARSED_TABLE_NAME}, }, logsLines: []string{ - `level="info" queryid="3871016669222913500" querytext="SELECT \"pizza_to_ingredients\".\"pizza_id\", \"i\".\"id\", \"i\".\"name\", \"i\".\"calories_per_slice\", \"i\".\"vegetarian\", \"i\".\"type\" FROM \"ingredients\" AS \"i\" JOIN \"pizza_to_ingredients\" AS \"pizza_to_ingredients\" ON (\"pizza_to_ingredients\".\"pizza_id\") IN ($1 /*, ... */) WHERE (\"i\".\"id\" = \"pizza_to_ingredients\".\"ingredient_id\")" datname="quickpizza" engine="postgres"`, - `level="info" queryid="3871016669222913500" datname="quickpizza" table="ingredients" engine="postgres" validated="false"`, - `level="info" queryid="3871016669222913500" datname="quickpizza" table="pizza_to_ingredients" engine="postgres" validated="false"`, - `level="info" queryid="7865322458849960000" querytext="SELECT \"quote\".\"name\" FROM \"quotes\" AS \"quote\"" datname="quickpizza" engine="postgres"`, - `level="info" queryid="7865322458849960000" datname="quickpizza" table="quotes" engine="postgres" validated="false"`, - `level="info" queryid="5775615007769463000" querytext="SELECT \"classical_name\".\"name\" FROM \"classical_names\" AS \"classical_name\"" datname="quickpizza" engine="postgres"`, - `level="info" queryid="5775615007769463000" datname="quickpizza" table="classical_names" engine="postgres" validated="false"`, - `level="info" queryid="7007034463187741000" querytext="SELECT \"dough\".\"id\", \"dough\".\"name\", \"dough\".\"calories_per_slice\" FROM \"doughs\" AS \"dough\"" datname="quickpizza" engine="postgres"`, - `level="info" queryid="7007034463187741000" datname="quickpizza" table="doughs" engine="postgres" validated="false"`, + `level="info" queryid="3871016669222913500" querytext="SELECT \"pizza_to_ingredients\".\"pizza_id\", \"i\".\"id\", \"i\".\"name\", \"i\".\"calories_per_slice\", \"i\".\"vegetarian\", \"i\".\"type\" FROM \"ingredients\" AS \"i\" JOIN \"pizza_to_ingredients\" AS \"pizza_to_ingredients\" ON (\"pizza_to_ingredients\".\"pizza_id\") IN ($1) WHERE (\"i\".\"id\" = \"pizza_to_ingredients\".\"ingredient_id\")" datname="quickpizza"`, + `level="info" queryid="3871016669222913500" datname="quickpizza" table="ingredients" validated="false"`, + `level="info" queryid="3871016669222913500" datname="quickpizza" table="pizza_to_ingredients" validated="false"`, + `level="info" queryid="7865322458849960000" querytext="SELECT \"quote\".\"name\" FROM \"quotes\" AS \"quote\"" datname="quickpizza"`, + `level="info" queryid="7865322458849960000" datname="quickpizza" table="quotes" validated="false"`, + `level="info" queryid="5775615007769463000" querytext="SELECT \"classical_name\".\"name\" FROM \"classical_names\" AS \"classical_name\"" datname="quickpizza"`, + `level="info" queryid="5775615007769463000" datname="quickpizza" table="classical_names" validated="false"`, + `level="info" queryid="7007034463187741000" querytext="SELECT \"dough\".\"id\", \"dough\".\"name\", \"dough\".\"calories_per_slice\" FROM \"doughs\" AS \"dough\"" datname="quickpizza"`, + `level="info" queryid="7007034463187741000" datname="quickpizza" table="doughs" validated="false"`, }, }, } @@ -532,9 +533,9 @@ func TestQueryDetails_SQLDriverErrors(t *testing.T) { lokiEntries := lokiClient.Received() require.Equal(t, model.LabelSet{"op": OP_QUERY_ASSOCIATION}, lokiEntries[0].Labels) - require.Equal(t, "level=\"info\" queryid=\"abc123\" querytext=\"SELECT * FROM some_table WHERE id = ?\" datname=\"some_database\" engine=\"postgres\"", lokiEntries[0].Line) + require.Equal(t, `level="info" queryid="abc123" querytext="SELECT * FROM some_table WHERE id = ?" datname="some_database"`, lokiEntries[0].Line) require.Equal(t, model.LabelSet{"op": OP_QUERY_PARSED_TABLE_NAME}, lokiEntries[1].Labels) - require.Equal(t, `level="info" queryid="abc123" datname="some_database" table="some_table" engine="postgres" validated="false"`, lokiEntries[1].Line) + require.Equal(t, `level="info" queryid="abc123" datname="some_database" table="some_table" validated="false"`, lokiEntries[1].Line) }) t.Run("result set iteration error", func(t *testing.T) { @@ -591,9 +592,9 @@ func TestQueryDetails_SQLDriverErrors(t *testing.T) { lokiEntries := lokiClient.Received() require.Equal(t, model.LabelSet{"op": OP_QUERY_ASSOCIATION}, lokiEntries[0].Labels) - require.Equal(t, "level=\"info\" queryid=\"abc123\" querytext=\"SELECT * FROM some_table WHERE id = ?\" datname=\"some_database\" engine=\"postgres\"", lokiEntries[0].Line) + require.Equal(t, `level="info" queryid="abc123" querytext="SELECT * FROM some_table WHERE id = ?" datname="some_database"`, lokiEntries[0].Line) require.Equal(t, model.LabelSet{"op": OP_QUERY_PARSED_TABLE_NAME}, lokiEntries[1].Labels) - require.Equal(t, `level="info" queryid="abc123" datname="some_database" table="some_table" engine="postgres" validated="false"`, lokiEntries[1].Line) + require.Equal(t, `level="info" queryid="abc123" datname="some_database" table="some_table" validated="false"`, lokiEntries[1].Line) }) t.Run("connection error recovery", func(t *testing.T) { @@ -648,8 +649,169 @@ func TestQueryDetails_SQLDriverErrors(t *testing.T) { lokiEntries := lokiClient.Received() require.Equal(t, model.LabelSet{"op": OP_QUERY_ASSOCIATION}, lokiEntries[0].Labels) - require.Equal(t, "level=\"info\" queryid=\"abc123\" querytext=\"SELECT * FROM some_table WHERE id = ?\" datname=\"some_database\" engine=\"postgres\"", lokiEntries[0].Line) + require.Equal(t, `level="info" queryid="abc123" querytext="SELECT * FROM some_table WHERE id = ?" datname="some_database"`, lokiEntries[0].Line) require.Equal(t, model.LabelSet{"op": OP_QUERY_PARSED_TABLE_NAME}, lokiEntries[1].Labels) - require.Equal(t, `level="info" queryid="abc123" datname="some_database" table="some_table" engine="postgres" validated="false"`, lokiEntries[1].Line) + require.Equal(t, `level="info" queryid="abc123" datname="some_database" table="some_table" validated="false"`, lokiEntries[1].Line) }) } + +func TestQueryDetails_TokenizeTableNames(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + sql string + want []string + wantErr bool + }{ + { + name: "simple select", + sql: "SELECT * FROM users", + want: []string{"users"}, + }, + { + name: "select with join", + sql: "SELECT * FROM users u JOIN orders o ON u.id = o.user_id", + want: []string{"orders", "users"}, + }, + { + name: "select with schema qualified tables", + sql: "SELECT * FROM public.users JOIN sales.orders ON users.id = orders.user_id", + want: []string{"public.users", "sales.orders"}, + }, + { + name: "insert statement", + sql: "INSERT INTO users (name, email) VALUES ('John', 'john@example.com')", + want: []string{"users"}, + }, + { + name: "update statement", + sql: "UPDATE users SET last_login = NOW() WHERE id = 1", + want: []string{"users"}, + }, + { + name: "delete statement", + sql: "DELETE FROM users WHERE id = 1", + want: []string{"users"}, + }, + { + name: "with clause", + sql: `WITH active_users AS ( + SELECT * FROM users WHERE status = 'active' + ) + SELECT * FROM active_users au + JOIN orders o ON o.user_id = au.id`, + want: []string{"orders", "users"}, + }, + { + name: "subquery in where clause", + sql: `SELECT * FROM orders + WHERE user_id IN (SELECT id FROM users WHERE status = 'active')`, + want: []string{"orders", "users"}, + }, + { + name: "multiple schema qualified tables with aliases", + sql: `SELECT u.name, o.total, p.status + FROM public.users u + JOIN sales.orders o ON u.id = o.user_id + LEFT JOIN shipping.packages p ON o.id = p.order_id`, + want: []string{"public.users", "sales.orders", "shipping.packages"}, + }, + { + name: "truncated query with ...", + sql: "SELECT * FROM users JOIN orders ON users.id = orders.user_id AND...", + want: []string{"users", "orders"}, + }, + { + name: "truncated query with incomplete comment", + sql: "SELECT * FROM users JOIN orders ON users.id = orders.user_id /* some comment that gets truncated...", + want: []string{"users", "orders"}, + }, + { + name: "truncated query mid-table name", + sql: "SELECT * FROM users JOIN ord...", + want: []string{"users", "ord"}, + }, + { + name: "truncated query with schema qualified tables", + sql: "SELECT * FROM public.users JOIN sales.orders ON users.id = orders.user_id AND...", + want: []string{"public.users", "sales.orders"}, + }, + { + name: "query with table.* expression", + sql: "SELECT u.*, o.* FROM users u JOIN orders o ON u.id = o.user_id", + want: []string{"users", "orders"}, + }, + { + name: "query with type cast", + sql: "SELECT u.id, '2024-03-20'::timestamp FROM users u", + want: []string{"users"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := TokenizeTableNames(sqllexer.NewNormalizer(sqllexer.WithCollectTables(true)), tt.sql) + if tt.wantErr { + require.Error(t, err) + return + } + require.NoError(t, err) + require.ElementsMatch(t, got, tt.want) + }) + } +} + +func TestQueryDetails_RemoveComments(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + sql string + want string + wantErr bool + }{ + { + name: "simple select", + sql: "SELECT * FROM users", + want: "SELECT * FROM users", + }, + { + name: "inline comment", + sql: "SELECT * FROM users -- getting all users", + want: "SELECT * FROM users", + }, + { + name: "block comment", + sql: "SELECT * FROM /* important table */ users", + want: "SELECT * FROM users", + }, + { + name: "multiple comments", + sql: "SELECT /* cols */ * FROM users -- table", + want: "SELECT * FROM users", + }, + { + name: "comment in string literal preserved", + sql: "SELECT ' -- not a comment ' FROM users", + want: "SELECT ' -- not a comment ' FROM users", + }, + { + name: "multiline block comment", + sql: "SELECT * FROM users /* \n multiline \n comment */ WHERE id = 1", + want: "SELECT * FROM users WHERE id = 1", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := RemoveComments(sqllexer.NewNormalizer(sqllexer.WithCollectComments(true)), tt.sql) + if tt.wantErr { + require.Error(t, err) + return + } + require.NoError(t, err) + require.Equal(t, tt.want, got) + }) + } +}