diff --git a/go.mod b/go.mod index 08f9e121c9..5b1f12f840 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,7 @@ require ( github.com/smartcontractkit/chainlink-common v0.7.1-0.20250521190241-65a9b738252b github.com/smartcontractkit/chainlink-framework/capabilities v0.0.0-20250522110034-65c54665034a github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20250616180023-87b70c08d7c0 - github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250522110034-65c54665034a + github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250618135814-7e3f79ab707e github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20250522110034-65c54665034a github.com/smartcontractkit/chainlink-protos/svr v1.1.0 github.com/smartcontractkit/chainlink-tron/relayer v0.0.11-0.20250528121202-292529af39df diff --git a/go.sum b/go.sum index 5f23597a89..204c4ec10b 100644 --- a/go.sum +++ b/go.sum @@ -642,8 +642,8 @@ github.com/smartcontractkit/chainlink-framework/capabilities v0.0.0-202505221100 github.com/smartcontractkit/chainlink-framework/capabilities v0.0.0-20250522110034-65c54665034a/go.mod h1:NVoJQoPYr6BorpaXTusoIH1IYTySCmanQ8Q1yv3mNh4= github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20250616180023-87b70c08d7c0 h1:OpFlG2f+LXsDp3cejSQju2rmoNsxBlhgrwXMIsk72IA= github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20250616180023-87b70c08d7c0/go.mod h1:X+a4k2a+2G2/yeAaRQMCTLmlhNdQYAeN6v+ZpLzRZww= -github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250522110034-65c54665034a h1:bFYBcW0cmhq0G8NSjPxSFfL/fVODuhEGluyWOxJTqqk= -github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250522110034-65c54665034a/go.mod h1:jo+cUqNcHwN8IF7SInQNXDZ8qzBsyMpnLdYbDswviFc= +github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250618135814-7e3f79ab707e h1:LRT+PltY99+hxZAJn+4nyTfqGVNEM1S6FJ675B9BtJo= +github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250618135814-7e3f79ab707e/go.mod h1:jo+cUqNcHwN8IF7SInQNXDZ8qzBsyMpnLdYbDswviFc= github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20250522110034-65c54665034a h1:O28vgyHM7QF1YLg1BwkQSIbOYA+t0RiH9+b+k90GPG8= github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20250522110034-65c54665034a/go.mod h1:zYqPBBRUXUQ/L+aD4Q7phnYsfVeC5rDBXtPt1VYwtws= github.com/smartcontractkit/chainlink-protos/svr v1.1.0 h1:79Z9N9dMbMVRGaLoDPAQ+vOwbM+Hnx8tIN2xCPG8H4o= diff --git a/pkg/logpoller/observability.go b/pkg/logpoller/observability.go index cbfb4590e6..0cdd040df0 100644 --- a/pkg/logpoller/observability.go +++ b/pkg/logpoller/observability.go @@ -20,12 +20,13 @@ import ( // It doesn't change internal logic, because all calls are delegated to the origin ORM type ObservedORM struct { ORM - metrics metrics.GenericLogPollerMetrics - queryDuration *prometheus.HistogramVec - datasetSize *prometheus.GaugeVec - logsInserted *prometheus.CounterVec - blocksInserted *prometheus.CounterVec - chainID string + metrics metrics.GenericLogPollerMetrics + queryDuration *prometheus.HistogramVec + datasetSize *prometheus.GaugeVec + logsInserted *prometheus.CounterVec + blocksInserted *prometheus.CounterVec + discoveryLatency *prometheus.HistogramVec + chainID string } // NewObservedORM creates an observed version of log poller's ORM created by NewORM @@ -36,226 +37,229 @@ func NewObservedORM(chainID *big.Int, ds sqlutil.DataSource, lggr logger.Logger) return nil, err } return &ObservedORM{ - ORM: NewORM(chainID, ds, lggr), - metrics: lpMetrics, - queryDuration: metrics.PromLpQueryDuration, - datasetSize: metrics.PromLpQueryDataSets, - logsInserted: metrics.PromLpLogsInserted, - blocksInserted: metrics.PromLpBlocksInserted, - chainID: chainID.String(), + ORM: NewORM(chainID, ds, lggr), + metrics: lpMetrics, + queryDuration: metrics.PromLpQueryDuration, + datasetSize: metrics.PromLpQueryDataSets, + logsInserted: metrics.PromLpLogsInserted, + blocksInserted: metrics.PromLpBlocksInserted, + discoveryLatency: metrics.PromLpQueryDuration, + chainID: chainID.String(), }, nil } func (o *ObservedORM) InsertLogs(ctx context.Context, logs []Log) error { - err := withObservedExec(o, "InsertLogs", metrics.Create, func() error { + err := withObservedExec(ctx, o, "InsertLogs", metrics.Create, func() error { return o.ORM.InsertLogs(ctx, logs) }) - trackInsertedLogsAndBlock(o, logs, nil, err) + trackInsertedLogsAndBlock(ctx, o, logs, nil, err) + trackInsertedBlockLatency(ctx, o, logs, err) return err } func (o *ObservedORM) InsertLogsWithBlock(ctx context.Context, logs []Log, block Block) error { - err := withObservedExec(o, "InsertLogsWithBlock", metrics.Create, func() error { + err := withObservedExec(ctx, o, "InsertLogsWithBlock", metrics.Create, func() error { return o.ORM.InsertLogsWithBlock(ctx, logs, block) }) - trackInsertedLogsAndBlock(o, logs, &block, err) + trackInsertedLogsAndBlock(ctx, o, logs, &block, err) + trackInsertedBlockLatency(ctx, o, logs, err) return err } func (o *ObservedORM) InsertFilter(ctx context.Context, filter Filter) error { - return withObservedExec(o, "InsertFilter", metrics.Create, func() error { + return withObservedExec(ctx, o, "InsertFilter", metrics.Create, func() error { return o.ORM.InsertFilter(ctx, filter) }) } func (o *ObservedORM) LoadFilters(ctx context.Context) (map[string]Filter, error) { - return withObservedQuery(o, "LoadFilters", func() (map[string]Filter, error) { + return withObservedQuery(ctx, o, "LoadFilters", func() (map[string]Filter, error) { return o.ORM.LoadFilters(ctx) }) } func (o *ObservedORM) DeleteFilter(ctx context.Context, name string) error { - return withObservedExec(o, "DeleteFilter", metrics.Del, func() error { + return withObservedExec(ctx, o, "DeleteFilter", metrics.Del, func() error { return o.ORM.DeleteFilter(ctx, name) }) } func (o *ObservedORM) DeleteBlocksBefore(ctx context.Context, end int64, limit int64) (int64, error) { - return withObservedExecAndRowsAffected(o, "DeleteBlocksBefore", metrics.Del, func() (int64, error) { + return withObservedExecAndRowsAffected(ctx, o, "DeleteBlocksBefore", metrics.Del, func() (int64, error) { return o.ORM.DeleteBlocksBefore(ctx, end, limit) }) } func (o *ObservedORM) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error { - return withObservedExec(o, "DeleteLogsAndBlocksAfter", metrics.Del, func() error { + return withObservedExec(ctx, o, "DeleteLogsAndBlocksAfter", metrics.Del, func() error { return o.ORM.DeleteLogsAndBlocksAfter(ctx, start) }) } func (o *ObservedORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error) { - return withObservedExecAndRowsAffected(o, "DeleteExpiredLogs", metrics.Del, func() (int64, error) { + return withObservedExecAndRowsAffected(ctx, o, "DeleteExpiredLogs", metrics.Del, func() (int64, error) { return o.ORM.DeleteExpiredLogs(ctx, limit) }) } func (o *ObservedORM) SelectUnmatchedLogIDs(ctx context.Context, limit int64) (ids []uint64, err error) { - return withObservedQueryAndResults[uint64](o, "SelectUnmatchedLogIDs", func() ([]uint64, error) { + return withObservedQueryAndResults[uint64](ctx, o, "SelectUnmatchedLogIDs", func() ([]uint64, error) { return o.ORM.SelectUnmatchedLogIDs(ctx, limit) }) } func (o *ObservedORM) SelectExcessLogIDs(ctx context.Context, limit int64) ([]uint64, error) { - return withObservedQueryAndResults[uint64](o, "SelectExcessLogIDs", func() ([]uint64, error) { + return withObservedQueryAndResults[uint64](ctx, o, "SelectExcessLogIDs", func() ([]uint64, error) { return o.ORM.SelectExcessLogIDs(ctx, limit) }) } func (o *ObservedORM) DeleteLogsByRowID(ctx context.Context, rowIDs []uint64) (int64, error) { - return withObservedExecAndRowsAffected(o, "DeleteLogsByRowID", metrics.Del, func() (int64, error) { + return withObservedExecAndRowsAffected(ctx, o, "DeleteLogsByRowID", metrics.Del, func() (int64, error) { return o.ORM.DeleteLogsByRowID(ctx, rowIDs) }) } func (o *ObservedORM) SelectBlockByNumber(ctx context.Context, n int64) (*Block, error) { - return withObservedQuery(o, "SelectBlockByNumber", func() (*Block, error) { + return withObservedQuery(ctx, o, "SelectBlockByNumber", func() (*Block, error) { return o.ORM.SelectBlockByNumber(ctx, n) }) } func (o *ObservedORM) SelectLatestBlock(ctx context.Context) (*Block, error) { - return withObservedQuery(o, "SelectLatestBlock", func() (*Block, error) { + return withObservedQuery(ctx, o, "SelectLatestBlock", func() (*Block, error) { return o.ORM.SelectLatestBlock(ctx) }) } func (o *ObservedORM) SelectOldestBlock(ctx context.Context, minAllowedBlockNumber int64) (*Block, error) { - return withObservedQuery(o, "SelectOldestBlock", func() (*Block, error) { + return withObservedQuery(ctx, o, "SelectOldestBlock", func() (*Block, error) { return o.ORM.SelectOldestBlock(ctx, minAllowedBlockNumber) }) } func (o *ObservedORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig common.Hash, address common.Address, confs evmtypes.Confirmations) (*Log, error) { - return withObservedQuery(o, "SelectLatestLogByEventSigWithConfs", func() (*Log, error) { + return withObservedQuery(ctx, o, "SelectLatestLogByEventSigWithConfs", func() (*Log, error) { return o.ORM.SelectLatestLogByEventSigWithConfs(ctx, eventSig, address, confs) }) } func (o *ObservedORM) SelectLogsWithSigs(ctx context.Context, start, end int64, address common.Address, eventSigs []common.Hash) ([]Log, error) { - return withObservedQueryAndResults(o, "SelectLogsWithSigs", func() ([]Log, error) { + return withObservedQueryAndResults(ctx, o, "SelectLogsWithSigs", func() ([]Log, error) { return o.ORM.SelectLogsWithSigs(ctx, start, end, address, eventSigs) }) } func (o *ObservedORM) SelectLogsCreatedAfter(ctx context.Context, address common.Address, eventSig common.Hash, after time.Time, confs evmtypes.Confirmations) ([]Log, error) { - return withObservedQueryAndResults(o, "SelectLogsCreatedAfter", func() ([]Log, error) { + return withObservedQueryAndResults(ctx, o, "SelectLogsCreatedAfter", func() ([]Log, error) { return o.ORM.SelectLogsCreatedAfter(ctx, address, eventSig, after, confs) }) } func (o *ObservedORM) SelectIndexedLogs(ctx context.Context, address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash, confs evmtypes.Confirmations) ([]Log, error) { - return withObservedQueryAndResults(o, "SelectIndexedLogs", func() ([]Log, error) { + return withObservedQueryAndResults(ctx, o, "SelectIndexedLogs", func() ([]Log, error) { return o.ORM.SelectIndexedLogs(ctx, address, eventSig, topicIndex, topicValues, confs) }) } func (o *ObservedORM) SelectIndexedLogsByBlockRange(ctx context.Context, start, end int64, address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash) ([]Log, error) { - return withObservedQueryAndResults(o, "SelectIndexedLogsByBlockRange", func() ([]Log, error) { + return withObservedQueryAndResults(ctx, o, "SelectIndexedLogsByBlockRange", func() ([]Log, error) { return o.ORM.SelectIndexedLogsByBlockRange(ctx, start, end, address, eventSig, topicIndex, topicValues) }) } func (o *ObservedORM) SelectIndexedLogsCreatedAfter(ctx context.Context, address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash, after time.Time, confs evmtypes.Confirmations) ([]Log, error) { - return withObservedQueryAndResults(o, "SelectIndexedLogsCreatedAfter", func() ([]Log, error) { + return withObservedQueryAndResults(ctx, o, "SelectIndexedLogsCreatedAfter", func() ([]Log, error) { return o.ORM.SelectIndexedLogsCreatedAfter(ctx, address, eventSig, topicIndex, topicValues, after, confs) }) } func (o *ObservedORM) SelectIndexedLogsWithSigsExcluding(ctx context.Context, sigA, sigB common.Hash, topicIndex int, address common.Address, startBlock, endBlock int64, confs evmtypes.Confirmations) ([]Log, error) { - return withObservedQueryAndResults(o, "SelectIndexedLogsWithSigsExcluding", func() ([]Log, error) { + return withObservedQueryAndResults(ctx, o, "SelectIndexedLogsWithSigsExcluding", func() ([]Log, error) { return o.ORM.SelectIndexedLogsWithSigsExcluding(ctx, sigA, sigB, topicIndex, address, startBlock, endBlock, confs) }) } func (o *ObservedORM) SelectLogs(ctx context.Context, start, end int64, address common.Address, eventSig common.Hash) ([]Log, error) { - return withObservedQueryAndResults(o, "SelectLogs", func() ([]Log, error) { + return withObservedQueryAndResults(ctx, o, "SelectLogs", func() ([]Log, error) { return o.ORM.SelectLogs(ctx, start, end, address, eventSig) }) } func (o *ObservedORM) SelectIndexedLogsByTxHash(ctx context.Context, address common.Address, eventSig common.Hash, txHash common.Hash) ([]Log, error) { - return withObservedQueryAndResults(o, "SelectIndexedLogsByTxHash", func() ([]Log, error) { + return withObservedQueryAndResults(ctx, o, "SelectIndexedLogsByTxHash", func() ([]Log, error) { return o.ORM.SelectIndexedLogsByTxHash(ctx, address, eventSig, txHash) }) } func (o *ObservedORM) GetBlocksRange(ctx context.Context, start int64, end int64) ([]Block, error) { - return withObservedQueryAndResults(o, "GetBlocksRange", func() ([]Block, error) { + return withObservedQueryAndResults(ctx, o, "GetBlocksRange", func() ([]Block, error) { return o.ORM.GetBlocksRange(ctx, start, end) }) } func (o *ObservedORM) SelectLatestLogEventSigsAddrsWithConfs(ctx context.Context, fromBlock int64, addresses []common.Address, eventSigs []common.Hash, confs evmtypes.Confirmations) ([]Log, error) { - return withObservedQueryAndResults(o, "SelectLatestLogEventSigsAddrsWithConfs", func() ([]Log, error) { + return withObservedQueryAndResults(ctx, o, "SelectLatestLogEventSigsAddrsWithConfs", func() ([]Log, error) { return o.ORM.SelectLatestLogEventSigsAddrsWithConfs(ctx, fromBlock, addresses, eventSigs, confs) }) } func (o *ObservedORM) SelectLatestBlockByEventSigsAddrsWithConfs(ctx context.Context, fromBlock int64, eventSigs []common.Hash, addresses []common.Address, confs evmtypes.Confirmations) (int64, error) { - return withObservedQuery(o, "SelectLatestBlockByEventSigsAddrsWithConfs", func() (int64, error) { + return withObservedQuery(ctx, o, "SelectLatestBlockByEventSigsAddrsWithConfs", func() (int64, error) { return o.ORM.SelectLatestBlockByEventSigsAddrsWithConfs(ctx, fromBlock, eventSigs, addresses, confs) }) } func (o *ObservedORM) SelectLogsDataWordRange(ctx context.Context, address common.Address, eventSig common.Hash, wordIndex int, wordValueMin, wordValueMax common.Hash, confs evmtypes.Confirmations) ([]Log, error) { - return withObservedQueryAndResults(o, "SelectLogsDataWordRange", func() ([]Log, error) { + return withObservedQueryAndResults(ctx, o, "SelectLogsDataWordRange", func() ([]Log, error) { return o.ORM.SelectLogsDataWordRange(ctx, address, eventSig, wordIndex, wordValueMin, wordValueMax, confs) }) } func (o *ObservedORM) SelectLogsDataWordGreaterThan(ctx context.Context, address common.Address, eventSig common.Hash, wordIndex int, wordValueMin common.Hash, confs evmtypes.Confirmations) ([]Log, error) { - return withObservedQueryAndResults(o, "SelectLogsDataWordGreaterThan", func() ([]Log, error) { + return withObservedQueryAndResults(ctx, o, "SelectLogsDataWordGreaterThan", func() ([]Log, error) { return o.ORM.SelectLogsDataWordGreaterThan(ctx, address, eventSig, wordIndex, wordValueMin, confs) }) } func (o *ObservedORM) SelectLogsDataWordBetween(ctx context.Context, address common.Address, eventSig common.Hash, wordIndexMin int, wordIndexMax int, wordValue common.Hash, confs evmtypes.Confirmations) ([]Log, error) { - return withObservedQueryAndResults(o, "SelectLogsDataWordBetween", func() ([]Log, error) { + return withObservedQueryAndResults(ctx, o, "SelectLogsDataWordBetween", func() ([]Log, error) { return o.ORM.SelectLogsDataWordBetween(ctx, address, eventSig, wordIndexMin, wordIndexMax, wordValue, confs) }) } func (o *ObservedORM) SelectIndexedLogsTopicGreaterThan(ctx context.Context, address common.Address, eventSig common.Hash, topicIndex int, topicValueMin common.Hash, confs evmtypes.Confirmations) ([]Log, error) { - return withObservedQueryAndResults(o, "SelectIndexedLogsTopicGreaterThan", func() ([]Log, error) { + return withObservedQueryAndResults(ctx, o, "SelectIndexedLogsTopicGreaterThan", func() ([]Log, error) { return o.ORM.SelectIndexedLogsTopicGreaterThan(ctx, address, eventSig, topicIndex, topicValueMin, confs) }) } func (o *ObservedORM) SelectIndexedLogsTopicRange(ctx context.Context, address common.Address, eventSig common.Hash, topicIndex int, topicValueMin, topicValueMax common.Hash, confs evmtypes.Confirmations) ([]Log, error) { - return withObservedQueryAndResults(o, "SelectIndexedLogsTopicRange", func() ([]Log, error) { + return withObservedQueryAndResults(ctx, o, "SelectIndexedLogsTopicRange", func() ([]Log, error) { return o.ORM.SelectIndexedLogsTopicRange(ctx, address, eventSig, topicIndex, topicValueMin, topicValueMax, confs) }) } func (o *ObservedORM) FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) { - return withObservedQueryAndResults(o, queryName, func() ([]Log, error) { + return withObservedQueryAndResults(ctx, o, queryName, func() ([]Log, error) { return o.ORM.FilteredLogs(ctx, filter, limitAndSort, queryName) }) } -func withObservedQueryAndResults[T any](o *ObservedORM, queryName string, query func() ([]T, error)) ([]T, error) { - results, err := withObservedQuery(o, queryName, query) +func withObservedQueryAndResults[T any](ctx context.Context, o *ObservedORM, queryName string, query func() ([]T, error)) ([]T, error) { + results, err := withObservedQuery(ctx, o, queryName, query) if err == nil { - ctx, cancel := context.WithTimeout(context.Background(), client.QueryTimeout) + ctx2, cancel := context.WithTimeout(ctx, client.QueryTimeout) defer cancel() - o.metrics.RecordQueryDatasetSize(ctx, queryName, metrics.Read, int64(len(results))) + o.metrics.RecordQueryDatasetSize(ctx2, queryName, metrics.Read, int64(len(results))) } return results, err } -func withObservedExecAndRowsAffected(o *ObservedORM, queryName string, queryType metrics.QueryType, exec func() (int64, error)) (int64, error) { +func withObservedExecAndRowsAffected(ctx context.Context, o *ObservedORM, queryName string, queryType metrics.QueryType, exec func() (int64, error)) (int64, error) { queryStarted := time.Now() rowsAffected, err := exec() - ctx, cancel := context.WithTimeout(context.Background(), client.QueryTimeout) + ctx, cancel := context.WithTimeout(ctx, client.QueryTimeout) defer cancel() duration := float64(time.Since(queryStarted)) o.metrics.RecordQueryDuration(ctx, queryName, queryType, duration) @@ -266,34 +270,48 @@ func withObservedExecAndRowsAffected(o *ObservedORM, queryName string, queryType return rowsAffected, err } -func withObservedQuery[T any](o *ObservedORM, queryName string, query func() (T, error)) (T, error) { +func withObservedQuery[T any](ctx context.Context, o *ObservedORM, queryName string, query func() (T, error)) (T, error) { queryStarted := time.Now() defer func() { - ctx, cancel := context.WithTimeout(context.Background(), client.QueryTimeout) + ctx2, cancel := context.WithTimeout(ctx, client.QueryTimeout) defer cancel() - o.metrics.RecordQueryDuration(ctx, queryName, metrics.Read, float64(time.Since(queryStarted))) + o.metrics.RecordQueryDuration(ctx2, queryName, metrics.Read, float64(time.Since(queryStarted))) }() return query() } -func withObservedExec(o *ObservedORM, query string, queryType metrics.QueryType, exec func() error) error { +func withObservedExec(ctx context.Context, o *ObservedORM, query string, queryType metrics.QueryType, exec func() error) error { queryStarted := time.Now() defer func() { - ctx, cancel := context.WithTimeout(context.Background(), client.QueryTimeout) + ctx2, cancel := context.WithTimeout(ctx, client.QueryTimeout) defer cancel() - o.metrics.RecordQueryDuration(ctx, query, queryType, float64(time.Since(queryStarted))) + o.metrics.RecordQueryDuration(ctx2, query, queryType, float64(time.Since(queryStarted))) }() return exec() } -func trackInsertedLogsAndBlock(o *ObservedORM, logs []Log, block *Block, err error) { +func trackInsertedLogsAndBlock(ctx context.Context, o *ObservedORM, logs []Log, block *Block, err error) { if err != nil { return } - ctx, cancel := context.WithTimeout(context.Background(), client.QueryTimeout) + ctx, cancel := context.WithTimeout(ctx, client.QueryTimeout) defer cancel() o.metrics.IncrementLogsInserted(ctx, int64(len(logs))) if block != nil { o.metrics.IncrementBlocksInserted(ctx, 1) } } + +func trackInsertedBlockLatency(ctx context.Context, o *ObservedORM, logs []Log, err error) { + if err != nil { + return + } + + if len(logs) == 0 { + return + } + ctx, cancel := context.WithTimeout(ctx, client.QueryTimeout) + defer cancel() + + o.metrics.RecordLogDiscoveryLatency(ctx, float64(time.Since(logs[0].BlockTimestamp))) +} diff --git a/pkg/logpoller/observability_test.go b/pkg/logpoller/observability_test.go index 1bd042c0fc..5cdedeae47 100644 --- a/pkg/logpoller/observability_test.go +++ b/pkg/logpoller/observability_test.go @@ -71,7 +71,7 @@ func TestMetricsAreProperlyPopulatedWithLabels(t *testing.T) { expectedSize := 2 for i := 0; i < expectedCount; i++ { - _, err := withObservedQueryAndResults(orm, "query", func() ([]string, error) { return []string{"value1", "value2"}, nil }) + _, err := withObservedQueryAndResults(t.Context(), orm, "query", func() ([]string, error) { return []string{"value1", "value2"}, nil }) require.NoError(t, err) } @@ -88,7 +88,7 @@ func TestMetricsAreProperlyPopulatedWithLabels(t *testing.T) { func TestNotPublishingDatasetSizeInCaseOfError(t *testing.T) { orm := createObservedORM(t, 420) - _, err := withObservedQueryAndResults(orm, "errorQuery", func() ([]string, error) { return nil, errors.New("error") }) + _, err := withObservedQueryAndResults(t.Context(), orm, "errorQuery", func() ([]string, error) { return nil, errors.New("error") }) require.Error(t, err) require.Equal(t, 1, counterFromHistogramByLabels(t, orm.queryDuration, network, "420", "errorQuery", "read")) @@ -97,8 +97,8 @@ func TestNotPublishingDatasetSizeInCaseOfError(t *testing.T) { func TestMetricsAreProperlyPopulatedForWrites(t *testing.T) { orm := createObservedORM(t, 420) - require.NoError(t, withObservedExec(orm, "execQuery", metrics.Create, func() error { return nil })) - require.Error(t, withObservedExec(orm, "execQuery", metrics.Create, func() error { return errors.New("error") })) + require.NoError(t, withObservedExec(t.Context(), orm, "execQuery", metrics.Create, func() error { return nil })) + require.Error(t, withObservedExec(t.Context(), orm, "execQuery", metrics.Create, func() error { return errors.New("error") })) require.Equal(t, 2, counterFromHistogramByLabels(t, orm.queryDuration, network, "420", "execQuery", "create")) } @@ -108,10 +108,11 @@ func TestCountersAreProperlyPopulatedForWrites(t *testing.T) { orm := createObservedORM(t, 420) logs := generateRandomLogs(420, 20) + assert.Equal(t, 0, testutil.CollectAndCount(orm.discoveryLatency)) // First insert 10 logs require.NoError(t, orm.InsertLogs(ctx, logs[:10])) assert.Equal(t, 10, int(testutil.ToFloat64(orm.logsInserted.WithLabelValues(network, "420")))) - + assert.Equal(t, 1, testutil.CollectAndCount(orm.discoveryLatency)) // Insert 5 more logs with block require.NoError(t, orm.InsertLogsWithBlock(ctx, logs[10:15], Block{ BlockHash: utils.RandomBytes32(), @@ -177,13 +178,14 @@ func NewTestObservedORM(chainID *big.Int, ds sqlutil.DataSource, lggr logger.Log return nil, err } return &ObservedORM{ - ORM: NewORM(chainID, ds, lggr), - metrics: lpMetrics, - queryDuration: metrics.PromLpQueryDuration, - datasetSize: metrics.PromLpQueryDataSets, - logsInserted: metrics.PromLpLogsInserted, - blocksInserted: metrics.PromLpBlocksInserted, - chainID: chainID.String(), + ORM: NewORM(chainID, ds, lggr), + metrics: lpMetrics, + queryDuration: metrics.PromLpQueryDuration, + datasetSize: metrics.PromLpQueryDataSets, + logsInserted: metrics.PromLpLogsInserted, + blocksInserted: metrics.PromLpBlocksInserted, + discoveryLatency: metrics.PromLpDiscoveryLatency, + chainID: chainID.String(), }, nil } @@ -200,6 +202,7 @@ func resetMetrics(lp ObservedORM) { lp.datasetSize.Reset() lp.logsInserted.Reset() lp.blocksInserted.Reset() + lp.discoveryLatency.Reset() } func counterFromGaugeByLabels(gaugeVec *prometheus.GaugeVec, labels ...string) int {