Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/blang/semver/v4"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"

"github.com/grafana/alloy/internal/component/common/loki"
Expand Down Expand Up @@ -83,6 +84,7 @@ type QuerySamplesArguments struct {
DisableQueryRedaction bool
AutoEnableSetupConsumers bool
SetupConsumersCheckInterval time.Duration
Registry *prometheus.Registry

Logger log.Logger
}
Expand All @@ -95,6 +97,8 @@ type QuerySamples struct {
disableQueryRedaction bool
autoEnableSetupConsumers bool
setupConsumersCheckInterval time.Duration
registry *prometheus.Registry
latencyHistogram *prometheus.HistogramVec

logger log.Logger
running *atomic.Bool
Expand All @@ -106,6 +110,16 @@ type QuerySamples struct {
}

func NewQuerySamples(args QuerySamplesArguments) (*QuerySamples, error) {
latencyHistogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "database_observability",
Name: "wait_event_latency_mysql_seconds",
Help: "Latency of wait events in seconds",
Buckets: prometheus.DefBuckets,
// NativeHistogramBucketFactor: 1.1,
}, []string{"digest", "schema", "event_name"})

args.Registry.MustRegister(latencyHistogram)

c := &QuerySamples{
dbConnection: args.DB,
engineVersion: args.EngineVersion,
Expand All @@ -114,6 +128,8 @@ func NewQuerySamples(args QuerySamplesArguments) (*QuerySamples, error) {
disableQueryRedaction: args.DisableQueryRedaction,
autoEnableSetupConsumers: args.AutoEnableSetupConsumers,
setupConsumersCheckInterval: args.SetupConsumersCheckInterval,
registry: args.Registry,
latencyHistogram: latencyHistogram,
logger: log.With(args.Logger, "collector", QuerySamplesCollector),
running: &atomic.Bool{},
}
Expand Down Expand Up @@ -178,6 +194,7 @@ func (c *QuerySamples) Stopped() bool {
// Stop should be kept idempotent
func (c *QuerySamples) Stop() {
c.cancel()
c.registry.Unregister(c.latencyHistogram)
}

func (c *QuerySamples) runSetupConsumersCheck() {
Expand Down Expand Up @@ -396,6 +413,8 @@ func (c *QuerySamples) fetchQuerySamples(ctx context.Context) error {
waitLogMessage,
int64(millisecondsToNanoseconds(row.TimestampMilliseconds)),
)

c.latencyHistogram.WithLabelValues(row.Digest.String, row.Schema.String, row.WaitEventName.String).Observe(picosecondsToSeconds(row.WaitTime.Float64))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"github.com/DATA-DOG/go-sqlmock"
"github.com/blang/semver/v4"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -131,13 +132,14 @@
logBuffer := syncbuffer.Buffer{}
lokiClient := loki.NewCollectingHandler()

collector, err := NewQuerySamples(QuerySamplesArguments{
DB: db,
EngineVersion: latestCompatibleVersion,
CollectInterval: time.Second,
EntryHandler: lokiClient,
Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)),
})
collector, err := NewQuerySamples(QuerySamplesArguments{

Check failure on line 135 in internal/component/database_observability/mysql/collector/query_samples_test.go

View workflow job for this annotation

GitHub Actions / lint / Lint Go

File is not properly formatted (gofmt)
DB: db,
EngineVersion: latestCompatibleVersion,
CollectInterval: time.Second,
EntryHandler: lokiClient,
Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)),
Registry: prometheus.NewRegistry(),
})
require.NoError(t, err)
require.NotNil(t, collector)

