Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 1 addition & 131 deletions ejector/data_api_signing_rate_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,122 +51,14 @@ func (srl *dataApiSigningRateLookup) GetSigningRates(
) ([]*validator.ValidatorSigningRate, error) {
switch version {
case ProtocolVersionV1:
if !omitPerfectSigners {
srl.logger.Warn(
"omitPerfectSigners flag is ignored for ProtocolVersionV1, will never return perfect signers")
}
return srl.getV1SigningRates(timeSpan, quorums)
return nil, fmt.Errorf("eigenda v1 protocol is no longer supported")
case ProtocolVersionV2:
Comment on lines 52 to 55
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ProtocolVersionV1 now returns an immediate error. Please add a small unit test asserting that v1 requests fail (and return a stable/expected error) so this behavior doesn’t regress silently if code paths are reintroduced later.

Copilot uses AI. Check for mistakes.
return srl.getV2SigningRates(timeSpan, quorums, omitPerfectSigners)
default:
return nil, fmt.Errorf("unsupported protocol version: %d", version)
}
}

// Look up signing rates for v1.
func (srl *dataApiSigningRateLookup) getV1SigningRates(
timeSpan time.Duration,
quorums []core.QuorumID,
) ([]*validator.ValidatorSigningRate, error) {

quorumSet := make(map[core.QuorumID]struct{})
for _, q := range quorums {
quorumSet[q] = struct{}{}
}

now := time.Now()

path := "api/v1/metrics/operator-nonsigning-percentage"
urlStr, err := url.JoinPath(srl.url, path)
if err != nil {
return nil, fmt.Errorf("error joining URL path with %s and %s: %w", srl.url, path, err)
}
url, err := url.Parse(urlStr)
if err != nil {
return nil, fmt.Errorf("error parsing URL: %w", err)
}
// add query parameters
q := url.Query()
q.Set("end", now.UTC().Format(time.RFC3339))
// interval: lookback window in seconds
q.Set("interval", strconv.Itoa(int(timeSpan.Seconds())))
url.RawQuery = q.Encode()
// Very verbose, enable for debugging if needed.
// srl.logger.Debug("making request to DataAPI", "url", url.String())

req, err := http.NewRequest("GET", url.String(), nil)
if err != nil {
return nil, fmt.Errorf("error creating HTTP request: %w", err)
}

resp, err := srl.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("error sending HTTP request: %w", err)
}
defer func() {
_ = resp.Body.Close()
}()

respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("error reading response body: %w", err)
}
// Very verbose, enable for debugging if needed.
// srl.logger.Info("Received response", "responseBody", string(respBody))

if resp.StatusCode != http.StatusOK {
var errResp dataapi.ErrorResponse
err = json.Unmarshal(respBody, &errResp)
if err != nil {
return nil, fmt.Errorf("error parsing error response: %w", err)
}
return nil, fmt.Errorf(
"error response (%d) from dataapi: %s",
resp.StatusCode,
errResp.Error,
)
}

var response dataapi.OperatorsNonsigningPercentage
err = json.Unmarshal(respBody, &response)
if err != nil {
return nil, fmt.Errorf("error parsing response body: %w", err)
}

// Use a map to combine results from multiple quorums.
signingRateMap := make(map[core.OperatorID]*validator.ValidatorSigningRate)

for _, data := range response.Data {
// If quorumSet is empty, then we include all quorums.
if len(quorumSet) > 0 {
if _, ok := quorumSet[data.QuorumId]; !ok {
// This quorum is not in the requested set, skip it.
continue
}
}

signingRate, err := translateV1ToProto(data)
if err != nil {
return nil, fmt.Errorf("error translating dataapi rate to proto: %w", err)
}

signingRateMap[core.OperatorID(signingRate.GetValidatorId())], err =
combineSigningRates(
signingRateMap[core.OperatorID(signingRate.GetValidatorId())],
signingRate)
if err != nil {
return nil, fmt.Errorf("error combining signing rates: %w", err)
}
}

signingRates := make([]*validator.ValidatorSigningRate, 0, len(signingRateMap))
for _, rate := range signingRateMap {
signingRates = append(signingRates, rate)
}

return signingRates, nil
}

