Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions receiver/sqlserverreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type TopQueryCollection struct {
MaxQuerySampleCount uint `mapstructure:"max_query_sample_count"`
TopQueryCount uint `mapstructure:"top_query_count"`
CollectionInterval time.Duration `mapstructure:"collection_interval"`
QueryPlanCacheTTL time.Duration `mapstructure:"query_plan_cache_ttl"`
}

// Config defines configuration for a sqlserver receiver.
Expand Down
14 changes: 10 additions & 4 deletions receiver/sqlserverreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"time"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/hashicorp/golang-lru/v2/expirable"
_ "github.com/microsoft/go-mssqldb" // register Db driver
_ "github.com/microsoft/go-mssqldb/integratedauth/krb5" // register Db driver
"go.opentelemetry.io/collector/component"
Expand All @@ -24,7 +25,7 @@

var errConfigNotSQLServer = errors.New("config was not a sqlserver receiver config")

// newCache creates a new cache with the given size.
// newCache creates a new metricCache with the given size.
// If the size is less or equal to 0, it will be set to 1.
// It will never return an error.
func newCache(size int) *lru.Cache[string, int64] {
Expand Down Expand Up @@ -59,6 +60,7 @@
MaxQuerySampleCount: 1000,
TopQueryCount: 200,
CollectionInterval: time.Minute,
QueryPlanCacheTTL: time.Hour,
},
}
}
Expand Down Expand Up @@ -131,15 +133,17 @@
id := component.NewIDWithName(metadata.Type, fmt.Sprintf("query-%d: %s", i, query))

// lru only returns error when the size is less than 0
cache := newCache(1)
metricCache := newCache(1)
planCache := expirable.NewLRU[string, string](1, nil, time.Second)

sqlServerScraper := newSQLServerScraper(id, query,
sqlquery.TelemetryConfig{},
dbProviderFunc,
sqlquery.NewDbClient,
params,
cfg,
cache)
metricCache,
planCache)

scrapers = append(scrapers, sqlServerScraper)
}
Expand Down Expand Up @@ -172,6 +176,7 @@
id := component.NewIDWithName(metadata.Type, fmt.Sprintf("logs-query-%d: %s", i, query))

cache := newCache(1)
planCache := expirable.NewLRU[string, string](1, nil, cfg.TopQueryCollection.QueryPlanCacheTTL)

Check failure on line 179 in receiver/sqlserverreceiver/factory.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, receiver-3)

QF1008: could remove embedded field "TopQueryCollection" from selector (staticcheck)

Check failure on line 179 in receiver/sqlserverreceiver/factory.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, receiver-3)

QF1008: could remove embedded field "TopQueryCollection" from selector (staticcheck)

if query == getSQLServerQueryTextAndPlanQuery() {
// we have 8 metrics in this query and multiple 2 to allow to cache more queries.
Expand All @@ -188,7 +193,8 @@
sqlquery.NewDbClient,
params,
cfg,
cache)
cache,
planCache)