Expand Down Expand Up @@ -237,6 +239,7 @@
CollectInterval: time.Second,
EntryHandler: lokiClient,
Logger: log.NewLogfmtLogger(os.Stderr),
Registry: prometheus.NewRegistry(),
})
require.NoError(t, err)
require.NotNil(t, collector)
Expand Down Expand Up @@ -332,6 +335,7 @@
CollectInterval: time.Second,
EntryHandler: lokiClient,
Logger: log.NewLogfmtLogger(os.Stderr),
Registry: prometheus.NewRegistry(),
})
require.NoError(t, err)
require.NotNil(t, collector)
Expand Down Expand Up @@ -499,6 +503,7 @@
CollectInterval: time.Second,
EntryHandler: lokiClient,
Logger: log.NewLogfmtLogger(os.Stderr),
Registry: prometheus.NewRegistry(),
})
require.NoError(t, err)
require.NotNil(t, collector)
Expand Down Expand Up @@ -619,6 +624,7 @@
EntryHandler: lokiClient,
Logger: log.NewLogfmtLogger(os.Stderr),
DisableQueryRedaction: true,
Registry: prometheus.NewRegistry(),
})
require.NoError(t, err)
require.NotNil(t, collector)
Expand Down Expand Up @@ -736,6 +742,7 @@
EntryHandler: lokiClient,
Logger: log.NewLogfmtLogger(os.Stderr),
DisableQueryRedaction: true,
Registry: prometheus.NewRegistry(),
})
require.NoError(t, err)
require.NotNil(t, collector)
Expand Down Expand Up @@ -844,6 +851,7 @@
EntryHandler: lokiClient,
Logger: log.NewLogfmtLogger(os.Stderr),
DisableQueryRedaction: false,
Registry: prometheus.NewRegistry(),
})
require.NoError(t, err)
require.NotNil(t, collector)
Expand Down Expand Up @@ -1098,13 +1106,14 @@

lokiClient := loki.NewCollectingHandler()

collector, err := NewQuerySamples(QuerySamplesArguments{
DB: db,
EngineVersion: semver.MustParse(tc.mysqlVersion),
CollectInterval: time.Second,
EntryHandler: lokiClient,
Logger: log.NewLogfmtLogger(os.Stderr),
})
collector, err := NewQuerySamples(QuerySamplesArguments{
DB: db,
EngineVersion: semver.MustParse(tc.mysqlVersion),
CollectInterval: time.Second,
EntryHandler: lokiClient,
Logger: log.NewLogfmtLogger(os.Stderr),
Registry: prometheus.NewRegistry(),
})
require.NoError(t, err)
require.NotNil(t, collector)

Expand Down Expand Up @@ -1173,6 +1182,7 @@
CollectInterval: time.Second,
EntryHandler: lokiClient,
Logger: log.NewLogfmtLogger(os.Stderr),
Registry: prometheus.NewRegistry(),
})
require.NoError(t, err)
require.NotNil(t, collector)
Expand Down Expand Up @@ -1301,6 +1311,7 @@
CollectInterval: time.Second,
EntryHandler: lokiClient,
Logger: log.NewLogfmtLogger(os.Stderr),
Registry: prometheus.NewRegistry(),
})
require.NoError(t, err)
require.NotNil(t, collector)
Expand Down Expand Up @@ -1427,6 +1438,7 @@
CollectInterval: time.Second,
EntryHandler: lokiClient,
Logger: log.NewLogfmtLogger(os.Stderr),
Registry: prometheus.NewRegistry(),
})
require.NoError(t, err)
require.NotNil(t, collector)
Expand Down Expand Up @@ -1547,7 +1559,7 @@
5,
))

c, err := NewQuerySamples(QuerySamplesArguments{DB: db})
c, err := NewQuerySamples(QuerySamplesArguments{DB: db, Registry: prometheus.NewRegistry()})
require.NoError(t, err)

require.NoError(t, c.initializeBookmark(t.Context()))
Expand All @@ -1566,7 +1578,7 @@
picosecondsToSeconds(math.MaxUint64) + 5,
))

c, err := NewQuerySamples(QuerySamplesArguments{DB: db})
c, err := NewQuerySamples(QuerySamplesArguments{DB: db, Registry: prometheus.NewRegistry()})
require.NoError(t, err)

require.NoError(t, c.initializeBookmark(t.Context()))
Expand Down Expand Up @@ -1955,7 +1967,7 @@

mock.ExpectQuery(selectNowAndUptime).WithoutArgs().WillReturnError(fmt.Errorf("some error"))

c, err := NewQuerySamples(QuerySamplesArguments{DB: db})
c, err := NewQuerySamples(QuerySamplesArguments{DB: db, Registry: prometheus.NewRegistry()})
require.NoError(t, err)

