|
6 | 6 | package resolvedspan_test
|
7 | 7 |
|
8 | 8 | import (
|
| 9 | + "context" |
| 10 | + "fmt" |
9 | 11 | "iter"
|
10 | 12 | "testing"
|
| 13 | + "time" |
11 | 14 |
|
| 15 | + "github.com/cockroachdb/cockroach/pkg/base" |
12 | 16 | "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/resolvedspan"
|
13 | 17 | "github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
|
14 | 18 | "github.com/cockroachdb/cockroach/pkg/keys"
|
15 | 19 | "github.com/cockroachdb/cockroach/pkg/roachpb"
|
16 | 20 | "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
|
17 | 21 | "github.com/cockroachdb/cockroach/pkg/testutils"
|
| 22 | + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" |
| 23 | + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" |
18 | 24 | "github.com/cockroachdb/cockroach/pkg/util/hlc"
|
19 | 25 | "github.com/cockroachdb/cockroach/pkg/util/leaktest"
|
20 | 26 | "github.com/cockroachdb/cockroach/pkg/util/log"
|
21 | 27 | "github.com/cockroachdb/cockroach/pkg/util/randutil"
|
22 | 28 | "github.com/cockroachdb/cockroach/pkg/util/span"
|
| 29 | + "github.com/cockroachdb/cockroach/pkg/util/timeutil" |
23 | 30 | "github.com/cockroachdb/errors"
|
24 | 31 | "github.com/stretchr/testify/require"
|
25 | 32 | )
|
@@ -172,6 +179,7 @@ func TestCoordinatorFrontier(t *testing.T) {
|
172 | 179 | }
|
173 | 180 |
|
174 | 181 | type frontier interface {
|
| 182 | + AddSpansAt(startAt hlc.Timestamp, spans ...roachpb.Span) error |
175 | 183 | Frontier() hlc.Timestamp
|
176 | 184 | ForwardResolvedSpan(jobspb.ResolvedSpan) (bool, error)
|
177 | 185 | InBackfill(jobspb.ResolvedSpan) bool
|
@@ -530,3 +538,120 @@ func TestFrontierForwardFullTableSpan(t *testing.T) {
|
530 | 538 | require.Equal(t, targetTimestamp, f.Frontier())
|
531 | 539 | })
|
532 | 540 | }
|
| 541 | + |
| 542 | +func BenchmarkFrontierPerTableTracking(b *testing.B) { |
| 543 | + defer leaktest.AfterTest(b)() |
| 544 | + defer log.Scope(b).Close(b) |
| 545 | + |
| 546 | + ctx := context.Background() |
| 547 | + rng, _ := randutil.NewTestRand() |
| 548 | + |
| 549 | + // Generate a random span iteration order that's the same for all |
| 550 | + // sub-benchmarks. |
| 551 | + const numSpans = 100 |
| 552 | + order := make([]int, numSpans) |
| 553 | + for i := range numSpans { |
| 554 | + order[i] = i |
| 555 | + } |
| 556 | + rng.Shuffle(numSpans, func(i, j int) { |
| 557 | + order[i], order[j] = order[j], order[i] |
| 558 | + }) |
| 559 | + |
| 560 | + for _, tenantType := range []string{"system", "tenant"} { |
| 561 | + for _, frontierType := range []string{"aggregator", "coordinator"} { |
| 562 | + for _, perTableTracking := range []bool{false, true} { |
| 563 | + b.Run( |
| 564 | + fmt.Sprintf("%s/%s/per-table-tracking=%t", tenantType, frontierType, perTableTracking), |
| 565 | + func(b *testing.B) { |
| 566 | + // Start the server. |
| 567 | + srv, db, _ := serverutils.StartServer(b, base.TestServerArgs{ |
| 568 | + DefaultTestTenant: base.TestControlsTenantsExplicitly, |
| 569 | + }) |
| 570 | + defer srv.Stopper().Stop(ctx) |
| 571 | + |
| 572 | + // Get a SQL connection/codec for the tenant type. |
| 573 | + sqlDB, codec := func() (*sqlutils.SQLRunner, keys.SQLCodec) { |
| 574 | + switch tenantType { |
| 575 | + case "system": |
| 576 | + return sqlutils.MakeSQLRunner(db), keys.SystemSQLCodec |
| 577 | + case "tenant": |
| 578 | + tenantID := roachpb.MinTenantID |
| 579 | + _, tenantDB := serverutils.StartTenant(b, srv, base.TestTenantArgs{ |
| 580 | + TenantID: tenantID, |
| 581 | + }) |
| 582 | + return sqlutils.MakeSQLRunner(tenantDB), keys.MakeSQLCodec(tenantID) |
| 583 | + default: |
| 584 | + panic("unreachable") |
| 585 | + } |
| 586 | + }() |
| 587 | + |
| 588 | + // Create a table and split it into multiple spans. |
| 589 | + sqlDB.Exec(b, `CREATE TABLE foo (id INT PRIMARY KEY)`) |
| 590 | + sqlDB.Exec(b, fmt.Sprintf( |
| 591 | + `ALTER TABLE foo SPLIT AT SELECT generate_series(10, %d, 10)`, (numSpans-1)*10)) |
| 592 | + |
| 593 | + var fooTableID uint32 |
| 594 | + sqlDB.QueryRow(b, `SELECT 'foo'::regclass::oid::int`).Scan(&fooTableID) |
| 595 | + fooSpan := codec.TableSpan(fooTableID) |
| 596 | + |
| 597 | + // Collect all the spans. |
| 598 | + var spans roachpb.Spans |
| 599 | + rows := sqlDB.Query(b, `SELECT raw_start_key, raw_end_key |
| 600 | +FROM [SHOW RANGES FROM TABLE foo WITH KEYS]`) |
| 601 | + for rows.Next() { |
| 602 | + var startKey, endKey roachpb.Key |
| 603 | + err := rows.Scan(&startKey, &endKey) |
| 604 | + require.NoError(b, err) |
| 605 | + sp := roachpb.Span{Key: startKey, EndKey: endKey} |
| 606 | + spans = append(spans, fooSpan.Intersect(sp)) |
| 607 | + } |
| 608 | + require.Len(b, spans, numSpans) |
| 609 | + |
| 610 | + now := makeTS(timeutil.Now().Unix()) |
| 611 | + |
| 612 | + // Create the frontier and add all the spans. |
| 613 | + f, err := func() (frontier, error) { |
| 614 | + switch frontierType { |
| 615 | + case "aggregator": |
| 616 | + return resolvedspan.NewAggregatorFrontier( |
| 617 | + now, |
| 618 | + now, |
| 619 | + codec, |
| 620 | + perTableTracking, |
| 621 | + ) |
| 622 | + case "coordinator": |
| 623 | + return resolvedspan.NewCoordinatorFrontier( |
| 624 | + now, |
| 625 | + now, |
| 626 | + codec, |
| 627 | + perTableTracking, |
| 628 | + ) |
| 629 | + default: |
| 630 | + panic("unreachable") |
| 631 | + } |
| 632 | + }() |
| 633 | + require.NoError(b, err) |
| 634 | + require.NoError(b, f.AddSpansAt(now, spans...)) |
| 635 | + |
| 636 | + // Main benchmark loop: forward (shuffled) spans in a loop. |
| 637 | + b.ResetTimer() |
| 638 | + for n := range b.N { |
| 639 | + ts := now.AddDuration(time.Second) |
| 640 | + i := n % len(spans) |
| 641 | + _, err := f.ForwardResolvedSpan(jobspb.ResolvedSpan{ |
| 642 | + Span: spans[order[i]], |
| 643 | + Timestamp: ts, |
| 644 | + }) |
| 645 | + if err != nil { |
| 646 | + b.Fatalf("error forwarding: %v", err) |
| 647 | + } |
| 648 | + } |
| 649 | + b.StopTimer() |
| 650 | + |
| 651 | + // Make sure the compiler doesn't optimize away the forwards. |
| 652 | + require.True(b, now.LessEq(f.Frontier())) |
| 653 | + }) |
| 654 | + } |
| 655 | + } |
| 656 | + } |
| 657 | +} |
0 commit comments