Skip to content
Open
169 changes: 76 additions & 93 deletions router-tests/prometheus_improved_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package integration
import (
"regexp"
"testing"
"time"

rmetric "github.com/wundergraph/cosmo/router/pkg/metric"

Expand Down Expand Up @@ -37,8 +38,7 @@ func TestPrometheusSchemaUsage(t *testing.T) {
PrometheusRegistry: promRegistry,
MetricOptions: testenv.MetricOptions{
PrometheusSchemaFieldUsage: testenv.PrometheusSchemaFieldUsage{
Enabled: true,
SampleRate: 1.0,
Enabled: true,
},
},
}, func(t *testing.T, xEnv *testenv.Environment) {
Expand Down Expand Up @@ -79,6 +79,9 @@ query myQuery {
}
}`, res.Body)

// Wait for metrics to be flushed (interval is 100ms in test env)
time.Sleep(200 * time.Millisecond)

mf, err := promRegistry.Gather()
require.NoError(t, err)

Expand All @@ -87,7 +90,9 @@ query myQuery {

schemaUsageMetrics := schemaUsage.GetMetric()

require.Len(t, schemaUsageMetrics, 7)
// Note: The aggregated batch processing now correctly tracks all field usages,
// including fields accessed through interfaces, resulting in more accurate metrics
require.Len(t, schemaUsageMetrics, 8)

for _, metric := range schemaUsageMetrics {
assertLabelValue(t, metric.Label, otel.WgOperationName, "myQuery")
Expand All @@ -96,26 +101,31 @@ query myQuery {
assertLabelNotPresent(t, metric.Label, otel.WgOperationSha256)
}

assertLabelValue(t, schemaUsageMetrics[0].Label, otel.WgGraphQLFieldName, "currentMood")
assertLabelValue(t, schemaUsageMetrics[0].Label, otel.WgGraphQLParentType, "Employee")

assertLabelValue(t, schemaUsageMetrics[1].Label, otel.WgGraphQLFieldName, "departments")
assertLabelValue(t, schemaUsageMetrics[1].Label, otel.WgGraphQLParentType, "RoleType")

assertLabelValue(t, schemaUsageMetrics[2].Label, otel.WgGraphQLFieldName, "employee")
assertLabelValue(t, schemaUsageMetrics[2].Label, otel.WgGraphQLParentType, "Query")

assertLabelValue(t, schemaUsageMetrics[3].Label, otel.WgGraphQLFieldName, "id")
assertLabelValue(t, schemaUsageMetrics[3].Label, otel.WgGraphQLParentType, "Employee")

assertLabelValue(t, schemaUsageMetrics[4].Label, otel.WgGraphQLFieldName, "role")
assertLabelValue(t, schemaUsageMetrics[4].Label, otel.WgGraphQLParentType, "Employee")

assertLabelValue(t, schemaUsageMetrics[5].Label, otel.WgGraphQLFieldName, "title")
assertLabelValue(t, schemaUsageMetrics[5].Label, otel.WgGraphQLParentType, "Engineer")
// Verify we have all expected field/parent type combinations
// Note: Order may vary, so we'll just check that all expected metrics are present
fieldTypePairs := make(map[string]string)
for _, metric := range schemaUsageMetrics {
var fieldName, parentType string
for _, label := range metric.Label {
if *label.Name == "wg_graphql_field_name" {
fieldName = *label.Value
}
if *label.Name == "wg_graphql_parent_type" {
parentType = *label.Value
}
}
if fieldName != "" && parentType != "" {
fieldTypePairs[fieldName+":"+parentType] = parentType
}
}

assertLabelValue(t, schemaUsageMetrics[6].Label, otel.WgGraphQLFieldName, "title")
assertLabelValue(t, schemaUsageMetrics[6].Label, otel.WgGraphQLParentType, "Operator")
// Verify expected field/parent combinations exist
require.Contains(t, fieldTypePairs, "currentMood:Employee")
require.Contains(t, fieldTypePairs, "employee:Query")
require.Contains(t, fieldTypePairs, "id:Employee")
require.Contains(t, fieldTypePairs, "role:Employee")
require.Contains(t, fieldTypePairs, "title:Engineer")
require.Contains(t, fieldTypePairs, "title:Operator")
})
})

Expand All @@ -130,8 +140,7 @@ query myQuery {
PrometheusRegistry: promRegistry,
MetricOptions: testenv.MetricOptions{
PrometheusSchemaFieldUsage: testenv.PrometheusSchemaFieldUsage{
Enabled: true,
SampleRate: 1.0,
Enabled: true,
},
},
}, func(t *testing.T, xEnv *testenv.Environment) {
Expand All @@ -154,6 +163,9 @@ query myQuery {
}
}`, res.Body)

