@@ -111,6 +111,8 @@ func makeTestBaseQueue(name string, impl queueImpl, store *Store, cfg queueConfi
111
111
cfg .pending = metric .NewGauge (metric.Metadata {Name : "pending" })
112
112
cfg .processingNanos = metric .NewCounter (metric.Metadata {Name : "processingnanos" })
113
113
cfg .purgatory = metric .NewGauge (metric.Metadata {Name : "purgatory" })
114
+ cfg .enqueueAdd = metric .NewCounter (metric.Metadata {Name : "enqueueadd" })
115
+ cfg .enqueueUnexpectedError = metric .NewCounter (metric.Metadata {Name : "enqueueunexpectederror" })
114
116
cfg .disabledConfig = testQueueEnabled
115
117
return newBaseQueue (name , impl , store , cfg )
116
118
}
@@ -1321,6 +1323,7 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
1321
1323
t .Fatal ("unexpected call to onProcessResult" )
1322
1324
},
1323
1325
})
1326
+ require .Equal (t , bq .enqueueAdd .Count (), int64 (1 ))
1324
1327
require .True (t , queued )
1325
1328
})
1326
1329
@@ -1347,6 +1350,7 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
1347
1350
t .Fatal ("unexpected call to onProcessResult" )
1348
1351
},
1349
1352
})
1353
+ require .Equal (t , int64 (i + 1 ), bq .enqueueAdd .Count ())
1350
1354
require .True (t , queued )
1351
1355
}
1352
1356
// Set range id back to 1.
@@ -1367,6 +1371,8 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
1367
1371
t .Fatal ("unexpected call to onProcessResult" )
1368
1372
},
1369
1373
})
1374
+ require .Equal (t , int64 (0 ), bq .enqueueAdd .Count ())
1375
+ require .Equal (t , int64 (1 ), bq .enqueueUnexpectedError .Count ())
1370
1376
require .False (t , queued )
1371
1377
})
1372
1378
t .Run ("stopped" , func (t * testing.T ) {
@@ -1385,6 +1391,8 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
1385
1391
},
1386
1392
})
1387
1393
require .False (t , queued )
1394
+ require .Equal (t , int64 (0 ), bq .enqueueAdd .Count ())
1395
+ require .Equal (t , int64 (1 ), bq .enqueueUnexpectedError .Count ())
1388
1396
})
1389
1397
1390
1398
t .Run ("alreadyqueued" , func (t * testing.T ) {
@@ -1402,6 +1410,8 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
1402
1410
},
1403
1411
})
1404
1412
require .True (t , queued )
1413
+ require .Equal (t , int64 (1 ), bq .enqueueAdd .Count ())
1414
+ require .Equal (t , int64 (0 ), bq .enqueueUnexpectedError .Count ())
1405
1415
1406
1416
// Inserting again on the same range id should fail.
1407
1417
queued , _ = bq .testingAddWithCallback (ctx , r , 1.0 , processCallback {
@@ -1414,6 +1424,8 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
1414
1424
},
1415
1425
})
1416
1426
require .False (t , queued )
1427
+ require .Equal (t , int64 (1 ), bq .enqueueAdd .Count ())
1428
+ require .Equal (t , int64 (0 ), bq .enqueueUnexpectedError .Count ())
1417
1429
})
1418
1430
1419
1431
t .Run ("purgatory" , func (t * testing.T ) {
@@ -1437,6 +1449,8 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
1437
1449
},
1438
1450
})
1439
1451
require .False (t , queued )
1452
+ require .Equal (t , int64 (0 ), bq .enqueueAdd .Count ())
1453
+ require .Equal (t , int64 (0 ), bq .enqueueUnexpectedError .Count ())
1440
1454
})
1441
1455
1442
1456
t .Run ("processing" , func (t * testing.T ) {
@@ -1448,7 +1462,7 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
1448
1462
item .setProcessing ()
1449
1463
bq .addLocked (item )
1450
1464
// Inserting a range that is already being processed should not enqueue again.
1451
- requeued , _ := bq .testingAddWithCallback (ctx , r , 1.0 , processCallback {
1465
+ markedAsRequeued , _ := bq .testingAddWithCallback (ctx , r , 1.0 , processCallback {
1452
1466
onEnqueueResult : func (indexOnHeap int , err error ) {
1453
1467
require .Equal (t , - 1 , indexOnHeap )
1454
1468
require .ErrorIs (t , err , errReplicaAlreadyProcessing )
@@ -1457,7 +1471,9 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
1457
1471
t .Fatal ("unexpected call to onProcessResult" )
1458
1472
},
1459
1473
})
1460
- require .True (t , requeued )
1474
+ require .True (t , markedAsRequeued )
1475
+ require .Equal (t , int64 (0 ), bq .enqueueAdd .Count ())
1476
+ require .Equal (t , int64 (0 ), bq .enqueueUnexpectedError .Count ())
1461
1477
})
1462
1478
t .Run ("fullqueue" , func (t * testing.T ) {
1463
1479
testQueue := & testQueueImpl {}
@@ -1477,6 +1493,8 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
1477
1493
},
1478
1494
})
1479
1495
require .True (t , queued )
1496
+ require .Equal (t , int64 (1 ), bq .enqueueAdd .Count ())
1497
+ require .Equal (t , int64 (0 ), bq .enqueueUnexpectedError .Count ())
1480
1498
})
1481
1499
}
1482
1500
0 commit comments