Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions indexer/pkg/worker/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"sync"
"time"

"github.com/smartcontractkit/chainlink-ccv/indexer/pkg/common"
"github.com/smartcontractkit/chainlink-ccv/indexer/pkg/config"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
)
Expand Down Expand Up @@ -124,15 +123,6 @@ func (s *Scheduler) Enqueue(ctx context.Context, t *Task) error {
shouldEnqueue, delay := s.shouldEnqueue(t)
if !shouldEnqueue {
s.dlq <- t
lastErrStr := ""
if t.lastErr != nil {
lastErrStr = t.lastErr.Error()
}

if err := t.SetMessageStatus(ctx, common.MessageTimeout, lastErrStr); err != nil {
return errors.New("unable to update message status to timeout. message is already in dlq")
}

return errors.New("unable to enqueue, max attempts reached. sending to dlq")
}

Expand Down
23 changes: 4 additions & 19 deletions indexer/pkg/worker/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ import (
"testing"
"time"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"

"github.com/smartcontractkit/chainlink-ccv/indexer/pkg/common"
"github.com/smartcontractkit/chainlink-ccv/indexer/pkg/config"
"github.com/smartcontractkit/chainlink-ccv/internal/mocks"
"github.com/smartcontractkit/chainlink-ccv/protocol/common/logging"
Expand Down Expand Up @@ -76,10 +74,8 @@ func TestScheduler_EnqueueImmediateAndDelayed(t *testing.T) {
}

// TestScheduler_DLQOnTTLExpired verifies that tasks whose TTL has already
// expired are sent to the scheduler's DLQ and that Enqueue calls
// UpdateMessageStatus(...) on the storage with MessageTimeout. The test
// asserts the task is delivered to the DLQ and that the storage interaction
// occurs (via the mock expectation).
// expired are sent to the scheduler's DLQ. The scheduler itself does not
// persist message status; that responsibility belongs to the DLQ consumer.
func TestScheduler_DLQOnTTLExpired(t *testing.T) {
lggr, err := logger.NewWith(logging.DevelopmentConfig(zapcore.DebugLevel))
require.NoError(t, err)
Expand All @@ -88,25 +84,18 @@ func TestScheduler_DLQOnTTLExpired(t *testing.T) {
s, err := NewScheduler(lggr, cfg)
require.NoError(t, err)

ms := mocks.NewMockIndexerStorage(t)
// Expect UpdateMessageStatus to be called when task is sent to DLQ
ms.On("UpdateMessageStatus", mock.Anything, mock.Anything, common.MessageTimeout, mock.Anything).Return(nil)

expired := &Task{ttl: time.Now().Add(-time.Minute), attempt: 10, storage: ms}
expired := &Task{ttl: time.Now().Add(-time.Minute), attempt: 10}

ctx := context.Background()
err = s.Enqueue(ctx, expired)
require.Error(t, err)

// DLQ channel should receive the task
select {
case got := <-s.DLQ():
require.Equal(t, expired, got)
case <-time.After(100 * time.Millisecond):
t.Fatalf("timed out waiting for DLQ")
}

// mock expectations will be asserted on test cleanup
}

// TestScheduler_Backoff_NegativeAttempt validates backoff calculation lower-bounds
Expand All @@ -131,11 +120,7 @@ func TestScheduler_Enqueue_TTLExpired_DLQ(t *testing.T) {
s, err := NewScheduler(lggr, scfg)
require.NoError(t, err)

ms := mocks.NewMockIndexerStorage(t)
tsk := &Task{ttl: time.Now().Add(-time.Minute), storage: ms}

// expect UpdateMessageStatus to be called when sending to DLQ
ms.On("UpdateMessageStatus", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
tsk := &Task{ttl: time.Now().Add(-time.Minute)}

err = s.Enqueue(context.Background(), tsk)
require.Error(t, err)
Expand Down
11 changes: 10 additions & 1 deletion indexer/pkg/worker/worker_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,17 @@ func (p *Pool) handleDLQ(ctx context.Context) {
p.logger.Error("DLQ channel closed; exiting handleDLQ")
return
}
p.logger.Warnf("Message %s entered DLQ. Partial verifications may have been recieved", task.messageID.String())
// TODO: DLQ Logic here..
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this TODO be moved back to the end of the code block?

lastErrStr := ""
if task.lastErr != nil {
lastErrStr = task.lastErr.Error()
}

if err := task.SetMessageStatus(ctx, common.MessageTimeout, lastErrStr); err != nil {
p.logger.Errorf("Unable to update message status to timeout for message %s", task.messageID.String())
}

p.logger.Warnf("Message %s entered DLQ. Partial verifications may have been received", task.messageID.String())
}
}
}
39 changes: 18 additions & 21 deletions indexer/pkg/worker/worker_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,53 +404,50 @@ func TestRun_PoolFull_EnqueuesTask(t *testing.T) {
p.Stop()
}

func TestRun_PoolFull_EnqueueToDLQOnTTLExpired(t *testing.T) {
func TestHandleDLQ_SetsMessageTimeoutOnExpiredTask(t *testing.T) {
lggr := logger.Test(t)

// Create a scheduler with ready channel
s := &Scheduler{
lggr: lggr,
ready: make(chan *Task, 1),
dlq: make(chan *Task, 1),
config: config.SchedulerConfig{TickerInterval: 50, BaseDelay: 0, MaxDelay: 0, VerificationVisibilityWindow: 60},
}

// DLQ behavior: directly enqueue an expired task and ensure scheduler sends it to DLQ
storageMock := mocks.NewMockIndexerStorage(t)
p2 := NewWorkerPool(lggr, config.PoolConfig{ConcurrentWorkers: 1, WorkerTimeout: 1}, nil, s, registry.NewVerifierRegistry(), storageMock)
p := NewWorkerPool(lggr, config.PoolConfig{ConcurrentWorkers: 1, WorkerTimeout: 1}, nil, s, registry.NewVerifierRegistry(), storageMock)

msg := protocol.VerifierResult{}
task, err := NewTask(lggr, msg, p2.registry, p2.storage, time.Second)
task, err := NewTask(lggr, msg, p.registry, p.storage, time.Second)
require.NoError(t, err)
task.ttl = time.Now().Add(-time.Minute)
task.lastErr = errors.New("simulated failure")

// Expect UpdateMessageStatus called when sent to DLQ
dlqCalled := make(chan struct{})
storageMock.On("UpdateMessageStatus", mock.Anything, mock.Anything, common.MessageTimeout, mock.Anything).Run(func(args mock.Arguments) {
storageMock.On("UpdateMessageStatus", mock.Anything, mock.Anything, common.MessageTimeout, "simulated failure").Run(func(args mock.Arguments) {
select {
case <-dlqCalled:
default:
close(dlqCalled)
}
}).Return(nil)

err = s.Enqueue(context.Background(), task)
require.Error(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// task should be on DLQ channel
select {
case dl := <-s.DLQ():
require.Equal(t, task, dl)
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for task in DLQ")
}
p.wg.Add(1)
go p.handleDLQ(ctx)

err = s.Enqueue(ctx, task)
require.Error(t, err)

select {
case <-dlqCalled:
// ok
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for UpdateMessageStatus for DLQ task")
// handleDLQ consumed the task and called UpdateMessageStatus
case <-time.After(500 * time.Millisecond):
t.Fatalf("timed out waiting for UpdateMessageStatus from handleDLQ")
}
// stop p2 if started (it's unused)
_ = p2

cancel()
p.wg.Wait()
}
Loading