err = c.fetchQuerySamples(t.Context())
Expand Down Expand Up @@ -2113,6 +2125,7 @@
Logger: log.NewLogfmtLogger(os.Stderr),
AutoEnableSetupConsumers: true,
SetupConsumersCheckInterval: time.Second,
Registry: prometheus.NewRegistry(),
})
require.NoError(t, err)
require.NotNil(t, collector)
Expand Down Expand Up @@ -2222,6 +2235,7 @@
Logger: log.NewLogfmtLogger(os.Stderr),
AutoEnableSetupConsumers: true,
SetupConsumersCheckInterval: time.Second,
Registry: prometheus.NewRegistry(),
})
require.NoError(t, err)
require.NotNil(t, collector)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ func (c *Component) startCollectors(serverID string, engineVersion string, parse
DisableQueryRedaction: c.args.QuerySamplesArguments.DisableQueryRedaction,
AutoEnableSetupConsumers: c.args.AllowUpdatePerfSchemaSettings && c.args.QuerySamplesArguments.AutoEnableSetupConsumers,
SetupConsumersCheckInterval: c.args.QuerySamplesArguments.SetupConsumersCheckInterval,
Registry: c.registry,
})
if err != nil {
logStartError(collector.QuerySamplesCollector, "create", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/go-kit/log"
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/lib/pq"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"

"github.com/grafana/alloy/internal/component/common/loki"
Expand Down Expand Up @@ -100,13 +101,16 @@ type QuerySamplesArguments struct {
EntryHandler loki.EntryHandler
Logger log.Logger
DisableQueryRedaction bool
Registry *prometheus.Registry
}

type QuerySamples struct {
dbConnection *sql.DB
collectInterval time.Duration
entryHandler loki.EntryHandler
disableQueryRedaction bool
registry *prometheus.Registry
latencyHistogram *prometheus.HistogramVec

logger log.Logger
running *atomic.Bool
Expand Down Expand Up @@ -204,14 +208,26 @@ func (w WaitEventIdentity) Equal(other WaitEventIdentity) bool {
}

func NewQuerySamples(args QuerySamplesArguments) (*QuerySamples, error) {
const emittedCacheSize = 1000 //pg_stat_statements default max number of statements to track
const emittedCacheSize = 1000 // pg_stat_statements default max number of statements to track
const emittedCacheTTL = 10 * time.Minute

latencyHistogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "database_observability",
Name: "wait_event_latency_postgres_seconds",
Help: "Latency of wait events in seconds",
Buckets: prometheus.DefBuckets,
// NativeHistogramBucketFactor: 1.1,
}, []string{"queryid", "datname", "event_name"})

args.Registry.MustRegister(latencyHistogram)

return &QuerySamples{
dbConnection: args.DB,
collectInterval: args.CollectInterval,
entryHandler: args.EntryHandler,
disableQueryRedaction: args.DisableQueryRedaction,
registry: args.Registry,
latencyHistogram: latencyHistogram,
logger: log.With(args.Logger, "collector", QuerySamplesCollector),
running: &atomic.Bool{},
samples: map[SampleKey]*SampleState{},
Expand Down Expand Up @@ -267,6 +283,7 @@ func (c *QuerySamples) Stopped() bool {
// Stop should be kept idempotent
func (c *QuerySamples) Stop() {
c.cancel()
c.registry.Unregister(c.latencyHistogram)
}

func (c *QuerySamples) fetchQuerySample(ctx context.Context) error {
Expand Down Expand Up @@ -468,6 +485,17 @@ func (c *QuerySamples) emitAndDeleteSample(key SampleKey) {
waitEventLabels,
we.LastTimestamp.UnixNano(),
)

dur, _ := time.ParseDuration(we.LastWaitTime)
fmt.Printf("wait event latency: %+v\n", dur.Seconds())
fmt.Printf("wait event : %s\n", fmt.Sprintf("%s:%s", we.WaitEventType, we.WaitEvent))
fmt.Printf("wait event queryid: %d\n", state.LastRow.QueryID.Int64)
c.latencyHistogram.WithLabelValues(
fmt.Sprintf("%d", state.LastRow.QueryID.Int64),
state.LastRow.DatabaseName.String,
fmt.Sprintf("%s:%s", we.WaitEventType, we.WaitEvent), // todo: compute only once
).
Observe(dur.Seconds()) // todo: no need to re-parse
}

delete(c.samples, key)
Expand Down
Loading
Loading