Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions receiver/oracledbreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net"
"net/url"
"strconv"
"time"

"go.opentelemetry.io/collector/scraper/scraperhelper"
"go.uber.org/multierr"
Expand All @@ -29,8 +30,9 @@ var (
)

type TopQueryCollection struct {
MaxQuerySampleCount uint `mapstructure:"max_query_sample_count"`
TopQueryCount uint `mapstructure:"top_query_count"`
MaxQuerySampleCount uint `mapstructure:"max_query_sample_count"`
TopQueryCount uint `mapstructure:"top_query_count"`
CollectionInterval time.Duration `mapstructure:"collection_interval"`
}

type QuerySample struct {
Expand Down
2 changes: 1 addition & 1 deletion receiver/oracledbreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestValidateInvalidConfigs(t *testing.T) {

func TestCreateDefaultConfig(t *testing.T) {
cfg := createDefaultConfig().(*Config)
assert.Equal(t, 10*time.Second, cfg.CollectionInterval)
assert.Equal(t, 10*time.Second, cfg.ControllerConfig.CollectionInterval)
}

func TestParseConfig(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions receiver/oracledbreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func createDefaultConfig() component.Config {
TopQueryCollection: TopQueryCollection{
MaxQuerySampleCount: 1000,
TopQueryCount: 200,
CollectionInterval: time.Minute,
},
}
}
Expand Down
37 changes: 29 additions & 8 deletions receiver/oracledbreceiver/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"net"
"os"
"sort"
Expand Down Expand Up @@ -133,6 +134,7 @@ type oracleScraper struct {
obfuscator *obfuscator
querySampleCfg QuerySample
serviceInstanceID string
lastExecutionTimestamp time.Time
}

func newScraper(metricsBuilder *metadata.MetricsBuilder, metricsBuilderConfig metadata.MetricsBuilderConfig, scrapeCfg scraperhelper.ControllerConfig, logger *zap.Logger, providerFunc dbProviderFunc, clientProviderFunc clientProviderFunc, instanceName, hostName string) (scraper.Metrics, error) {
Expand Down Expand Up @@ -529,9 +531,16 @@ func (s *oracleScraper) scrapeLogs(ctx context.Context) (plog.Logs, error) {
var scrapeErrors []error

if s.logsBuilderConfig.Events.DbServerTopQuery.Enabled {
topNCollectionErrors := s.collectTopNMetricData(ctx, logs)
if topNCollectionErrors != nil {
scrapeErrors = append(scrapeErrors, topNCollectionErrors)
currentCollectionTime := time.Now()
lookbackTimeCounter := s.calculateLookbackSeconds()
if lookbackTimeCounter < int(s.topQueryCollectCfg.CollectionInterval.Seconds()) {
s.logger.Debug("Skipping the collection of top queries because collection interval has not yet elapsed.")
} else {
topNCollectionErrors := s.collectTopNMetricData(ctx, logs, currentCollectionTime, lookbackTimeCounter)
if topNCollectionErrors != nil {
scrapeErrors = append(scrapeErrors, topNCollectionErrors)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move s.lastExecutionTimestamp = collectionTime here? so the early returned collectTopNMetricData can still update the last execution time.

Currently, if the hits is zero, then the last execution time won't get updated.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats a good point. I had multiple thoughts about this too.

I did it this way to make the first collection happen as early as possible, otherwise it would happen only at the end of first 60 seconds. But it's nothing huge. Let me check if there could be some work around.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @XSAM updated as you suggested!

s.lastExecutionTimestamp = currentCollectionTime
}
}

Expand All @@ -545,13 +554,11 @@ func (s *oracleScraper) scrapeLogs(ctx context.Context) (plog.Logs, error) {
return logs, errors.Join(scrapeErrors...)
}

func (s *oracleScraper) collectTopNMetricData(ctx context.Context, logs plog.Logs) error {
func (s *oracleScraper) collectTopNMetricData(ctx context.Context, logs plog.Logs, collectionTime time.Time, lookbackTimeSeconds int) error {
var errs []error
// get metrics and query texts from DB
timestamp := pcommon.NewTimestampFromTime(time.Now())
intervalSeconds := int(s.scrapeCfg.CollectionInterval.Seconds())
s.oracleQueryMetricsClient = s.clientProviderFunc(s.db, oracleQueryMetricsSQL, s.logger)
metricRows, metricError := s.oracleQueryMetricsClient.metricRows(ctx, intervalSeconds, s.topQueryCollectCfg.MaxQuerySampleCount)
metricRows, metricError := s.oracleQueryMetricsClient.metricRows(ctx, lookbackTimeSeconds, s.topQueryCollectCfg.MaxQuerySampleCount)

if metricError != nil {
return fmt.Errorf("error executing oracleQueryMetricsSQL: %w", metricError)
Expand Down Expand Up @@ -646,7 +653,7 @@ func (s *oracleScraper) collectTopNMetricData(ctx context.Context, logs plog.Log
planString := string(planBytes)

s.lb.RecordDbServerTopQueryEvent(context.Background(),
timestamp,
pcommon.NewTimestampFromTime(collectionTime),
dbSystemNameVal,
s.hostName,
hit.queryText,
Expand Down Expand Up @@ -872,3 +879,17 @@ func constructInstanceID(host, port, service string) string {
}
return fmt.Sprintf("%s:%s", host, port)
}

func (s *oracleScraper) calculateLookbackSeconds() int {
if s.lastExecutionTimestamp.IsZero() {
return int(s.topQueryCollectCfg.CollectionInterval.Seconds())
}

// vsqlRefreshLagSec is the buffer to account for v$sql maximum refresh latency (5 seconds) + 5 seconds to offset any collection delays.
// PS: https://docs.oracle.com/en/database/oracle/oracle-database/21/refrn/V-SQL.html
const vsqlRefreshLagSec = 10 * time.Second

return int(math.Ceil(time.Now().
Add(vsqlRefreshLagSec).
Sub(s.lastExecutionTimestamp).Seconds()))
}
100 changes: 100 additions & 0 deletions receiver/oracledbreceiver/scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"path/filepath"
"strings"
"testing"
"time"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -291,6 +292,7 @@ func TestScraper_ScrapeTopNLogs(t *testing.T) {
}()
require.NoError(t, err)
expectedQueryPlanFile := filepath.Join("testdata", "expectedQueryTextAndPlanQuery.yaml")
collectionTriggerTime := time.Now()

logs, err := scrpr.scrapeLogs(t.Context())

Expand All @@ -304,6 +306,8 @@ func TestScraper_ScrapeTopNLogs(t *testing.T) {
assert.NoError(t, errs)
assert.Equal(t, "db.server.top_query", logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).EventName())
assert.NoError(t, errs)

assert.True(t, scrpr.lastExecutionTimestamp.After(collectionTriggerTime), "lastExecutionTimestamp hasn't set after a successful collection")
}
})
}
Expand Down Expand Up @@ -426,6 +430,102 @@ func TestGetInstanceId(t *testing.T) {
assert.Equal(t, "unknown:1521", localInstanceID)
}

func TestScrapesTopNLogsOnlyWhenIntervalHasElapsed(t *testing.T) {
var metricRowData []metricRow
var logRowData []metricRow
tests := []struct {
name string
dbclientFn func(db *sql.DB, s string, logger *zap.Logger) dbClient
errWanted string
}{
{
name: "valid collection",
dbclientFn: func(_ *sql.DB, s string, _ *zap.Logger) dbClient {
if strings.Contains(s, "V$SQL_PLAN") {
metricRowFile := readFile("oracleQueryPlanData.txt")
unmarshalErr := json.Unmarshal(metricRowFile, &logRowData)
if unmarshalErr == nil {
return &fakeDbClient{
Responses: [][]metricRow{
logRowData,
},
}
}
} else {
metricRowFile := readFile("oracleQueryMetricsData.txt")
unmarshalErr := json.Unmarshal(metricRowFile, &metricRowData)
if unmarshalErr == nil {
return &fakeDbClient{
Responses: [][]metricRow{
metricRowData,
},
}
}
}
return nil
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
logsCfg := metadata.DefaultLogsBuilderConfig()
logsCfg.Events.DbServerTopQuery.Enabled = true
metricsCfg := metadata.DefaultMetricsBuilderConfig()
lruCache, _ := lru.New[string, map[string]int64](500)
lruCache.Add("fxk8aq3nds8aw:0", cacheValue)

scrpr := oracleScraper{
logger: zap.NewNop(),
mb: metadata.NewMetricsBuilder(metricsCfg, receivertest.NewNopSettings(metadata.Type)),
lb: metadata.NewLogsBuilder(logsCfg, receivertest.NewNopSettings(metadata.Type)),
dbProviderFunc: func() (*sql.DB, error) {
return nil, nil
},
clientProviderFunc: test.dbclientFn,
metricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(),
logsBuilderConfig: metadata.DefaultLogsBuilderConfig(),
metricCache: lruCache,
topQueryCollectCfg: TopQueryCollection{MaxQuerySampleCount: 5000, TopQueryCount: 200},
obfuscator: newObfuscator(),
}

scrpr.logsBuilderConfig.Events.DbServerTopQuery.Enabled = true
scrpr.topQueryCollectCfg.CollectionInterval = 1 * time.Minute

err := scrpr.start(t.Context(), componenttest.NewNopHost())
defer func() {
assert.NoError(t, scrpr.shutdown(t.Context()))
}()
require.NoError(t, err)

assert.True(t, scrpr.lastExecutionTimestamp.IsZero(), "No value should be set for lastExecutionTimestamp before a successful collection")
logsCol1, _ := scrpr.scrapeLogs(t.Context())
assert.Equal(t, 1, logsCol1.ResourceLogs().At(0).ScopeLogs().Len(), "Collection should run when lastExecutionTimestamp is not available")
assert.False(t, scrpr.lastExecutionTimestamp.IsZero(), "A value should be set for lastExecutionTimestamp after a successful collection")

scrpr.lastExecutionTimestamp = scrpr.lastExecutionTimestamp.Add(-10 * time.Second)
logsCol2, err := scrpr.scrapeLogs(t.Context())
assert.Equal(t, 0, logsCol2.ResourceLogs().Len(), "top_query should not be collected until %s elapsed.", scrpr.topQueryCollectCfg.CollectionInterval.String())
require.NoError(t, err)
})
}
}

func TestCalculateLookbackSeconds(t *testing.T) {
collectionInterval := 20 * time.Second
vsqlRefreshLagSec := 10 * time.Second
expectedMinimumLookbackTime := int((collectionInterval + vsqlRefreshLagSec).Seconds())
currentCollectionTime := time.Now()

scrpr := oracleScraper{
lastExecutionTimestamp: currentCollectionTime.Add(-collectionInterval),
}
lookbackTime := scrpr.calculateLookbackSeconds()

assert.LessOrEqual(t, expectedMinimumLookbackTime, lookbackTime, "`lookbackTime` should be minimum %d", expectedMinimumLookbackTime)
}

func readFile(fname string) []byte {
file, err := os.ReadFile(filepath.Join("testdata", fname))
if err != nil {
Expand Down
Loading