Skip to content

Conversation

@danidomi
Copy link
Contributor

@danidomi danidomi commented Dec 29, 2025

Summary by CodeRabbit

  • New Features

    • Chainlink Data Streams support added for oracle feeds; example Chainlink configs included.
  • Breaking Changes

    • HTTP API, Swagger UI, health and probe endpoints removed; "api" and "probe" CLI commands removed.
    • Dynamic pipeline/task execution system and related pipeline-based feed runner removed; feed config format and feed handling changed.
  • Chores

    • Dependency updates and refactor to a multi-provider oracle architecture (Stork + Chainlink).

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Dec 29, 2025

📝 Walkthrough

Walkthrough

Removes the legacy pipeline and HTTP/Swagger API surfaces; refactors the oracle to provider-agnostic public types; adds Chainlink Data Streams fetcher and Chainlink/Stork integrations; updates CLI options and examples; and upgrades dependencies.

Changes

Cohort / File(s) Summary
Pipeline framework removal
pipeline/*, pipeline/null/*, pipeline/task.*.go, pipeline/LICENSE.txt
Entire pipeline subsystem deleted: core types, task implementations, runner, scheduler, graph parsing, params, vars, models, null wrappers, and pipeline license removed.
API, health & swagger removal
cmd/injective-price-oracle/api.go, cmd/injective-price-oracle/probe.go, internal/service/health/health.go, internal/service/oracle/api.go, internal/service/swagger/swagger.go, swagger/*, swagger/index.html, swagger/oauth2-redirect.html, api/design/*
HTTP API server, Probe endpoint, health service, Goa API designs, Swagger UI/spec and related constructors removed; CLI api and probe commands deleted.
Oracle core refactor
internal/service/oracle/service.go, internal/service/oracle/models.go, internal/service/oracle/feed_dynamic.go
Legacy dynamic pipeline-driven feeds and PriceData removed; service rewritten to use new public types package and provider-specific fetchers (stork/chainlink); NewService signature and internal flows updated to public types.
New shared oracle types
internal/service/oracle/types/types.go
Adds public abstractions: Fetcher, PricePuller, PriceData, FeedConfig, Ticker type, FeedProvider constants and helpers to unify provider integrations.
Stork package reorg
internal/service/oracle/stork/*
Stork code reorganized: package rename, new StorkPriceData model, parse/constructor signatures updated to use types, NewFetcher rename and Fetcher interface introduced.
Chainlink Data Streams integration
internal/service/oracle/chainlink/*
New Chainlink data-streams SDK integration: client wiring, Chainlink fetcher with subscription/start, report decoding, cached latest reports, Chainlink feed parser, Chainlink price feed implementation, models and types.
Oracle command wiring & options
cmd/injective-price-oracle/oracle.go, cmd/injective-price-oracle/options.go, cmd/injective-price-oracle/main.go
Replaces grpc-web API flags with Chainlink Data Streams CLI flags (WsURL, APIKey, APISecret); dynamic feed loading supports Stork & Chainlink; probe/api CLI commands removed; Chainlink fetcher wired and started when configured.
WebSocket utils rename & examples
internal/service/oracle/utils/ws.go, examples/*.toml
Package rename for WS helpers (pipeline → utils); adds Chainlink example TOML configs.
Dependency updates & build
go.mod, Dockerfile
Upgrades Injective SDK and cosmos-sdk, adds smartcontractkit/data-streams-sdk/go, updates indirect deps; Dockerfile stops copying swagger artifacts into final image.
Miscellaneous deletions
api/design/*, swagger/* (duplicated above)
Goa DSL API design and assets removed (probe, health, swagger definitions).

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant CLI as CLI / Service Init
  participant Streams as Data-Streams SDK
  participant CLFetcher as chainlink.Fetcher
  participant Oracle as oracle.Service
  Note over CLI,Streams: Startup with Chainlink options
  CLI->>Streams: streams.New(config{WsURL, APIKey, APISecret})
  Streams-->>CLI: streams.Client
  CLI->>CLFetcher: NewFetcher(client, feedIDs)
  CLFetcher-->>CLI: Chainlink fetcher
  CLI->>CLFetcher: Start(ctx)
  alt subscription success
    CLFetcher->>Streams: Subscribe(streamID)
    Streams-->>CLFetcher: Stream / Report(payload)
    loop streaming reports
      Streams-->>CLFetcher: Report(payload)
      CLFetcher->>CLFetcher: decode → update latest cache
      Oracle->>CLFetcher: (on PullPrice) request latest report
      CLFetcher-->>Oracle: cached report
    end
  else subscription error
    CLFetcher-->>CLI: error (retry/backoff)
  end
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Poem

🐇 I hopped through code both old and new,

Pipelines cleared so streams could cue.
Chainlink hums and Stork keeps time,
New feeds sing in tidy rhyme.
A carrot patch of logs — hooray! 🥕

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 38.46% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly indicates a major architectural shift: replacing the API server and pipeline infrastructure with direct Chainlink integration, which is the primary focus of this large changeset.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 12

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
internal/service/oracle/stork/feed_stork.go (2)

52-55: Bug: Wrapping nil error and inconsistent message.

Two issues here:

  1. After successful ParseDuration, err is nil, so errors.Wrapf(err, ...) returns nil. The function returns (nil, nil), silently accepting invalid configuration.
  2. The error message says "minimum interval = 30s" but the check is for < time.Second (1s).
🔎 Proposed fix
 		if interval < time.Second {
-			err = errors.Wrapf(err, "failed to parse pull interval: %s (minimum interval = 30s)", cfg.PullInterval)
-			return nil, err
+			return nil, fmt.Errorf("invalid pull interval: %s (minimum interval = 1s)", cfg.PullInterval)
 		}

109-111: OracleType() ignores the configured value.

Same issue as in Chainlink feed: the method returns hardcoded OracleType_Stork instead of using f.oracleType set during construction.

🔎 Proposed fix
 func (f *storkPriceFeed) OracleType() oracletypes.OracleType {
-	return oracletypes.OracleType_Stork
+	return f.oracleType
 }
internal/service/oracle/service.go (2)

104-111: Same incorrect package reference for provider constants in Start method.

Consistent with the issue in NewService, the provider constants are referenced from the wrong package.

🔎 Proposed fix
 		for ticker, pricePuller := range s.pricePullers {
 			switch pricePuller.Provider() {
-			case types.FeedProviderStork, types.FeedProviderChainlink:
+			case stork.FeedProviderStork, chainlink.FeedProviderChainlink:
 				go s.processSetPriceFeed(ticker, pricePuller, dataC)
 			default:
 				s.logger.WithField("provider", pricePuller.Provider()).Warningln("unsupported price feed provider")
 			}
 		}

128-166: Context leak: defer cancelFn() inside infinite loop.

The defer cancelFn() on line 132 is inside an infinite for loop. Deferred functions only execute when the surrounding function returns, so cancelFn will never be called during normal operation. This leaks a new context on every iteration, eventually exhausting resources.

🔎 Proposed fix - call cancelFn explicitly at iteration end
 	for {
 		select {
 		case <-t.C:
 			ctx, cancelFn := context.WithTimeout(context.Background(), maxRespTime)
-			defer cancelFn()

 			result, err := pricePuller.PullPrice(ctx)

 			if err != nil {
 				metrics.ReportFuncError(s.svcTags)
 				feedLogger.WithError(err).Warningln("retrying PullPrice after error")

 				for i := 0; i < maxRetriesPerInterval; i++ {
 					if result, err = pricePuller.PullPrice(ctx); err != nil {
 						time.Sleep(time.Second)
 						continue
 					}
 					break
 				}

 				if err != nil {
 					metrics.ReportFuncCallAndTimingWithErr(s.svcTags)(&err)
 					feedLogger.WithFields(log.Fields{
 						"symbol":  symbol,
 						"retries": maxRetriesPerInterval,
 					}).WithError(err).Errorln("failed to fetch price")

+					cancelFn()
 					t.Reset(pricePuller.Interval())
 					continue
 				}
 			}

 			if result != nil {
 				dataC <- result
 			}

+			cancelFn()
 			t.Reset(pricePuller.Interval())
 		}
 	}
♻️ Duplicate comments (1)
internal/service/oracle/service.go (1)

316-316: Same oracle type mismatch concern as in composeChainlinkOracleMsgs.

This validation also checks for OracleType_Chainlink, which may not match the default OracleType_ChainlinkDataStreams set by the Chainlink feed constructor.

🧹 Nitpick comments (9)
cmd/injective-price-oracle/oracle.go (2)

373-392: Context cancellation may be delayed during retry sleep.

The time.Sleep(5 * time.Second) at line 388 blocks for the full duration even if the context is cancelled. Consider using a pattern similar to the Stork fetcher goroutine above (lines 351-370) that uses time.After with a select statement.

🔎 Proposed fix for responsive context cancellation
 		err := chainlinkFetcher.Start(ctx)
 		if err != nil {
 			log.WithError(err).Errorln("chainlink fetcher failed, retrying in 5 seconds")
-			time.Sleep(5 * time.Second)
-			continue
+			select {
+			case <-ctx.Done():
+				return
+			case <-time.After(5 * time.Second):
+				continue
+			}
 		}

286-295: Consider using stork.Fetcher interface type instead of concrete stork.StorkFetcher.

Line 286 declares storkFetcher with concrete type stork.StorkFetcher, but there's a stork.Fetcher interface defined in internal/service/oracle/stork/types.go. Using the interface would be more idiomatic and consistent with how chainlink.ChainLinkFetcher is used (which is also an interface). However, both approaches work since NewFetcher returns *storkFetcher.

internal/service/oracle/chainlink/chainlink_fetcher.go (2)

55-57: Unused method logPrintf.

This method wraps f.logger.Infof but is never called within this file. It may be dead code or intended for future use.


116-124: Address TODO: Report decoding may need to be per-feedID.

The TODO comment indicates uncertainty about whether decoding should vary by feedID configuration. Currently, all reports are decoded as v3.Data. If different feeds require different report versions, this could cause decoding failures.

Do you want me to help investigate whether Chainlink Data Streams uses different report formats per feed, or open an issue to track this?

internal/service/oracle/stork/feed_stork_test.go (1)

18-32: Duplicate test case names reduce clarity.

Three consecutive test cases (lines 18-32) share the identical name "Convert nanoseconds to seconds". While this doesn't affect test execution, distinct names would improve test output readability and debugging.

🔎 Proposed fix
 		{
-			name:      "Convert nanoseconds to seconds",
+			name:      "Convert nanoseconds to seconds - case 1",
 			timestamp: 1738013700767706647, // nanoseconds
 			expected:  1738013700,
 		},
 		{
-			name:      "Convert nanoseconds to seconds",
+			name:      "Convert nanoseconds to seconds - case 2",
 			timestamp: 1738013701044503470, // nanoseconds
 			expected:  1738013701,
 		},
 		{
-			name:      "Convert nanoseconds to seconds",
+			name:      "Convert nanoseconds to seconds - rounding",
 			timestamp: 1738013701534503470, // nanoseconds
 			expected:  1738013701,
 		},
internal/service/oracle/chainlink/models.go (1)

13-13: Duplicate constant definition.

FeedProviderChainlink is also defined in internal/service/oracle/types/types.go (line 46). Consider using the one from the types package to maintain a single source of truth.

🔎 Proposed fix

Remove the local constant and import from types:

-const FeedProviderChainlink types.FeedProvider = "chainlink"

Then use types.FeedProviderChainlink where needed.

internal/service/oracle/chainlink/feed_chainlink.go (1)

110-110: Typo in variable name.

chainlinkRepot should be chainlinkReport.

🔎 Proposed fix
-	chainlinkRepot := f.chainlinkFetcher.ChainlinkReport(f.feedID)
-	if chainlinkRepot == nil {
+	chainlinkReport := f.chainlinkFetcher.ChainlinkReport(f.feedID)
+	if chainlinkReport == nil {
 		return nil, nil
 	}

 	chainlinkData := &ChainlinkPriceData{
 		Ticker:          f.ticker,
 		ProviderName:    f.providerName,
 		Symbol:          f.ticker,
 		Timestamp:       ts,
 		OracleType:      f.OracleType(),
-		ChainlinkReport: chainlinkRepot,
+		ChainlinkReport: chainlinkReport,
 	}
internal/service/oracle/stork/models.go (1)

13-13: Duplicate constant definition.

FeedProviderStork is also defined in internal/service/oracle/types/types.go (line 45). Consider using the one from the types package to maintain a single source of truth, similar to the recommendation for FeedProviderChainlink.

internal/service/oracle/types/types.go (1)

74-78: WsConfig missing TOML tags.

FeedConfig has TOML tags but WsConfig does not. If WsConfig is intended to be parsed from TOML, add appropriate tags for consistency.

🔎 Proposed fix
 type WsConfig struct {
-	WebsocketUrl    string
-	WebsocketHeader string
-	Message         string
+	WebsocketUrl    string `toml:"websocketUrl"`
+	WebsocketHeader string `toml:"websocketHeader"`
+	Message         string `toml:"message"`
 }
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 45a6dea and e7d8fb1.

⛔ Files ignored due to path filters (6)
  • api/gen/docs.json is excluded by !**/gen/**
  • api/gen/http/openapi.json is excluded by !**/gen/**
  • api/gen/http/openapi.yaml is excluded by !**/gen/**
  • api/gen/http/openapi3.json is excluded by !**/gen/**
  • api/gen/http/openapi3.yaml is excluded by !**/gen/**
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (24)
  • api/design/api_server.go
  • cmd/injective-price-oracle/api.go
  • cmd/injective-price-oracle/main.go
  • cmd/injective-price-oracle/options.go
  • cmd/injective-price-oracle/oracle.go
  • cmd/injective-price-oracle/probe.go
  • examples/chainlink_eth_usd.toml
  • go.mod
  • internal/service/health/health.go
  • internal/service/oracle/api.go
  • internal/service/oracle/chainlink/chainlink_fetcher.go
  • internal/service/oracle/chainlink/feed_chainlink.go
  • internal/service/oracle/chainlink/models.go
  • internal/service/oracle/chainlink/types.go
  • internal/service/oracle/feed_dynamic.go
  • internal/service/oracle/models.go
  • internal/service/oracle/service.go
  • internal/service/oracle/stork/feed_stork.go
  • internal/service/oracle/stork/feed_stork_test.go
  • internal/service/oracle/stork/models.go
  • internal/service/oracle/stork/stork_fetcher.go
  • internal/service/oracle/stork/types.go
  • internal/service/oracle/types/types.go
  • swagger/swagger.json
💤 Files with no reviewable changes (7)
  • cmd/injective-price-oracle/main.go
  • internal/service/oracle/feed_dynamic.go
  • internal/service/oracle/models.go
  • internal/service/health/health.go
  • internal/service/oracle/api.go
  • cmd/injective-price-oracle/probe.go
  • cmd/injective-price-oracle/api.go
🧰 Additional context used
🧬 Code graph analysis (6)
internal/service/oracle/stork/stork_fetcher.go (1)
internal/service/oracle/chainlink/chainlink_fetcher.go (1)
  • NewFetcher (36-53)
internal/service/oracle/stork/models.go (1)
internal/service/oracle/types/types.go (2)
  • FeedProvider (38-38)
  • Ticker (50-50)
internal/service/oracle/chainlink/models.go (1)
internal/service/oracle/types/types.go (2)
  • FeedProvider (38-38)
  • Ticker (50-50)
cmd/injective-price-oracle/oracle.go (7)
internal/service/oracle/types/types.go (2)
  • FeedConfig (65-72)
  • Ticker (50-50)
internal/service/oracle/stork/feed_stork.go (1)
  • ParseStorkFeedConfig (32-40)
internal/service/oracle/chainlink/feed_chainlink.go (1)
  • ParseChainlinkFeedConfig (28-35)
internal/service/oracle/stork/stork_fetcher.go (1)
  • StorkFetcher (27-30)
internal/service/oracle/chainlink/chainlink_fetcher.go (1)
  • ChainLinkFetcher (19-22)
internal/service/oracle/chainlink/types.go (1)
  • Config (14-19)
internal/service/oracle/service.go (1)
  • NewService (52-94)
internal/service/oracle/service.go (7)
internal/service/oracle/types/types.go (8)
  • PricePuller (28-35)
  • FeedProvider (38-38)
  • FeedConfig (65-72)
  • Fetcher (13-15)
  • FeedProviderStork (45-45)
  • Ticker (50-50)
  • FeedProviderChainlink (46-46)
  • PriceData (18-25)
internal/service/oracle/chainlink/types.go (1)
  • Fetcher (9-12)
internal/service/oracle/stork/types.go (1)
  • Fetcher (10-13)
internal/service/oracle/stork/models.go (2)
  • FeedProviderStork (13-13)
  • StorkPriceData (16-24)
internal/service/oracle/stork/feed_stork.go (1)
  • NewStorkPriceFeed (43-91)
internal/service/oracle/chainlink/models.go (2)
  • FeedProviderChainlink (13-13)
  • ChainlinkPriceData (16-24)
internal/service/oracle/chainlink/feed_chainlink.go (1)
  • NewChainlinkPriceFeed (38-82)
internal/service/oracle/chainlink/chainlink_fetcher.go (1)
internal/service/oracle/stork/stork_fetcher.go (2)
  • NewFetcher (50-67)
  • Data (215-222)
🪛 Checkov (3.2.334)
swagger/swagger.json

[high] 1: Ensure that the global security field has rules defined

(CKV_OPENAPI_4)


[high] 1: Ensure that security operations is not empty.

(CKV_OPENAPI_5)

🪛 OSV Scanner (2.3.1)
go.mod

[HIGH] 88-88: github.com/consensys/gnark-crypto 0.18.0: Unchecked memory allocation during vector deserialization in github.com/consensys/gnark-crypto

(GO-2025-4087)


[HIGH] 88-88: github.com/consensys/gnark-crypto 0.18.0: gnark-crypto allows unchecked memory allocation during vector deserialization

(GHSA-fj2x-735w-74vq)

🔇 Additional comments (20)
examples/chainlink_eth_usd.toml (1)

1-4: LGTM!

The example configuration correctly demonstrates the Chainlink feed setup with the required fields: provider, ticker, feedId, and oracleType. The feedId format matches the expected Chainlink Data Streams hex identifier format.

cmd/injective-price-oracle/options.go (1)

254-276: LGTM!

The Chainlink Data Streams options follow the established patterns for CLI option initialization. The default WebSocket URL pointing to testnet is appropriate for development, and API credentials correctly have no default values.

api/design/api_server.go (1)

14-21: Verify the testnet URL consistency.

The mainnet host was updated to include .eu in the subdomain, but the testnet host at line 19 still references mainnet.staging (not testnet). Is this intentional, or should it be updated to a corresponding pattern?

swagger/swagger.json (1)

1-1: Server URL update is consistent with API design.

The OpenAPI server URL correctly reflects the change made in api/design/api_server.go.

Regarding the static analysis hints about global security rules (CKV_OPENAPI_4, CKV_OPENAPI_5): these appear to be pre-existing issues in the OpenAPI spec, not introduced by this PR. Individual endpoints like /api/price-oracle/v1/probe do have security defined, but there's no global security field. Consider addressing this in a follow-up if stricter API security posture is desired.

internal/service/oracle/stork/feed_stork_test.go (1)

1-1: Package declaration correctly updated to match directory.

The package name stork now correctly aligns with the file's location in the stork subdirectory.

internal/service/oracle/stork/types.go (1)

10-13: LGTM!

The Fetcher interface provides a clean abstraction for the Stork data fetcher. The design appropriately differs from ChainLinkFetcher by accepting an external WebSocket connection in Start, which aligns with how Stork connections are managed externally in oracle.go.

internal/service/oracle/chainlink/chainlink_fetcher.go (2)

146-152: No action required. common.Hex2Bytes from go-ethereum accepts both 0x-prefixed and non-prefixed hex strings, so the code at line 148 correctly handles reportResponse.FeedID.String(), which returns a 0x-prefixed hex string.

Likely an incorrect or invalid review comment.


126-133: Latency metric type mismatch — verify with metrics package documentation.

Line 128 computes latency as now.Sub(ts) / time.Millisecond, which yields int64. Line 131 passes this to s.Timing(), which typically expects time.Duration. Confirm whether github.com/InjectiveLabs/metrics v0.0.10's Timing method signature accepts int64 or if the value should be cast to time.Duration.

internal/service/oracle/chainlink/types.go (1)

9-19: Clean interface and config definitions.

The Fetcher interface is well-designed with a clear contract. The Config struct appropriately captures the necessary Chainlink Data Streams configuration.

Ensure that APIKey and APISecret are never logged or exposed in error messages when using this config.

internal/service/oracle/stork/stork_fetcher.go (1)

49-50: LGTM!

The renaming to NewFetcher is consistent with the Chainlink package convention and improves the modular package structure.

go.mod (2)

8-8: Using a release candidate version in production.

sdk-go v1.60.0-rc1 is a release candidate. Verify this is intentional and consider upgrading to a stable release when available.


23-23: Data Streams SDK dependency is up-to-date.

Dependency github.com/smartcontractkit/data-streams-sdk/go v1.2.0 is the latest stable release (November 21, 2025) with no known public security vulnerabilities. The repository is actively maintained.

internal/service/oracle/chainlink/models.go (1)

16-32: LGTM!

The ChainlinkPriceData struct and its getter methods correctly implement the PriceData interface from the types package.

internal/service/oracle/stork/models.go (1)

16-32: LGTM!

The StorkPriceData struct and its getter methods correctly implement the PriceData interface.

internal/service/oracle/stork/feed_stork.go (1)

113-115: Method renamed for interface consistency.

Renaming from AssetPair() to GetAssetPair() aligns with the getter pattern used in PriceData interface methods.

internal/service/oracle/types/types.go (1)

12-35: Well-designed interface abstractions.

The Fetcher, PriceData, and PricePuller interfaces provide a clean, provider-agnostic contract for the oracle feed system.

internal/service/oracle/chainlink/feed_chainlink.go (1)

17-26: No action needed. The type ChainLinkFetcher is properly defined in chainlink_fetcher.go at line 19 as an interface within the same package. The struct field correctly references this type, and no undefined type issue exists.

Likely an incorrect or invalid review comment.

internal/service/oracle/service.go (3)

14-21: LGTM!

The import reorganization properly includes the new chainlink, stork, and types packages required for the multi-provider architecture.


28-50: LGTM!

The struct and type updates correctly reference the centralized types package definitions.


238-242: LGTM!

The aggregator function correctly combines messages from both provider-specific composers.

Comment on lines +257 to +263
feedCfg, err := chainlink.ParseChainlinkFeedConfig(cfgBody)
if err != nil {
log.WithError(err).WithFields(log.Fields{
"filename": d.Name(),
}).Errorln("failed to parse stork feed config")
return nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Incorrect error message: says "stork" but parsing Chainlink config.

The error message at line 261 refers to "stork feed config" but this code block handles Chainlink feeds.

🔎 Proposed fix
 				feedCfg, err := chainlink.ParseChainlinkFeedConfig(cfgBody)
 				if err != nil {
 					log.WithError(err).WithFields(log.Fields{
 						"filename": d.Name(),
-					}).Errorln("failed to parse stork feed config")
+					}).Errorln("failed to parse chainlink feed config")
 					return nil
 				}
🤖 Prompt for AI Agents
In cmd/injective-price-oracle/oracle.go around lines 257 to 263 the error log
incorrectly says "failed to parse stork feed config" while parsing a Chainlink
feed; update the log message to accurately reflect Chainlink (e.g., "failed to
parse chainlink feed config") and keep the existing WithError(err) and filename
fields so the error details and context are preserved.

Comment on lines +47 to +50
if interval < 1*time.Second {
err = errors.Wrapf(err, "failed to parse pull interval: %s (minimum interval = 1s)", cfg.PullInterval)
return nil, err
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Bug: Wrapping nil error.

When interval < 1*time.Second, the err variable is nil (from the successful ParseDuration call), so errors.Wrapf(err, ...) will return nil, not an error. The function will return (nil, nil), silently accepting an invalid configuration.

🔎 Proposed fix
 		if interval < 1*time.Second {
-			err = errors.Wrapf(err, "failed to parse pull interval: %s (minimum interval = 1s)", cfg.PullInterval)
-			return nil, err
+			return nil, fmt.Errorf("invalid pull interval: %s (minimum interval = 1s)", cfg.PullInterval)
 		}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if interval < 1*time.Second {
err = errors.Wrapf(err, "failed to parse pull interval: %s (minimum interval = 1s)", cfg.PullInterval)
return nil, err
}
if interval < 1*time.Second {
return nil, fmt.Errorf("invalid pull interval: %s (minimum interval = 1s)", cfg.PullInterval)
}
🤖 Prompt for AI Agents
In internal/service/oracle/chainlink/feed_chainlink.go around lines 47 to 50,
the code wraps a nil err when interval < 1*time.Second which yields nil and
causes the function to return (nil, nil); instead, construct and return a new
error describing the invalid pull interval (include the cfg.PullInterval and
minimum value), e.g. create a fresh error via errors.Errorf or errors.New with
formatting and return that (or assign it to err before wrapping), ensuring the
function returns a non-nil error when the interval is too small.

Comment on lines +95 to +97
func (f *chainlinkPriceFeed) OracleType() oracletypes.OracleType {
return oracletypes.OracleType_ChainlinkDataStreams
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

OracleType() ignores the configured value.

The method returns a hardcoded oracletypes.OracleType_ChainlinkDataStreams instead of using the f.oracleType field that was set during construction. This makes the OracleType configuration in FeedConfig ineffective.

🔎 Proposed fix
 func (f *chainlinkPriceFeed) OracleType() oracletypes.OracleType {
-	return oracletypes.OracleType_ChainlinkDataStreams
+	return f.oracleType
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (f *chainlinkPriceFeed) OracleType() oracletypes.OracleType {
return oracletypes.OracleType_ChainlinkDataStreams
}
func (f *chainlinkPriceFeed) OracleType() oracletypes.OracleType {
return f.oracleType
}
🤖 Prompt for AI Agents
In internal/service/oracle/chainlink/feed_chainlink.go around lines 95 to 97,
the OracleType() method currently returns the hardcoded
oracletypes.OracleType_ChainlinkDataStreams which ignores the instance
configuration; change the implementation to return the instance field (return
f.oracleType) so the value set during construction from FeedConfig is respected,
and ensure f.oracleType is properly initialized in the constructor (or handle a
sensible default if it may be zero).

Comment on lines +189 to 192
if chainlinkData, ok := pData.(*stork.StorkPriceData); ok {
assetPair := chainlinkData.AssetPair
assetPairs = append(assetPairs, assetPair)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Misleading variable name: chainlinkData should be storkData.

The variable is named chainlinkData but it holds stork.StorkPriceData. This appears to be a copy-paste error that hurts readability.

🔎 Proposed fix
-		if chainlinkData, ok := pData.(*stork.StorkPriceData); ok {
-			assetPair := chainlinkData.AssetPair
+		if storkData, ok := pData.(*stork.StorkPriceData); ok {
+			assetPair := storkData.AssetPair
 			assetPairs = append(assetPairs, assetPair)
 		}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if chainlinkData, ok := pData.(*stork.StorkPriceData); ok {
assetPair := chainlinkData.AssetPair
assetPairs = append(assetPairs, assetPair)
}
if storkData, ok := pData.(*stork.StorkPriceData); ok {
assetPair := storkData.AssetPair
assetPairs = append(assetPairs, assetPair)
}
🤖 Prompt for AI Agents
In internal/service/oracle/service.go around lines 189 to 192, the local
variable is misleadingly named `chainlinkData` even though it holds a
*stork.StorkPriceData; rename the variable to `storkData` (or another
stork-appropriate name) and update any references in that block to improve
readability and avoid confusion.

Comment on lines 316 to 325
} else if priceData.GetOracleType() == oracletypes.OracleType_Chainlink {
if chainlinkData, ok := priceData.(*chainlink.ChainlinkPriceData); ok {
if chainlinkData.ChainlinkReport.FeedId == nil || chainlinkData.ChainlinkReport == nil {
s.logger.WithFields(log.Fields{
"ticker": priceData.GetTicker(),
"provider": priceData.GetProviderName(),
}).Debugln("got invalid chainlink report data, skipping")
continue
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Nil pointer dereference: check ChainlinkReport before accessing FeedId.

On line 318, chainlinkData.ChainlinkReport.FeedId is accessed before checking if ChainlinkReport is nil. If ChainlinkReport is nil, this will cause a panic.

🔎 Proposed fix - reorder nil checks
 			} else if priceData.GetOracleType() == oracletypes.OracleType_Chainlink {
 				if chainlinkData, ok := priceData.(*chainlink.ChainlinkPriceData); ok {
-					if chainlinkData.ChainlinkReport.FeedId == nil || chainlinkData.ChainlinkReport == nil {
+					if chainlinkData.ChainlinkReport == nil || chainlinkData.ChainlinkReport.FeedId == nil {
 						s.logger.WithFields(log.Fields{
 							"ticker":   priceData.GetTicker(),
 							"provider": priceData.GetProviderName(),
 						}).Debugln("got invalid chainlink report data, skipping")
 						continue
 					}
 				}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
} else if priceData.GetOracleType() == oracletypes.OracleType_Chainlink {
if chainlinkData, ok := priceData.(*chainlink.ChainlinkPriceData); ok {
if chainlinkData.ChainlinkReport.FeedId == nil || chainlinkData.ChainlinkReport == nil {
s.logger.WithFields(log.Fields{
"ticker": priceData.GetTicker(),
"provider": priceData.GetProviderName(),
}).Debugln("got invalid chainlink report data, skipping")
continue
}
}
} else if priceData.GetOracleType() == oracletypes.OracleType_Chainlink {
if chainlinkData, ok := priceData.(*chainlink.ChainlinkPriceData); ok {
if chainlinkData.ChainlinkReport == nil || chainlinkData.ChainlinkReport.FeedId == nil {
s.logger.WithFields(log.Fields{
"ticker": priceData.GetTicker(),
"provider": priceData.GetProviderName(),
}).Debugln("got invalid chainlink report data, skipping")
continue
}
}
🤖 Prompt for AI Agents
In internal/service/oracle/service.go around lines 316 to 325, the code accesses
chainlinkData.ChainlinkReport.FeedId before confirming ChainlinkReport is
non-nil which can cause a panic; reorder the nil checks so you first verify
chainlinkData.ChainlinkReport != nil and then check FeedId (e.g. if
chainlinkData.ChainlinkReport == nil || chainlinkData.ChainlinkReport.FeedId ==
nil) and keep the existing logging and continue behavior when either is nil,
ensuring the type assertion remains unchanged.

Comment on lines +52 to +58
func (t Ticker) Base() string {
return strings.Split(string(t), "/")[0]
}

func (t Ticker) Quote() string {
return strings.Split(string(t), "/")[1]
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Potential panic in Quote() if ticker format is invalid.

If the ticker string doesn't contain a "/" separator, strings.Split returns a single-element slice, and accessing index [1] will cause a panic.

🔎 Proposed fix with safe handling
 func (t Ticker) Base() string {
-	return strings.Split(string(t), "/")[0]
+	parts := strings.Split(string(t), "/")
+	if len(parts) > 0 {
+		return parts[0]
+	}
+	return ""
 }

 func (t Ticker) Quote() string {
-	return strings.Split(string(t), "/")[1]
+	parts := strings.Split(string(t), "/")
+	if len(parts) > 1 {
+		return parts[1]
+	}
+	return ""
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (t Ticker) Base() string {
return strings.Split(string(t), "/")[0]
}
func (t Ticker) Quote() string {
return strings.Split(string(t), "/")[1]
}
func (t Ticker) Base() string {
parts := strings.Split(string(t), "/")
if len(parts) > 0 {
return parts[0]
}
return ""
}
func (t Ticker) Quote() string {
parts := strings.Split(string(t), "/")
if len(parts) > 1 {
return parts[1]
}
return ""
}
🤖 Prompt for AI Agents
In internal/service/oracle/types/types.go around lines 52 to 58, Quote()
currently assumes the ticker contains a "/" and will panic if it doesn't; change
it to safely parse the quote by using strings.SplitN(string(t), "/", 2) (or
check for strings.Index), then if the result has two parts return the second
part, otherwise return an empty string (or a sensible default) to avoid indexing
out of range.

@danidomi danidomi changed the title F/chainlink integration fea(ILO-703): chainlink integration Dec 29, 2025
# Conflicts:
#	cmd/injective-price-oracle/api.go
#	swagger/swagger.json
@danidomi danidomi changed the title fea(ILO-703): chainlink integration feat(ILO-703): chainlink integration Dec 29, 2025
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
internal/service/oracle/utils/ws.go (3)

16-20: Fix inconsistent error return value.

Line 19 returns &websocket.Conn{} on error, which is inconsistent with other error returns in this function (lines 33, 39, 44) that correctly return nil. This could mislead callers into thinking they have a valid connection.

🔎 Proposed fix
 func ConnectWebSocket(ctx context.Context, websocketUrl, urlHeader string, maxRetries int) (conn *websocket.Conn, err error) {
 	u, err := url.Parse(websocketUrl)
 	if err != nil {
-		return &websocket.Conn{}, errors.Wrapf(err, "can not parse WS url %s: %v", websocketUrl, err)
+		return nil, errors.Wrapf(err, "can not parse WS url %s: %v", websocketUrl, err)
 	}

27-31: Use the configured dialer instead of DefaultDialer.

The dialer is configured with compression enabled on lines 27-28, but line 31 uses websocket.DefaultDialer instead, causing the compression setting to be ignored.

🔎 Proposed fix
 	dialer := websocket.DefaultDialer
 	dialer.EnableCompression = true
 	retries := 0
 	for {
-		conn, _, err = websocket.DefaultDialer.DialContext(ctx, u.String(), header)
+		conn, _, err = dialer.DialContext(ctx, u.String(), header)
 		if ctx.Err() != nil {

42-46: Prevent timer leak on context cancellation.

The timer created on line 45 is not stopped if the context is cancelled, potentially leaking resources. Timers should be explicitly stopped when they're no longer needed.

🔎 Proposed fix
 			select {
 			case <-ctx.Done():
 				return nil, ctx.Err()
-			case <-time.NewTimer(5 * time.Second).C:
+			case <-time.After(5 * time.Second):
 			}

Or alternatively, use a timer with explicit stop:

 			select {
 			case <-ctx.Done():
 				return nil, ctx.Err()
-			case <-time.NewTimer(5 * time.Second).C:
+			default:
+				timer := time.NewTimer(5 * time.Second)
+				select {
+				case <-ctx.Done():
+					timer.Stop()
+					return nil, ctx.Err()
+				case <-timer.C:
+				}
 			}
♻️ Duplicate comments (2)
cmd/injective-price-oracle/oracle.go (2)

257-263: Duplicate: Error message says "stork" but parsing Chainlink config.

This issue has already been flagged in previous review comments.


313-317: Duplicate: API secrets being logged in plain text.

This critical security issue has already been flagged in previous review comments.

🧹 Nitpick comments (2)
internal/service/oracle/utils/ws.go (1)

41-41: Simplify string formatting.

The format string uses %s with fmt.Sprint(retries), which is unnecessarily verbose. Consider using %d directly.

🔎 Proposed fix
-			log.Infof("Retrying connect %sth in 5s...", fmt.Sprint(retries))
+			log.Infof("Retrying connect %dth in 5s...", retries)
cmd/injective-price-oracle/oracle.go (1)

228-232: Enhance error message with file path.

Including the file path in the error message would make debugging easier when feed config files fail to load.

🔎 Proposed enhancement
 cfgBody, err := os.ReadFile(path)
 if err != nil {
-	err = errors.Wrapf(err, "failed to read feed config")
+	err = errors.Wrapf(err, "failed to read feed config from %s", path)
 	return err
 }
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e7d8fb1 and 2f81423.

⛔ Files ignored due to path filters (4)
  • gen/Dockerfile is excluded by !**/gen/**
  • gen/gen-goa.sh is excluded by !**/gen/**
  • swagger/favicon-16x16.png is excluded by !**/*.png
  • swagger/favicon-32x32.png is excluded by !**/*.png
📒 Files selected for processing (47)
  • cmd/injective-price-oracle/oracle.go
  • internal/service/oracle/utils/ws.go
  • internal/service/swagger/swagger.go
  • pipeline/LICENSE.txt
  • pipeline/common.go
  • pipeline/common_eth.go
  • pipeline/common_http.go
  • pipeline/graph.go
  • pipeline/http.go
  • pipeline/http_allowed_ips.go
  • pipeline/models.go
  • pipeline/null/int64.go
  • pipeline/null/uint32.go
  • pipeline/runner.go
  • pipeline/scheduler.go
  • pipeline/task.any.go
  • pipeline/task.base.go
  • pipeline/task.divide.go
  • pipeline/task.eth_abi_decode.go
  • pipeline/task.eth_abi_decode_log.go
  • pipeline/task.eth_abi_encode.go
  • pipeline/task.eth_abi_encode_2.go
  • pipeline/task.fail.go
  • pipeline/task.http.go
  • pipeline/task.jsonparse.go
  • pipeline/task.lowercase.go
  • pipeline/task.mean.go
  • pipeline/task.median.go
  • pipeline/task.memo.go
  • pipeline/task.merge.go
  • pipeline/task.mode.go
  • pipeline/task.multiply.go
  • pipeline/task.panic.go
  • pipeline/task.sum.go
  • pipeline/task.uppercase.go
  • pipeline/task_object_params.go
  • pipeline/task_params.go
  • pipeline/variables.go
  • swagger/index.html
  • swagger/oauth2-redirect.html
  • swagger/swagger-ui-bundle.js
  • swagger/swagger-ui-es-bundle-core.js
  • swagger/swagger-ui-es-bundle.js
  • swagger/swagger-ui-standalone-preset.js
  • swagger/swagger-ui.css
  • swagger/swagger-ui.js
  • swagger/swagger.json
💤 Files with no reviewable changes (39)
  • pipeline/task.eth_abi_encode.go
  • internal/service/swagger/swagger.go
  • pipeline/http.go
  • pipeline/task.memo.go
  • pipeline/http_allowed_ips.go
  • pipeline/task.sum.go
  • pipeline/task.eth_abi_decode_log.go
  • pipeline/task.lowercase.go
  • pipeline/variables.go
  • pipeline/task.any.go
  • pipeline/task.multiply.go
  • pipeline/task.eth_abi_decode.go
  • pipeline/task.eth_abi_encode_2.go
  • pipeline/task.mean.go
  • pipeline/task_object_params.go
  • pipeline/scheduler.go
  • pipeline/task.uppercase.go
  • swagger/index.html
  • pipeline/task.divide.go
  • pipeline/runner.go
  • pipeline/null/int64.go
  • pipeline/null/uint32.go
  • pipeline/common_http.go
  • pipeline/LICENSE.txt
  • pipeline/task.panic.go
  • pipeline/models.go
  • pipeline/graph.go
  • pipeline/task.jsonparse.go
  • swagger/oauth2-redirect.html
  • pipeline/task.median.go
  • swagger/swagger.json
  • pipeline/task_params.go
  • pipeline/task.mode.go
  • pipeline/task.fail.go
  • pipeline/task.base.go
  • pipeline/task.merge.go
  • pipeline/common_eth.go
  • pipeline/common.go
  • pipeline/task.http.go
🧰 Additional context used
🧬 Code graph analysis (1)
cmd/injective-price-oracle/oracle.go (5)
internal/service/oracle/types/types.go (2)
  • FeedConfig (65-72)
  • Ticker (50-50)
internal/service/oracle/stork/feed_stork.go (1)
  • ParseStorkFeedConfig (32-40)
internal/service/oracle/chainlink/feed_chainlink.go (1)
  • ParseChainlinkFeedConfig (28-35)
internal/service/oracle/service.go (2)
  • NewService (52-94)
  • MaxRetriesReConnectWebSocket (42-42)
internal/service/oracle/utils/ws.go (1)
  • ConnectWebSocket (16-52)
🔇 Additional comments (9)
internal/service/oracle/utils/ws.go (1)

1-1: All imports have been correctly updated for the package rename.

The package is now correctly named utils at line 1, and the only file importing it (cmd/injective-price-oracle/oracle.go:28) uses the correct path internal/service/oracle/utils. No lingering imports of the old package path remain.

cmd/injective-price-oracle/oracle.go (8)

19-29: LGTM! Clean multi-provider setup.

The import additions and CLI parameter declarations properly support the Chainlink integration alongside Stork. The structure follows existing patterns and enables provider-agnostic feed handling.

Also applies to: 81-85, 133-139


210-216: LGTM! Proper provider state tracking.

The provider-specific maps and enable flags cleanly support dynamic multi-provider initialization. The migration to types.FeedConfig aligns with the centralized types package.


234-272: LGTM! Excellent provider-agnostic feed loading design.

The two-phase parsing approach is well-structured:

  1. Generic parse to detect provider type from the provider field
  2. Delegation to provider-specific parsers (stork.ParseStorkFeedConfig, chainlink.ParseChainlinkFeedConfig)

This pattern cleanly supports multiple providers and makes adding new providers straightforward.


286-295: LGTM! Clean Stork fetcher initialization.

The conditional initialization and ticker collection from the map is straightforward and correct.


305-311: LGTM! Proper Chainlink client and fetcher initialization.

The Chainlink Data Streams SDK configuration and initialization sequence is well-structured with appropriate error handling. The client and fetcher are created with proper error checks and fatal logging on failure.

Note: Lines 316-317 contain a critical security issue (logging secrets) that has been flagged in previous review comments.

Also applies to: 319-329


332-345: LGTM! Service initialization properly wired.

The service creation correctly passes both Stork and Chainlink fetchers, with appropriate error handling and cleanup registration. The service layer handles nil fetchers correctly when providers are not enabled.


347-371: LGTM! Robust Stork fetcher lifecycle management.

The goroutine properly handles:

  • Nil check to skip when Stork is not enabled
  • WebSocket connection with retry logic via utils.ConnectWebSocket
  • Reconnection on fetcher failure
  • Context cancellation

373-392: LGTM! Appropriate Chainlink fetcher lifecycle management.

The goroutine correctly implements:

  • Nil check to skip when Chainlink is not enabled
  • Retry loop with 5-second delay on failure
  • Context cancellation handling

The pattern differs from Stork appropriately, as Chainlink manages its own connection internally via the Data Streams SDK, while Stork requires external WebSocket management.

@socket-security
Copy link

socket-security bot commented Dec 29, 2025

Review the following changes in direct dependencies. Learn more about Socket for GitHub.

Diff Package Supply Chain
Security
Vulnerability Quality Maintenance License
Updatedgithub.com/​InjectiveLabs/​sdk-go@​v1.58.0 ⏵ v1.60.0-rc185100100100100
Addedgithub.com/​smartcontractkit/​data-streams-sdk/​go@​v1.2.091100100100100

View full report

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
internal/service/oracle/chainlink/chainlink_fetcher.go (1)

141-152: Close() logic issue persists: cleanup code unreachable when stream exists.

This issue was flagged in a previous review. The cleanup of latestPrices and the log message are skipped when f.stream != nil due to the early return. The cleanup should occur regardless of whether the stream exists.

cmd/injective-price-oracle/oracle.go (1)

257-263: Incorrect error message: says "stork" but parsing Chainlink config.

The error message at line 261 refers to "stork feed config" but this code block handles Chainlink feeds. This issue was flagged in a previous review.

🧹 Nitpick comments (3)
internal/service/oracle/chainlink/chainlink_fetcher.go (2)

32-50: Consider simplifying NewFetcher return signature.

The function always returns nil for the error. If no validation is needed at construction time, consider returning only *chainlinkFetcher for a simpler API.

🔎 Proposed simplification
-func NewFetcher(client streams.Client, feedIds []string) (*chainlinkFetcher, error) {
+func NewFetcher(client streams.Client, feedIds []string) *chainlinkFetcher {
 	fetcher := &chainlinkFetcher{
 		latestPrices: make(map[string]*oracletypes.ChainlinkReport),
 		logger: log.WithFields(log.Fields{
 			"svc":      "oracle",
 			"dynamic":  true,
 			"provider": "chainlinkFetcher",
 		}),
 		client:  client,
 		feedIDs: feedIds,

 		svcTags: metrics.Tags{
 			"provider": "chainlinkFetcher",
 		},
 	}

-	return fetcher, nil
+	return fetcher
 }

This would require updating the call site in oracle.go accordingly.


52-54: Remove unused logPrintf helper.

This method is defined but never called within the file.

🔎 Proposed fix
-func (f *chainlinkFetcher) logPrintf(format string, args ...interface{}) {
-	f.logger.Infof(format, args...)
-}
-
cmd/injective-price-oracle/oracle.go (1)

371-390: Consider adding exponential backoff for retry logic.

The current fixed 5-second retry interval is functional but could benefit from exponential backoff to avoid hammering the service during extended outages. However, this is consistent with the Stork fetcher pattern and acceptable for the initial implementation.

📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2f81423 and df42074.

📒 Files selected for processing (5)
  • cmd/injective-price-oracle/oracle.go
  • examples/chainlink_btc_v3.toml
  • examples/chainlink_rwa_v8.toml
  • internal/service/oracle/chainlink/chainlink_fetcher.go
  • internal/service/oracle/chainlink/models.go
✅ Files skipped from review due to trivial changes (1)
  • examples/chainlink_rwa_v8.toml
🧰 Additional context used
🧬 Code graph analysis (3)
internal/service/oracle/chainlink/models.go (1)
internal/service/oracle/types/types.go (2)
  • FeedProvider (38-38)
  • Ticker (50-50)
internal/service/oracle/chainlink/chainlink_fetcher.go (1)
internal/service/oracle/stork/stork_fetcher.go (1)
  • NewFetcher (50-67)
cmd/injective-price-oracle/oracle.go (8)
internal/service/oracle/types/types.go (2)
  • FeedConfig (65-72)
  • Ticker (50-50)
internal/service/oracle/stork/feed_stork.go (1)
  • ParseStorkFeedConfig (32-40)
internal/service/oracle/chainlink/models.go (1)
  • FeedProviderChainlink (13-13)
internal/service/oracle/chainlink/feed_chainlink.go (1)
  • ParseChainlinkFeedConfig (28-35)
internal/service/oracle/stork/stork_fetcher.go (1)
  • StorkFetcher (27-30)
internal/service/oracle/chainlink/chainlink_fetcher.go (2)
  • NewFetcher (33-50)
  • ChainLinkFetcher (16-19)
internal/service/oracle/chainlink/types.go (1)
  • Config (14-19)
internal/service/oracle/utils/ws.go (1)
  • ConnectWebSocket (16-52)
🔇 Additional comments (9)
examples/chainlink_btc_v3.toml (1)

1-7: LGTM!

The example configuration is complete and well-documented. The comment clarifies that v3 is for crypto assets, and the configuration fields align with the expected Chainlink Data Streams format.

internal/service/oracle/chainlink/models.go (2)

16-31: LGTM!

The ChainlinkPriceData struct and its getter methods are well-designed, implementing a clean accessor pattern. The fields cover all necessary data for Chainlink price reporting including the raw report, timestamp, and oracle type.


33-52: LGTM!

The ReportVersion type provides a clean enum-like pattern with proper validation. The distinction between v3 (crypto assets) and v8 (RWA) is well-documented.

internal/service/oracle/chainlink/chainlink_fetcher.go (3)

56-61: LGTM!

Thread-safe read access with RLock is correctly implemented for accessing the latest prices map.


63-91: LGTM!

The Start method properly validates inputs, parses feed IDs with appropriate error handling, and establishes the stream subscription before beginning to read reports.


122-128: No issue found — common.Hex2Bytes correctly handles optional "0x" prefixes.

The go-ethereum common.Hex2Bytes function accepts hex strings with or without the "0x" prefix and handles both cases correctly. The code at line 124 is working as intended regardless of whether feedIDStr (from reportResponse.FeedID.String()) includes the prefix or not. No fix is required.

Likely an incorrect or invalid review comment.

cmd/injective-price-oracle/oracle.go (3)

80-84: LGTM!

The Chainlink Data Streams parameters are appropriately defined alongside the existing Stork parameters, following the same pattern.


306-328: LGTM!

The Chainlink client and fetcher initialization is properly structured. Sensitive credentials (ApiKey, ApiSecret) are correctly passed to the config without being logged.


330-336: LGTM!

The service wiring properly integrates both Stork and Chainlink fetchers, allowing for flexible provider configuration where either or both can be enabled.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
examples/chainlink_btc_v3.toml (1)

1-5: Pull interval and feed ID handling are working as designed; clarify with comments where needed.

The configuration follows the established pattern, with two observations:

  1. Pull interval: The pullInterval = "1s" appears in this example and chainlink_rwa_v8.toml, but not in chainlink_eth_usd.toml, which relies on the application's default. If "1s" is intentional for Chainlink Data Streams examples, consider adding a comment to the example file explaining that this should be adjusted based on rate limits and requirements.

  2. Feed ID and ticker relationship: The code does not validate that the feedId corresponds to the specified ticker (BTC/USD). This is user responsibility when configuring feeds. No action required in code, but users should verify feed ID accuracy against Chainlink's Data Streams documentation.

The version indicator ("v3") in the filename is separate from the configuration structure and appears intentional for differentiating example files.

📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between df42074 and a7f1589.

📒 Files selected for processing (2)
  • examples/chainlink_btc_v3.toml
  • examples/chainlink_rwa_v8.toml
🔇 Additional comments (1)
examples/chainlink_rwa_v8.toml (1)

1-5: The pull interval configuration is correct as designed.

The pullInterval = "1s" value is actually the minimum allowed by the system's design, as documented in the README: pullInterval "Cannot be negative or less than '1s'". The 1-second interval is therefore an acceptable and architectural minimum, not an aggressive setting that would trigger rate limits or performance issues. This example configuration is valid and follows the documented specification.

Likely an incorrect or invalid review comment.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (4)
internal/service/oracle/service.go (4)

69-89: This issue was already flagged in a previous review.

The provider constants are being referenced from the types package, but according to the previous review, FeedProviderStork is defined in the stork package and FeedProviderChainlink is defined in the chainlink package. This should be addressed as indicated in the earlier review.


189-192: This issue was already flagged in a previous review.

The variable is misleadingly named chainlinkData when it actually holds *stork.StorkPriceData. As indicated in the earlier review, this should be renamed to storkData for clarity.


213-223: This issue was already flagged in a previous review.

The function filters for oracletypes.OracleType_Chainlink, but according to the previous review, ChainlinkPriceFeed returns oracletypes.OracleType_ChainlinkDataStreams. This mismatch would cause all Chainlink price data to be silently skipped. The filter should be updated as indicated in the earlier review.


316-325: This issue was already flagged in a previous review.

Line 318 accesses ChainlinkReport.FeedId before verifying that ChainlinkReport is non-nil, which could cause a panic. As indicated in the earlier review, the nil checks should be reordered.

📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a7f1589 and 6b8fe01.

📒 Files selected for processing (5)
  • examples/mainnet_chainlink_btc_usd.toml
  • examples/mainnet_chainlink_googl_usd.toml
  • examples/testnet_chainlink_btc_usd.toml
  • examples/testnet_chainlink_tsla_usd.toml
  • internal/service/oracle/service.go
✅ Files skipped from review due to trivial changes (1)
  • examples/mainnet_chainlink_btc_usd.toml
🔇 Additional comments (11)
internal/service/oracle/service.go (9)

1-21: LGTM!

Package declaration and imports are properly organized for the new modular architecture.


23-37: LGTM!

The struct properly uses the new types.PricePuller interface, supporting the provider-agnostic architecture.


45-50: LGTM!

The config struct properly uses the new public types from the types package.


96-117: LGTM!

The Start method properly initializes the price data channel and starts processing feeds. The flow structure is sound.


119-167: LGTM!

The method signature is correctly updated to use the new types, and the retry logic with error handling is properly implemented.


238-242: LGTM!

The function properly aggregates messages from both Stork and Chainlink providers.


337-349: LGTM!

The batching logic with composite keys and size/time limits is well-implemented.


351-399: LGTM!

The broadcast method properly handles timeouts, errors, and metrics reporting.


401-416: LGTM!

Panic recovery and close methods are appropriately implemented.

examples/testnet_chainlink_btc_usd.toml (1)

1-5: Verify feedId and pullInterval against Chainlink documentation.

The configuration follows the established pattern. Please verify:

  1. The feedId matches the BTC/USD feed identifier in Chainlink's testnet Data Streams.
  2. The pullInterval of "1s" aligns with Chainlink's rate limits and your requirements. While BTC/USD typically has higher update frequency than equity pairs, confirm this polling rate is supported and cost-effective.
examples/mainnet_chainlink_googl_usd.toml (1)

1-5: Configuration is appropriate for Chainlink Data Streams mainnet. The feedId correctly corresponds to the GOOGL/USD feed on Chainlink mainnet, and the 1s pullInterval is suitable for this streaming implementation. The SDK uses WebSocket streaming (the Chainlink-recommended approach), so the pullInterval controls local cache polling frequency rather than API call frequency to Chainlink, making it consistent with other Chainlink configurations in the repository.

Comment on lines +1 to +5
provider = "chainlink"
feedId = "0x000890039996f52efbfa66e193d077fd4aa6fa12009688f7f6606a64cc0c25ad"
ticker = "TSLA/USD"
pullInterval = "1s"
oracleType = "Chainlink"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🌐 Web query:

Chainlink Data Streams feed IDs testnet TSLA/USD

💡 Result:

TSLA / USD Data Streams feed ID (truncated on page): 0x0008...17fa. [1]

You can stream it from the testnet Data Streams WebSocket endpoint (example usage shown in the TS SDK docs). [2]

Sources:

  • Chainlink Data Streams — TSLA / USD stream (feed ID shown). [1]
  • Chainlink Data Streams docs — testnet streaming example (WebSocket/TS SDK). [2]

🌐 Web query:

Chainlink Data Streams rate limits pricing per-second polling

💡 Result:

Short answer — Chainlink Data Streams doesn't publish a simple per‑second polling price/RPS cap publicly. It offers two billing models: subscription (contact sales for plan limits and RPS) or pay‑per‑report verification at $0.35/report (10% surcharge if paid in non‑LINK). For high‑throughput / sub‑second use, Chainlink recommends using the WebSocket/SDK streaming options and enterprise/subscription plans (talk to an expert for per‑second rate limits and pricing). [1][2]

Sources:

  • Data Streams Billing (pay‑per‑report $0.35; subscription contact). [1]
  • Data Streams overview (sub‑second latency, REST/WebSocket/SDKs, high‑throughput use cases). [2]

Use WebSocket streaming instead of per-second polling due to cost and architectural constraints.

The feedId 0x000890039996f52efbfa66e193d077fd4aa6fa12009688f7f6606a64cc0c25ad is valid for testnet TSLA/USD. However, the pullInterval of "1s" creates a critical issue:

  • At 1-second polling with Chainlink's pay-per-report model ($0.35/report), continuous polling costs ~$30,240/day—prohibitively expensive.
  • Chainlink Data Streams explicitly recommends WebSocket/SDK streaming for high-frequency updates instead of polling.
  • Subscription/enterprise plans exist but require sales contact to confirm per-second RPS limits and pricing.

Refactor to use WebSocket streaming or establish a subscription plan if continuous per-second polling is required.

# Conflicts:
#	api/design/api_server.go
#	api/gen/docs.json
#	api/gen/http/openapi3.json
#	api/gen/http/openapi3.yaml
#	cmd/injective-price-oracle/api.go
#	go.mod
#	go.sum
#	internal/service/oracle/service.go
#	swagger/swagger.json
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Fix all issues with AI agents
In @go.mod:
- Line 78: The go.mod currently pins github.com/consensys/gnark-crypto to
v0.18.0 which contains a HIGH severity vulnerability; update the module version
for github.com/consensys/gnark-crypto to a patched release (at minimum v0.18.1
or v0.19.2+ if compatible), then run `go mod tidy` and `go mod vendor` (if you
vendor) and rebuild/run the test suite to verify no compatibility issues; if
compilation errors appear, adjust imports/usages referencing gnark-crypto APIs
to match the updated version or choose the compatible patched minor (v0.19.x)
and re-run the tidy/vendor/test steps.

In @internal/service/oracle/service.go:
- Around line 325-334: The nil-check order is wrong and can panic: in the branch
where priceData.GetOracleType() == oracletypes.OracleType_ChainlinkDataStreams
and you cast to chainlink.ChainlinkPriceData, check
chainlinkData.ChainlinkReport for nil before accessing its FeedId; change the
condition to verify chainlinkData.ChainlinkReport == nil ||
chainlinkData.ChainlinkReport.FeedId == nil (or equivalent) and keep the same
logging/continue behavior in the block that currently references ChainlinkReport
and FeedId.
- Around line 69-89: The switch is comparing feedCfg.ProviderName against
constants from the wrong package; replace references to
types.FeedProviderStork.String() and types.FeedProviderChainlink.String() with
the provider-specific constants (stork.FeedProviderStork.String() and
chainlink.FeedProviderChainlink.String()) in the switch inside the init logic
that creates price pullers (the block that calls stork.NewStorkPriceFeed and
chainlink.NewChainlinkPriceFeed), and ensure the stork and chainlink packages
are imported so the comparisons compile.
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 131df79 and a86aafd.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (2)
  • go.mod
  • internal/service/oracle/service.go
🧰 Additional context used
🪛 OSV Scanner (2.3.1)
go.mod

[HIGH] 78-78: github.com/consensys/gnark-crypto 0.18.0: Unchecked memory allocation during vector deserialization in github.com/consensys/gnark-crypto

(GO-2025-4087)


[HIGH] 78-78: github.com/consensys/gnark-crypto 0.18.0: gnark-crypto allows unchecked memory allocation during vector deserialization

(GHSA-fj2x-735w-74vq)

🔇 Additional comments (4)
go.mod (2)

8-8: Verify sdk-go RC release is appropriate for production.

Line 8 pins github.com/InjectiveLabs/sdk-go at v1.60.0-rc1, which is a release candidate. Please confirm this pre-release version is intentional and stable enough for your use case, or consider upgrading to a stable release if available.


18-18: Verify data-streams-sdk/go version and security.

Line 18 adds a new direct dependency github.com/smartcontractkit/data-streams-sdk/go v1.2.0 for Chainlink integration. Please verify that this version is the latest stable release and free of known vulnerabilities.

internal/service/oracle/service.go (2)

215-225: Oracle type filter is now correct.

The previous review flagged that this function was filtering for OracleType_Chainlink instead of OracleType_ChainlinkDataStreams. This has been corrected on line 217, and Chainlink price data will now be properly included in the reports.


191-194: Misleading variable name: chainlinkData should be storkData.

This issue was previously flagged: the variable is named chainlinkData but it holds *stork.StorkPriceData. This is a copy-paste error that significantly hurts readability and should be renamed to storkData.

Likely an incorrect or invalid review comment.

github.com/cometbft/cometbft/api v1.0.0 // indirect
github.com/consensys/bavard v0.1.30 // indirect
github.com/consensys/gnark-crypto v0.17.0 // indirect
github.com/consensys/gnark-crypto v0.18.0 // indirect
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Upgrade gnark-crypto to patch the HIGH severity vulnerability.

The dependency gnark-crypto v0.18.0 contains GHSA-fj2x-735w-74vq (unchecked memory allocation during vector deserialization), which has been flagged in a previous review. This is a HIGH severity issue and should be resolved before merging.

Patched versions are available at v0.18.1 (minimal patch) or v0.19.2+ (if compatible with your codebase).

🔧 How to fix

Update line 78 in go.mod:

-	github.com/consensys/gnark-crypto v0.18.0 // indirect
+	github.com/consensys/gnark-crypto v0.18.1 // indirect

Then run:

go mod tidy
go mod vendor  # if using vendor directory

Test the build and run the test suite to ensure compatibility.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
github.com/consensys/gnark-crypto v0.18.0 // indirect
github.com/consensys/gnark-crypto v0.18.1 // indirect
🧰 Tools
🪛 OSV Scanner (2.3.1)

[HIGH] 78-78: github.com/consensys/gnark-crypto 0.18.0: Unchecked memory allocation during vector deserialization in github.com/consensys/gnark-crypto

(GO-2025-4087)


[HIGH] 78-78: github.com/consensys/gnark-crypto 0.18.0: gnark-crypto allows unchecked memory allocation during vector deserialization

(GHSA-fj2x-735w-74vq)

🤖 Prompt for AI Agents
In @go.mod at line 78, The go.mod currently pins
github.com/consensys/gnark-crypto to v0.18.0 which contains a HIGH severity
vulnerability; update the module version for github.com/consensys/gnark-crypto
to a patched release (at minimum v0.18.1 or v0.19.2+ if compatible), then run
`go mod tidy` and `go mod vendor` (if you vendor) and rebuild/run the test suite
to verify no compatibility issues; if compilation errors appear, adjust
imports/usages referencing gnark-crypto APIs to match the updated version or
choose the compatible patched minor (v0.19.x) and re-run the tidy/vendor/test
steps.

Comment on lines 69 to 89
switch feedCfg.ProviderName {
case FeedProviderStork.String():
case types.FeedProviderStork.String():
ticker := feedCfg.Ticker
pricePuller, err := NewStorkPriceFeed(storkFetcher, feedCfg)
pricePuller, err := stork.NewStorkPriceFeed(storkFetcher, feedCfg)
if err != nil {
err = errors.Wrapf(err, "failed to init stork price feed for ticker %s", ticker)
return nil, err
}
svc.pricePullers[ticker] = pricePuller
default: // TODO this should be replaced with correct providers
case types.FeedProviderChainlink.String():
ticker := feedCfg.Ticker
pricePuller, err := NewDynamicPriceFeed(feedCfg)
pricePuller, err := chainlink.NewChainlinkPriceFeed(chainlinkFetcher, feedCfg)
if err != nil {
err = errors.Wrapf(err, "failed to init dynamic price feed for ticker %s", ticker)
err = errors.Wrapf(err, "failed to init chainlink price feed for ticker %s", ticker)
return nil, err
}
svc.pricePullers[ticker] = pricePuller
default:
// Unsupported provider
svc.logger.WithField("provider", feedCfg.ProviderName).Warningln("unsupported feed provider, skipping")
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Incorrect package reference for provider constants.

This issue was previously flagged: FeedProviderStork and FeedProviderChainlink should be referenced from their respective provider packages (stork and chainlink), not from the types package, as this will cause compilation errors.

🤖 Prompt for AI Agents
In @internal/service/oracle/service.go around lines 69 - 89, The switch is
comparing feedCfg.ProviderName against constants from the wrong package; replace
references to types.FeedProviderStork.String() and
types.FeedProviderChainlink.String() with the provider-specific constants
(stork.FeedProviderStork.String() and chainlink.FeedProviderChainlink.String())
in the switch inside the init logic that creates price pullers (the block that
calls stork.NewStorkPriceFeed and chainlink.NewChainlinkPriceFeed), and ensure
the stork and chainlink packages are imported so the comparisons compile.

Comment on lines +325 to 334
} else if priceData.GetOracleType() == oracletypes.OracleType_ChainlinkDataStreams {
if chainlinkData, ok := priceData.(*chainlink.ChainlinkPriceData); ok {
if chainlinkData.ChainlinkReport.FeedId == nil || chainlinkData.ChainlinkReport == nil {
s.logger.WithFields(log.Fields{
"ticker": priceData.GetTicker(),
"provider": priceData.GetProviderName(),
}).Debugln("got invalid chainlink report data, skipping")
continue
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Nil pointer dereference: check ChainlinkReport before accessing FeedId.

This issue was previously flagged: on line 327, chainlinkData.ChainlinkReport.FeedId is accessed before checking if ChainlinkReport is nil. The nil checks must be reordered to: if chainlinkData.ChainlinkReport == nil || chainlinkData.ChainlinkReport.FeedId == nil to prevent a panic.

🤖 Prompt for AI Agents
In @internal/service/oracle/service.go around lines 325 - 334, The nil-check
order is wrong and can panic: in the branch where priceData.GetOracleType() ==
oracletypes.OracleType_ChainlinkDataStreams and you cast to
chainlink.ChainlinkPriceData, check chainlinkData.ChainlinkReport for nil before
accessing its FeedId; change the condition to verify
chainlinkData.ChainlinkReport == nil || chainlinkData.ChainlinkReport.FeedId ==
nil (or equivalent) and keep the same logging/continue behavior in the block
that currently references ChainlinkReport and FeedId.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Fix all issues with AI agents
In @internal/service/oracle/chainlink/chainlink_fetcher.go:
- Around line 32-49: The NewFetcher constructor currently never returns a
non-nil error and does not validate its client argument; update NewFetcher to
either (a) validate that the client parameter is non-nil and return a
descriptive error (e.g., "nil client") so callers fail early (this will prevent
panics when Start() calls f.client.Stream()), or (b) if nil-checking is
unnecessary, remove the error return from NewFetcher’s signature and callers
accordingly; locate and modify the NewFetcher function and any call sites to
match the chosen approach, referencing the chainlinkFetcher type and
Start()/f.client.Stream() usage to ensure consistency.
- Around line 139-150: Close() unsafely accesses f.stream causing races with
startReadingReports(); make Close() and startReadingReports() coordinate via
f.mu: acquire f.mu before reading or closing f.stream, set and check a protected
boolean flag (e.g., f.closed) to make Close() idempotent and to let
startReadingReports() exit if closed, always clear f.latestPrices while holding
f.mu regardless of stream state or errors, and return the stream.Close() error
but still perform cleanup; update Close(), startReadingReports(), and any
Start() caller to use these synchronized checks.
- Around line 92-137: The loop in startReadingReports calls f.stream.Read(ctx)
and on error immediately continues, which can create a tight error loop;
implement an exponential backoff (e.g., start at ~100ms, double on each
consecutive error up to a cap like 5s) that is reset to the initial delay on a
successful Read; use a context-aware wait (select on ctx.Done() and time.After)
so sleep respects cancellation, and log the backoff duration with f.logger (and
keep emitting the existing read_error metric) before retrying; reference
startReadingReports, f.stream.Read, f.logger and the error handling block to
locate where to add the backoff logic.
- Around line 15-18: The ChainLinkFetcher interface lacks Close(), causing the
chainlink stream not to be cleaned up; add Close() error to the ChainLinkFetcher
interface definition, ensure the concrete chainlinkFetcher implementation's
Close() signature matches, and update the service's Close() method to call
fetcher.Close() (handle/return any error) so the goroutine started by Start(ctx)
is properly terminated and resources released; also verify Start/Close lifecycle
so the fetcher goroutine exits and stream is closed on Close().
🧹 Nitpick comments (2)
internal/service/oracle/chainlink/chainlink_fetcher.go (2)

51-53: Consider removing the unused helper method.

The logPrintf helper simply wraps f.logger.Infof without adding functionality, and it doesn't appear to be used anywhere in the codebase. Consider removing it to reduce unnecessary abstraction.

♻️ Proposed refactor
-func (f *chainlinkFetcher) logPrintf(format string, args ...interface{}) {
-	f.logger.Infof(format, args...)
-}
-
 func (f *chainlinkFetcher) ChainlinkReport(feedID string) *oracletypes.ChainlinkReport {

62-90: Document that Start() blocks indefinitely.

The Start() method calls startReadingReports(), which runs an infinite loop and only returns on context cancellation or error. This means Start() will block the calling goroutine. Callers need to know they must invoke this in a separate goroutine if they want concurrent operation.

Add documentation to clarify the blocking behavior:

📝 Proposed documentation
-// NewFetcher returns a new Fetcher instance.
+// NewFetcher returns a new Fetcher instance. Call Start() to begin streaming reports.
 func NewFetcher(client streams.Client, feedIds []string) (*chainlinkFetcher, error) {

...

+// Start subscribes to Chainlink Data Streams and begins reading reports.
+// This method blocks until the context is cancelled or an error occurs.
+// Callers should typically run this in a separate goroutine.
 func (f *chainlinkFetcher) Start(ctx context.Context) error {
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a86aafd and bcbca49.

📒 Files selected for processing (1)
  • internal/service/oracle/chainlink/chainlink_fetcher.go
🧰 Additional context used
🧬 Code graph analysis (1)
internal/service/oracle/chainlink/chainlink_fetcher.go (1)
internal/service/oracle/stork/stork_fetcher.go (1)
  • NewFetcher (50-67)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: multi-release
🔇 Additional comments (3)
internal/service/oracle/chainlink/chainlink_fetcher.go (3)

55-60: LGTM: Thread-safe accessor with proper locking.

The ChainlinkReport accessor correctly uses RLock for thread-safe read access and appropriately returns nil when the feed ID is not found.


67-84: LGTM: Proper feed ID validation and subscription.

Feed ID parsing includes proper error handling with wrapped errors, and the subscription logic appropriately logs the feed IDs being subscribed to.


121-131: LGTM: Thread-safe report storage with proper locking.

The report processing correctly updates latestPrices under a write lock and converts the Chainlink SDK response to the internal oracletypes.ChainlinkReport type. The FeedID[:] slice conversion on Line 122 is valid—FeedID is a [32]byte array in the data-streams-sdk, and the slice conversion correctly produces the expected []byte type for the protobuf field.

Comment on lines +15 to +18
type ChainLinkFetcher interface {
Start(ctx context.Context) error
ChainlinkReport(feedID string) *oracletypes.ChainlinkReport
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for ChainLinkFetcher interface usage
echo "=== ChainLinkFetcher interface usage ==="
rg -n "ChainLinkFetcher" --type go -C 2

echo ""
echo "=== Close() calls on chainlink-related code ==="
rg -n "\.Close\(\)" --type go -B 3 -A 1 | grep -A 4 -B 3 "chainlink\|oracle"

Repository: InjectiveLabs/injective-price-oracle

Length of output: 3671


🏁 Script executed:

#!/bin/bash
# Check how chainlinkFetcher is initialized in oracle.go
echo "=== chainlinkFetcher initialization in oracle.go ==="
rg -n "chainlinkFetcher" --type go -B 5 -A 10 cmd/injective-price-oracle/oracle.go

echo ""
echo "=== How chainlinkFetcher is assigned a concrete value ==="
rg -n "chainlinkFetcher\s*=" --type go -B 2 -A 5

echo ""
echo "=== Check if there's a Close method on the main service ==="
rg -n "func.*Close\(\)" --type go -B 2 -A 8 | grep -A 8 "service\|Service"

Repository: InjectiveLabs/injective-price-oracle

Length of output: 3825


Close() method missing from interface causes resource leak.

The ChainLinkFetcher interface does not include a Close() method, but the concrete chainlinkFetcher type implements one (line 139). This creates a resource leak: the Chainlink stream is never closed because (1) the interface doesn't expose Close(), and (2) the service's Close() method is empty (line 427-429 in service.go). The fetcher goroutine at line 372-390 runs indefinitely and exits on context cancellation without cleanup. Add Close() error to the interface and call it in the service's lifecycle management.

🤖 Prompt for AI Agents
In @internal/service/oracle/chainlink/chainlink_fetcher.go around lines 15 - 18,
The ChainLinkFetcher interface lacks Close(), causing the chainlink stream not
to be cleaned up; add Close() error to the ChainLinkFetcher interface
definition, ensure the concrete chainlinkFetcher implementation's Close()
signature matches, and update the service's Close() method to call
fetcher.Close() (handle/return any error) so the goroutine started by Start(ctx)
is properly terminated and resources released; also verify Start/Close lifecycle
so the fetcher goroutine exits and stream is closed on Close().

Comment on lines +32 to +49
func NewFetcher(client streams.Client, feedIds []string) (*chainlinkFetcher, error) {
fetcher := &chainlinkFetcher{
latestPrices: make(map[string]*oracletypes.ChainlinkReport),
logger: log.WithFields(log.Fields{
"svc": "oracle",
"dynamic": true,
"provider": "chainlinkFetcher",
}),
client: client,
feedIDs: feedIds,

svcTags: metrics.Tags{
"provider": "chainlinkFetcher",
},
}

return fetcher, nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Validate the client parameter and reconsider the error return.

The constructor signature includes an error return but never returns a non-nil error, which is misleading. Additionally, the client parameter is not validated for nil, which would cause a panic later when Start() calls f.client.Stream().

🛡️ Proposed fix
 func NewFetcher(client streams.Client, feedIds []string) (*chainlinkFetcher, error) {
+	if client == nil {
+		return nil, errors.New("client cannot be nil")
+	}
+
 	fetcher := &chainlinkFetcher{
 		latestPrices: make(map[string]*oracletypes.ChainlinkReport),
 		logger: log.WithFields(log.Fields{
 			"svc":      "oracle",
 			"dynamic":  true,
 			"provider": "chainlinkFetcher",
 		}),
 		client:  client,
 		feedIDs: feedIds,
 
 		svcTags: metrics.Tags{
 			"provider": "chainlinkFetcher",
 		},
 	}
 
 	return fetcher, nil
 }

Alternatively, if no validation is needed, consider removing the error return to simplify the API.

🤖 Prompt for AI Agents
In @internal/service/oracle/chainlink/chainlink_fetcher.go around lines 32 - 49,
The NewFetcher constructor currently never returns a non-nil error and does not
validate its client argument; update NewFetcher to either (a) validate that the
client parameter is non-nil and return a descriptive error (e.g., "nil client")
so callers fail early (this will prevent panics when Start() calls
f.client.Stream()), or (b) if nil-checking is unnecessary, remove the error
return from NewFetcher’s signature and callers accordingly; locate and modify
the NewFetcher function and any call sites to match the chosen approach,
referencing the chainlinkFetcher type and Start()/f.client.Stream() usage to
ensure consistency.

Comment on lines +92 to +137
func (f *chainlinkFetcher) startReadingReports(ctx context.Context) error {
for {
select {
case <-ctx.Done():
f.logger.Infoln("context cancelled, stopping Chainlink fetcher")
return ctx.Err()
default:
}

reportResponse, err := f.stream.Read(ctx)
if err != nil {
metrics.CustomReport(func(s metrics.Statter, tagSpec []string) {
s.Count("feed_provider.chainlink.read_error.count", 1, tagSpec, 1)
}, f.svcTags)
f.logger.WithError(err).Warningln("error reading from Chainlink stream")
continue
}

feedIDStr := reportResponse.FeedID.String()

metrics.CustomReport(func(s metrics.Statter, tagSpec []string) {
s.Count("feed_provider.chainlink.price_receive.count", 1, tagSpec, 1)
}, f.svcTags)

// Log the decoded report
f.logger.WithFields(log.Fields{
"feedID": feedIDStr,
}).Debugln("received Chainlink report")

priceData := &oracletypes.ChainlinkReport{
FeedId: reportResponse.FeedID[:],
FullReport: reportResponse.FullReport,
ValidFromTimestamp: reportResponse.ValidFromTimestamp,
ObservationsTimestamp: reportResponse.ObservationsTimestamp,
}

// Update the latest prices
f.mu.Lock()
f.latestPrices[feedIDStr] = priceData
f.mu.Unlock()

metrics.CustomReport(func(s metrics.Statter, tagSpec []string) {
s.Count("feed_provider.chainlink.latest_pairs_update.count", 1, tagSpec, 1)
}, f.svcTags)
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add backoff/delay after read errors to prevent tight error loops.

When f.stream.Read(ctx) returns an error on Line 102, the loop continues immediately without any delay. If the stream encounters persistent errors (e.g., network issues, stream closure), this will create a tight loop that consumes excessive CPU and floods logs.

⏱️ Proposed fix with exponential backoff
+import (
+	"time"
+)
+
 func (f *chainlinkFetcher) startReadingReports(ctx context.Context) error {
+	const (
+		baseDelay = 100 * time.Millisecond
+		maxDelay  = 30 * time.Second
+	)
+	currentDelay := baseDelay
+
 	for {
 		select {
 		case <-ctx.Done():
 			f.logger.Infoln("context cancelled, stopping Chainlink fetcher")
 			return ctx.Err()
 		default:
 		}
 
 		reportResponse, err := f.stream.Read(ctx)
 		if err != nil {
 			metrics.CustomReport(func(s metrics.Statter, tagSpec []string) {
 				s.Count("feed_provider.chainlink.read_error.count", 1, tagSpec, 1)
 			}, f.svcTags)
 			f.logger.WithError(err).Warningln("error reading from Chainlink stream")
+			
+			// Backoff before retrying
+			select {
+			case <-time.After(currentDelay):
+			case <-ctx.Done():
+				return ctx.Err()
+			}
+			
+			// Exponential backoff
+			currentDelay = currentDelay * 2
+			if currentDelay > maxDelay {
+				currentDelay = maxDelay
+			}
 			continue
 		}
+		
+		// Reset delay on successful read
+		currentDelay = baseDelay
 
 		feedIDStr := reportResponse.FeedID.String()

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In @internal/service/oracle/chainlink/chainlink_fetcher.go around lines 92 -
137, The loop in startReadingReports calls f.stream.Read(ctx) and on error
immediately continues, which can create a tight error loop; implement an
exponential backoff (e.g., start at ~100ms, double on each consecutive error up
to a cap like 5s) that is reset to the initial delay on a successful Read; use a
context-aware wait (select on ctx.Done() and time.After) so sleep respects
cancellation, and log the backoff duration with f.logger (and keep emitting the
existing read_error metric) before retrying; reference startReadingReports,
f.stream.Read, f.logger and the error handling block to locate where to add the
backoff logic.

Comment on lines +139 to +150
func (f *chainlinkFetcher) Close() error {
if f.stream != nil {
return f.stream.Close()
}

f.mu.Lock()
f.latestPrices = make(map[string]*oracletypes.ChainlinkReport)
f.mu.Unlock()
f.logger.Infoln("Chainlink fetcher closed")

return nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix race condition between Close() and startReadingReports().

The Close() method accesses and potentially closes f.stream without synchronization, while startReadingReports() also accesses f.stream concurrently. This creates a race condition where:

  1. Close() could close the stream while startReadingReports() is calling f.stream.Read(ctx)
  2. Multiple goroutines could call Close() concurrently

Additionally, the cleanup logic is inconsistent: if stream.Close() returns an error, latestPrices is not cleared, but if stream is nil, it is cleared.

🔒 Proposed fix with proper synchronization
 func (f *chainlinkFetcher) Close() error {
+	f.mu.Lock()
+	defer f.mu.Unlock()
+	
 	if f.stream != nil {
-		return f.stream.Close()
+		if err := f.stream.Close(); err != nil {
+			f.logger.WithError(err).Errorln("error closing Chainlink stream")
+			return err
+		}
+		f.stream = nil
 	}
 
-	f.mu.Lock()
 	f.latestPrices = make(map[string]*oracletypes.ChainlinkReport)
-	f.mu.Unlock()
 	f.logger.Infoln("Chainlink fetcher closed")
 
 	return nil
 }

Note: You may also want to add a closed flag to prevent Start() from being called after Close(), and check this flag in startReadingReports() to exit gracefully.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (f *chainlinkFetcher) Close() error {
if f.stream != nil {
return f.stream.Close()
}
f.mu.Lock()
f.latestPrices = make(map[string]*oracletypes.ChainlinkReport)
f.mu.Unlock()
f.logger.Infoln("Chainlink fetcher closed")
return nil
}
func (f *chainlinkFetcher) Close() error {
f.mu.Lock()
defer f.mu.Unlock()
if f.stream != nil {
if err := f.stream.Close(); err != nil {
f.logger.WithError(err).Errorln("error closing Chainlink stream")
return err
}
f.stream = nil
}
f.latestPrices = make(map[string]*oracletypes.ChainlinkReport)
f.logger.Infoln("Chainlink fetcher closed")
return nil
}
🤖 Prompt for AI Agents
In @internal/service/oracle/chainlink/chainlink_fetcher.go around lines 139 -
150, Close() unsafely accesses f.stream causing races with
startReadingReports(); make Close() and startReadingReports() coordinate via
f.mu: acquire f.mu before reading or closing f.stream, set and check a protected
boolean flag (e.g., f.closed) to make Close() idempotent and to let
startReadingReports() exit if closed, always clear f.latestPrices while holding
f.mu regardless of stream state or errors, and return the stream.Close() error
but still perform cleanup; update Close(), startReadingReports(), and any
Start() caller to use these synchronized checks.

@danidomi danidomi changed the title feat(ILO-703): chainlink integration ⚠️ DO NOT MERGE feat(ILO-703): chainlink integration Jan 13, 2026
@danidomi danidomi marked this pull request as draft January 13, 2026 22:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants