Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions x-pack/apm-server/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func TestMonitoring(t *testing.T) {
monitoringtest.ExpectContainOtelMetricsKeys(c, reader, []string{
"apm-server.sampling.tail.storage.lsm_size",
"apm-server.sampling.tail.storage.value_log_size",
"apm-server.sampling.tail.storage.storage_limit",
"apm-server.sampling.tail.storage.disk_used",
"apm-server.sampling.tail.storage.disk_total",
"apm-server.sampling.tail.storage.disk_usage_threshold",
})
}, time.Second, 10*time.Millisecond)

Expand Down
53 changes: 51 additions & 2 deletions x-pack/apm-server/sampling/eventstorage/storage_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,20 @@ type StorageManager struct {
// meterProvider is the OTel meter provider
meterProvider metric.MeterProvider
storageMetrics storageMetrics

// configuredStorageLimit stores the configured storage limit (0 means unlimited)
configuredStorageLimit atomic.Uint64
// configuredDiskUsageThreshold stores the configured disk usage threshold as percentage (0-100)
configuredDiskUsageThreshold atomic.Uint64
}

type storageMetrics struct {
lsmSizeGauge metric.Int64Gauge
valueLogSizeGauge metric.Int64Gauge
lsmSizeGauge metric.Int64Gauge
valueLogSizeGauge metric.Int64Gauge
storageLimitGauge metric.Int64Gauge
diskUsedGauge metric.Int64Gauge
diskTotalGauge metric.Int64Gauge
diskUsageThresholdGauge metric.Int64Gauge
}

// NewStorageManager returns a new StorageManager with pebble DB at storageDir.
Expand Down Expand Up @@ -175,6 +184,10 @@ func NewStorageManager(storageDir string, logger *logp.Logger, opts ...StorageMa

sm.storageMetrics.lsmSizeGauge, _ = meter.Int64Gauge("apm-server.sampling.tail.storage.lsm_size")
sm.storageMetrics.valueLogSizeGauge, _ = meter.Int64Gauge("apm-server.sampling.tail.storage.value_log_size")
sm.storageMetrics.storageLimitGauge, _ = meter.Int64Gauge("apm-server.sampling.tail.storage.storage_limit")
sm.storageMetrics.diskUsedGauge, _ = meter.Int64Gauge("apm-server.sampling.tail.storage.disk_used")
sm.storageMetrics.diskTotalGauge, _ = meter.Int64Gauge("apm-server.sampling.tail.storage.disk_total")
sm.storageMetrics.diskUsageThresholdGauge, _ = meter.Int64Gauge("apm-server.sampling.tail.storage.disk_usage_threshold")
}

if err := sm.reset(); err != nil {
Expand Down Expand Up @@ -280,10 +293,22 @@ func (sm *StorageManager) updateDiskUsage() {
sm.storageMetrics.valueLogSizeGauge.Record(context.Background(), int64(defaultValueLogSize))
}

// Record storage limit metric
if sm.storageMetrics.storageLimitGauge != nil {
sm.storageMetrics.storageLimitGauge.Record(context.Background(), int64(sm.configuredStorageLimit.Load()))
}

if sm.getDiskUsageFailed.Load() {
// Skip GetDiskUsage under the assumption that
// it will always get the same error if GetDiskUsage ever returns one,
// such that it does not keep logging GetDiskUsage errors.
// Record zero values for disk metrics when disk usage check failed
if sm.storageMetrics.diskUsedGauge != nil {
sm.storageMetrics.diskUsedGauge.Record(context.Background(), 0)
}
if sm.storageMetrics.diskTotalGauge != nil {
sm.storageMetrics.diskTotalGauge.Record(context.Background(), 0)
}
return
}
usage, err := sm.getDiskUsage()
Expand All @@ -292,10 +317,29 @@ func (sm *StorageManager) updateDiskUsage() {
sm.getDiskUsageFailed.Store(true)
sm.cachedDiskStat.used.Store(0)
sm.cachedDiskStat.total.Store(0) // setting total to 0 to disable any running disk usage threshold checks
// Record zero values for disk metrics when disk usage check failed
if sm.storageMetrics.diskUsedGauge != nil {
sm.storageMetrics.diskUsedGauge.Record(context.Background(), 0)
}
if sm.storageMetrics.diskTotalGauge != nil {
sm.storageMetrics.diskTotalGauge.Record(context.Background(), 0)
}
return
}
sm.cachedDiskStat.used.Store(usage.UsedBytes)
sm.cachedDiskStat.total.Store(usage.TotalBytes)

// Record disk utilization metrics
if sm.storageMetrics.diskUsedGauge != nil {
sm.storageMetrics.diskUsedGauge.Record(context.Background(), int64(usage.UsedBytes))
}
if sm.storageMetrics.diskTotalGauge != nil {
sm.storageMetrics.diskTotalGauge.Record(context.Background(), int64(usage.TotalBytes))
}
// Record disk usage threshold as a percentage (0-100)
if sm.storageMetrics.diskUsageThresholdGauge != nil {
sm.storageMetrics.diskUsageThresholdGauge.Record(context.Background(), int64(sm.configuredDiskUsageThreshold.Load()))
}
}

// diskUsed returns the actual used disk space in bytes.
Expand Down Expand Up @@ -429,6 +473,11 @@ func (sm *StorageManager) WriteSubscriberPosition(data []byte) error {

// NewReadWriter returns a read writer configured with storage limit and disk usage threshold.
func (sm *StorageManager) NewReadWriter(storageLimit uint64, diskUsageThreshold float64) RW {
// Store configured values for monitoring metrics
sm.configuredStorageLimit.Store(storageLimit)
// Store disk usage threshold as percentage (0-100)
sm.configuredDiskUsageThreshold.Store(uint64(diskUsageThreshold * 100))

var rw RW = SplitReadWriter{
eventRW: sm.eventStorage.NewReadWriter(),
decisionRW: sm.decisionStorage.NewReadWriter(),
Expand Down
11 changes: 9 additions & 2 deletions x-pack/apm-server/sampling/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,9 +644,16 @@ func TestStorageMonitoring(t *testing.T) {

require.NoError(t, config.DB.Flush())

metricsNames := []string{"apm-server.sampling.tail.storage.lsm_size", "apm-server.sampling.tail.storage.value_log_size"}
metricsNames := []string{
"apm-server.sampling.tail.storage.lsm_size",
"apm-server.sampling.tail.storage.value_log_size",
"apm-server.sampling.tail.storage.storage_limit",
"apm-server.sampling.tail.storage.disk_used",
"apm-server.sampling.tail.storage.disk_total",
"apm-server.sampling.tail.storage.disk_usage_threshold",
}
gaugeValues := getGaugeValues(t, tempdirConfig.metricReader, metricsNames...)
assert.Len(t, gaugeValues, 2)
assert.Len(t, gaugeValues, 6)

lsmSize := gaugeValues[0]
assert.NotZero(t, lsmSize)
Expand Down