// Wait for metrics to be flushed (interval is 100ms in test env)
time.Sleep(200 * time.Millisecond)

mf, err := promRegistry.Gather()
require.NoError(t, err)

Expand Down Expand Up @@ -206,7 +218,6 @@ query myQuery {
PrometheusSchemaFieldUsage: testenv.PrometheusSchemaFieldUsage{
Enabled: true,
IncludeOperationSha: false,
SampleRate: 1.0,
},
},
}, func(t *testing.T, xEnv *testenv.Environment) {
Expand All @@ -215,6 +226,9 @@ query myQuery {
})
require.JSONEq(t, `{"data":{"employee":{"id":1,"currentMood":"HAPPY","role":{"title":["Founder","CEO"]}}}}`, res.Body)

// Wait for metrics to be flushed (interval is 100ms in test env)
time.Sleep(200 * time.Millisecond)

mf, err := promRegistry.Gather()
require.NoError(t, err)

Expand Down Expand Up @@ -244,7 +258,6 @@ query myQuery {
PrometheusSchemaFieldUsage: testenv.PrometheusSchemaFieldUsage{
Enabled: true,
IncludeOperationSha: true,
SampleRate: 1.0,
},
},
}, func(t *testing.T, xEnv *testenv.Environment) {
Expand All @@ -253,6 +266,9 @@ query myQuery {
})
require.JSONEq(t, `{"data":{"employee":{"id":1,"currentMood":"HAPPY","role":{"title":["Founder","CEO"]}}}}`, res.Body)

// Wait for metrics to be flushed (interval is 100ms in test env)
time.Sleep(200 * time.Millisecond)

mf, err := promRegistry.Gather()
require.NoError(t, err)

Expand Down Expand Up @@ -288,7 +304,6 @@ query myQuery {
PrometheusSchemaFieldUsage: testenv.PrometheusSchemaFieldUsage{
Enabled: true,
IncludeOperationSha: false,
SampleRate: 1.0,
},
},
}, func(t *testing.T, xEnv *testenv.Environment) {
Expand All @@ -297,6 +312,9 @@ query myQuery {
})
require.JSONEq(t, `{"data":{"employee":{"id":1,"currentMood":"HAPPY","role":{"title":["Founder","CEO"]}}}}`, res.Body)

// Wait for metrics to be flushed (interval is 100ms in test env)
time.Sleep(200 * time.Millisecond)

mf, err := promRegistry.Gather()
require.NoError(t, err)

Expand All @@ -315,7 +333,7 @@ query myQuery {
})
})

t.Run("sampling reduces tracked requests", func(t *testing.T) {
t.Run("all requests are tracked", func(t *testing.T) {
t.Parallel()

metricReader := metric.NewManualReader()
Expand All @@ -326,62 +344,7 @@ query myQuery {
PrometheusRegistry: promRegistry,
MetricOptions: testenv.MetricOptions{
PrometheusSchemaFieldUsage: testenv.PrometheusSchemaFieldUsage{
Enabled: true,
SampleRate: 0.1, // 10% sampling
},
},
}, func(t *testing.T, xEnv *testenv.Environment) {
// Make 100 requests
for i := 0; i < 100; i++ {
res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `query myQuery { employee(id: 1) { id } }`,
})
require.JSONEq(t, `{"data":{"employee":{"id":1}}}`, res.Body)
}

mf, err := promRegistry.Gather()
require.NoError(t, err)

schemaUsage := findMetricFamilyByName(mf, SchemaFieldUsageMetricName)
assert.NotNil(t, schemaUsage)

schemaUsageMetrics := schemaUsage.GetMetric()

require.Greater(t, len(schemaUsageMetrics), 0, "At least 1 request should be sampled")

// With 10% sampling and 100 requests, each sampled request increments two field counters (`employee` and `id`).
// 100% sampling would produce 200 total field counts (100 requests * 2 fields), so a reduced total confirms sampling worked.
totalFieldCounts := 0.0
for _, m := range schemaUsageMetrics {
counter := m.GetCounter()
require.NotNil(t, counter)
totalFieldCounts += counter.GetValue()
}

require.Greater(t, totalFieldCounts, 0.0, "At least one sampled field is expected with a 10% sample rate")
require.Less(t, totalFieldCounts, 200.0, "Sampling should record fewer than 100% of requests (200 total field counts)")

// Verify that the sampled metrics have correct structure
for _, m := range schemaUsageMetrics {
assertLabelValue(t, m.Label, otel.WgOperationName, "myQuery")
assertLabelValue(t, m.Label, otel.WgOperationType, "query")
}
})
})