scrapers = append(scrapers, sqlServerScraper)
}
Expand Down
37 changes: 28 additions & 9 deletions receiver/sqlserverreceiver/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"time"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/hashicorp/golang-lru/v2/expirable"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/pdata/pcommon"
Expand Down Expand Up @@ -61,7 +62,8 @@
db *sql.DB
mb *metadata.MetricsBuilder
lb *metadata.LogsBuilder
cache *lru.Cache[string, int64]
metricCache *lru.Cache[string, int64]
planCache *expirable.LRU[string, string]
lastExecutionTimestamp time.Time
obfuscator *obfuscator
serviceInstanceID string
Expand All @@ -79,7 +81,8 @@
clientProviderFunc sqlquery.ClientProviderFunc,
params receiver.Settings,
cfg *Config,
cache *lru.Cache[string, int64],
metricCache *lru.Cache[string, int64],
planCache *expirable.LRU[string, string],
) *sqlServerScraperHelper {
// Compute service instance ID
serviceInstanceID, err := computeServiceInstanceID(cfg)
Expand All @@ -98,7 +101,8 @@
clientProviderFunc: clientProviderFunc,
mb: metadata.NewMetricsBuilder(cfg.MetricsBuilderConfig, params),
lb: metadata.NewLogsBuilder(cfg.LogsBuilderConfig, params),
cache: cache,
metricCache: metricCache,
planCache: planCache,
lastExecutionTimestamp: time.Unix(0, 0),
obfuscator: newObfuscator(),
serviceInstanceID: serviceInstanceID,
Expand Down Expand Up @@ -685,6 +689,7 @@
now := time.Now()
timestamp := pcommon.NewTimestampFromTime(now)
s.lastExecutionTimestamp = now
s.logger.Debug("Current planCache size: " + strconv.Itoa(s.planCache.Len()))
for i, row := range rows {
// skipping the rest of the rows as totalElapsedTimeDiffs is sorted in descending order
if totalElapsedTimeDiffsMicrosecond[i] == 0 {
Expand All @@ -696,6 +701,8 @@
queryHashVal := hex.EncodeToString([]byte(row[queryHash]))
queryPlanHashVal := hex.EncodeToString([]byte(row[queryPlanHash]))

isNewPlan := s.cacheIfNewPlan(queryHashVal, queryPlanHashVal)

queryTextVal := s.retrieveValue(row, queryText, &errs, func(row sqlquery.StringMap, columnName string) (any, error) {
statement := row[columnName]
obfuscated, err := s.obfuscator.obfuscateSQLString(statement)
Expand Down Expand Up @@ -731,9 +738,12 @@
physicalReadsVal = int64(0)
}

queryPlanVal := s.retrieveValue(row, queryPlan, &errs, func(row sqlquery.StringMap, columnName string) (any, error) {
return s.obfuscator.obfuscateXMLPlan(row[columnName])
})
var queryPlanVal any = ""
if isNewPlan {
queryPlanVal = s.retrieveValue(row, queryPlan, &errs, func(row sqlquery.StringMap, columnName string) (any, error) {
return s.obfuscator.obfuscateXMLPlan(row[columnName])
})
}

rowsReturnedVal := s.retrieveValue(row, rowsReturned, &errs, retrieveInt)
cached, rowsReturnedVal = s.cacheAndDiff(queryHashVal, queryPlanHashVal, rowsReturned, rowsReturnedVal.(int64))
Expand Down Expand Up @@ -787,6 +797,15 @@
return resources, errors.Join(errs...)
}

func (s *sqlServerScraperHelper) cacheIfNewPlan(queryHashVal string, queryPlanHashVal string) bool {

Check failure on line 800 in receiver/sqlserverreceiver/scraper.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, receiver-3)

paramTypeCombine: func(queryHashVal string, queryPlanHashVal string) bool could be replaced with func(queryHashVal, queryPlanHashVal string) bool (gocritic)

Check failure on line 800 in receiver/sqlserverreceiver/scraper.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, receiver-3)

paramTypeCombine: func(queryHashVal string, queryPlanHashVal string) bool could be replaced with func(queryHashVal, queryPlanHashVal string) bool (gocritic)
cachedPlanHashValue, isPlanHashPresent := s.planCache.Get(queryHashVal)
if !isPlanHashPresent || cachedPlanHashValue != queryPlanHashVal {
s.planCache.Add(queryHashVal, queryPlanHashVal)
return true
}
return false
}

func (s *sqlServerScraperHelper) retrieveValue(
row sqlquery.StringMap,
column string,
Expand All @@ -812,14 +831,14 @@

key := queryHash + "-" + queryPlanHash + "-" + column

cached, ok := s.cache.Get(key)
cached, ok := s.metricCache.Get(key)
if !ok {
s.cache.Add(key, val)
s.metricCache.Add(key, val)
return false, val
}

if val > cached {
s.cache.Add(key, val)
s.metricCache.Add(key, val)
return true, val - cached
}

Expand Down
48 changes: 45 additions & 3 deletions receiver/sqlserverreceiver/scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@
assert.NotNil(t, scrapers)

scraper := scrapers[0]
assert.NotNil(t, scraper.cache)
assert.NotNil(t, scraper.metricCache)

const totalElapsedTime = "total_elapsed_time"
const rowsReturned = "total_rows"
Expand Down Expand Up @@ -447,7 +447,7 @@
assert.NotNil(t, scrapers)

scraper := scrapers[0]
assert.NotNil(t, scraper.cache)
assert.NotNil(t, scraper.metricCache)

const totalElapsedTime = "total_elapsed_time"
const rowsReturned = "total_rows"
Expand Down Expand Up @@ -494,6 +494,48 @@
assert.NoError(t, errs)
}

func TestCacheIfNewPlan(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Username = "sa"
cfg.Password = "password"
cfg.Port = 1433
cfg.Server = "0.0.0.0"
cfg.MetricsBuilderConfig.ResourceAttributes.SqlserverInstanceName.Enabled = true
cfg.Events.DbServerTopQuery.Enabled = true
assert.NoError(t, cfg.Validate())

configureAllScraperMetricsAndEvents(cfg, false)
cfg.Events.DbServerTopQuery.Enabled = true
cfg.TopQueryCollection.CollectionInterval = cfg.ControllerConfig.CollectionInterval

scrapers := setupSQLServerLogsScrapers(receivertest.NewNopSettings(metadata.Type), cfg)
assert.NotNil(t, scrapers)

scraper := scrapers[0]
assert.NotNil(t, scraper.planCache)

scraper.planCache.Add("query-hash-1", "plan-hash-1")

//Test existing values.

Check failure on line 519 in receiver/sqlserverreceiver/scraper_test.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, receiver-3)

commentFormatting: put a space between `//` and comment text (gocritic)

Check failure on line 519 in receiver/sqlserverreceiver/scraper_test.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, receiver-3)

