@@ -110,6 +110,8 @@ func makeTestBaseQueue(name string, impl queueImpl, store *Store, cfg queueConfi
110
110
cfg .pending = metric .NewGauge (metric.Metadata {Name : "pending" })
111
111
cfg .processingNanos = metric .NewCounter (metric.Metadata {Name : "processingnanos" })
112
112
cfg .purgatory = metric .NewGauge (metric.Metadata {Name : "purgatory" })
113
+ cfg .enqueueAdd = metric .NewCounter (metric.Metadata {Name : "enqueueadd" })
114
+ cfg .enqueueUnexpectedError = metric .NewCounter (metric.Metadata {Name : "enqueueunexpectederror" })
113
115
cfg .disabledConfig = testQueueEnabled
114
116
return newBaseQueue (name , impl , store , cfg )
115
117
}
@@ -1326,6 +1328,7 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
1326
1328
t .Fatal ("unexpected call to onProcessResult" )
1327
1329
},
1328
1330
})
1331
+ require .Equal (t , bq .enqueueAdd .Count (), int64 (1 ))
1329
1332
require .True (t , queued )
1330
1333
})
1331
1334
@@ -1352,6 +1355,7 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
1352
1355
t .Fatal ("unexpected call to onProcessResult" )
1353
1356
},
1354
1357
})
1358
+ require .Equal (t , int64 (i + 1 ), bq .enqueueAdd .Count ())
1355
1359
require .True (t , queued )
1356
1360
}
1357
1361
// Set range id back to 1.
@@ -1372,6 +1376,8 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
1372
1376
t .Fatal ("unexpected call to onProcessResult" )
1373
1377
},
1374
1378
})
1379
+ require .Equal (t , int64 (0 ), bq .enqueueAdd .Count ())
1380
+ require .Equal (t , int64 (1 ), bq .enqueueUnexpectedError .Count ())
1375
1381
require .False (t , queued )
1376
1382
})
1377
1383
t .Run ("stopped" , func (t * testing.T ) {
@@ -1390,6 +1396,8 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
1390
1396
},
1391
1397
})
1392
1398
require .False (t , queued )
1399
+ require .Equal (t , int64 (0 ), bq .enqueueAdd .Count ())
1400
+ require .Equal (t , int64 (1 ), bq .enqueueUnexpectedError .Count ())
1393
1401
})
1394
1402
1395
1403
t .Run ("alreadyqueued" , func (t * testing.T ) {
@@ -1407,6 +1415,8 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
1407
1415
},
1408
1416
})
1409
1417
require .True (t , queued )
1418
+ require .Equal (t , int64 (1 ), bq .enqueueAdd .Count ())
1419
+ require .Equal (t , int64 (0 ), bq .enqueueUnexpectedError .Count ())
1410
1420
1411
1421
// Inserting again on the same range id should fail.
1412
1422
queued , _ = bq .testingAddWithCallback (ctx , r , 1.0 , processCallback {
@@ -1419,6 +1429,8 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
1419
1429
},
1420
1430
})
1421
1431
require .False (t , queued )
1432
+ require .Equal (t , int64 (1 ), bq .enqueueAdd .Count ())
1433
+ require .Equal (t , int64 (0 ), bq .enqueueUnexpectedError .Count ())
1422
1434
})
1423
1435
1424
1436
t .Run ("purgatory" , func (t * testing.T ) {
@@ -1442,6 +1454,8 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
1442
1454
},
1443
1455
})
1444
1456
require .False (t , queued )
1457
+ require .Equal (t , int64 (0 ), bq .enqueueAdd .Count ())
1458
+ require .Equal (t , int64 (0 ), bq .enqueueUnexpectedError .Count ())
1445
1459
})
1446
1460
1447
1461
t .Run ("processing" , func (t * testing.T ) {
@@ -1453,7 +1467,7 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
1453
1467
item .setProcessing ()
1454
1468
bq .addLocked (item )
1455
1469
// Inserting a range that is already being processed should not enqueue again.
1456
- requeued , _ := bq .testingAddWithCallback (ctx , r , 1.0 , processCallback {
1470
+ markedAsRequeued , _ := bq .testingAddWithCallback (ctx , r , 1.0 , processCallback {
1457
1471
onEnqueueResult : func (indexOnHeap int , err error ) {
1458
1472
require .Equal (t , - 1 , indexOnHeap )
1459
1473
require .ErrorIs (t , err , errReplicaAlreadyProcessing )
@@ -1462,7 +1476,9 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
1462
1476
t .Fatal ("unexpected call to onProcessResult" )
1463
1477
},
1464
1478
})
1465
- require .True (t , requeued )
1479
+ require .True (t , markedAsRequeued )
1480
+ require .Equal (t , int64 (0 ), bq .enqueueAdd .Count ())
1481
+ require .Equal (t , int64 (0 ), bq .enqueueUnexpectedError .Count ())
1466
1482
})
1467
1483
t .Run ("fullqueue" , func (t * testing.T ) {
1468
1484
testQueue := & testQueueImpl {}
@@ -1482,6 +1498,8 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
1482
1498
},
1483
1499
})
1484
1500
require .True (t , queued )
1501
+ require .Equal (t , int64 (1 ), bq .enqueueAdd .Count ())
1502
+ require .Equal (t , int64 (0 ), bq .enqueueUnexpectedError .Count ())
1485
1503
})
1486
1504
}
1487
1505
0 commit comments