Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions protocol/relaycore/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,6 @@ type MetricsInterface interface {
SetProtocolErrorRecoveredSuccessfullyMetric(chainId string, apiInterface string, attempt string)
}

// QoSAvailabilityDegrader interface for QoS management
type QoSAvailabilityDegrader interface {
DegradeAvailability(epoch uint64, sessionId int64)
}

// ChainIdAndApiInterfaceGetter interface
type ChainIdAndApiInterfaceGetter interface {
GetChainIdAndApiInterface() (string, string)
Expand Down
26 changes: 0 additions & 26 deletions protocol/relaycore/relay_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ type RelayProcessor struct {
relayRetriesManager *lavaprotocol.RelayRetriesManager
ResultsManager
RelayStateMachine
availabilityDegrader QoSAvailabilityDegrader
crossValidationMap map[[32]byte]int
currentCrossValidationEqualResults int
statefulRelayTargets []string // stores all providers that received a stateful relay
Expand All @@ -50,7 +49,6 @@ func NewRelayProcessor(
chainIdAndApiInterfaceGetter ChainIdAndApiInterfaceGetter,
relayRetriesManager *lavaprotocol.RelayRetriesManager,
relayStateMachine RelayStateMachine,
availabilityDegrader QoSAvailabilityDegrader,
) *RelayProcessor {
guid, _ := utils.GetUniqueIdentifier(ctx)
if crossValidationParams.Min <= 0 {
Expand All @@ -69,7 +67,6 @@ func NewRelayProcessor(
RelayStateMachine: relayStateMachine,
selection: relayStateMachine.GetSelection(),
usedProviders: relayStateMachine.GetUsedProviders(),
availabilityDegrader: availabilityDegrader,
crossValidationMap: make(map[[32]byte]int),
currentCrossValidationEqualResults: 0,
}
Expand Down Expand Up @@ -550,33 +547,13 @@ func (rp *RelayProcessor) ProcessingResult() (returnedResult *common.RelayResult

// this must be here before the lock because this function locks
allProvidersAddresses := rp.GetUsedProviders().AllUnwantedAddresses()
shouldDegradeAvailability := false

rp.lock.RLock()
defer rp.lock.RUnlock()

isSpecialApi := rp.GetProtocolMessage().IsDefaultApi() || rp.GetProtocolMessage().GetApi().Category.Stateful == common.CONSISTENCY_SELECT_ALL_PROVIDERS
successResults, nodeErrors, protocolErrors := rp.GetResultsData()
successResultsCount, nodeErrorCount, protocolErrorCount := len(successResults), len(nodeErrors), len(protocolErrors)

defer func() {
if shouldDegradeAvailability {
if rp.availabilityDegrader == nil {
utils.LavaFormatWarning("Availability degrader is nil, skipping availability degradation", nil)
return
}
for _, result := range nodeErrors {
session := result.Request.RelaySession
utils.LavaFormatDebug("Degrading availability for provider",
utils.LogAttr("provider", result.ProviderInfo.ProviderAddress),
utils.LogAttr("epoch", session.Epoch),
utils.LogAttr("sessionId", session.SessionId),
)
rp.availabilityDegrader.DegradeAvailability(uint64(session.Epoch), int64(session.SessionId))
}
}
}()

// Calculate the required cross-validation size using the unified function
requiredCrossValidationSize := rp.getRequiredCrossValidationSize(successResultsCount)

Expand All @@ -602,9 +579,6 @@ func (rp *RelayProcessor) ProcessingResult() (returnedResult *common.RelayResult
result, err := rp.responsesCrossValidation(successResults, requiredCrossValidationSize)
if err == nil {
// Successes formed cross-validation
if len(nodeErrors) > 0 && !isSpecialApi { // if we have node errors and it's not a default api, we should degrade availability
shouldDegradeAvailability = true
}
return result, nil
}

Expand Down
29 changes: 7 additions & 22 deletions protocol/relaycore/relay_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/lavanet/lava/v5/protocol/common"
"github.com/lavanet/lava/v5/protocol/lavaprotocol"
"github.com/lavanet/lava/v5/protocol/lavasession"
"github.com/lavanet/lava/v5/protocol/qos"
pairingtypes "github.com/lavanet/lava/v5/x/pairing/types"
spectypes "github.com/lavanet/lava/v5/x/spec/types"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -95,7 +94,7 @@ func TestRelayProcessorHappyFlow(t *testing.T) {
protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, dappId, consumerIp)
consistency := NewConsistency(specId)
usedProviders := lavasession.NewUsedProviders(nil)
relayProcessor := NewRelayProcessor(ctx, common.DefaultCrossValidationParams, consistency, RelayProcessorMetrics, RelayProcessorMetrics, RelayRetriesManagerInstance, newMockRelayStateMachine(protocolMessage, usedProviders), qos.NewQoSManager())
relayProcessor := NewRelayProcessor(ctx, common.DefaultCrossValidationParams, consistency, RelayProcessorMetrics, RelayProcessorMetrics, RelayRetriesManagerInstance, newMockRelayStateMachine(protocolMessage, usedProviders))

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
defer cancel()
Expand Down Expand Up @@ -151,7 +150,7 @@ func TestRelayProcessorTimeout(t *testing.T) {
require.NoError(t, err)
protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "")
usedProviders := lavasession.NewUsedProviders(nil)
relayProcessor := NewRelayProcessor(ctx, common.DefaultCrossValidationParams, nil, RelayProcessorMetrics, RelayProcessorMetrics, RelayRetriesManagerInstance, newMockRelayStateMachine(protocolMessage, usedProviders), qos.NewQoSManager())
relayProcessor := NewRelayProcessor(ctx, common.DefaultCrossValidationParams, nil, RelayProcessorMetrics, RelayProcessorMetrics, RelayRetriesManagerInstance, newMockRelayStateMachine(protocolMessage, usedProviders))

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
defer cancel()
Expand Down Expand Up @@ -204,7 +203,7 @@ func TestRelayProcessorRetry(t *testing.T) {
require.NoError(t, err)
protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "")
usedProviders := lavasession.NewUsedProviders(nil)
relayProcessor := NewRelayProcessor(ctx, common.DefaultCrossValidationParams, nil, RelayProcessorMetrics, RelayProcessorMetrics, RelayRetriesManagerInstance, newMockRelayStateMachine(protocolMessage, usedProviders), qos.NewQoSManager())
relayProcessor := NewRelayProcessor(ctx, common.DefaultCrossValidationParams, nil, RelayProcessorMetrics, RelayProcessorMetrics, RelayRetriesManagerInstance, newMockRelayStateMachine(protocolMessage, usedProviders))

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
defer cancel()
Expand Down Expand Up @@ -249,7 +248,7 @@ func TestRelayProcessorRetryNodeError(t *testing.T) {
require.NoError(t, err)
protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "")
usedProviders := lavasession.NewUsedProviders(nil)
relayProcessor := NewRelayProcessor(ctx, common.DefaultCrossValidationParams, nil, RelayProcessorMetrics, RelayProcessorMetrics, RelayRetriesManagerInstance, newMockRelayStateMachine(protocolMessage, usedProviders), qos.NewQoSManager())
relayProcessor := NewRelayProcessor(ctx, common.DefaultCrossValidationParams, nil, RelayProcessorMetrics, RelayProcessorMetrics, RelayRetriesManagerInstance, newMockRelayStateMachine(protocolMessage, usedProviders))

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
defer cancel()
Expand Down Expand Up @@ -297,7 +296,7 @@ func TestRelayProcessorStatefulApi(t *testing.T) {
require.NoError(t, err)
protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "")
usedProviders := lavasession.NewUsedProviders(nil)
relayProcessor := NewRelayProcessor(ctx, common.DefaultCrossValidationParams, nil, RelayProcessorMetrics, RelayProcessorMetrics, RelayRetriesManagerInstance, newMockRelayStateMachine(protocolMessage, usedProviders), qos.NewQoSManager())
relayProcessor := NewRelayProcessor(ctx, common.DefaultCrossValidationParams, nil, RelayProcessorMetrics, RelayProcessorMetrics, RelayRetriesManagerInstance, newMockRelayStateMachine(protocolMessage, usedProviders))
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
defer cancel()
canUse := usedProviders.TryLockSelection(ctx)
Expand Down Expand Up @@ -352,7 +351,7 @@ func TestRelayProcessorStatefulApiErr(t *testing.T) {
require.NoError(t, err)
protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "")
usedProviders := lavasession.NewUsedProviders(nil)
relayProcessor := NewRelayProcessor(ctx, common.DefaultCrossValidationParams, nil, RelayProcessorMetrics, RelayProcessorMetrics, RelayRetriesManagerInstance, newMockRelayStateMachine(protocolMessage, usedProviders), qos.NewQoSManager())
relayProcessor := NewRelayProcessor(ctx, common.DefaultCrossValidationParams, nil, RelayProcessorMetrics, RelayProcessorMetrics, RelayRetriesManagerInstance, newMockRelayStateMachine(protocolMessage, usedProviders))
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
defer cancel()
canUse := usedProviders.TryLockSelection(ctx)
Expand Down Expand Up @@ -400,7 +399,7 @@ func TestRelayProcessorLatest(t *testing.T) {
require.NoError(t, err)
protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "")
usedProviders := lavasession.NewUsedProviders(nil)
relayProcessor := NewRelayProcessor(ctx, common.DefaultCrossValidationParams, nil, RelayProcessorMetrics, RelayProcessorMetrics, RelayRetriesManagerInstance, newMockRelayStateMachine(protocolMessage, usedProviders), qos.NewQoSManager())
relayProcessor := NewRelayProcessor(ctx, common.DefaultCrossValidationParams, nil, RelayProcessorMetrics, RelayProcessorMetrics, RelayRetriesManagerInstance, newMockRelayStateMachine(protocolMessage, usedProviders))
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
defer cancel()
canUse := usedProviders.TryLockSelection(ctx)
Expand Down Expand Up @@ -551,7 +550,6 @@ func TestHasRequiredNodeResultsCrossValidationScenarios(t *testing.T) {
RelayProcessorMetrics,
RelayRetriesManagerInstance,
newMockRelayStateMachineWithSelection(protocolMessage, usedProviders, Stateless),
qos.NewQoSManager(),
)

// Set the batch size so WaitForResults knows how many responses to expect
Expand Down Expand Up @@ -637,7 +635,6 @@ func TestNodeErrorCrossValidationMet(t *testing.T) {
RelayProcessorMetrics,
RelayRetriesManagerInstance,
newMockRelayStateMachineWithSelection(protocolMessage, usedProviders, Stateless),
qos.NewQoSManager(),
)

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
Expand Down Expand Up @@ -700,7 +697,6 @@ func TestNodeErrorCrossValidationNotMet(t *testing.T) {
RelayProcessorMetrics,
RelayRetriesManagerInstance,
newMockRelayStateMachineWithSelection(protocolMessage, usedProviders, Stateless),
qos.NewQoSManager(),
)

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
Expand Down Expand Up @@ -763,7 +759,6 @@ func TestNodeErrorCrossValidationWithProtocolErrors(t *testing.T) {
RelayProcessorMetrics,
RelayRetriesManagerInstance,
newMockRelayStateMachineWithSelection(protocolMessage, usedProviders, Stateless),
qos.NewQoSManager(),
)

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
Expand Down Expand Up @@ -829,7 +824,6 @@ func TestNodeErrorPrioritizedOverProtocolErrors(t *testing.T) {
RelayProcessorMetrics,
RelayRetriesManagerInstance,
newMockRelayStateMachine(protocolMessage, usedProviders),
qos.NewQoSManager(),
)

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
Expand Down Expand Up @@ -897,7 +891,6 @@ func TestNodeErrorPrioritizedOverProtocolErrors(t *testing.T) {
RelayProcessorMetrics,
RelayRetriesManagerInstance,
newMockRelayStateMachineWithSelection(protocolMessage, usedProviders, Stateless),
qos.NewQoSManager(),
)

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
Expand Down Expand Up @@ -973,7 +966,6 @@ func TestSuccessCrossValidationTakesPriorityOverNodeError(t *testing.T) {
RelayProcessorMetrics,
RelayRetriesManagerInstance,
newMockRelayStateMachineWithSelection(protocolMessage, usedProviders, Stateless),
qos.NewQoSManager(),
)

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
Expand Down Expand Up @@ -1045,7 +1037,6 @@ func TestNodeErrorCrossValidationWhenSuccessInsufficient(t *testing.T) {
RelayProcessorMetrics,
RelayRetriesManagerInstance,
newMockRelayStateMachineWithSelection(protocolMessage, usedProviders, Stateless),
qos.NewQoSManager(),
)

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
Expand Down Expand Up @@ -1123,7 +1114,6 @@ func TestSuccessCrossValidationIgnoresNodeErrors(t *testing.T) {
RelayProcessorMetrics,
RelayRetriesManagerInstance,
newMockRelayStateMachineWithSelection(protocolMessage, usedProviders, Stateless),
qos.NewQoSManager(),
)

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
Expand Down Expand Up @@ -1202,7 +1192,6 @@ func TestSuccessCrossValidationFailsWhenCrossValidationDisabled(t *testing.T) {
RelayProcessorMetrics,
RelayRetriesManagerInstance,
newMockRelayStateMachineWithSelection(protocolMessage, usedProviders, Stateful), // Stateful = cross-validation disabled
qos.NewQoSManager(),
)

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
Expand Down Expand Up @@ -1502,7 +1491,6 @@ func TestNodeErrorsRecoveryMetricWithCrossValidation(t *testing.T) {
mockMetrics,
RelayRetriesManagerInstance,
newMockRelayStateMachineWithSelection(protocolMessage, usedProviders, Stateless),
qos.NewQoSManager(),
)

// Add providers
Expand Down Expand Up @@ -1605,7 +1593,6 @@ func TestProtocolErrorsRecoveryMetricWithCrossValidation(t *testing.T) {
mockMetrics,
RelayRetriesManagerInstance,
newMockRelayStateMachineWithSelection(protocolMessage, usedProviders, Stateless),
qos.NewQoSManager(),
)

// Add providers
Expand Down Expand Up @@ -1694,7 +1681,6 @@ func TestBothErrorTypesRecoveryMetricsWithCrossValidation(t *testing.T) {
mockMetrics,
RelayRetriesManagerInstance,
newMockRelayStateMachineWithSelection(protocolMessage, usedProviders, Stateless),
qos.NewQoSManager(),
)

// Add providers
Expand Down Expand Up @@ -1794,7 +1780,6 @@ func TestNoRecoveryMetricsWhenCrossValidationNotMet(t *testing.T) {
mockMetrics,
RelayRetriesManagerInstance,
newMockRelayStateMachineWithSelection(protocolMessage, usedProviders, Stateless),
qos.NewQoSManager(),
)

// Add providers
Expand Down
6 changes: 2 additions & 4 deletions protocol/rpcconsumer/consumer_relay_state_machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
common "github.com/lavanet/lava/v5/protocol/common"
"github.com/lavanet/lava/v5/protocol/lavaprotocol"
lavasession "github.com/lavanet/lava/v5/protocol/lavasession"
"github.com/lavanet/lava/v5/protocol/qos"
"github.com/lavanet/lava/v5/protocol/relaycore"
"github.com/lavanet/lava/v5/protocol/relaycoretest"
"github.com/lavanet/lava/v5/utils"
Expand Down Expand Up @@ -123,7 +122,7 @@ func TestConsumerStateMachineHappyFlow(t *testing.T) {
protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, dappId, consumerIp)
consistency := relaycore.NewConsistency(specId)
usedProviders := lavasession.NewUsedProviders(nil)
relayProcessor := relaycore.NewRelayProcessor(ctx, common.DefaultCrossValidationParams, consistency, relaycoretest.RelayProcessorMetrics, relaycoretest.RelayProcessorMetrics, relaycoretest.RelayRetriesManagerInstance, NewRelayStateMachine(ctx, usedProviders, &ConsumerRelaySenderMock{retValue: nil}, protocolMessage, nil, false, relaycoretest.RelayProcessorMetrics), qos.NewQoSManager())
relayProcessor := relaycore.NewRelayProcessor(ctx, common.DefaultCrossValidationParams, consistency, relaycoretest.RelayProcessorMetrics, relaycoretest.RelayProcessorMetrics, relaycoretest.RelayRetriesManagerInstance, NewRelayStateMachine(ctx, usedProviders, &ConsumerRelaySenderMock{retValue: nil}, protocolMessage, nil, false, relaycoretest.RelayProcessorMetrics))

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
defer cancel()
Expand Down Expand Up @@ -194,7 +193,7 @@ func TestConsumerStateMachineExhaustRetries(t *testing.T) {
protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, dappId, consumerIp)
consistency := relaycore.NewConsistency(specId)
usedProviders := lavasession.NewUsedProviders(nil)
relayProcessor := relaycore.NewRelayProcessor(ctx, common.DefaultCrossValidationParams, consistency, relaycoretest.RelayProcessorMetrics, relaycoretest.RelayProcessorMetrics, relaycoretest.RelayRetriesManagerInstance, NewRelayStateMachine(ctx, usedProviders, &ConsumerRelaySenderMock{retValue: nil, tickerValue: 10 * time.Second}, protocolMessage, nil, false, relaycoretest.RelayProcessorMetrics), qos.NewQoSManager())
relayProcessor := relaycore.NewRelayProcessor(ctx, common.DefaultCrossValidationParams, consistency, relaycoretest.RelayProcessorMetrics, relaycoretest.RelayProcessorMetrics, relaycoretest.RelayRetriesManagerInstance, NewRelayStateMachine(ctx, usedProviders, &ConsumerRelaySenderMock{retValue: nil, tickerValue: 10 * time.Second}, protocolMessage, nil, false, relaycoretest.RelayProcessorMetrics))

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
defer cancel()
Expand Down Expand Up @@ -278,7 +277,6 @@ func TestConsumerStateMachineArchiveRetry(t *testing.T) {
false,
relaycoretest.RelayProcessorMetrics,
),
qos.NewQoSManager(),
)

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
Expand Down
2 changes: 0 additions & 2 deletions protocol/rpcconsumer/rpcconsumer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ func (rpccs *RPCConsumerServer) sendRelayWithRetries(ctx context.Context, retrie
rpccs,
rpccs.relayRetriesManager,
NewRelayStateMachine(ctx, usedProviders, rpccs, protocolMessage, nil, rpccs.debugRelays, rpccs.rpcConsumerLogs),
rpccs.consumerSessionManager.GetQoSManager(),
)
usedProvidersResets := 1
for i := 0; i < retries; i++ {
Expand Down Expand Up @@ -516,7 +515,6 @@ func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, protocolMe
rpccs,
rpccs.relayRetriesManager,
NewRelayStateMachine(ctx, usedProviders, rpccs, protocolMessage, analytics, rpccs.debugRelays, rpccs.rpcConsumerLogs),
rpccs.consumerSessionManager.GetQoSManager(),
)

relayTaskChannel, err := relayProcessor.GetRelayTaskChannel()
Expand Down
2 changes: 0 additions & 2 deletions protocol/rpcsmartrouter/rpcsmartrouter_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,6 @@ func (rpcss *RPCSmartRouterServer) sendRelayWithRetries(ctx context.Context, ret
rpcss,
rpcss.relayRetriesManager,
NewSmartRouterRelayStateMachine(ctx, usedProviders, rpcss, protocolMessage, nil, rpcss.debugRelays, rpcss.rpcSmartRouterLogs),
rpcss.sessionManager.GetQoSManager(),
)
usedProvidersResets := 1
for i := 0; i < retries; i++ {
Expand Down Expand Up @@ -504,7 +503,6 @@ func (rpcss *RPCSmartRouterServer) ProcessRelaySend(ctx context.Context, protoco
rpcss,
rpcss.relayRetriesManager,
NewSmartRouterRelayStateMachine(ctx, usedProviders, rpcss, protocolMessage, analytics, rpcss.debugRelays, rpcss.rpcSmartRouterLogs),
rpcss.sessionManager.GetQoSManager(),
)

relayTaskChannel, err := relayProcessor.GetRelayTaskChannel()
Expand Down
Loading
Loading