Skip to content

Commit bb9a110

Browse files
committed
kv/bulk: enable test tenants
There is one concerningly looking failure in external-process mode, and it's tracked separately. Release note: None
1 parent f09626f commit bb9a110

File tree

5 files changed

+47
-41
lines changed

5 files changed

+47
-41
lines changed

pkg/ccl/storageccl/engineccl/encrypted_fs_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"time"
2020

2121
"github.com/cockroachdb/cockroach/pkg/base"
22+
"github.com/cockroachdb/cockroach/pkg/keys"
2223
"github.com/cockroachdb/cockroach/pkg/roachpb"
2324
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
2425
"github.com/cockroachdb/cockroach/pkg/storage"
@@ -295,7 +296,7 @@ func TestPebbleEncryption(t *testing.T) {
295296
require.NoError(t, batch.PutUnversioned(roachpb.Key("a"), []byte("a")))
296297
require.NoError(t, batch.Commit(true))
297298
require.NoError(t, db.Flush())
298-
require.Equal(t, []byte("a"), storageutils.MVCCGetRaw(t, db, storageutils.PointKey("a", 0)))
299+
require.Equal(t, []byte("a"), storageutils.MVCCGetRaw(t, db, storageutils.PointKey(keys.SystemSQLCodec, "a", 0)))
299300
}()
300301

