@@ -112,6 +112,8 @@ func makeTestBaseQueue(name string, impl queueImpl, store *Store, cfg queueConfi
112
112
cfg .pending = metric .NewGauge (metric.Metadata {Name : "pending" })
113
113
cfg .processingNanos = metric .NewCounter (metric.Metadata {Name : "processingnanos" })
114
114
cfg .purgatory = metric .NewGauge (metric.Metadata {Name : "purgatory" })
115
+ cfg .enqueueAdd = metric .NewCounter (metric.Metadata {Name : "enqueueadd" })
116
+ cfg .enqueueUnexpectedError = metric .NewCounter (metric.Metadata {Name : "enqueueunexpectederror" })
115
117
cfg .disabledConfig = testQueueEnabled
116
118
return newBaseQueue (name , impl , store , cfg )
117
119
}
@@ -1328,6 +1330,7 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
1328
1330
t .Fatal ("unexpected call to onProcessResult" )
1329
1331
},
1330
1332
})
1333
+ require .Equal (t , bq .enqueueAdd .Count (), int64 (1 ))
1331
1334
require .True (t , queued )
1332
1335
})
1333
1336
@@ -1354,6 +1357,7 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
1354
1357
t .Fatal ("unexpected call to onProcessResult" )
1355
1358
},
1356
1359
})
1360
+ require .Equal (t , int64 (i + 1 ), bq .enqueueAdd .Count ())
1357
1361
require .True (t , queued )
1358
1362
}
1359
1363
// Set range id back to 1.
@@ -1374,6 +1378,8 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
1374
1378
t .Fatal ("unexpected call to onProcessResult" )
1375
1379
},
1376
1380
})
1381
+ require .Equal (t , int64 (0 ), bq .enqueueAdd .Count ())
1382
+ require .Equal (t , int64 (1 ), bq .enqueueUnexpectedError .Count ())
1377
1383
require .False (t , queued )
1378
1384
})
1379
1385
t .Run ("stopped" , func (t * testing.T ) {
@@ -1392,6 +1398,8 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
1392
1398
},
1393
1399
})
1394
1400
require .False (t , queued )
1401
+ require .Equal (t , int64 (0 ), bq .enqueueAdd .Count ())
1402
+ require .Equal (t , int64 (1 ), bq .enqueueUnexpectedError .Count ())
1395
1403
})
1396
1404
1397
1405
t .Run ("alreadyqueued" , func (t * testing.T ) {
@@ -1409,6 +1417,8 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
1409
1417
},
1410
1418
})
1411
1419
require .True (t , queued )
1420
+ require .Equal (t , int64 (1 ), bq .enqueueAdd .Count ())
1421
+ require .Equal (t , int64 (0 ), bq .enqueueUnexpectedError .Count ())
1412
1422
1413
1423
// Inserting again on the same range id should fail.
1414
1424
queued , _ = bq .testingAddWithCallback (ctx , r , 1.0 , processCallback {
@@ -1421,6 +1431,8 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
1421
1431
},
1422
1432
})
1423
1433
require .False (t , queued )
1434
+ require .Equal (t , int64 (1 ), bq .enqueueAdd .Count ())
1435
+ require .Equal (t , int64 (0 ), bq .enqueueUnexpectedError .Count ())
1424
1436
})
1425
1437
1426
1438
t .Run ("purgatory" , func (t * testing.T ) {
@@ -1444,6 +1456,8 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
1444
1456
},
1445
1457
})
1446
1458
require .False (t , queued )
1459
+ require .Equal (t , int64 (0 ), bq .enqueueAdd .Count ())
1460
+ require .Equal (t , int64 (0 ), bq .enqueueUnexpectedError .Count ())
1447
1461
})
1448
1462
1449
1463
t .Run ("processing" , func (t * testing.T ) {
@@ -1455,7 +1469,7 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
1455
1469
item .setProcessing ()
1456
1470
bq .addLocked (item )
1457
1471
// Inserting a range that is already being processed should not enqueue again.
1458
- requeued , _ := bq .testingAddWithCallback (ctx , r , 1.0 , processCallback {
1472
+ markedAsRequeued , _ := bq .testingAddWithCallback (ctx , r , 1.0 , processCallback {
1459
1473
onEnqueueResult : func (indexOnHeap int , err error ) {
1460
1474
require .Equal (t , - 1 , indexOnHeap )
1461
1475
require .ErrorIs (t , err , errReplicaAlreadyProcessing )
@@ -1464,7 +1478,9 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
1464
1478
t .Fatal ("unexpected call to onProcessResult" )
1465
1479
},
1466
1480
})
1467
- require .True (t , requeued )
1481
+ require .True (t , markedAsRequeued )
1482
+ require .Equal (t , int64 (0 ), bq .enqueueAdd .Count ())
1483
+ require .Equal (t , int64 (0 ), bq .enqueueUnexpectedError .Count ())
1468
1484
})
1469
1485
t .Run ("fullqueue" , func (t * testing.T ) {
1470
1486
testQueue := & testQueueImpl {}
@@ -1484,6 +1500,8 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
1484
1500
},
1485
1501
})
1486
1502
require .True (t , queued )
1503
+ require .Equal (t , int64 (1 ), bq .enqueueAdd .Count ())
1504
+ require .Equal (t , int64 (0 ), bq .enqueueUnexpectedError .Count ())
1487
1505
})
1488
1506
}
1489
1507
0 commit comments