Skip to content

Commit 317c0a2

Browse files
committed
kvserver: add TestBaseQueueCallback
This commit adds TestBaseQueueCallbackOnEnqueueResult and TestBaseQueueCallbackOnProcessResult to verify that callbacks are correctly invoked with both enqueue and process results.
1 parent d75b08e commit 317c0a2

File tree

2 files changed

+268
-0
lines changed

2 files changed

+268
-0
lines changed

pkg/kv/kvserver/queue_helpers_testutil.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,15 @@ func (bq *baseQueue) testingAdd(
2121
return bq.addInternal(ctx, repl.Desc(), repl.ReplicaID(), priority, noopProcessCallback)
2222
}
2323

24+
// testingAddWithCallback is the same as testingAdd, but allows the caller to
25+
// register a process callback that will be invoked when the replica is enqueued
26+
// or processed.
27+
func (bq *baseQueue) testingAddWithCallback(
28+
ctx context.Context, repl replicaInQueue, priority float64, callback processCallback,
29+
) (bool, error) {
30+
return bq.addInternal(ctx, repl.Desc(), repl.ReplicaID(), priority, callback)
31+
}
32+
2433
func forceScanAndProcess(ctx context.Context, s *Store, q *baseQueue) error {
2534
// Check that the system config is available. It is needed by many queues. If
2635
// it's not available, some queues silently fail to process any replicas,

pkg/kv/kvserver/queue_test.go

Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1292,6 +1292,265 @@ func TestBaseQueueDisable(t *testing.T) {
12921292
}
12931293
}
12941294