301302
func() {
@@ -320,7 +321,7 @@ func TestPebbleEncryption(t *testing.T) {
320321
db, err := storage.Open(ctx, env, settings)
321322
require.NoError(t, err)
322323
defer db.Close()
323-
require.Equal(t, []byte("a"), storageutils.MVCCGetRaw(t, db, storageutils.PointKey("a", 0)))
324+
require.Equal(t, []byte("a"), storageutils.MVCCGetRaw(t, db, storageutils.PointKey(keys.SystemSQLCodec, "a", 0)))
324325

325326
// Flushing should've created a new sstable under the active key.
326327
stats, err := db.GetEnvStats()

pkg/kv/bulk/buffering_adder_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@ func TestBufferingAdderMemoryExhaustion(t *testing.T) {
2828
// sstbatcher if allocation failed due to memory exhaustion.
2929

3030
// Start a test server.
31-
s := serverutils.StartServerOnly(t, base.TestServerArgs{})
32-
defer s.Stopper().Stop(ctx)
31+
srv := serverutils.StartServerOnly(t, base.TestServerArgs{})
32+
defer srv.Stopper().Stop(ctx)
33+
s := srv.ApplicationLayer()
3334

3435
distSQLSrv := s.DistSQLServer().(*distsql.ServerImpl)
3536
bulkAdderFactory := distSQLSrv.ServerConfig.BulkAdder

pkg/kv/bulk/main_test.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,9 @@ func TestMain(m *testing.M) {
2828
serverutils.InitTestServerFactory(server.TestServerFactory)
2929
serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)
3030

31-
defer serverutils.TestingSetDefaultTenantSelectionOverride(
32-
base.TestIsForStuffThatShouldWorkWithSecondaryTenantsButDoesntYet(76378),
33-
)()
34-
3531
defer serverutils.TestingGlobalDRPCOption(
3632
base.TestDRPCEnabledRandomly,
3733
)()
3834

39-
code := m.Run()
40-
41-
os.Exit(code)
35+
os.Exit(m.Run())
4236
}

pkg/kv/bulk/sst_batcher_test.go

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -89,15 +89,15 @@ func (e *rowEncoder) encodeRow(row tree.Datums) roachpb.KeyValue {
8989

9090
// newBatcher creates a new SSTBatcher with a range cache for testing.
9191
func newBatcher(
92-
t *testing.T, ctx context.Context, s serverutils.TestServerInterface, mvccCompliant bool,
92+
t *testing.T, ctx context.Context, s serverutils.ApplicationLayerInterface, mvccCompliant bool,
9393
) *bulk.SSTBatcher {
9494
mem := mon.NewUnlimitedMonitor(ctx, mon.Options{Name: mon.MakeName("mvcc-compliance")})
9595
reqs := limit.MakeConcurrentRequestLimiter("reqs", 1000)
9696

9797
// Create a range cache to test pipelined flush behavior
9898
ds := s.DistSenderI().(*kvcoord.DistSender)
9999
rc := rangecache.NewRangeCache(s.ClusterSettings(), ds,
100-
func() int64 { return 2 << 10 }, s.Stopper())
100+
func() int64 { return 2 << 10 }, s.AppStopper())
101101

102102
batcher, err := bulk.MakeSSTBatcher(ctx, "test", s.DB(), s.ClusterSettings(), hlc.Timestamp{}, mvccCompliant, true, mem.MakeConcurrentBoundAccount(), reqs, rc)
103103
require.NoError(t, err)
@@ -121,8 +121,9 @@ func TestDuplicateHandling(t *testing.T) {
121121

122122
mem := mon.NewUnlimitedMonitor(ctx, mon.Options{Name: mon.MakeName("lots")})
123123
reqs := limit.MakeConcurrentRequestLimiter("reqs", 1000)
124-
s, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
125-
defer s.Stopper().Stop(ctx)
124+
srv, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
125+
defer srv.Stopper().Stop(ctx)
126+
s := srv.ApplicationLayer()
126127

127128
expectRevisionCount := func(startKey roachpb.Key, endKey roachpb.Key, count int, exportStartTime hlc.Timestamp) {
128129
req := &kvpb.ExportRequest{
@@ -278,7 +279,7 @@ func TestDuplicateHandling(t *testing.T) {
278279
require.NoError(t, err)
279280
defer b.Close(ctx)
280281
k := func(i int, ts int64) storage.MVCCKey {
281-
return storageutils.PointKey(fmt.Sprintf("bulk-test-%s-%04d", tc.name, i+1), int(ts))
282+
return storageutils.PointKey(s.Codec(), fmt.Sprintf("bulk-test-%s-%04d", tc.name, i+1), int(ts))
282283
}
283284
endKey := tc.addKeys(t, b, k)
284285
if tc.expectedCount > 0 {
@@ -293,8 +294,9 @@ func TestDuplicateHandling(t *testing.T) {
293294
func runTestImport(t *testing.T, batchSizeValue int64) {
294295

295296
ctx := context.Background()
296-
s, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
297-
defer s.Stopper().Stop(ctx)
297+
srv, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
298+
defer srv.Stopper().Stop(ctx)
299+
s := srv.ApplicationLayer()
298300

299301
batchSize := func() int64 { return batchSizeValue }
300302

@@ -347,7 +349,7 @@ func runTestImport(t *testing.T, batchSizeValue int64) {
347349
{{0, 3}, {4}},
348350
} {
349351
t.Run(fmt.Sprintf("%d-%v", i, testCase), func(t *testing.T) {
350-
prefix := keys.SystemSQLCodec.IndexPrefix(uint32(100+i), 1)
352+
prefix := s.Codec().IndexPrefix(uint32(100+i), 1)
351353
key := func(i int) roachpb.Key {
352354
return encoding.EncodeStringAscending(append([]byte{}, prefix...), fmt.Sprintf("k%d", i))
353355
}
@@ -365,7 +367,7 @@ func runTestImport(t *testing.T, batchSizeValue int64) {
365367
// populate it after the first split but before the second split.
366368
ds := s.DistSenderI().(*kvcoord.DistSender)
367369
mockCache := rangecache.NewRangeCache(s.ClusterSettings(), ds,
368-
func() int64 { return 2 << 10 }, s.Stopper())
370+
func() int64 { return 2 << 10 }, s.AppStopper())
369371
for _, k := range []int{0, split1} {
370372
ent, err := ds.RangeDescriptorCache().Lookup(ctx, keys.MustAddr(key(k)))
371373
require.NoError(t, err)
@@ -448,16 +450,17 @@ func TestImportEpochIngestion(t *testing.T) {
448450

449451
mem := mon.NewUnlimitedMonitor(ctx, mon.Options{Name: mon.MakeName("lots")})
450452
reqs := limit.MakeConcurrentRequestLimiter("reqs", 1000)
451-
s, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
452-
defer s.Stopper().Stop(ctx)
453+
srv, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
454+
defer srv.Stopper().Stop(ctx)
455+
s := srv.ApplicationLayer()
453456

454457
b, err := bulk.MakeTestingSSTBatcher(ctx, kvDB, s.ClusterSettings(),
455458
false, true, mem.MakeConcurrentBoundAccount(), reqs)
456459
require.NoError(t, err)
457460
defer b.Close(ctx)
458461

459-
startKey := storageutils.PointKey("a", 1)
460-
endKey := storageutils.PointKey("b", 1)
462+
startKey := storageutils.PointKey(s.Codec(), "a", 1)
463+
endKey := storageutils.PointKey(s.Codec(), "b", 1)
461464
value := storageutils.StringValueRaw("myHumbleValue")
462465
mvccValue, err := storage.DecodeMVCCValue(value)
463466
require.NoError(t, err)
@@ -528,12 +531,13 @@ func TestSSTBatcherError(t *testing.T) {
528531
},
529532
}
530533

531-
s, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{
534+
srv, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{
532535
Knobs: base.TestingKnobs{
533536
Store: knobs,
534537
},
535538
})
536-
defer s.Stopper().Stop(ctx)
539+
defer srv.Stopper().Stop(ctx)
540+
s := srv.ApplicationLayer()
537541

538542
mem := mon.NewUnlimitedMonitor(ctx, mon.Options{Name: mon.MakeName("mvcc-compliance")})
539543
reqs := limit.MakeConcurrentRequestLimiter("reqs", 1000)
@@ -542,7 +546,7 @@ func TestSSTBatcherError(t *testing.T) {
542546
defer batcher.Close(ctx)
543547

544548
require.NoError(t, batcher.AddMVCCKey(ctx,
545-
storage.MVCCKey{Key: []byte("a"), Timestamp: hlc.Timestamp{WallTime: 1}},
549+
storageutils.PointKey(s.Codec(), "a", 1),
546550
storageutils.StringValueRaw("value"),
547551
))
548552

@@ -573,12 +577,14 @@ func TestSSTBatcherPipelinedFlush(t *testing.T) {
573577
},
574578
}
575579

576-
s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{
580+
srv, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{
577581
Knobs: base.TestingKnobs{
578582
Store: knobs,
579583
},
584+
DefaultTestTenant: base.TestDoesNotWorkWithSecondaryTenantsButWeDontKnowWhyYet(156329),
580585
})
581-
defer s.Stopper().Stop(ctx)
586+
defer srv.Stopper().Stop(ctx)
587+
s := srv.ApplicationLayer()
582588

583589
tdb := sqlutils.MakeSQLRunner(db)
584590
tdb.Exec(t, `CREATE TABLE kv (pk INT PRIMARY KEY, v STRING)`)
@@ -625,13 +631,14 @@ func TestSSTBatcherMvccCompliance(t *testing.T) {
625631
defer log.Scope(t).Close(t)
626632
ctx := context.Background()
627633

628-
s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
629-
defer s.Stopper().Stop(ctx)
634+
srv, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
635+
defer srv.Stopper().Stop(ctx)
636+
s := srv.ApplicationLayer()
630637

631638
tdb := sqlutils.MakeSQLRunner(db)
632639
tdb.Exec(t, `CREATE TABLE kv (pk INT PRIMARY KEY, v STRING)`)
633640

634-
tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, keys.SystemSQLCodec, "defaultdb", "kv")
641+
tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, s.Codec(), "defaultdb", "kv")
635642
re := newRowEncoder(t, tableDesc.TableDesc(), s.Codec())
636643

637644
batcher := newBatcher(t, ctx, s, true)
@@ -676,13 +683,14 @@ func TestSSTBatcherRewriteHistory(t *testing.T) {
676683
defer log.Scope(t).Close(t)
677684
ctx := context.Background()
678685

679-
s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
680-
defer s.Stopper().Stop(ctx)
686+
srv, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
687+
defer srv.Stopper().Stop(ctx)
688+
s := srv.ApplicationLayer()
681689

682690
tdb := sqlutils.MakeSQLRunner(db)
683691
tdb.Exec(t, `CREATE TABLE kv (pk INT PRIMARY KEY, v STRING)`)
684692

685-
tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, keys.SystemSQLCodec, "defaultdb", "kv")
693+
tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, s.Codec(), "defaultdb", "kv")
686694
re := newRowEncoder(t, tableDesc.TableDesc(), s.Codec())
687695

688696
batcher := newBatcher(t, ctx, s, false)
@@ -743,9 +751,9 @@ func TestSSTBatcherCloseWithoutFlush(t *testing.T) {
743751
})
744752
defer tc.Stopper().Stop(ctx)
745753

746-
s := tc.Server(0)
754+
s := tc.ApplicationLayer(0)
747755
db := tc.ServerConn(0)
748-
kvDB := tc.Server(0).DB()
756+
kvDB := s.DB()
749757

750758
tdb := sqlutils.MakeSQLRunner(db)
751759
tdb.Exec(t, `CREATE TABLE kv (pk INT PRIMARY KEY, v STRING)`)

pkg/testutils/storageutils/kv.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package storageutils
77

88
import (
9+
"github.com/cockroachdb/cockroach/pkg/keys"
910
"github.com/cockroachdb/cockroach/pkg/roachpb"
1011
"github.com/cockroachdb/cockroach/pkg/storage"
1112
"github.com/cockroachdb/cockroach/pkg/storage/mvccencoding"
@@ -26,8 +27,9 @@ func (kvs KVs) MVCCKeyValues() []storage.MVCCKeyValue {
2627

2728
// PointKey creates an MVCCKey for the given string key and timestamp (walltime
2829
// seconds).
29-
func PointKey(key string, ts int) storage.MVCCKey {
30-
return storage.MVCCKey{Key: roachpb.Key(key), Timestamp: WallTS(ts)}
30+
func PointKey(codec keys.SQLCodec, key string, ts int) storage.MVCCKey {
31+
k := append(append(roachpb.Key(nil), codec.TenantPrefix()...), roachpb.Key(key)...)
32+
return storage.MVCCKey{Key: k, Timestamp: WallTS(ts)}
3133
}
3234

3335
// PointKV creates an MVCCKeyValue for the given string key/value and timestamp
@@ -50,7 +52,7 @@ func PointKVWithLocalTS(key string, ts int, localTS int, value string) storage.M
5052
panic(err)
5153
}
5254
return storage.MVCCKeyValue{
53-
Key: PointKey(key, ts),
55+
Key: PointKey(keys.SystemSQLCodec, key, ts),
5456
Value: v,
5557
}
5658
}
@@ -70,7 +72,7 @@ func PointKVWithImportEpoch(
7072
panic(err)
7173
}
7274
return storage.MVCCKeyValue{
73-
Key: PointKey(key, ts),
75+
Key: PointKey(keys.SystemSQLCodec, key, ts),
7476
Value: v,
7577
}
7678
}

0 commit comments

Comments
 (0)