// Look up signing rates for v2.
func (srl *dataApiSigningRateLookup) getV2SigningRates(
timeSpan time.Duration,
Expand Down Expand Up @@ -274,28 +166,6 @@ func (srl *dataApiSigningRateLookup) getV2SigningRates(
return signingRates, nil
}

// Translates a single DataAPI OperatorNonsigningPercentageMetrics to a ValidatorSigningRate protobuf.
func translateV1ToProto(data *dataapi.OperatorNonsigningPercentageMetrics) (*validator.ValidatorSigningRate, error) {
validatorID, err := core.OperatorIDFromHex(data.OperatorId)
if err != nil {
return nil, fmt.Errorf("error parsing operator ID %s: %w", data.OperatorId, err)
}

signedBatches := data.TotalBatches - data.TotalUnsignedBatches
unsignedBatches := data.TotalUnsignedBatches

signingRate := &validator.ValidatorSigningRate{
ValidatorId: validatorID[:],
SignedBatches: uint64(signedBatches),
UnsignedBatches: uint64(unsignedBatches),
SignedBytes: uint64(signedBatches), // Not accurate, but we don't have byte info from DataAPI.
UnsignedBytes: uint64(unsignedBatches), // Not accurate, but we don't have byte info from DataAPI.
SigningLatency: 0, // Not available from DataAPI.
}

return signingRate, nil
}

// Translates a single DataAPI v2 OperatorSigningInfo to a ValidatorSigningRate protobuf.
func translateV2ToProto(data *dataapiv2.OperatorSigningInfo) (*validator.ValidatorSigningRate, error) {
validatorID, err := core.OperatorIDFromHex(data.OperatorId)
Expand Down
23 changes: 1 addition & 22 deletions ejector/ejector.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ type Ejector struct {
// Responsible for executing ejections and managing the ejection lifecycle.
ejectionManager *ThreadedEjectionManager

// Used for looking up signing rates for V1.
// TODO(cody.littley): remove this after V1 sunset
signingRateLookupV1 SigningRateLookup

// Used for looking up signing rates for V2.
signingRateLookupV2 SigningRateLookup

Expand Down Expand Up @@ -52,7 +48,6 @@ func NewEjector(
logger logging.Logger,
config *EjectorConfig,
ejectionManager *ThreadedEjectionManager,
signingRateLookupV1 SigningRateLookup,
signingRateLookupV2 SigningRateLookup,
validatorIDToAddressCache eth.ValidatorIDToAddressConverter,
referenceBlockProvider eth.ReferenceBlockProvider,
Expand All @@ -63,7 +58,6 @@ func NewEjector(
ctx: ctx,
logger: logger,
ejectionManager: ejectionManager,
signingRateLookupV1: signingRateLookupV1,
signingRateLookupV2: signingRateLookupV2,
period: config.EjectionPeriod,
ejectionCriteriaTimeWindow: config.EjectionCriteriaTimeWindow,
Expand Down Expand Up @@ -104,17 +98,7 @@ func (e *Ejector) evaluateValidators() error {

e.logger.Debug("evaluating validators for ejection")

v1SigningRates, err := e.signingRateLookupV1.GetSigningRates(
e.ejectionCriteriaTimeWindow,
nil, // all quorums
ProtocolVersionV1,
true, // omit perfect signers if possible (data API has inconsistent behavior across v1 and v2)
)
if err != nil {
return fmt.Errorf("error looking up v1 signing rates: %w", err)
}

v2SigningRates, err := e.signingRateLookupV2.GetSigningRates(
signingRates, err := e.signingRateLookupV2.GetSigningRates(
e.ejectionCriteriaTimeWindow,
nil, // all quorums
ProtocolVersionV2,
Expand All @@ -124,11 +108,6 @@ func (e *Ejector) evaluateValidators() error {
return fmt.Errorf("error looking up v2 signing rates: %w", err)
}

// Combine data from v1 and v2 lookups, since the validator is likely to cancel ejection if it is active in either.
signingRates, err := combineSigningRateSlices(v1SigningRates, v2SigningRates)
if err != nil {
return fmt.Errorf("error combining signing rates: %w", err)
}
sortByUnsignedBytesDescending(signingRates)

for _, signingRate := range signingRates {
Expand Down
3 changes: 1 addition & 2 deletions ejector/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func run(ctx context.Context) error {

threadedEjectionManager := ejector.NewThreadedEjectionManager(ctx, logger, ejectionManager, ejectorConfig)

// Currently used for both v1 and v2 signing rate lookups. Eventually, v2 will poll the controller for this info.
// Currently used for v2 signing rate lookups. Eventually, v2 will poll the controller for this info.
dataApiSigningRateLookup := ejector.NewDataApiSigningRateLookup(
logger,
ejectorConfig.DataApiUrl,
Expand Down Expand Up @@ -185,7 +185,6 @@ func run(ctx context.Context) error {
ejectorConfig,
threadedEjectionManager,
dataApiSigningRateLookup,
dataApiSigningRateLookup,
validatorIDToAddressConverter,
referenceBlockProvider,
validatorQuorumLookup,
Expand Down
Loading