diff --git a/internal/component/database_observability/mysql/collector/query_samples.go b/internal/component/database_observability/mysql/collector/query_samples.go index 4b6ca5f651..d9c5e5acd9 100644 --- a/internal/component/database_observability/mysql/collector/query_samples.go +++ b/internal/component/database_observability/mysql/collector/query_samples.go @@ -8,6 +8,7 @@ import ( "github.com/blang/semver/v4" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/atomic" "github.com/grafana/alloy/internal/component/common/loki" @@ -83,6 +84,7 @@ type QuerySamplesArguments struct { DisableQueryRedaction bool AutoEnableSetupConsumers bool SetupConsumersCheckInterval time.Duration + Registry *prometheus.Registry Logger log.Logger } @@ -95,6 +97,8 @@ type QuerySamples struct { disableQueryRedaction bool autoEnableSetupConsumers bool setupConsumersCheckInterval time.Duration + registry *prometheus.Registry + latencyHistogram *prometheus.HistogramVec logger log.Logger running *atomic.Bool @@ -106,6 +110,16 @@ type QuerySamples struct { } func NewQuerySamples(args QuerySamplesArguments) (*QuerySamples, error) { + latencyHistogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "database_observability", + Name: "wait_event_latency_mysql_seconds", + Help: "Latency of wait events in seconds", + Buckets: prometheus.DefBuckets, + // NativeHistogramBucketFactor: 1.1, + }, []string{"digest", "schema", "event_name"}) + + args.Registry.MustRegister(latencyHistogram) + c := &QuerySamples{ dbConnection: args.DB, engineVersion: args.EngineVersion, @@ -114,6 +128,8 @@ func NewQuerySamples(args QuerySamplesArguments) (*QuerySamples, error) { disableQueryRedaction: args.DisableQueryRedaction, autoEnableSetupConsumers: args.AutoEnableSetupConsumers, setupConsumersCheckInterval: args.SetupConsumersCheckInterval, + registry: args.Registry, + latencyHistogram: latencyHistogram, logger: log.With(args.Logger, "collector", QuerySamplesCollector), running: &atomic.Bool{}, } @@ -178,6 +194,7 @@ func (c *QuerySamples) Stopped() bool { // Stop should be kept idempotent func (c *QuerySamples) Stop() { c.cancel() + c.registry.Unregister(c.latencyHistogram) } func (c *QuerySamples) runSetupConsumersCheck() { @@ -396,6 +413,8 @@ func (c *QuerySamples) fetchQuerySamples(ctx context.Context) error { waitLogMessage, int64(millisecondsToNanoseconds(row.TimestampMilliseconds)), ) + + c.latencyHistogram.WithLabelValues(row.Digest.String, row.Schema.String, row.WaitEventName.String).Observe(picosecondsToSeconds(row.WaitTime.Float64)) } } diff --git a/internal/component/database_observability/mysql/collector/query_samples_test.go b/internal/component/database_observability/mysql/collector/query_samples_test.go index 38dc05f9af..b4ecd066c0 100644 --- a/internal/component/database_observability/mysql/collector/query_samples_test.go +++ b/internal/component/database_observability/mysql/collector/query_samples_test.go @@ -11,6 +11,7 @@ import ( "github.com/DATA-DOG/go-sqlmock" "github.com/blang/semver/v4" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -131,13 +132,14 @@ func TestQuerySamples(t *testing.T) { logBuffer := syncbuffer.Buffer{} lokiClient := loki.NewCollectingHandler() - collector, err := NewQuerySamples(QuerySamplesArguments{ - DB: db, - EngineVersion: latestCompatibleVersion, - CollectInterval: time.Second, - EntryHandler: lokiClient, - Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), - }) + collector, err := NewQuerySamples(QuerySamplesArguments{ + DB: db, + EngineVersion: latestCompatibleVersion, + CollectInterval: time.Second, + EntryHandler: lokiClient, + Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), + Registry: prometheus.NewRegistry(), + }) require.NoError(t, err) require.NotNil(t, collector) @@ -237,6 +239,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { CollectInterval: time.Second, EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(os.Stderr), + Registry: prometheus.NewRegistry(), }) require.NoError(t, err) require.NotNil(t, collector) @@ -332,6 +335,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { CollectInterval: time.Second, EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(os.Stderr), + Registry: prometheus.NewRegistry(), }) require.NoError(t, err) require.NotNil(t, collector) @@ -499,6 +503,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { CollectInterval: time.Second, EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(os.Stderr), + Registry: prometheus.NewRegistry(), }) require.NoError(t, err) require.NotNil(t, collector) @@ -619,6 +624,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(os.Stderr), DisableQueryRedaction: true, + Registry: prometheus.NewRegistry(), }) require.NoError(t, err) require.NotNil(t, collector) @@ -736,6 +742,7 @@ func TestQuerySamples_DisableQueryRedaction(t *testing.T) { EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(os.Stderr), DisableQueryRedaction: true, + Registry: prometheus.NewRegistry(), }) require.NoError(t, err) require.NotNil(t, collector) @@ -844,6 +851,7 @@ func TestQuerySamples_DisableQueryRedaction(t *testing.T) { EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(os.Stderr), DisableQueryRedaction: false, + Registry: prometheus.NewRegistry(), }) require.NoError(t, err) require.NotNil(t, collector) @@ -1098,13 +1106,14 @@ func TestQuerySamplesMySQLVersions(t *testing.T) { lokiClient := loki.NewCollectingHandler() - collector, err := NewQuerySamples(QuerySamplesArguments{ - DB: db, - EngineVersion: semver.MustParse(tc.mysqlVersion), - CollectInterval: time.Second, - EntryHandler: lokiClient, - Logger: log.NewLogfmtLogger(os.Stderr), - }) + collector, err := NewQuerySamples(QuerySamplesArguments{ + DB: db, + EngineVersion: semver.MustParse(tc.mysqlVersion), + CollectInterval: time.Second, + EntryHandler: lokiClient, + Logger: log.NewLogfmtLogger(os.Stderr), + Registry: prometheus.NewRegistry(), + }) require.NoError(t, err) require.NotNil(t, collector) @@ -1173,6 +1182,7 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { CollectInterval: time.Second, EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(os.Stderr), + Registry: prometheus.NewRegistry(), }) require.NoError(t, err) require.NotNil(t, collector) @@ -1301,6 +1311,7 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { CollectInterval: time.Second, EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(os.Stderr), + Registry: prometheus.NewRegistry(), }) require.NoError(t, err) require.NotNil(t, collector) @@ -1427,6 +1438,7 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { CollectInterval: time.Second, EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(os.Stderr), + Registry: prometheus.NewRegistry(), }) require.NoError(t, err) require.NotNil(t, collector) @@ -1547,7 +1559,7 @@ func TestQuerySamples_initializeTimer(t *testing.T) { 5, )) - c, err := NewQuerySamples(QuerySamplesArguments{DB: db}) + c, err := NewQuerySamples(QuerySamplesArguments{DB: db, Registry: prometheus.NewRegistry()}) require.NoError(t, err) require.NoError(t, c.initializeBookmark(t.Context())) @@ -1566,7 +1578,7 @@ func TestQuerySamples_initializeTimer(t *testing.T) { picosecondsToSeconds(math.MaxUint64) + 5, )) - c, err := NewQuerySamples(QuerySamplesArguments{DB: db}) + c, err := NewQuerySamples(QuerySamplesArguments{DB: db, Registry: prometheus.NewRegistry()}) require.NoError(t, err) require.NoError(t, c.initializeBookmark(t.Context())) @@ -1955,7 +1967,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { mock.ExpectQuery(selectNowAndUptime).WithoutArgs().WillReturnError(fmt.Errorf("some error")) - c, err := NewQuerySamples(QuerySamplesArguments{DB: db}) + c, err := NewQuerySamples(QuerySamplesArguments{DB: db, Registry: prometheus.NewRegistry()}) require.NoError(t, err) err = c.fetchQuerySamples(t.Context()) @@ -2113,6 +2125,7 @@ func TestQuerySamples_AutoEnableSetupConsumers(t *testing.T) { Logger: log.NewLogfmtLogger(os.Stderr), AutoEnableSetupConsumers: true, SetupConsumersCheckInterval: time.Second, + Registry: prometheus.NewRegistry(), }) require.NoError(t, err) require.NotNil(t, collector) @@ -2222,6 +2235,7 @@ func TestQuerySamples_AutoEnableSetupConsumers(t *testing.T) { Logger: log.NewLogfmtLogger(os.Stderr), AutoEnableSetupConsumers: true, SetupConsumersCheckInterval: time.Second, + Registry: prometheus.NewRegistry(), }) require.NoError(t, err) require.NotNil(t, collector) diff --git a/internal/component/database_observability/mysql/component.go b/internal/component/database_observability/mysql/component.go index cae2c18f86..f06535c154 100644 --- a/internal/component/database_observability/mysql/component.go +++ b/internal/component/database_observability/mysql/component.go @@ -460,6 +460,7 @@ func (c *Component) startCollectors(serverID string, engineVersion string, parse DisableQueryRedaction: c.args.QuerySamplesArguments.DisableQueryRedaction, AutoEnableSetupConsumers: c.args.AllowUpdatePerfSchemaSettings && c.args.QuerySamplesArguments.AutoEnableSetupConsumers, SetupConsumersCheckInterval: c.args.QuerySamplesArguments.SetupConsumersCheckInterval, + Registry: c.registry, }) if err != nil { logStartError(collector.QuerySamplesCollector, "create", err) diff --git a/internal/component/database_observability/postgres/collector/query_samples.go b/internal/component/database_observability/postgres/collector/query_samples.go index d526b13d2f..f7e26d5dc7 100644 --- a/internal/component/database_observability/postgres/collector/query_samples.go +++ b/internal/component/database_observability/postgres/collector/query_samples.go @@ -10,6 +10,7 @@ import ( "github.com/go-kit/log" "github.com/hashicorp/golang-lru/v2/expirable" "github.com/lib/pq" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/atomic" "github.com/grafana/alloy/internal/component/common/loki" @@ -100,6 +101,7 @@ type QuerySamplesArguments struct { EntryHandler loki.EntryHandler Logger log.Logger DisableQueryRedaction bool + Registry *prometheus.Registry } type QuerySamples struct { @@ -107,6 +109,8 @@ type QuerySamples struct { collectInterval time.Duration entryHandler loki.EntryHandler disableQueryRedaction bool + registry *prometheus.Registry + latencyHistogram *prometheus.HistogramVec logger log.Logger running *atomic.Bool @@ -204,14 +208,26 @@ func (w WaitEventIdentity) Equal(other WaitEventIdentity) bool { } func NewQuerySamples(args QuerySamplesArguments) (*QuerySamples, error) { - const emittedCacheSize = 1000 //pg_stat_statements default max number of statements to track + const emittedCacheSize = 1000 // pg_stat_statements default max number of statements to track const emittedCacheTTL = 10 * time.Minute + latencyHistogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "database_observability", + Name: "wait_event_latency_postgres_seconds", + Help: "Latency of wait events in seconds", + Buckets: prometheus.DefBuckets, + // NativeHistogramBucketFactor: 1.1, + }, []string{"queryid", "datname", "event_name"}) + + args.Registry.MustRegister(latencyHistogram) + return &QuerySamples{ dbConnection: args.DB, collectInterval: args.CollectInterval, entryHandler: args.EntryHandler, disableQueryRedaction: args.DisableQueryRedaction, + registry: args.Registry, + latencyHistogram: latencyHistogram, logger: log.With(args.Logger, "collector", QuerySamplesCollector), running: &atomic.Bool{}, samples: map[SampleKey]*SampleState{}, @@ -267,6 +283,7 @@ func (c *QuerySamples) Stopped() bool { // Stop should be kept idempotent func (c *QuerySamples) Stop() { c.cancel() + c.registry.Unregister(c.latencyHistogram) } func (c *QuerySamples) fetchQuerySample(ctx context.Context) error { @@ -468,6 +485,17 @@ func (c *QuerySamples) emitAndDeleteSample(key SampleKey) { waitEventLabels, we.LastTimestamp.UnixNano(), ) + + dur, _ := time.ParseDuration(we.LastWaitTime) + fmt.Printf("wait event latency: %+v\n", dur.Seconds()) + fmt.Printf("wait event : %s\n", fmt.Sprintf("%s:%s", we.WaitEventType, we.WaitEvent)) + fmt.Printf("wait event queryid: %d\n", state.LastRow.QueryID.Int64) + c.latencyHistogram.WithLabelValues( + fmt.Sprintf("%d", state.LastRow.QueryID.Int64), + state.LastRow.DatabaseName.String, + fmt.Sprintf("%s:%s", we.WaitEventType, we.WaitEvent), // todo: compute only once + ). + Observe(dur.Seconds()) // todo: no need to re-parse } delete(c.samples, key) diff --git a/internal/component/database_observability/postgres/collector/query_samples_test.go b/internal/component/database_observability/postgres/collector/query_samples_test.go index 5013d4e538..800a0416c7 100644 --- a/internal/component/database_observability/postgres/collector/query_samples_test.go +++ b/internal/component/database_observability/postgres/collector/query_samples_test.go @@ -11,6 +11,7 @@ import ( "github.com/DATA-DOG/go-sqlmock" "github.com/go-kit/log" "github.com/lib/pq" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -269,6 +270,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), DisableQueryRedaction: tc.disableQueryRedaction, + Registry: prometheus.NewRegistry(), }) require.NoError(t, err) require.NotNil(t, sampleCollector) @@ -354,6 +356,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), DisableQueryRedaction: true, + Registry: prometheus.NewRegistry(), }) require.NoError(t, err) @@ -403,6 +406,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), DisableQueryRedaction: true, + Registry: prometheus.NewRegistry(), }) require.NoError(t, err) @@ -461,6 +465,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), DisableQueryRedaction: true, + Registry: prometheus.NewRegistry(), }) require.NoError(t, err) @@ -520,6 +525,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), DisableQueryRedaction: true, + Registry: prometheus.NewRegistry(), }) require.NoError(t, err) @@ -579,6 +585,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), DisableQueryRedaction: true, + Registry: prometheus.NewRegistry(), }) require.NoError(t, err) @@ -659,6 +666,7 @@ func TestQuerySamples_IdleScenarios(t *testing.T) { EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), DisableQueryRedaction: true, + Registry: prometheus.NewRegistry(), }) require.NoError(t, err) @@ -717,6 +725,7 @@ func TestQuerySamples_IdleScenarios(t *testing.T) { EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), DisableQueryRedaction: true, + Registry: prometheus.NewRegistry(), }) require.NoError(t, err) @@ -775,6 +784,7 @@ func TestQuerySamples_IdleScenarios(t *testing.T) { EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), DisableQueryRedaction: true, + Registry: prometheus.NewRegistry(), }) require.NoError(t, err) @@ -833,6 +843,7 @@ func TestQuerySamples_IdleScenarios(t *testing.T) { EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), DisableQueryRedaction: true, + Registry: prometheus.NewRegistry(), }) require.NoError(t, err) diff --git a/internal/component/database_observability/postgres/component.go b/internal/component/database_observability/postgres/component.go index abccf389ef..c808c5fbfc 100644 --- a/internal/component/database_observability/postgres/component.go +++ b/internal/component/database_observability/postgres/component.go @@ -414,6 +414,7 @@ func (c *Component) startCollectors(systemID string, engineVersion string) error EntryHandler: entryHandler, Logger: c.opts.Logger, DisableQueryRedaction: c.args.QuerySampleArguments.DisableQueryRedaction, + Registry: c.registry, }) if err != nil { logStartError(collector.QuerySamplesCollector, "create", err)