From d8a065d23bef93423661ec325796bc4853ee411b Mon Sep 17 00:00:00 2001 From: Cristian Greco Date: Wed, 3 Dec 2025 11:24:12 +0100 Subject: [PATCH] dbo11y: stop tracking alloy's own queries in `pg_stat_activity` This PR introduces a change for excluding "own queries" in postgres from `pg_stat_activity`: we need to drop samples (and wait events) for the currently connected user. This new behaviour is enabled by default through the setting `exclude_current_user`, as it's deemed safe / recommended. No changes to be done on `pg_stat_statements` as that is achieved through user permissions update (docs on the website). --- CHANGELOG.md | 2 +- .../database_observability.postgres.md | 1 + .../postgres/collector/query_samples.go | 14 +- .../postgres/collector/query_samples_test.go | 176 ++++++++++++++---- .../postgres/component.go | 3 + 5 files changed, 159 insertions(+), 37 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 24a72762e3..ffe4f25dff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,10 +26,10 @@ Main (unreleased) ### Bugfixes - - (_Public Preview_) Additions to `database_observability.postgres` component: - `schema_details` - fixes collection of schema details for mixed case table names (@fridgepoet) + - do not track query samples for currently connected user, via option `exclude_current_user` (@cristiangreco) - (_Public Preview_) Additions to `database_observability.mysql` component: - replace the internal `server_id` label attribution in favor of a hash composed from `@@server_uuid` and `@@hostname` diff --git a/docs/sources/reference/components/database_observability/database_observability.postgres.md b/docs/sources/reference/components/database_observability/database_observability.postgres.md index 8337621027..903b0395a5 100644 --- a/docs/sources/reference/components/database_observability/database_observability.postgres.md +++ b/docs/sources/reference/components/database_observability/database_observability.postgres.md @@ -94,6 +94,7 @@ The `aws` block supplies the [ARN](https://docs.aws.amazon.com/IAM/latest/UserGu |---------------------------|------------|---------------------------------------------------------------|---------|----------| | `collect_interval` | `duration` | How frequently to collect information from database. | `"15s"` | no | | `disable_query_redaction` | `bool` | Collect unredacted SQL query text (might include parameters). | `false` | no | +| `exclude_current_user` | `bool` | Do not collect query samples for current database user. | `true` | no | ### `schema_details` diff --git a/internal/component/database_observability/postgres/collector/query_samples.go b/internal/component/database_observability/postgres/collector/query_samples.go index d526b13d2f..1f644ae60b 100644 --- a/internal/component/database_observability/postgres/collector/query_samples.go +++ b/internal/component/database_observability/postgres/collector/query_samples.go @@ -66,8 +66,11 @@ const selectPgStatActivity = ` s.query_id != 0 ) ) + %s ` +const excludeCurrentUserClause = `AND s.usesysid != (select oid from pg_roles where rolname = current_user)` + type QuerySamplesInfo struct { DatabaseName sql.NullString DatabaseID int @@ -100,6 +103,7 @@ type QuerySamplesArguments struct { EntryHandler loki.EntryHandler Logger log.Logger DisableQueryRedaction bool + ExcludeCurrentUser bool } type QuerySamples struct { @@ -107,6 +111,7 @@ type QuerySamples struct { collectInterval time.Duration entryHandler loki.EntryHandler disableQueryRedaction bool + excludeCurrentUser bool logger log.Logger running *atomic.Bool @@ -204,7 +209,7 @@ func (w WaitEventIdentity) Equal(other WaitEventIdentity) bool { } func NewQuerySamples(args QuerySamplesArguments) (*QuerySamples, error) { - const emittedCacheSize = 1000 //pg_stat_statements default max number of statements to track + const emittedCacheSize = 1000 // pg_stat_statements default max number of statements to track const emittedCacheTTL = 10 * time.Minute return &QuerySamples{ @@ -212,6 +217,7 @@ func NewQuerySamples(args QuerySamplesArguments) (*QuerySamples, error) { collectInterval: args.CollectInterval, entryHandler: args.EntryHandler, disableQueryRedaction: args.DisableQueryRedaction, + excludeCurrentUser: args.ExcludeCurrentUser, logger: log.With(args.Logger, "collector", QuerySamplesCollector), running: &atomic.Bool{}, samples: map[SampleKey]*SampleState{}, @@ -275,7 +281,11 @@ func (c *QuerySamples) fetchQuerySample(ctx context.Context) error { queryTextField = queryTextClause } - query := fmt.Sprintf(selectPgStatActivity, queryTextField) + excludeCurrentUserClauseField := "" + if c.excludeCurrentUser { + excludeCurrentUserClauseField = excludeCurrentUserClause + } + query := fmt.Sprintf(selectPgStatActivity, queryTextField, excludeCurrentUserClauseField) rows, err := c.dbConnection.QueryContext(ctx, query) if err != nil { return fmt.Errorf("failed to query pg_stat_activity: %w", err) diff --git a/internal/component/database_observability/postgres/collector/query_samples_test.go b/internal/component/database_observability/postgres/collector/query_samples_test.go index 5013d4e538..433604adb1 100644 --- a/internal/component/database_observability/postgres/collector/query_samples_test.go +++ b/internal/component/database_observability/postgres/collector/query_samples_test.go @@ -41,7 +41,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { { name: "active query without wait event", setupMock: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "")).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows([]string{ "now", "datname", "pid", "leader_pid", "usename", "application_name", "client_addr", "client_port", @@ -56,7 +56,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { sql.NullString{}, nil, queryStartTime, sql.NullInt64{Int64: 123, Valid: true}, )) // Second scrape: empty to trigger finalization - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "")).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows([]string{ "now", "datname", "pid", "leader_pid", "usename", "application_name", "client_addr", "client_port", @@ -76,7 +76,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { { name: "parallel query with leader PID", setupMock: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "")).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows([]string{ "now", "datname", "pid", "leader_pid", "usename", "application_name", "client_addr", "client_port", @@ -91,7 +91,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { sql.NullString{}, nil, now, sql.NullInt64{Int64: 123, Valid: true}, )) // Second scrape: empty to trigger finalization - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "")).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows([]string{ "now", "datname", "pid", "leader_pid", "usename", "application_name", "client_addr", "client_port", @@ -111,7 +111,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { { name: "query with wait event", setupMock: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "")).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows([]string{ "now", "datname", "pid", "leader_pid", "usename", "application_name", "client_addr", "client_port", @@ -126,7 +126,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { sql.NullString{String: "relation", Valid: true}, pq.Int64Array{103, 104}, now, sql.NullInt64{Int64: 124, Valid: true}, )) // Second scrape: empty to trigger finalization - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "")).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows([]string{ "now", "datname", "pid", "leader_pid", "usename", "application_name", "client_addr", "client_port", @@ -148,7 +148,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { { name: "insufficient privilege query - no loki entries expected", setupMock: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows([]string{ "now", "datname", "pid", "leader_pid", "usename", "application_name", "client_addr", "client_port", @@ -165,7 +165,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { "", )) // Second scrape: empty to complete cycle - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows([]string{ "now", "datname", "pid", "leader_pid", "usename", "application_name", "client_addr", "client_port", @@ -183,7 +183,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { { name: "null database name - no loki entries expected", setupMock: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "")).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows([]string{ "now", "datname", "pid", "leader_pid", "usename", "application_name", "client_addr", "client_port", @@ -198,7 +198,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { sql.NullString{}, nil, now, sql.NullInt64{Int64: 126, Valid: true}, )) // Second scrape: empty to complete cycle - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "")).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows([]string{ "now", "datname", "pid", "leader_pid", "usename", "application_name", "client_addr", "client_port", @@ -214,7 +214,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { { name: "query with redaction disabled", setupMock: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows([]string{ "now", "datname", "pid", "leader_pid", "usename", "application_name", "client_addr", "client_port", @@ -231,7 +231,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { "SELECT * FROM users WHERE id = 123 AND email = 'test@example.com'", )) // Second scrape: empty to trigger finalization - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows([]string{ "now", "datname", "pid", "leader_pid", "usename", "application_name", "client_addr", "client_port", @@ -269,6 +269,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), DisableQueryRedaction: tc.disableQueryRedaction, + ExcludeCurrentUser: true, }) require.NoError(t, err) require.NotNil(t, sampleCollector) @@ -354,11 +355,12 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), DisableQueryRedaction: true, + ExcludeCurrentUser: true, }) require.NoError(t, err) // First scrape: active row - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 1000, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -368,7 +370,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { "SELECT * FROM t", )) // Second scrape: no rows -> finalize - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns)) require.NoError(t, sampleCollector.Start(t.Context())) @@ -403,11 +405,12 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), DisableQueryRedaction: true, + ExcludeCurrentUser: true, }) require.NoError(t, err) // Scrape 1: wait event with unordered/dup PIDs - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 300, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -417,7 +420,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { "UPDATE users SET status = 'active'", )) // Scrape 2: same wait event with normalized PIDs - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 300, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -427,7 +430,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { "UPDATE users SET status = 'active'", )) // Scrape 3: disappear - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns)) require.NoError(t, sampleCollector.Start(t.Context())) @@ -461,11 +464,12 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), DisableQueryRedaction: true, + ExcludeCurrentUser: true, }) require.NoError(t, err) // Scrape 1: wait event - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 301, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -475,7 +479,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { "UPDATE users SET status = 'active'", )) // Scrape 2: active with no wait -> close occurrence - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 301, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -485,7 +489,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { "UPDATE users SET status = 'active'", )) // Scrape 3: disappear - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns)) require.NoError(t, sampleCollector.Start(t.Context())) @@ -520,11 +524,12 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), DisableQueryRedaction: true, + ExcludeCurrentUser: true, }) require.NoError(t, err) // Scrape 1: active CPU snapshot (10s) - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 402, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -534,7 +539,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { "SELECT * FROM t", )) // Scrape 2: waiting with wait_event; state_change 7s ago - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 402, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -544,7 +549,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { "SELECT * FROM t", )) // Scrape 3: disappear -> finalize - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns)) require.NoError(t, sampleCollector.Start(t.Context())) @@ -579,11 +584,12 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), DisableQueryRedaction: true, + ExcludeCurrentUser: true, }) require.NoError(t, err) // Scrape 1: wait event set A - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 403, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -593,7 +599,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { "UPDATE t SET c=1", )) // Scrape 2: same event, set changes -> new occurrence - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 403, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -603,7 +609,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { "UPDATE t SET c=1", )) // Scrape 3: disappear -> finalize - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns)) require.NoError(t, sampleCollector.Start(t.Context())) @@ -659,11 +665,12 @@ func TestQuerySamples_IdleScenarios(t *testing.T) { EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), DisableQueryRedaction: true, + ExcludeCurrentUser: true, }) require.NoError(t, err) // Scrape 1: active row - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 2000, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -673,7 +680,7 @@ func TestQuerySamples_IdleScenarios(t *testing.T) { "SELECT * FROM t", )) // Scrape 2: same key turns idle; state_change denotes end - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 2000, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -717,11 +724,12 @@ func TestQuerySamples_IdleScenarios(t *testing.T) { EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), DisableQueryRedaction: true, + ExcludeCurrentUser: true, }) require.NoError(t, err) // Scrape 1: only idle row - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 2001, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -731,7 +739,7 @@ func TestQuerySamples_IdleScenarios(t *testing.T) { "SELECT * FROM users", )) // Scrape 2: same idle row again -> should not re-emit - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 2001, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -775,11 +783,12 @@ func TestQuerySamples_IdleScenarios(t *testing.T) { EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), DisableQueryRedaction: true, + ExcludeCurrentUser: true, }) require.NoError(t, err) // Scrape 1: idle in transaction (aborted) - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 2100, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -789,7 +798,7 @@ func TestQuerySamples_IdleScenarios(t *testing.T) { "SELECT 1", )) // Scrape 2: same idle row again -> should not re-emit - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 2100, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -833,11 +842,12 @@ func TestQuerySamples_IdleScenarios(t *testing.T) { EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), DisableQueryRedaction: true, + ExcludeCurrentUser: true, }) require.NoError(t, err) // Scrape 1: two idle-only rows with different keys (PID/QueryID) - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns). AddRow( now, "testdb", 2200, sql.NullInt64{}, @@ -856,7 +866,7 @@ func TestQuerySamples_IdleScenarios(t *testing.T) { "SELECT * FROM b", )) // Scrape 2: same idle rows again -> should not re-emit - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns). AddRow( now, "testdb", 2200, sql.NullInt64{}, @@ -906,3 +916,101 @@ func TestQuerySamples_IdleScenarios(t *testing.T) { require.NoError(t, mock.ExpectationsWereMet()) }) } + +func TestQuerySamples_ExcludeCurrentUser(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("github.com/hashicorp/golang-lru/v2/expirable.NewLRU[...].func1")) + + now := time.Now() + stateChangeTime := now.Add(-10 * time.Second) + queryStartTime := now.Add(-30 * time.Second) + xactStartTime := now.Add(-2 * time.Minute) + backendStartTime := now.Add(-1 * time.Hour) + + columns := []string{ + "now", "datname", "pid", "leader_pid", + "usename", "application_name", "client_addr", "client_port", + "backend_type", "backend_start", "backend_xid", "backend_xmin", + "xact_start", "state", "state_change", "wait_event_type", + "wait_event", "blocked_by_pids", "query_start", "query_id", + } + + testCases := []struct { + name string + excludeCurrentUser bool + expectedQuery string + }{ + { + name: "ExcludeCurrentUser enabled", + excludeCurrentUser: true, + expectedQuery: fmt.Sprintf(selectPgStatActivity, "", excludeCurrentUserClause), + }, + { + name: "ExcludeCurrentUser disabled", + excludeCurrentUser: false, + expectedQuery: fmt.Sprintf(selectPgStatActivity, "", ""), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + require.NoError(t, err) + defer db.Close() + + logBuffer := syncbuffer.Buffer{} + lokiClient := loki.NewCollectingHandler() + defer lokiClient.Stop() + + sampleCollector, err := NewQuerySamples(QuerySamplesArguments{ + DB: db, + CollectInterval: 10 * time.Millisecond, + EntryHandler: lokiClient, + Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), + ExcludeCurrentUser: tc.excludeCurrentUser, + }) + require.NoError(t, err) + require.NotNil(t, sampleCollector) + + // First scrape: expect query with correct SQL format + mock.ExpectQuery(tc.expectedQuery).RowsWillBeClosed(). + WillReturnRows(sqlmock.NewRows(columns).AddRow( + now, "testdb", 100, sql.NullInt64{}, + "testuser", "testapp", "127.0.0.1", 5432, + "client backend", backendStartTime, sql.NullInt32{Int32: 500, Valid: true}, sql.NullInt32{Int32: 400, Valid: true}, + xactStartTime, "active", stateChangeTime, sql.NullString{}, + sql.NullString{}, nil, queryStartTime, sql.NullInt64{Int64: 123, Valid: true}, + )) + + // Second scrape: empty to trigger finalization + mock.ExpectQuery(tc.expectedQuery).RowsWillBeClosed(). + WillReturnRows(sqlmock.NewRows(columns)) + + err = sampleCollector.Start(t.Context()) + require.NoError(t, err) + + // Wait for collector to process queries + require.EventuallyWithT(t, func(t *assert.CollectT) { + entries := lokiClient.Received() + require.Len(t, entries, 1) + }, 5*time.Second, 100*time.Millisecond) + + sampleCollector.Stop() + + // Wait for the collector to stop + require.Eventually(t, func() bool { + return sampleCollector.Stopped() + }, 5*time.Second, 100*time.Millisecond) + + lokiClient.Stop() + + // Give time for goroutines to clean up + time.Sleep(100 * time.Millisecond) + + // Verify mock expectations were met (verifies correct SQL query was used) + err = mock.ExpectationsWereMet() + require.NoError(t, err) + }) + } +} diff --git a/internal/component/database_observability/postgres/component.go b/internal/component/database_observability/postgres/component.go index abccf389ef..05612747d0 100644 --- a/internal/component/database_observability/postgres/component.go +++ b/internal/component/database_observability/postgres/component.go @@ -86,6 +86,7 @@ type AWSCloudProviderInfo struct { type QuerySampleArguments struct { CollectInterval time.Duration `alloy:"collect_interval,attr,optional"` DisableQueryRedaction bool `alloy:"disable_query_redaction,attr,optional"` + ExcludeCurrentUser bool `alloy:"exclude_current_user,attr,optional"` } type QueryTablesArguments struct { @@ -103,6 +104,7 @@ var DefaultArguments = Arguments{ QuerySampleArguments: QuerySampleArguments{ CollectInterval: 15 * time.Second, DisableQueryRedaction: false, + ExcludeCurrentUser: true, }, QueryTablesArguments: QueryTablesArguments{ CollectInterval: 1 * time.Minute, @@ -414,6 +416,7 @@ func (c *Component) startCollectors(systemID string, engineVersion string) error EntryHandler: entryHandler, Logger: c.opts.Logger, DisableQueryRedaction: c.args.QuerySampleArguments.DisableQueryRedaction, + ExcludeCurrentUser: c.args.QuerySampleArguments.ExcludeCurrentUser, }) if err != nil { logStartError(collector.QuerySamplesCollector, "create", err)