1295+
// TestBaseQueueCallbackOnEnqueueResult tests the callback onEnqueueResult for
1296+
// 1. successful case: the replica is successfully enqueued.
1297+
// 2. priority update: updates the priority of the replica and not enqueuing
1298+
// again.
1299+
// 3. disabled: queue is disabled and the replica is not enqueued.
1300+
// 4. stopped: queue is stopped and the replica is not enqueued.
1301+
// 5. already queued: the replica is already in the queue and not enqueued
1302+
// again.
1303+
// 6. purgatory: the replica is in purgatory and not enqueued again.
1304+
// 7. processing: the replica is already being processed and not enqueued again.
1305+
// 8. full queue: the queue is full and the replica is not enqueued again.
1306+
func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
1307+
defer leaktest.AfterTest(t)()
1308+
defer log.Scope(t).Close(t)
1309+
tc := testContext{}
1310+
stopper := stop.NewStopper()
1311+
ctx := context.Background()
1312+
defer stopper.Stop(ctx)
1313+
tc.Start(ctx, t, stopper)
1314+
1315+
t.Run("successfuladd", func(t *testing.T) {
1316+
testQueue := &testQueueImpl{}
1317+
bq := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{maxSize: 1})
1318+
r, err := tc.store.GetReplica(1)
1319+
require.NoError(t, err)
1320+
queued, _ := bq.testingAddWithCallback(ctx, r, 1.0, processCallback{
1321+
onEnqueueResult: func(indexOnHeap int, err error) {
1322+
require.Equal(t, 0, indexOnHeap)
1323+
require.NoError(t, err)
1324+
},
1325+
onProcessResult: func(err error) {
1326+
t.Fatal("unexpected call to onProcessResult")
1327+
},
1328+
})
1329+
require.True(t, queued)
1330+
})
1331+
1332+
t.Run("priority", func(t *testing.T) {
1333+
testQueue := &testQueueImpl{}
1334+
bq := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{maxSize: 5})
1335+
r, err := tc.store.GetReplica(1)
1336+
require.NoError(t, err)
1337+
priorities := []float64{5.0, 4.0, 8.0, 1.0, 3.0}
1338+
expectedIndices := []int{0, 1, 0, 3, 4}
1339+
// When inserting 5, [5], index 0.
1340+
// When inserting 4, [5, 4], index 1.
1341+
// When inserting 8, [8, 4, 5], index 0.
1342+
// When inserting 1, [8, 4, 5, 1], index 3.
1343+
// When inserting 3, [8, 4, 5, 1, 3], index 4.
1344+
for i, priority := range priorities {
1345+
r.Desc().RangeID = roachpb.RangeID(i + 1)
1346+
queued, _ := bq.testingAddWithCallback(ctx, r, priority, processCallback{
1347+
onEnqueueResult: func(indexOnHeap int, err error) {
1348+
require.Equal(t, expectedIndices[i], indexOnHeap)
1349+
require.NoError(t, err)
1350+
},
1351+
onProcessResult: func(err error) {
1352+
t.Fatal("unexpected call to onProcessResult")
1353+
},
1354+
})
1355+
require.True(t, queued)
1356+
}
1357+
// Set range id back to 1.
1358+
r.Desc().RangeID = 1
1359+
})
1360+
t.Run("disabled", func(t *testing.T) {
1361+
testQueue := &testQueueImpl{}
1362+
bq := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{maxSize: 2})
1363+
bq.SetDisabled(true)
1364+
r, err := tc.store.GetReplica(1)
1365+
require.NoError(t, err)
1366+
queued, _ := bq.testingAddWithCallback(ctx, r, 1.0, processCallback{
1367+
onEnqueueResult: func(indexOnHeap int, err error) {
1368+
require.Equal(t, -1, indexOnHeap)
1369+
require.ErrorIs(t, err, errQueueDisabled)
1370+
},
1371+
onProcessResult: func(err error) {
1372+
t.Fatal("unexpected call to onProcessResult")
1373+
},
1374+
})
1375+
require.False(t, queued)
1376+
})
1377+
t.Run("stopped", func(t *testing.T) {
1378+
testQueue := &testQueueImpl{}
1379+
bq := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{maxSize: 2})
1380+
bq.mu.stopped = true
1381+
r, err := tc.store.GetReplica(1)
1382+
require.NoError(t, err)
1383+
queued, _ := bq.testingAddWithCallback(ctx, r, 1.0, processCallback{
1384+
onEnqueueResult: func(indexOnHeap int, err error) {
1385+
require.Equal(t, -1, indexOnHeap)
1386+
require.ErrorIs(t, err, errQueueStopped)
1387+
},
1388+
onProcessResult: func(err error) {
1389+
t.Fatal("unexpected call to onProcessResult")
1390+
},
1391+
})
1392+
require.False(t, queued)
1393+
})
1394+
1395+
t.Run("alreadyqueued", func(t *testing.T) {
1396+
testQueue := &testQueueImpl{}
1397+
bq := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{maxSize: 2})
1398+
r, err := tc.store.GetReplica(1)
1399+
require.NoError(t, err)
1400+
queued, _ := bq.testingAddWithCallback(ctx, r, 1.0, processCallback{
1401+
onEnqueueResult: func(indexOnHeap int, err error) {
1402+
require.Equal(t, 0, indexOnHeap)
1403+
require.NoError(t, err)
1404+
},
1405+
onProcessResult: func(err error) {
1406+
t.Fatal("unexpected call to onProcessResult")
1407+
},
1408+
})
1409+
require.True(t, queued)
1410+
1411+
// Inserting again on the same range id should fail.
1412+
queued, _ = bq.testingAddWithCallback(ctx, r, 1.0, processCallback{
1413+
onEnqueueResult: func(indexOnHeap int, err error) {
1414+
require.Equal(t, -1, indexOnHeap)
1415+
require.ErrorIs(t, err, errReplicaAlreadyInQueue)
1416+
},
1417+
onProcessResult: func(err error) {
1418+
t.Fatal("unexpected call to onProcessResult")
1419+
},
1420+
})
1421+
require.False(t, queued)
1422+
})
1423+
1424+
t.Run("purgatory", func(t *testing.T) {
1425+
testQueue := &testQueueImpl{
1426+
pChan: make(chan time.Time, 1),
1427+
}
1428+
bq := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{maxSize: 2})
1429+
r, err := tc.store.GetReplica(1)
1430+
require.NoError(t, err)
1431+
bq.mu.Lock()
1432+
bq.addToPurgatoryLocked(ctx, stopper, r, &testPurgatoryError{}, 1.0, nil)
1433+
bq.mu.Unlock()
1434+
// Inserting a range in purgatory should not enqueue again.
1435+
queued, _ := bq.testingAddWithCallback(ctx, r, 1.0, processCallback{
1436+
onEnqueueResult: func(indexOnHeap int, err error) {
1437+
require.Equal(t, -1, indexOnHeap)
1438+
require.ErrorIs(t, err, errReplicaAlreadyInPurgatory)
1439+
},
1440+
onProcessResult: func(err error) {
1441+
t.Fatal("unexpected call to onProcessResult")
1442+
},
1443+
})
1444+
require.False(t, queued)
1445+
})
1446+
1447+
t.Run("processing", func(t *testing.T) {
1448+
testQueue := &testQueueImpl{}
1449+
bq := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{maxSize: 2})
1450+
r, err := tc.store.GetReplica(1)
1451+
require.NoError(t, err)
1452+
item := &replicaItem{rangeID: r.Desc().RangeID, replicaID: r.ReplicaID(), index: -1}
1453+
item.setProcessing()
1454+
bq.addLocked(item)
1455+
// Inserting a range that is already being processed should not enqueue again.
1456+
requeued, _ := bq.testingAddWithCallback(ctx, r, 1.0, processCallback{
1457+
onEnqueueResult: func(indexOnHeap int, err error) {
1458+
require.Equal(t, -1, indexOnHeap)
1459+
require.ErrorIs(t, err, errReplicaAlreadyProcessing)
1460+
},
1461+
onProcessResult: func(err error) {
1462+
t.Fatal("unexpected call to onProcessResult")
1463+
},
1464+
})
1465+
require.True(t, requeued)
1466+
})
1467+
t.Run("fullqueue", func(t *testing.T) {
1468+
testQueue := &testQueueImpl{}
1469+
bq := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{maxSize: 0})
1470+
r, err := tc.store.GetReplica(1)
1471+
require.NoError(t, err)
1472+
// Max size is 0, so the replica should not be enqueued.
1473+
queued, _ := bq.testingAddWithCallback(ctx, r, 1.0, processCallback{
1474+
onEnqueueResult: func(indexOnHeap int, err error) {
1475+
// It may be called with err = nil.
1476+
if err != nil {
1477+
require.ErrorIs(t, err, errDroppedDueToFullQueueSize)
1478+
}
1479+
},
1480+
onProcessResult: func(err error) {
1481+
t.Fatal("unexpected call to onProcessResult")
1482+
},
1483+
})
1484+
require.True(t, queued)
1485+
})
1486+
}
1487+
1488+
// TestBaseQueueCallbackOnProcessResult tests that the processCallback is
1489+
// invoked when the replica is processed and will be invoked again if the
1490+
// replica ends up in the purgatory queue and being processed again.
1491+
func TestBaseQueueCallbackOnProcessResult(t *testing.T) {
1492+
defer leaktest.AfterTest(t)()
1493+
defer log.Scope(t).Close(t)
1494+
tc := testContext{}
1495+
stopper := stop.NewStopper()
1496+
ctx := context.Background()
1497+
defer stopper.Stop(ctx)
1498+
tsc := TestStoreConfig(nil)
1499+
tc.StartWithStoreConfig(ctx, t, stopper, tsc)
1500+
1501+
testQueue := &testQueueImpl{
1502+
duration: time.Nanosecond,
1503+
pChan: make(chan time.Time, 1),
1504+
err: &testPurgatoryError{},
1505+
}
1506+
1507+
const replicaCount = 10
1508+
repls := createReplicas(t, &tc, replicaCount)
1509+
1510+
bq := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{maxSize: replicaCount})
1511+
bq.Start(stopper)
1512+
1513+
var totalProcessedCalledWithErr atomic.Int32
1514+
for _, r := range repls {
1515+
queued, _ := bq.testingAddWithCallback(context.Background(), r, 1.0, processCallback{
1516+
onEnqueueResult: func(indexOnHeap int, err error) {
1517+
require.NoError(t, err)
1518+
},
1519+
onProcessResult: func(err error) {
1520+
if err != nil {
1521+
totalProcessedCalledWithErr.Add(1)
1522+
}
1523+
},
1524+
})
1525+
require.True(t, queued)
1526+
}
1527+
1528+
testutils.SucceedsSoon(t, func() error {
1529+
if pc := testQueue.getProcessed(); pc != replicaCount {
1530+
return errors.Errorf("expected %d processed replicas; got %d", replicaCount, pc)
1531+
}
1532+
1533+
if totalProcessedCalledWithErr.Load() != int32(replicaCount) {
1534+
return errors.Errorf("expected %d processed replicas with err; got %d", replicaCount, totalProcessedCalledWithErr.Load())
1535+
}
1536+
return nil
1537+
})
1538+
1539+
// Now, signal that purgatoried replicas should retry.
1540+
testQueue.pChan <- timeutil.Now()
1541+
1542+
testutils.SucceedsSoon(t, func() error {
1543+
if pc := testQueue.getProcessed(); pc != replicaCount*2 {
1544+
return errors.Errorf("expected %d processed replicas; got %d", replicaCount, pc)
1545+
}
1546+
1547+
if totalProcessedCalledWithErr.Load() != int32(replicaCount*2) {
1548+
return errors.Errorf("expected %d processed replicas with err; got %d", replicaCount, totalProcessedCalledWithErr.Load())
1549+
}
1550+
return nil
1551+
})
1552+
}
1553+
12951554
// TestQueueDisable verifies that setting the set of queue.enabled cluster
12961555
// settings actually disables the base queue. This test works alongside
12971556
// TestBaseQueueDisable to verify the entire disable workflow.

0 commit comments

Comments
 (0)