Skip to content

Commit b4e47cf

Browse files
committed
jobs/frontier: add randomized frontier test
Epic: none Release note: none
1 parent 96f9f14 commit b4e47cf

File tree

1 file changed

+106
-0
lines changed

1 file changed

+106
-0
lines changed

pkg/jobs/jobfrontier/frontier_test.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/cockroachdb/cockroach/pkg/util/hlc"
1818
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
1919
"github.com/cockroachdb/cockroach/pkg/util/log"
20+
"github.com/cockroachdb/cockroach/pkg/util/randutil"
2021
"github.com/cockroachdb/cockroach/pkg/util/span"
2122
"github.com/stretchr/testify/require"
2223
)
@@ -464,3 +465,108 @@ func TestGetAllResolvedSpans(t *testing.T) {
464465
})
465466
}
466467
}
468+
469+
func TestRandomizedFrontier(t *testing.T) {
470+
defer leaktest.AfterTest(t)()
471+
defer log.Scope(t).Close(t)
472+
473+
ctx := context.Background()
474+
srv := serverutils.StartServerOnly(t, base.TestServerArgs{})
475+
defer srv.Stopper().Stop(ctx)
476+
477+
s := srv.ApplicationLayer()
478+
479+
// Seed for reproducible randomness
480+
rng, _ := randutil.NewTestRand()
481+
482+
// Create 50 adjacent key spans
483+
const numSpans = 50
484+
spans := make([]roachpb.Span, numSpans)
485+
for i := 0; i < numSpans; i++ {
486+
startKey := []byte{byte(i)}
487+
endKey := []byte{byte(i + 1)}
488+
spans[i] = roachpb.Span{Key: startKey, EndKey: endKey}
489+
}
490+
491+
frontier, err := span.MakeFrontier(spans...)
492+
require.NoError(t, err)
493+
494+
// Initialize spans to random timestamps 0-3
495+
for _, sp := range spans {
496+
ts := hlc.Timestamp{WallTime: int64(rng.Intn(4))}
497+
_, err := frontier.Forward(sp, ts)
498+
require.NoError(t, err)
499+
}
500+
501+
jobID := jobspb.JobID(3000)
502+
spanSize := spans[0].Size()
503+
smallChunkSize := spanSize * rng.Intn(10)
504+
largeChunkSize := spanSize * numSpans * 2
505+
506+
persist := func() {
507+
require.NoError(t, s.InternalDB().(isql.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
508+
// Store sharded version (small chunk size forces sharding)
509+
err := storeChunked(ctx, txn, jobID, "sharded", frontier, smallChunkSize)
510+
require.NoError(t, err)
511+
512+
// Store unsharded version (large chunk size prevents sharding)
513+
err = storeChunked(ctx, txn, jobID, "unsharded", frontier, largeChunkSize)
514+
require.NoError(t, err)
515+
516+
return nil
517+
}))
518+
}
519+
520+
getAndVerify := func() {
521+
require.NoError(t, s.InternalDB().(isql.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
522+
523+
shardedFrontier, found, err := Get(ctx, txn, jobID, "sharded")
524+
require.NoError(t, err)
525+
require.True(t, found, "updated sharded frontier should be found")
526+
527+
checkSpanFrontierEquality(t, frontier, shardedFrontier)
528+
529+
unshardedFrontier, found, err := Get(ctx, txn, jobID, "unsharded")
530+
require.NoError(t, err)
531+
require.True(t, found, "updated unsharded frontier should be found")
532+
533+
checkSpanFrontierEquality(t, frontier, unshardedFrontier)
534+
535+
return nil
536+
}))
537+
}
538+
539+
persist()
540+
getAndVerify()
541+
542+
// Randomly update some spans in the in-memory frontier
543+
numUpdates := rng.Intn(20) + 10
544+
545+
for i := 0; i < numUpdates; i++ {
546+
spanIdx := rng.Intn(numSpans)
547+
newTs := hlc.Timestamp{WallTime: int64(rng.Intn(4))}
548+
549+
// If the new ts is less than the current time in the frontier, this will be
550+
// a noop.
551+
_, err := frontier.Forward(spans[spanIdx], newTs)
552+
require.NoError(t, err)
553+
}
554+
555+
persist()
556+
getAndVerify()
557+
}
558+
559+
func checkSpanFrontierEquality(t *testing.T, expected span.Frontier, actual span.Frontier) {
560+
require.Equal(t, expected.Len(), actual.Len(), "frontiers should have same number of spans")
561+
for eSp, eTs := range expected.Entries() {
562+
found := false
563+
for aSp, aTs := range actual.Entries() {
564+
if eSp.Equal(aSp) {
565+
require.Equal(t, eTs, aTs, "span with expected Ts %s and actual Ts %s", eTs, aTs)
566+
found = true
567+
break
568+
}
569+
}
570+
require.True(t, found, "span %d should be found in loaded frontier", eSp)
571+
}
572+
}

0 commit comments

Comments
 (0)