From 488362a76607aa57dbc3b581d1a96809e7ac93c5 Mon Sep 17 00:00:00 2001 From: sreenathv Date: Wed, 26 Nov 2025 14:26:30 +0000 Subject: [PATCH 1/5] planCache enhancement for top_query collection --- receiver/sqlserverreceiver/factory.go | 8 ++++++-- receiver/sqlserverreceiver/scraper.go | 24 +++++++++++++++++++++--- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/receiver/sqlserverreceiver/factory.go b/receiver/sqlserverreceiver/factory.go index 572d45e2e2bb6..ee3b0ecbc5041 100644 --- a/receiver/sqlserverreceiver/factory.go +++ b/receiver/sqlserverreceiver/factory.go @@ -132,6 +132,7 @@ func setupSQLServerScrapers(params receiver.Settings, cfg *Config) []*sqlServerS // lru only returns error when the size is less than 0 cache := newCache(1) + planCache, _ := lru.New[string, string](1) sqlServerScraper := newSQLServerScraper(id, query, sqlquery.TelemetryConfig{}, @@ -139,7 +140,8 @@ func setupSQLServerScrapers(params receiver.Settings, cfg *Config) []*sqlServerS sqlquery.NewDbClient, params, cfg, - cache) + cache, + planCache) scrapers = append(scrapers, sqlServerScraper) } @@ -172,6 +174,7 @@ func setupSQLServerLogsScrapers(params receiver.Settings, cfg *Config) []*sqlSer id := component.NewIDWithName(metadata.Type, fmt.Sprintf("logs-query-%d: %s", i, query)) cache := newCache(1) + planCache, _ := lru.New[string, string](int(cfg.MaxQuerySampleCount)) if query == getSQLServerQueryTextAndPlanQuery() { // we have 8 metrics in this query and multiple 2 to allow to cache more queries. @@ -188,7 +191,8 @@ func setupSQLServerLogsScrapers(params receiver.Settings, cfg *Config) []*sqlSer sqlquery.NewDbClient, params, cfg, - cache) + cache, + planCache) scrapers = append(scrapers, sqlServerScraper) } diff --git a/receiver/sqlserverreceiver/scraper.go b/receiver/sqlserverreceiver/scraper.go index a04218f62ca4a..2cc247996c4d3 100644 --- a/receiver/sqlserverreceiver/scraper.go +++ b/receiver/sqlserverreceiver/scraper.go @@ -62,6 +62,7 @@ type sqlServerScraperHelper struct { mb *metadata.MetricsBuilder lb *metadata.LogsBuilder cache *lru.Cache[string, int64] + planCache *lru.Cache[string, string] lastExecutionTimestamp time.Time obfuscator *obfuscator serviceInstanceID string @@ -80,6 +81,7 @@ func newSQLServerScraper(id component.ID, params receiver.Settings, cfg *Config, cache *lru.Cache[string, int64], + planCache *lru.Cache[string, string], ) *sqlServerScraperHelper { // Compute service instance ID serviceInstanceID, err := computeServiceInstanceID(cfg) @@ -99,6 +101,7 @@ func newSQLServerScraper(id component.ID, mb: metadata.NewMetricsBuilder(cfg.MetricsBuilderConfig, params), lb: metadata.NewLogsBuilder(cfg.LogsBuilderConfig, params), cache: cache, + planCache: planCache, lastExecutionTimestamp: time.Unix(0, 0), obfuscator: newObfuscator(), serviceInstanceID: serviceInstanceID, @@ -685,6 +688,7 @@ func (s *sqlServerScraperHelper) recordDatabaseQueryTextAndPlan(ctx context.Cont now := time.Now() timestamp := pcommon.NewTimestampFromTime(now) s.lastExecutionTimestamp = now + s.logger.Debug("Current planCache size: " + strconv.Itoa(s.planCache.Len())) for i, row := range rows { // skipping the rest of the rows as totalElapsedTimeDiffs is sorted in descending order if totalElapsedTimeDiffsMicrosecond[i] == 0 { @@ -696,6 +700,8 @@ func (s *sqlServerScraperHelper) recordDatabaseQueryTextAndPlan(ctx context.Cont queryHashVal := hex.EncodeToString([]byte(row[queryHash])) queryPlanHashVal := hex.EncodeToString([]byte(row[queryPlanHash])) + isNewPlan := s.cacheIfNewPlan(queryHashVal, queryPlanHashVal) + queryTextVal := s.retrieveValue(row, queryText, &errs, func(row sqlquery.StringMap, columnName string) (any, error) { statement := row[columnName] obfuscated, err := s.obfuscator.obfuscateSQLString(statement) @@ -731,9 +737,12 @@ func (s *sqlServerScraperHelper) recordDatabaseQueryTextAndPlan(ctx context.Cont physicalReadsVal = int64(0) } - queryPlanVal := s.retrieveValue(row, queryPlan, &errs, func(row sqlquery.StringMap, columnName string) (any, error) { - return s.obfuscator.obfuscateXMLPlan(row[columnName]) - }) + var queryPlanVal any = "" + if isNewPlan { + queryPlanVal = s.retrieveValue(row, queryPlan, &errs, func(row sqlquery.StringMap, columnName string) (any, error) { + return s.obfuscator.obfuscateXMLPlan(row[columnName]) + }) + } rowsReturnedVal := s.retrieveValue(row, rowsReturned, &errs, retrieveInt) cached, rowsReturnedVal = s.cacheAndDiff(queryHashVal, queryPlanHashVal, rowsReturned, rowsReturnedVal.(int64)) @@ -787,6 +796,15 @@ func (s *sqlServerScraperHelper) recordDatabaseQueryTextAndPlan(ctx context.Cont return resources, errors.Join(errs...) } +func (s *sqlServerScraperHelper) cacheIfNewPlan(queryHashVal string, queryPlanHashVal string) bool { + cachedPlanHashValue, isPlanHashPresent := s.planCache.Get(queryHashVal) + if !isPlanHashPresent || cachedPlanHashValue != queryPlanHashVal { + s.planCache.Add(queryHashVal, queryPlanHashVal) + return true + } + return false +} + func (s *sqlServerScraperHelper) retrieveValue( row sqlquery.StringMap, column string, From 9e9c98f8bbfc406172f8b595fe1555c451da973c Mon Sep 17 00:00:00 2001 From: sreenathv Date: Wed, 26 Nov 2025 14:41:42 +0000 Subject: [PATCH 2/5] renaming 'cache' to be 'metricCache' so its more meaningful --- receiver/sqlserverreceiver/factory.go | 6 +++--- receiver/sqlserverreceiver/scraper.go | 12 ++++++------ receiver/sqlserverreceiver/scraper_test.go | 6 +++--- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/receiver/sqlserverreceiver/factory.go b/receiver/sqlserverreceiver/factory.go index ee3b0ecbc5041..239783fe54b92 100644 --- a/receiver/sqlserverreceiver/factory.go +++ b/receiver/sqlserverreceiver/factory.go @@ -24,7 +24,7 @@ import ( var errConfigNotSQLServer = errors.New("config was not a sqlserver receiver config") -// newCache creates a new cache with the given size. +// newCache creates a new metricCache with the given size. // If the size is less or equal to 0, it will be set to 1. // It will never return an error. func newCache(size int) *lru.Cache[string, int64] { @@ -131,7 +131,7 @@ func setupSQLServerScrapers(params receiver.Settings, cfg *Config) []*sqlServerS id := component.NewIDWithName(metadata.Type, fmt.Sprintf("query-%d: %s", i, query)) // lru only returns error when the size is less than 0 - cache := newCache(1) + metricCache := newCache(1) planCache, _ := lru.New[string, string](1) sqlServerScraper := newSQLServerScraper(id, query, @@ -140,7 +140,7 @@ func setupSQLServerScrapers(params receiver.Settings, cfg *Config) []*sqlServerS sqlquery.NewDbClient, params, cfg, - cache, + metricCache, planCache) scrapers = append(scrapers, sqlServerScraper) diff --git a/receiver/sqlserverreceiver/scraper.go b/receiver/sqlserverreceiver/scraper.go index 2cc247996c4d3..516ad368b3bb1 100644 --- a/receiver/sqlserverreceiver/scraper.go +++ b/receiver/sqlserverreceiver/scraper.go @@ -61,7 +61,7 @@ type sqlServerScraperHelper struct { db *sql.DB mb *metadata.MetricsBuilder lb *metadata.LogsBuilder - cache *lru.Cache[string, int64] + metricCache *lru.Cache[string, int64] planCache *lru.Cache[string, string] lastExecutionTimestamp time.Time obfuscator *obfuscator @@ -80,7 +80,7 @@ func newSQLServerScraper(id component.ID, clientProviderFunc sqlquery.ClientProviderFunc, params receiver.Settings, cfg *Config, - cache *lru.Cache[string, int64], + metricCache *lru.Cache[string, int64], planCache *lru.Cache[string, string], ) *sqlServerScraperHelper { // Compute service instance ID @@ -100,7 +100,7 @@ func newSQLServerScraper(id component.ID, clientProviderFunc: clientProviderFunc, mb: metadata.NewMetricsBuilder(cfg.MetricsBuilderConfig, params), lb: metadata.NewLogsBuilder(cfg.LogsBuilderConfig, params), - cache: cache, + metricCache: metricCache, planCache: planCache, lastExecutionTimestamp: time.Unix(0, 0), obfuscator: newObfuscator(), @@ -830,14 +830,14 @@ func (s *sqlServerScraperHelper) cacheAndDiff(queryHash, queryPlanHash, column s key := queryHash + "-" + queryPlanHash + "-" + column - cached, ok := s.cache.Get(key) + cached, ok := s.metricCache.Get(key) if !ok { - s.cache.Add(key, val) + s.metricCache.Add(key, val) return false, val } if val > cached { - s.cache.Add(key, val) + s.metricCache.Add(key, val) return true, val - cached } diff --git a/receiver/sqlserverreceiver/scraper_test.go b/receiver/sqlserverreceiver/scraper_test.go index cbb751a447e2b..03449c434bfb5 100644 --- a/receiver/sqlserverreceiver/scraper_test.go +++ b/receiver/sqlserverreceiver/scraper_test.go @@ -388,7 +388,7 @@ func TestQueryTextAndPlanQuery(t *testing.T) { assert.NotNil(t, scrapers) scraper := scrapers[0] - assert.NotNil(t, scraper.cache) + assert.NotNil(t, scraper.metricCache) const totalElapsedTime = "total_elapsed_time" const rowsReturned = "total_rows" @@ -447,7 +447,7 @@ func TestInvalidQueryTextAndPlanQuery(t *testing.T) { assert.NotNil(t, scrapers) scraper := scrapers[0] - assert.NotNil(t, scraper.cache) + assert.NotNil(t, scraper.metricCache) const totalElapsedTime = "total_elapsed_time" const rowsReturned = "total_rows" @@ -543,7 +543,7 @@ func TestRecordDatabaseSampleQuery(t *testing.T) { assert.NotNil(t, scrapers) scraper := scrapers[0] - assert.NotNil(t, scraper.cache) + assert.NotNil(t, scraper.metricCache) scraper.client = tc.mockClient(scraper.instanceName, scraper.sqlQuery) From 435a1d0b6b75b53ff0c5051b9d7e302e80b6cd0c Mon Sep 17 00:00:00 2001 From: sreenathv Date: Wed, 26 Nov 2025 15:30:12 +0000 Subject: [PATCH 3/5] Adding unit tests --- receiver/sqlserverreceiver/scraper_test.go | 42 ++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/receiver/sqlserverreceiver/scraper_test.go b/receiver/sqlserverreceiver/scraper_test.go index 03449c434bfb5..d2f0aa34a9c4f 100644 --- a/receiver/sqlserverreceiver/scraper_test.go +++ b/receiver/sqlserverreceiver/scraper_test.go @@ -494,6 +494,48 @@ func TestInvalidQueryTextAndPlanQuery(t *testing.T) { assert.NoError(t, errs) } +func TestCacheIfNewPlan(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.Username = "sa" + cfg.Password = "password" + cfg.Port = 1433 + cfg.Server = "0.0.0.0" + cfg.MetricsBuilderConfig.ResourceAttributes.SqlserverInstanceName.Enabled = true + cfg.Events.DbServerTopQuery.Enabled = true + assert.NoError(t, cfg.Validate()) + + configureAllScraperMetricsAndEvents(cfg, false) + cfg.Events.DbServerTopQuery.Enabled = true + cfg.TopQueryCollection.CollectionInterval = cfg.ControllerConfig.CollectionInterval + + scrapers := setupSQLServerLogsScrapers(receivertest.NewNopSettings(metadata.Type), cfg) + assert.NotNil(t, scrapers) + + scraper := scrapers[0] + assert.NotNil(t, scraper.planCache) + + scraper.planCache.Add("query-hash-1", "plan-hash-1") + + //Test existing values. + isPlan1New := scraper.cacheIfNewPlan("query-hash-1", "plan-hash-1") + assert.False(t, isPlan1New, "Should be False because query-hash-1 already exists in the cache") + value1, _ := scraper.planCache.Get("query-hash-1") + assert.Equal(t, "plan-hash-1", value1, "The existing value in cache should be 'plan-hash-1'") + + //Test adding new values. + isPlan2New := scraper.cacheIfNewPlan("query-hash-2", "plan-hash-2") + assert.True(t, isPlan2New, "Should be true because 'query-hash-2' does not already exist") + value2, _ := scraper.planCache.Get("query-hash-2") + assert.Equal(t, "plan-hash-2", value2, "The new 'plan-hash-2' should have added into the cache") + + //Test updating existing values. + isPlan3New := scraper.cacheIfNewPlan("query-hash-1", "plan-hash-3") + assert.True(t, isPlan3New, "Should be true because plan hash 'plan-hash-3' is new for 'query-hash-1'") + value3, _ := scraper.planCache.Get("query-hash-1") + assert.Equal(t, "plan-hash-3", value3, "The value for key 'query-hash-1' should be now updated to 'plan-hash-3'") + +} + func TestRecordDatabaseSampleQuery(t *testing.T) { tests := map[string]struct { expectedFile string From fbe5c0af3a338fb61dd02036ad8fca9339849cb7 Mon Sep 17 00:00:00 2001 From: sreenathv Date: Thu, 27 Nov 2025 09:42:00 +0000 Subject: [PATCH 4/5] Making planCache expirable --- receiver/sqlserverreceiver/config.go | 1 + receiver/sqlserverreceiver/factory.go | 6 ++++-- receiver/sqlserverreceiver/scraper.go | 5 +++-- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/receiver/sqlserverreceiver/config.go b/receiver/sqlserverreceiver/config.go index cc52fab7b21b4..5956d3c62348d 100644 --- a/receiver/sqlserverreceiver/config.go +++ b/receiver/sqlserverreceiver/config.go @@ -30,6 +30,7 @@ type TopQueryCollection struct { MaxQuerySampleCount uint `mapstructure:"max_query_sample_count"` TopQueryCount uint `mapstructure:"top_query_count"` CollectionInterval time.Duration `mapstructure:"collection_interval"` + QueryPlanCacheTTL time.Duration `mapstructure:"query_plan_cache_ttl"` } // Config defines configuration for a sqlserver receiver. diff --git a/receiver/sqlserverreceiver/factory.go b/receiver/sqlserverreceiver/factory.go index 239783fe54b92..b5c17b404c3bf 100644 --- a/receiver/sqlserverreceiver/factory.go +++ b/receiver/sqlserverreceiver/factory.go @@ -11,6 +11,7 @@ import ( "time" lru "github.com/hashicorp/golang-lru/v2" + "github.com/hashicorp/golang-lru/v2/expirable" _ "github.com/microsoft/go-mssqldb" // register Db driver _ "github.com/microsoft/go-mssqldb/integratedauth/krb5" // register Db driver "go.opentelemetry.io/collector/component" @@ -59,6 +60,7 @@ func createDefaultConfig() component.Config { MaxQuerySampleCount: 1000, TopQueryCount: 200, CollectionInterval: time.Minute, + QueryPlanCacheTTL: time.Hour, }, } } @@ -132,7 +134,7 @@ func setupSQLServerScrapers(params receiver.Settings, cfg *Config) []*sqlServerS // lru only returns error when the size is less than 0 metricCache := newCache(1) - planCache, _ := lru.New[string, string](1) + planCache := expirable.NewLRU[string, string](1, nil, time.Second) sqlServerScraper := newSQLServerScraper(id, query, sqlquery.TelemetryConfig{}, @@ -174,7 +176,7 @@ func setupSQLServerLogsScrapers(params receiver.Settings, cfg *Config) []*sqlSer id := component.NewIDWithName(metadata.Type, fmt.Sprintf("logs-query-%d: %s", i, query)) cache := newCache(1) - planCache, _ := lru.New[string, string](int(cfg.MaxQuerySampleCount)) + planCache := expirable.NewLRU[string, string](1, nil, cfg.TopQueryCollection.QueryPlanCacheTTL) if query == getSQLServerQueryTextAndPlanQuery() { // we have 8 metrics in this query and multiple 2 to allow to cache more queries. diff --git a/receiver/sqlserverreceiver/scraper.go b/receiver/sqlserverreceiver/scraper.go index 516ad368b3bb1..896351421195e 100644 --- a/receiver/sqlserverreceiver/scraper.go +++ b/receiver/sqlserverreceiver/scraper.go @@ -16,6 +16,7 @@ import ( "time" lru "github.com/hashicorp/golang-lru/v2" + "github.com/hashicorp/golang-lru/v2/expirable" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/pdata/pcommon" @@ -62,7 +63,7 @@ type sqlServerScraperHelper struct { mb *metadata.MetricsBuilder lb *metadata.LogsBuilder metricCache *lru.Cache[string, int64] - planCache *lru.Cache[string, string] + planCache *expirable.LRU[string, string] lastExecutionTimestamp time.Time obfuscator *obfuscator serviceInstanceID string @@ -81,7 +82,7 @@ func newSQLServerScraper(id component.ID, params receiver.Settings, cfg *Config, metricCache *lru.Cache[string, int64], - planCache *lru.Cache[string, string], + planCache *expirable.LRU[string, string], ) *sqlServerScraperHelper { // Compute service instance ID serviceInstanceID, err := computeServiceInstanceID(cfg) From 9cf3dd901d276ac2cbcd523cf76fdce5ab871b1d Mon Sep 17 00:00:00 2001 From: sreenathv Date: Fri, 28 Nov 2025 15:06:28 +0000 Subject: [PATCH 5/5] rename QueryPlanCacheTTL to QueryPlanCacheDuration --- receiver/sqlserverreceiver/config.go | 10 +++++----- receiver/sqlserverreceiver/factory.go | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/receiver/sqlserverreceiver/config.go b/receiver/sqlserverreceiver/config.go index 5956d3c62348d..1ebd5571e949a 100644 --- a/receiver/sqlserverreceiver/config.go +++ b/receiver/sqlserverreceiver/config.go @@ -26,11 +26,11 @@ type TopQueryCollection struct { // The query statement will also be reported, hence, it is not ideal to send it as a metric. Hence // we are reporting them as logs. // The `N` is configured via `TopQueryCount` - LookbackTime time.Duration `mapstructure:"lookback_time"` - MaxQuerySampleCount uint `mapstructure:"max_query_sample_count"` - TopQueryCount uint `mapstructure:"top_query_count"` - CollectionInterval time.Duration `mapstructure:"collection_interval"` - QueryPlanCacheTTL time.Duration `mapstructure:"query_plan_cache_ttl"` + LookbackTime time.Duration `mapstructure:"lookback_time"` + MaxQuerySampleCount uint `mapstructure:"max_query_sample_count"` + TopQueryCount uint `mapstructure:"top_query_count"` + CollectionInterval time.Duration `mapstructure:"collection_interval"` + QueryPlanCacheDuration time.Duration `mapstructure:"query_plan_cache_duration"` } // Config defines configuration for a sqlserver receiver. diff --git a/receiver/sqlserverreceiver/factory.go b/receiver/sqlserverreceiver/factory.go index b5c17b404c3bf..0389874ce13c2 100644 --- a/receiver/sqlserverreceiver/factory.go +++ b/receiver/sqlserverreceiver/factory.go @@ -57,10 +57,10 @@ func createDefaultConfig() component.Config { MaxRowsPerQuery: 100, }, TopQueryCollection: TopQueryCollection{ - MaxQuerySampleCount: 1000, - TopQueryCount: 200, - CollectionInterval: time.Minute, - QueryPlanCacheTTL: time.Hour, + MaxQuerySampleCount: 1000, + TopQueryCount: 200, + CollectionInterval: time.Minute, + QueryPlanCacheDuration: time.Hour, }, } } @@ -176,7 +176,7 @@ func setupSQLServerLogsScrapers(params receiver.Settings, cfg *Config) []*sqlSer id := component.NewIDWithName(metadata.Type, fmt.Sprintf("logs-query-%d: %s", i, query)) cache := newCache(1) - planCache := expirable.NewLRU[string, string](1, nil, cfg.TopQueryCollection.QueryPlanCacheTTL) + planCache := expirable.NewLRU[string, string](1, nil, cfg.TopQueryCollection.QueryPlanCacheDuration) if query == getSQLServerQueryTextAndPlanQuery() { // we have 8 metrics in this query and multiple 2 to allow to cache more queries.