@@ -2721,3 +2721,163 @@ func BenchmarkSeekPrefixTombstones(b *testing.B) {
27212721 iter .SeekPrefixGE (seekKey )
27222722 }
27232723}
2724+
2725+ func runBenchmarkQueueWorkload (b * testing.B , deleteRatio float32 , initOps int , valueSize int ) {
2726+ const queueCount = 8
2727+ // These should be large enough to assign a unique key to each item in the
2728+ // queue.
2729+ const maxQueueIDLen = 1
2730+ const maxItemLen = 7
2731+ const maxKeyLen = maxQueueIDLen + 1 + maxItemLen
2732+ queueIDKeyspace := testkeys .Alpha (maxQueueIDLen )
2733+ itemKeyspace := testkeys .Alpha (maxItemLen )
2734+ key := make ([]byte , maxKeyLen )
2735+ val := make ([]byte , valueSize )
2736+ rng := rand .New (rand .NewSource (uint64 (time .Now ().UnixNano ())))
2737+
2738+ getKey := func (q int , i int ) []byte {
2739+ n := testkeys .WriteKey (key , queueIDKeyspace , int64 (q ))
2740+ key [n ] = '/'
2741+ prefixLen := n + 1
2742+ n = testkeys .WriteKey (key [prefixLen :], itemKeyspace , int64 (i ))
2743+ return key [:prefixLen + n ]
2744+ }
2745+
2746+ type Queue struct {
2747+ start int
2748+ end int // exclusive
2749+ }
2750+ var queues = make ([]* Queue , queueCount )
2751+ for i := 0 ; i < queueCount ; i ++ {
2752+ queues [i ] = & Queue {}
2753+ }
2754+
2755+ o := (& Options {
2756+ DisableWAL : true ,
2757+ FS : vfs .NewMem (),
2758+ Comparer : testkeys .Comparer ,
2759+ FormatMajorVersion : FormatNewest ,
2760+ }).EnsureDefaults ()
2761+
2762+ d , err := Open ("" , o )
2763+ require .NoError (b , err )
2764+
2765+ processQueueOnce := func (batch * Batch ) {
2766+ for {
2767+ // Randomly pick a queue to process.
2768+ q := rng .Intn (queueCount )
2769+ queue := queues [q ]
2770+
2771+ isDelete := rng .Float32 () < deleteRatio
2772+
2773+ if isDelete {
2774+ // Only process the queue if it's not empty. Otherwise, retry
2775+ // with a different queue.
2776+ if queue .start != queue .end {
2777+ require .NoError (b , batch .Delete (getKey (q , queue .start ), nil ))
2778+ queue .start = (queue .start + 1 ) % int (itemKeyspace .Count ())
2779+ break
2780+ }
2781+ } else {
2782+ // Append to the queue.
2783+ require .NoError (b , batch .Set (getKey (q , queue .end ), val , nil ))
2784+ queue .end = (queue .end + 1 ) % int (itemKeyspace .Count ())
2785+ break
2786+ }
2787+ }
2788+ }
2789+
2790+ // First, process queues initialOps times.
2791+ batch := d .NewBatch ()
2792+ for i := 0 ; i < initOps ; i ++ {
2793+ processQueueOnce (batch )
2794+ // Use a large batch size to speed up initialization.
2795+ if batch .Len () >= 10 << 24 /* 167 MiB */ {
2796+ require .NoError (b , batch .Commit (NoSync ))
2797+ batch = d .NewBatch ()
2798+ }
2799+ }
2800+ require .NoError (b , batch .Commit (NoSync ))
2801+ // Manually flush in case the last batch was small.
2802+ _ , err = d .AsyncFlush ()
2803+ require .NoError (b , err )
2804+
2805+ waitForCompactions := func () {
2806+ d .mu .Lock ()
2807+ // NB: Wait for table stats because some compaction types rely
2808+ // on table stats to be collected.
2809+ d .waitTableStats ()
2810+ for d .mu .compact .compactingCount > 0 {
2811+ d .mu .compact .cond .Wait ()
2812+ d .waitTableStats ()
2813+ }
2814+ d .mu .Unlock ()
2815+ }
2816+
2817+ waitForCompactions ()
2818+
2819+ // Log the number of tombstones and live keys in each level after
2820+ // background compactions are complete.
2821+ b .Log ("LSM after compactions:" )
2822+ firstIter , _ := d .NewIter (nil )
2823+ firstIter .First ()
2824+ lastIter , _ := d .NewIter (nil )
2825+ lastIter .Last ()
2826+ stats , _ := d .ScanStatistics (context .Background (), firstIter .Key (), lastIter .Key (), ScanStatisticsOptions {})
2827+ require .NoError (b , firstIter .Close ())
2828+ require .NoError (b , lastIter .Close ())
2829+ metrics := d .Metrics ()
2830+ for i := 0 ; i < numLevels ; i ++ {
2831+ numTombstones := stats .Levels [i ].KindsCount [base .InternalKeyKindDelete ]
2832+ numSets := stats .Levels [i ].KindsCount [base .InternalKeyKindSet ]
2833+ numTables := metrics .Levels [i ].NumFiles
2834+ if numSets > 0 {
2835+ b .Logf ("L%d: %d tombstones, %d sets, %d sstables\n " , i , numTombstones , numSets , numTables )
2836+ }
2837+ }
2838+
2839+ // Seek to the start of each queue.
2840+ b .Run ("seek" , func (b * testing.B ) {
2841+ for i := 0 ; i < b .N ; i ++ {
2842+ for q := 0 ; q < queueCount ; q ++ {
2843+ iter , _ := d .NewIter (nil )
2844+ iter .SeekGE (getKey (q , 0 ))
2845+ require .NoError (b , iter .Close ())
2846+ }
2847+ }
2848+ })
2849+
2850+ require .NoError (b , d .Close ())
2851+ }
2852+
2853+ // BenchmarkQueueWorkload benchmarks a workload consisting of multiple queues
2854+ // that are all being processed at the same time. Processing a queue entails
2855+ // either appending to the end of the queue (a Set operation) or deleting from
2856+ // the start of the queue (a Delete operation). The goal is to detect cases
2857+ // where we see a large buildup of point tombstones at the beginning of each
2858+ // queue, which leads to the slowdown of SeekGE(<start of queue>). To that end,
2859+ // the test subbenchmarks a series of configurations that each 1) process the
2860+ // queues a certain number of times and then 2) benchmark both the queue
2861+ // processing throughput and SeekGE performance. See
2862+ // https://github.com/facebook/rocksdb/wiki/Implement-Queue-Service-Using-RocksDB
2863+ // for more information.
2864+ func BenchmarkQueueWorkload (b * testing.B ) {
2865+ // The portion of processing ops that will be deletes for each subbenchmark.
2866+ var deleteRatios = []float32 {0.1 , 0.3 , 0.5 }
2867+ // The number of times queues will be processed before running each
2868+ // subbenchmark.
2869+ var initOps = []int {400_000 , 800_000 , 1_200_000 , 2_000_000 , 3_500_000 , 5_000_000 , 7_500_000 , 10_000_000 , 50_000_000 }
2870+ // We vary the value size to identify how compaction behaves when the
2871+ // relative sizes of tombstones and the keys they delete are different.
2872+ var valueSizes = []int {128 , 2048 }
2873+
2874+ for _ , deleteRatio := range deleteRatios {
2875+ for _ , valueSize := range valueSizes {
2876+ for _ , numInitOps := range initOps {
2877+ b .Run (fmt .Sprintf ("initial_ops=%d/deleteRatio=%.2f/valueSize=%d" , numInitOps , deleteRatio , valueSize ), func (b * testing.B ) {
2878+ runBenchmarkQueueWorkload (b , deleteRatio , numInitOps , valueSize )
2879+ })
2880+ }
2881+ }
2882+ }
2883+ }
0 commit comments