commentFormatting: put a space between `//` and comment text (gocritic)
isPlan1New := scraper.cacheIfNewPlan("query-hash-1", "plan-hash-1")
assert.False(t, isPlan1New, "Should be False because query-hash-1 already exists in the cache")
value1, _ := scraper.planCache.Get("query-hash-1")
assert.Equal(t, "plan-hash-1", value1, "The existing value in cache should be 'plan-hash-1'")

//Test adding new values.

Check failure on line 525 in receiver/sqlserverreceiver/scraper_test.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, receiver-3)

commentFormatting: put a space between `//` and comment text (gocritic)

Check failure on line 525 in receiver/sqlserverreceiver/scraper_test.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, receiver-3)

commentFormatting: put a space between `//` and comment text (gocritic)
isPlan2New := scraper.cacheIfNewPlan("query-hash-2", "plan-hash-2")
assert.True(t, isPlan2New, "Should be true because 'query-hash-2' does not already exist")
value2, _ := scraper.planCache.Get("query-hash-2")
assert.Equal(t, "plan-hash-2", value2, "The new 'plan-hash-2' should have added into the cache")

//Test updating existing values.

Check failure on line 531 in receiver/sqlserverreceiver/scraper_test.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, receiver-3)

commentFormatting: put a space between `//` and comment text (gocritic)

Check failure on line 531 in receiver/sqlserverreceiver/scraper_test.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, receiver-3)

commentFormatting: put a space between `//` and comment text (gocritic)
isPlan3New := scraper.cacheIfNewPlan("query-hash-1", "plan-hash-3")
assert.True(t, isPlan3New, "Should be true because plan hash 'plan-hash-3' is new for 'query-hash-1'")
value3, _ := scraper.planCache.Get("query-hash-1")
assert.Equal(t, "plan-hash-3", value3, "The value for key 'query-hash-1' should be now updated to 'plan-hash-3'")

Check failure on line 536 in receiver/sqlserverreceiver/scraper_test.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, receiver-3)

File is not properly formatted (gofumpt)

Check failure on line 536 in receiver/sqlserverreceiver/scraper_test.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, receiver-3)

File is not properly formatted (gofumpt)
}

Check failure on line 537 in receiver/sqlserverreceiver/scraper_test.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, receiver-3)

unnecessary trailing newline (whitespace)

Check failure on line 537 in receiver/sqlserverreceiver/scraper_test.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, receiver-3)

unnecessary trailing newline (whitespace)

func TestRecordDatabaseSampleQuery(t *testing.T) {
tests := map[string]struct {
expectedFile string
Expand Down Expand Up @@ -543,7 +585,7 @@
assert.NotNil(t, scrapers)

scraper := scrapers[0]
assert.NotNil(t, scraper.cache)
assert.NotNil(t, scraper.metricCache)

scraper.client = tc.mockClient(scraper.instanceName, scraper.sqlQuery)

Expand Down
Loading