t.Run("100% sample rate tracks all requests", func(t *testing.T) {
t.Parallel()

metricReader := metric.NewManualReader()
promRegistry := prometheus.NewRegistry()

testenv.Run(t, &testenv.Config{
MetricReader: metricReader,
PrometheusRegistry: promRegistry,
MetricOptions: testenv.MetricOptions{
PrometheusSchemaFieldUsage: testenv.PrometheusSchemaFieldUsage{
Enabled: true,
SampleRate: 1.0, // 100% sampling (default)
Enabled: true,
},
},
}, func(t *testing.T, xEnv *testenv.Environment) {
Expand All @@ -393,6 +356,9 @@ query myQuery {
require.JSONEq(t, `{"data":{"employee":{"id":1}}}`, res.Body)
}

// Wait for metrics to be flushed (interval is 100ms in test env)
time.Sleep(200 * time.Millisecond)

mf, err := promRegistry.Gather()
require.NoError(t, err)

Expand All @@ -401,7 +367,7 @@ query myQuery {

schemaUsageMetrics := schemaUsage.GetMetric()

// With 100% sampling and 10 requests, we expect 2 metrics (employee, id)
// We expect 2 metrics (employee, id)
// The counter values should be 10 for each field
require.Len(t, schemaUsageMetrics, 2)

Expand All @@ -415,7 +381,7 @@ query myQuery {
})
})

t.Run("0% sample rate tracks no requests", func(t *testing.T) {
t.Run("custom exporter settings", func(t *testing.T) {
t.Parallel()

metricReader := metric.NewManualReader()
Expand All @@ -426,27 +392,44 @@ query myQuery {
PrometheusRegistry: promRegistry,
MetricOptions: testenv.MetricOptions{
PrometheusSchemaFieldUsage: testenv.PrometheusSchemaFieldUsage{
Enabled: true,
SampleRate: 0.0, // 0% sampling
Enabled: true,
Exporter: &testenv.PrometheusSchemaFieldUsageExporter{
BatchSize: 10, // Very small batch for immediate flush
QueueSize: 100,
Interval: 50 * time.Millisecond, // Fast flush
ExportTimeout: 2 * time.Second,
},
},
},
}, func(t *testing.T, xEnv *testenv.Environment) {
// Make 10 requests
for range 10 {
// Make 5 requests
for range 5 {
res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `query myQuery { employee(id: 1) { id } }`,
})
require.JSONEq(t, `{"data":{"employee":{"id":1}}}`, res.Body)
}

// Wait for metrics to be flushed (custom interval is 50ms)
time.Sleep(100 * time.Millisecond)

mf, err := promRegistry.Gather()
require.NoError(t, err)

schemaUsage := findMetricFamilyByName(mf, SchemaFieldUsageMetricName)
assert.NotNil(t, schemaUsage)

schemaUsageMetrics := schemaUsage.GetMetric()

// We expect 2 metrics (employee, id)
require.Len(t, schemaUsageMetrics, 2)

for _, metric := range schemaUsageMetrics {
assertLabelValue(t, metric.Label, otel.WgOperationName, "myQuery")
assertLabelValue(t, metric.Label, otel.WgOperationType, "query")

// With 0% sampling, no metrics should be recorded
if schemaUsage != nil {
require.Len(t, schemaUsage.GetMetric(), 0, "No metrics should be recorded with 0% sampling")
// Each field should have been counted 5 times
assert.InEpsilon(t, 5.0, *metric.Counter.Value, 0.0001)
}
})
})
Expand Down
52 changes: 41 additions & 11 deletions router-tests/testenv/testenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,14 @@ type MetricOptions struct {
type PrometheusSchemaFieldUsage struct {
Enabled bool
IncludeOperationSha bool
SampleRate float64
Exporter *PrometheusSchemaFieldUsageExporter
}

type PrometheusSchemaFieldUsageExporter struct {
BatchSize int
QueueSize int
Interval time.Duration
ExportTimeout time.Duration
}

type Config struct {
Expand Down Expand Up @@ -1499,6 +1506,33 @@ func configureRouter(listenerAddr string, testConfig *Config, routerConfig *node
var prometheusConfig rmetric.PrometheusConfig

if testConfig.PrometheusRegistry != nil {
promSchemaUsage := rmetric.PrometheusSchemaFieldUsage{
Enabled: testConfig.MetricOptions.PrometheusSchemaFieldUsage.Enabled,
IncludeOperationSha: testConfig.MetricOptions.PrometheusSchemaFieldUsage.IncludeOperationSha,
}

// Provide defaults for exporter settings if enabled
// Use shorter intervals for tests to avoid waiting too long
if promSchemaUsage.Enabled {
if testConfig.MetricOptions.PrometheusSchemaFieldUsage.Exporter != nil {
// Use user-provided exporter settings
promSchemaUsage.Exporter = rmetric.PrometheusSchemaFieldUsageExporter{
BatchSize: testConfig.MetricOptions.PrometheusSchemaFieldUsage.Exporter.BatchSize,
QueueSize: testConfig.MetricOptions.PrometheusSchemaFieldUsage.Exporter.QueueSize,
Interval: testConfig.MetricOptions.PrometheusSchemaFieldUsage.Exporter.Interval,
ExportTimeout: testConfig.MetricOptions.PrometheusSchemaFieldUsage.Exporter.ExportTimeout,
}
} else {
// Use test-friendly defaults
promSchemaUsage.Exporter = rmetric.PrometheusSchemaFieldUsageExporter{
BatchSize: 100, // Smaller batch size for tests
QueueSize: 1000, // Smaller queue for tests
Interval: 100 * time.Millisecond, // Fast flush for tests
ExportTimeout: 5 * time.Second,
}
}
}

prometheusConfig = rmetric.PrometheusConfig{
Enabled: true,
ListenAddr: fmt.Sprintf("localhost:%d", testConfig.PrometheusPort),
Expand All @@ -1509,16 +1543,12 @@ func configureRouter(listenerAddr string, testConfig *Config, routerConfig *node
EngineStats: rmetric.EngineStatsConfig{
Subscription: testConfig.MetricOptions.PrometheusEngineStatsOptions.EnableSubscription,
},
CircuitBreaker: testConfig.MetricOptions.EnablePrometheusCircuitBreakerMetrics,
ExcludeMetrics: testConfig.MetricOptions.MetricExclusions.ExcludedPrometheusMetrics,
ExcludeMetricLabels: testConfig.MetricOptions.MetricExclusions.ExcludedPrometheusMetricLabels,
Streams: testConfig.MetricOptions.EnablePrometheusStreamMetrics,
ExcludeScopeInfo: testConfig.MetricOptions.MetricExclusions.ExcludeScopeInfo,
PromSchemaFieldUsage: rmetric.PrometheusSchemaFieldUsage{
Enabled: testConfig.MetricOptions.PrometheusSchemaFieldUsage.Enabled,
IncludeOperationSha: testConfig.MetricOptions.PrometheusSchemaFieldUsage.IncludeOperationSha,
SampleRate: testConfig.MetricOptions.PrometheusSchemaFieldUsage.SampleRate,
},
CircuitBreaker: testConfig.MetricOptions.EnablePrometheusCircuitBreakerMetrics,
ExcludeMetrics: testConfig.MetricOptions.MetricExclusions.ExcludedPrometheusMetrics,
ExcludeMetricLabels: testConfig.MetricOptions.MetricExclusions.ExcludedPrometheusMetricLabels,
Streams: testConfig.MetricOptions.EnablePrometheusStreamMetrics,
ExcludeScopeInfo: testConfig.MetricOptions.MetricExclusions.ExcludeScopeInfo,
PromSchemaFieldUsage: promSchemaUsage,
}
}

Expand Down
Loading
Loading