@@ -7,6 +7,7 @@ package tpcc
7
7
8
8
import (
9
9
"bytes"
10
+ "context"
10
11
gosql "database/sql"
11
12
"fmt"
12
13
"math/rand/v2"
@@ -388,7 +389,12 @@ func (p *partitioner) randActive(rng *rand.Rand) int {
388
389
// flag is passed into tpcc, it will set the constraints/preferences based on
389
390
// the geographic zones provided.
390
391
func configureZone (
391
- db * gosql.DB , cfg zoneConfig , table , partition string , partIdx int , totalParts int ,
392
+ ctx context.Context ,
393
+ conn * gosql.Conn ,
394
+ cfg zoneConfig ,
395
+ table , partition string ,
396
+ partIdx int ,
397
+ totalParts int ,
392
398
) error {
393
399
var constraint string
394
400
var lease string
@@ -419,7 +425,7 @@ func configureZone(
419
425
420
426
sql := fmt .Sprintf (`ALTER PARTITION %s OF TABLE %s CONFIGURE ZONE USING %s` ,
421
427
partition , table , opts )
422
- if _ , err := db . Exec ( sql ); err != nil {
428
+ if _ , err := conn . ExecContext ( ctx , sql ); err != nil {
423
429
return errors .Wrapf (err , "Couldn't exec %q" , sql )
424
430
}
425
431
return nil
@@ -429,7 +435,12 @@ func configureZone(
429
435
// provided name, given the partitioning. Callers of the function must specify
430
436
// the associated table and the partition's number.
431
437
func partitionObject (
432
- db * gosql.DB , cfg zoneConfig , p * partitioner , obj , name , col , table string , idx int ,
438
+ ctx context.Context ,
439
+ conn * gosql.Conn ,
440
+ cfg zoneConfig ,
441
+ p * partitioner ,
442
+ obj , name , col , table string ,
443
+ idx int ,
433
444
) error {
434
445
var buf bytes.Buffer
435
446
fmt .Fprintf (& buf , "ALTER %s %s PARTITION BY RANGE (%s) (\n " , obj , name , col )
@@ -442,72 +453,93 @@ func partitionObject(
442
453
buf .WriteString ("\n " )
443
454
}
444
455
buf .WriteString (")\n " )
445
- if _ , err := db . Exec ( buf .String ()); err != nil {
456
+ if _ , err := conn . ExecContext ( ctx , buf .String ()); err != nil {
446
457
return errors .Wrapf (err , "Couldn't exec %q" , buf .String ())
447
458
}
448
459
449
460
for i := 0 ; i < p .parts ; i ++ {
450
- if err := configureZone (db , cfg , table , fmt .Sprintf ("p%d_%d" , idx , i ), i , p .parts ); err != nil {
461
+ if err := configureZone (ctx , conn , cfg , table , fmt .Sprintf ("p%d_%d" , idx , i ), i , p .parts ); err != nil {
451
462
return err
452
463
}
453
464
}
454
465
return nil
455
466
}
456
467
457
468
func partitionTable (
458
- db * gosql.DB , cfg zoneConfig , p * partitioner , table , col string , idx int ,
469
+ ctx context. Context , conn * gosql.Conn , cfg zoneConfig , p * partitioner , table , col string , idx int ,
459
470
) error {
460
- return partitionObject (db , cfg , p , "TABLE" , table , col , table , idx )
471
+ return partitionObject (ctx , conn , cfg , p , "TABLE" , table , col , table , idx )
461
472
}
462
473
463
474
func partitionIndex (
464
- db * gosql.DB , cfg zoneConfig , p * partitioner , table , index , col string , idx int ,
475
+ ctx context.Context ,
476
+ conn * gosql.Conn ,
477
+ cfg zoneConfig ,
478
+ p * partitioner ,
479
+ table , index , col string ,
480
+ idx int ,
465
481
) error {
466
482
indexStr := fmt .Sprintf ("%s@%s" , table , index )
467
- if exists , err := indexExists (db , table , index ); err != nil {
483
+ if exists , err := indexExists (ctx , conn , table , index ); err != nil {
468
484
return err
469
485
} else if ! exists {
470
486
return errors .Errorf ("could not find index %q" , indexStr )
471
487
}
472
- return partitionObject (db , cfg , p , "INDEX" , indexStr , col , table , idx )
488
+ return partitionObject (ctx , conn , cfg , p , "INDEX" , indexStr , col , table , idx )
473
489
}
474
490
475
- func partitionWarehouse (db * gosql.DB , cfg zoneConfig , wPart * partitioner ) error {
476
- return partitionTable (db , cfg , wPart , "warehouse" , "w_id" , 0 )
491
+ func partitionWarehouse (
492
+ ctx context.Context , conn * gosql.Conn , cfg zoneConfig , wPart * partitioner ,
493
+ ) error {
494
+ return partitionTable (ctx , conn , cfg , wPart , "warehouse" , "w_id" , 0 )
477
495
}
478
496
479
- func partitionDistrict (db * gosql.DB , cfg zoneConfig , wPart * partitioner ) error {
480
- return partitionTable (db , cfg , wPart , "district" , "d_w_id" , 0 )
497
+ func partitionDistrict (
498
+ ctx context.Context , conn * gosql.Conn , cfg zoneConfig , wPart * partitioner ,
499
+ ) error {
500
+ return partitionTable (ctx , conn , cfg , wPart , "district" , "d_w_id" , 0 )
481
501
}
482
502
483
- func partitionNewOrder (db * gosql.DB , cfg zoneConfig , wPart * partitioner ) error {
484
- return partitionTable (db , cfg , wPart , "new_order" , "no_w_id" , 0 )
503
+ func partitionNewOrder (
504
+ ctx context.Context , conn * gosql.Conn , cfg zoneConfig , wPart * partitioner ,
505
+ ) error {
506
+ return partitionTable (ctx , conn , cfg , wPart , "new_order" , "no_w_id" , 0 )
485
507
}
486
508
487
- func partitionOrder (db * gosql.DB , cfg zoneConfig , wPart * partitioner ) error {
488
- if err := partitionTable (db , cfg , wPart , `"order"` , "o_w_id" , 0 ); err != nil {
509
+ func partitionOrder (
510
+ ctx context.Context , conn * gosql.Conn , cfg zoneConfig , wPart * partitioner ,
511
+ ) error {
512
+ if err := partitionTable (ctx , conn , cfg , wPart , `"order"` , "o_w_id" , 0 ); err != nil {
489
513
return err
490
514
}
491
- return partitionIndex (db , cfg , wPart , `"order"` , "order_idx" , "o_w_id" , 1 )
515
+ return partitionIndex (ctx , conn , cfg , wPart , `"order"` , "order_idx" , "o_w_id" , 1 )
492
516
}
493
517
494
- func partitionOrderLine (db * gosql.DB , cfg zoneConfig , wPart * partitioner ) error {
495
- return partitionTable (db , cfg , wPart , "order_line" , "ol_w_id" , 0 )
518
+ func partitionOrderLine (
519
+ ctx context.Context , conn * gosql.Conn , cfg zoneConfig , wPart * partitioner ,
520
+ ) error {
521
+ return partitionTable (ctx , conn , cfg , wPart , "order_line" , "ol_w_id" , 0 )
496
522
}
497
523
498
- func partitionStock (db * gosql.DB , cfg zoneConfig , wPart * partitioner ) error {
499
- return partitionTable (db , cfg , wPart , "stock" , "s_w_id" , 0 )
524
+ func partitionStock (
525
+ ctx context.Context , conn * gosql.Conn , cfg zoneConfig , wPart * partitioner ,
526
+ ) error {
527
+ return partitionTable (ctx , conn , cfg , wPart , "stock" , "s_w_id" , 0 )
500
528
}
501
529
502
- func partitionCustomer (db * gosql.DB , cfg zoneConfig , wPart * partitioner ) error {
503
- if err := partitionTable (db , cfg , wPart , "customer" , "c_w_id" , 0 ); err != nil {
530
+ func partitionCustomer (
531
+ ctx context.Context , conn * gosql.Conn , cfg zoneConfig , wPart * partitioner ,
532
+ ) error {
533
+ if err := partitionTable (ctx , conn , cfg , wPart , "customer" , "c_w_id" , 0 ); err != nil {
504
534
return err
505
535
}
506
- return partitionIndex (db , cfg , wPart , "customer" , "customer_idx" , "c_w_id" , 1 )
536
+ return partitionIndex (ctx , conn , cfg , wPart , "customer" , "customer_idx" , "c_w_id" , 1 )
507
537
}
508
538
509
- func partitionHistory (db * gosql.DB , cfg zoneConfig , wPart * partitioner ) error {
510
- return partitionTable (db , cfg , wPart , "history" , "h_w_id" , 0 )
539
+ func partitionHistory (
540
+ ctx context.Context , conn * gosql.Conn , cfg zoneConfig , wPart * partitioner ,
541
+ ) error {
542
+ return partitionTable (ctx , conn , cfg , wPart , "history" , "h_w_id" , 0 )
511
543
}
512
544
513
545
// replicateColumns creates covering replicated indexes for a given table
@@ -517,7 +549,8 @@ func partitionHistory(db *gosql.DB, cfg zoneConfig, wPart *partitioner) error {
517
549
// lookups on those columns to be local within the provided zone. If there are
518
550
// no zones, it assumes that each partition corresponds to a rack.
519
551
func replicateColumns (
520
- db * gosql.DB ,
552
+ ctx context.Context ,
553
+ conn * gosql.Conn ,
521
554
cfg zoneConfig ,
522
555
wPart * partitioner ,
523
556
name string ,
@@ -526,13 +559,13 @@ func replicateColumns(
526
559
) error {
527
560
constraints := synthesizeConstraints (cfg , wPart )
528
561
for i , constraint := range constraints {
529
- if _ , err := db . Exec (
562
+ if _ , err := conn . ExecContext ( ctx ,
530
563
fmt .Sprintf (`CREATE UNIQUE INDEX %[1]s_idx_%[2]d ON %[1]s (%[3]s) STORING (%[4]s)` ,
531
564
name , i , strings .Join (pkColumns , "," ), strings .Join (storedColumns , "," )),
532
565
); err != nil {
533
566
return err
534
567
}
535
- if _ , err := db . Exec ( fmt .Sprintf (
568
+ if _ , err := conn . ExecContext ( ctx , fmt .Sprintf (
536
569
`ALTER INDEX %[1]s@%[1]s_idx_%[2]d
537
570
CONFIGURE ZONE USING num_replicas = COPY FROM PARENT, constraints='{"%[3]s": 1}', lease_preferences='[[%[3]s]]'` ,
538
571
name , i , constraint )); err != nil {
@@ -542,17 +575,23 @@ CONFIGURE ZONE USING num_replicas = COPY FROM PARENT, constraints='{"%[3]s": 1}'
542
575
return nil
543
576
}
544
577
545
- func replicateWarehouse (db * gosql.DB , cfg zoneConfig , wPart * partitioner ) error {
546
- return replicateColumns (db , cfg , wPart , "warehouse" , []string {"w_id" }, []string {"w_tax" })
578
+ func replicateWarehouse (
579
+ ctx context.Context , conn * gosql.Conn , cfg zoneConfig , wPart * partitioner ,
580
+ ) error {
581
+ return replicateColumns (ctx , conn , cfg , wPart , "warehouse" , []string {"w_id" }, []string {"w_tax" })
547
582
}
548
583
549
- func replicateDistrict (db * gosql.DB , cfg zoneConfig , wPart * partitioner ) error {
550
- return replicateColumns (db , cfg , wPart , "district" , []string {"d_w_id" , "d_id" },
584
+ func replicateDistrict (
585
+ ctx context.Context , conn * gosql.Conn , cfg zoneConfig , wPart * partitioner ,
586
+ ) error {
587
+ return replicateColumns (ctx , conn , cfg , wPart , "district" , []string {"d_w_id" , "d_id" },
551
588
[]string {"d_name" , "d_street_1" , "d_street_2" , "d_city" , "d_state" , "d_zip" })
552
589
}
553
590
554
- func replicateItem (db * gosql.DB , cfg zoneConfig , wPart * partitioner ) error {
555
- return replicateColumns (db , cfg , wPart , "item" , []string {"i_id" },
591
+ func replicateItem (
592
+ ctx context.Context , conn * gosql.Conn , cfg zoneConfig , wPart * partitioner ,
593
+ ) error {
594
+ return replicateColumns (ctx , conn , cfg , wPart , "item" , []string {"i_id" },
556
595
[]string {"i_im_id" , "i_name" , "i_price" , "i_data" })
557
596
}
558
597
@@ -572,46 +611,50 @@ func synthesizeConstraints(cfg zoneConfig, wPart *partitioner) []string {
572
611
}
573
612
574
613
func partitionTables (
575
- db * gosql.DB , cfg zoneConfig , wPart * partitioner , replicateStaticColumns bool ,
614
+ ctx context.Context ,
615
+ conn * gosql.Conn ,
616
+ cfg zoneConfig ,
617
+ wPart * partitioner ,
618
+ replicateStaticColumns bool ,
576
619
) error {
577
- if err := partitionWarehouse (db , cfg , wPart ); err != nil {
620
+ if err := partitionWarehouse (ctx , conn , cfg , wPart ); err != nil {
578
621
return err
579
622
}
580
- if err := partitionDistrict (db , cfg , wPart ); err != nil {
623
+ if err := partitionDistrict (ctx , conn , cfg , wPart ); err != nil {
581
624
return err
582
625
}
583
- if err := partitionNewOrder (db , cfg , wPart ); err != nil {
626
+ if err := partitionNewOrder (ctx , conn , cfg , wPart ); err != nil {
584
627
return err
585
628
}
586
- if err := partitionOrder (db , cfg , wPart ); err != nil {
629
+ if err := partitionOrder (ctx , conn , cfg , wPart ); err != nil {
587
630
return err
588
631
}
589
- if err := partitionOrderLine (db , cfg , wPart ); err != nil {
632
+ if err := partitionOrderLine (ctx , conn , cfg , wPart ); err != nil {
590
633
return err
591
634
}
592
- if err := partitionStock (db , cfg , wPart ); err != nil {
635
+ if err := partitionStock (ctx , conn , cfg , wPart ); err != nil {
593
636
return err
594
637
}
595
- if err := partitionCustomer (db , cfg , wPart ); err != nil {
638
+ if err := partitionCustomer (ctx , conn , cfg , wPart ); err != nil {
596
639
return err
597
640
}
598
- if err := partitionHistory (db , cfg , wPart ); err != nil {
641
+ if err := partitionHistory (ctx , conn , cfg , wPart ); err != nil {
599
642
return err
600
643
}
601
644
if replicateStaticColumns {
602
- if err := replicateDistrict (db , cfg , wPart ); err != nil {
645
+ if err := replicateDistrict (ctx , conn , cfg , wPart ); err != nil {
603
646
return err
604
647
}
605
- if err := replicateWarehouse (db , cfg , wPart ); err != nil {
648
+ if err := replicateWarehouse (ctx , conn , cfg , wPart ); err != nil {
606
649
return err
607
650
}
608
651
}
609
- return replicateItem (db , cfg , wPart )
652
+ return replicateItem (ctx , conn , cfg , wPart )
610
653
}
611
654
612
- func partitionCount (db * gosql.DB ) (int , error ) {
655
+ func partitionCount (ctx context. Context , conn * gosql.Conn ) (int , error ) {
613
656
var count int
614
- if err := db . QueryRow ( `
657
+ if err := conn . QueryRowContext ( ctx , `
615
658
SELECT count(*)
616
659
FROM crdb_internal.tables t
617
660
JOIN crdb_internal.partitions p
@@ -624,12 +667,12 @@ func partitionCount(db *gosql.DB) (int, error) {
624
667
return count , nil
625
668
}
626
669
627
- func indexExists (db * gosql.DB , table , index string ) (bool , error ) {
670
+ func indexExists (ctx context. Context , conn * gosql.Conn , table , index string ) (bool , error ) {
628
671
// Strip any quotes around the table name.
629
672
table = strings .ReplaceAll (table , `"` , `` )
630
673
631
674
var exists bool
632
- if err := db . QueryRow ( `
675
+ if err := conn . QueryRowContext ( ctx , `
633
676
SELECT count(*) > 0
634
677
FROM information_schema.statistics
635
678
WHERE table_name = $1
0 commit comments