diff --git a/receiver/oracledbreceiver/config.go b/receiver/oracledbreceiver/config.go index cb59804bf5ec5..389d638766361 100644 --- a/receiver/oracledbreceiver/config.go +++ b/receiver/oracledbreceiver/config.go @@ -9,6 +9,7 @@ import ( "net" "net/url" "strconv" + "time" "go.opentelemetry.io/collector/scraper/scraperhelper" "go.uber.org/multierr" @@ -29,8 +30,9 @@ var ( ) type TopQueryCollection struct { - MaxQuerySampleCount uint `mapstructure:"max_query_sample_count"` - TopQueryCount uint `mapstructure:"top_query_count"` + MaxQuerySampleCount uint `mapstructure:"max_query_sample_count"` + TopQueryCount uint `mapstructure:"top_query_count"` + CollectionInterval time.Duration `mapstructure:"collection_interval"` } type QuerySample struct { diff --git a/receiver/oracledbreceiver/config_test.go b/receiver/oracledbreceiver/config_test.go index 6573094d67603..215c02f9845d2 100644 --- a/receiver/oracledbreceiver/config_test.go +++ b/receiver/oracledbreceiver/config_test.go @@ -117,7 +117,7 @@ func TestValidateInvalidConfigs(t *testing.T) { func TestCreateDefaultConfig(t *testing.T) { cfg := createDefaultConfig().(*Config) - assert.Equal(t, 10*time.Second, cfg.CollectionInterval) + assert.Equal(t, 10*time.Second, cfg.ControllerConfig.CollectionInterval) } func TestParseConfig(t *testing.T) { diff --git a/receiver/oracledbreceiver/factory.go b/receiver/oracledbreceiver/factory.go index 6302b10345248..4437e2273e0da 100644 --- a/receiver/oracledbreceiver/factory.go +++ b/receiver/oracledbreceiver/factory.go @@ -50,6 +50,7 @@ func createDefaultConfig() component.Config { TopQueryCollection: TopQueryCollection{ MaxQuerySampleCount: 1000, TopQueryCount: 200, + CollectionInterval: time.Minute, }, } } diff --git a/receiver/oracledbreceiver/scraper.go b/receiver/oracledbreceiver/scraper.go index 926e723e1ea8f..071a04a5e6489 100644 --- a/receiver/oracledbreceiver/scraper.go +++ b/receiver/oracledbreceiver/scraper.go @@ -11,6 +11,7 @@ import ( "encoding/json" "errors" "fmt" + "math" "net" "os" "sort" @@ -133,6 +134,7 @@ type oracleScraper struct { obfuscator *obfuscator querySampleCfg QuerySample serviceInstanceID string + lastExecutionTimestamp time.Time } func newScraper(metricsBuilder *metadata.MetricsBuilder, metricsBuilderConfig metadata.MetricsBuilderConfig, scrapeCfg scraperhelper.ControllerConfig, logger *zap.Logger, providerFunc dbProviderFunc, clientProviderFunc clientProviderFunc, instanceName, hostName string) (scraper.Metrics, error) { @@ -529,9 +531,16 @@ func (s *oracleScraper) scrapeLogs(ctx context.Context) (plog.Logs, error) { var scrapeErrors []error if s.logsBuilderConfig.Events.DbServerTopQuery.Enabled { - topNCollectionErrors := s.collectTopNMetricData(ctx, logs) - if topNCollectionErrors != nil { - scrapeErrors = append(scrapeErrors, topNCollectionErrors) + currentCollectionTime := time.Now() + lookbackTimeCounter := s.calculateLookbackSeconds() + if lookbackTimeCounter < int(s.topQueryCollectCfg.CollectionInterval.Seconds()) { + s.logger.Debug("Skipping the collection of top queries because collection interval has not yet elapsed.") + } else { + topNCollectionErrors := s.collectTopNMetricData(ctx, logs, currentCollectionTime, lookbackTimeCounter) + if topNCollectionErrors != nil { + scrapeErrors = append(scrapeErrors, topNCollectionErrors) + } + s.lastExecutionTimestamp = currentCollectionTime } } @@ -545,13 +554,11 @@ func (s *oracleScraper) scrapeLogs(ctx context.Context) (plog.Logs, error) { return logs, errors.Join(scrapeErrors...) } -func (s *oracleScraper) collectTopNMetricData(ctx context.Context, logs plog.Logs) error { +func (s *oracleScraper) collectTopNMetricData(ctx context.Context, logs plog.Logs, collectionTime time.Time, lookbackTimeSeconds int) error { var errs []error // get metrics and query texts from DB - timestamp := pcommon.NewTimestampFromTime(time.Now()) - intervalSeconds := int(s.scrapeCfg.CollectionInterval.Seconds()) s.oracleQueryMetricsClient = s.clientProviderFunc(s.db, oracleQueryMetricsSQL, s.logger) - metricRows, metricError := s.oracleQueryMetricsClient.metricRows(ctx, intervalSeconds, s.topQueryCollectCfg.MaxQuerySampleCount) + metricRows, metricError := s.oracleQueryMetricsClient.metricRows(ctx, lookbackTimeSeconds, s.topQueryCollectCfg.MaxQuerySampleCount) if metricError != nil { return fmt.Errorf("error executing oracleQueryMetricsSQL: %w", metricError) @@ -646,7 +653,7 @@ func (s *oracleScraper) collectTopNMetricData(ctx context.Context, logs plog.Log planString := string(planBytes) s.lb.RecordDbServerTopQueryEvent(context.Background(), - timestamp, + pcommon.NewTimestampFromTime(collectionTime), dbSystemNameVal, s.hostName, hit.queryText, @@ -872,3 +879,17 @@ func constructInstanceID(host, port, service string) string { } return fmt.Sprintf("%s:%s", host, port) } + +func (s *oracleScraper) calculateLookbackSeconds() int { + if s.lastExecutionTimestamp.IsZero() { + return int(s.topQueryCollectCfg.CollectionInterval.Seconds()) + } + + // vsqlRefreshLagSec is the buffer to account for v$sql maximum refresh latency (5 seconds) + 5 seconds to offset any collection delays. + // PS: https://docs.oracle.com/en/database/oracle/oracle-database/21/refrn/V-SQL.html + const vsqlRefreshLagSec = 10 * time.Second + + return int(math.Ceil(time.Now(). + Add(vsqlRefreshLagSec). + Sub(s.lastExecutionTimestamp).Seconds())) +} diff --git a/receiver/oracledbreceiver/scraper_test.go b/receiver/oracledbreceiver/scraper_test.go index 4300a1fc512f0..11b1f3dc99fb0 100644 --- a/receiver/oracledbreceiver/scraper_test.go +++ b/receiver/oracledbreceiver/scraper_test.go @@ -12,6 +12,7 @@ import ( "path/filepath" "strings" "testing" + "time" lru "github.com/hashicorp/golang-lru/v2" "github.com/stretchr/testify/assert" @@ -291,6 +292,7 @@ func TestScraper_ScrapeTopNLogs(t *testing.T) { }() require.NoError(t, err) expectedQueryPlanFile := filepath.Join("testdata", "expectedQueryTextAndPlanQuery.yaml") + collectionTriggerTime := time.Now() logs, err := scrpr.scrapeLogs(t.Context()) @@ -304,6 +306,8 @@ func TestScraper_ScrapeTopNLogs(t *testing.T) { assert.NoError(t, errs) assert.Equal(t, "db.server.top_query", logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).EventName()) assert.NoError(t, errs) + + assert.True(t, scrpr.lastExecutionTimestamp.After(collectionTriggerTime), "lastExecutionTimestamp hasn't set after a successful collection") } }) } @@ -426,6 +430,102 @@ func TestGetInstanceId(t *testing.T) { assert.Equal(t, "unknown:1521", localInstanceID) } +func TestScrapesTopNLogsOnlyWhenIntervalHasElapsed(t *testing.T) { + var metricRowData []metricRow + var logRowData []metricRow + tests := []struct { + name string + dbclientFn func(db *sql.DB, s string, logger *zap.Logger) dbClient + errWanted string + }{ + { + name: "valid collection", + dbclientFn: func(_ *sql.DB, s string, _ *zap.Logger) dbClient { + if strings.Contains(s, "V$SQL_PLAN") { + metricRowFile := readFile("oracleQueryPlanData.txt") + unmarshalErr := json.Unmarshal(metricRowFile, &logRowData) + if unmarshalErr == nil { + return &fakeDbClient{ + Responses: [][]metricRow{ + logRowData, + }, + } + } + } else { + metricRowFile := readFile("oracleQueryMetricsData.txt") + unmarshalErr := json.Unmarshal(metricRowFile, &metricRowData) + if unmarshalErr == nil { + return &fakeDbClient{ + Responses: [][]metricRow{ + metricRowData, + }, + } + } + } + return nil + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + logsCfg := metadata.DefaultLogsBuilderConfig() + logsCfg.Events.DbServerTopQuery.Enabled = true + metricsCfg := metadata.DefaultMetricsBuilderConfig() + lruCache, _ := lru.New[string, map[string]int64](500) + lruCache.Add("fxk8aq3nds8aw:0", cacheValue) + + scrpr := oracleScraper{ + logger: zap.NewNop(), + mb: metadata.NewMetricsBuilder(metricsCfg, receivertest.NewNopSettings(metadata.Type)), + lb: metadata.NewLogsBuilder(logsCfg, receivertest.NewNopSettings(metadata.Type)), + dbProviderFunc: func() (*sql.DB, error) { + return nil, nil + }, + clientProviderFunc: test.dbclientFn, + metricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(), + logsBuilderConfig: metadata.DefaultLogsBuilderConfig(), + metricCache: lruCache, + topQueryCollectCfg: TopQueryCollection{MaxQuerySampleCount: 5000, TopQueryCount: 200}, + obfuscator: newObfuscator(), + } + + scrpr.logsBuilderConfig.Events.DbServerTopQuery.Enabled = true + scrpr.topQueryCollectCfg.CollectionInterval = 1 * time.Minute + + err := scrpr.start(t.Context(), componenttest.NewNopHost()) + defer func() { + assert.NoError(t, scrpr.shutdown(t.Context())) + }() + require.NoError(t, err) + + assert.True(t, scrpr.lastExecutionTimestamp.IsZero(), "No value should be set for lastExecutionTimestamp before a successful collection") + logsCol1, _ := scrpr.scrapeLogs(t.Context()) + assert.Equal(t, 1, logsCol1.ResourceLogs().At(0).ScopeLogs().Len(), "Collection should run when lastExecutionTimestamp is not available") + assert.False(t, scrpr.lastExecutionTimestamp.IsZero(), "A value should be set for lastExecutionTimestamp after a successful collection") + + scrpr.lastExecutionTimestamp = scrpr.lastExecutionTimestamp.Add(-10 * time.Second) + logsCol2, err := scrpr.scrapeLogs(t.Context()) + assert.Equal(t, 0, logsCol2.ResourceLogs().Len(), "top_query should not be collected until %s elapsed.", scrpr.topQueryCollectCfg.CollectionInterval.String()) + require.NoError(t, err) + }) + } +} + +func TestCalculateLookbackSeconds(t *testing.T) { + collectionInterval := 20 * time.Second + vsqlRefreshLagSec := 10 * time.Second + expectedMinimumLookbackTime := int((collectionInterval + vsqlRefreshLagSec).Seconds()) + currentCollectionTime := time.Now() + + scrpr := oracleScraper{ + lastExecutionTimestamp: currentCollectionTime.Add(-collectionInterval), + } + lookbackTime := scrpr.calculateLookbackSeconds() + + assert.LessOrEqual(t, expectedMinimumLookbackTime, lookbackTime, "`lookbackTime` should be minimum %d", expectedMinimumLookbackTime) +} + func readFile(fname string) []byte { file, err := os.ReadFile(filepath.Join("testdata", fname)) if err != nil {