Skip to content

Commit f194f92

Browse files
craig[bot]itsbilalknznvbXiang-Gu
committed
107302: storage: add method to ingest external files, rename IngestExternalFiles r=RaduBerinde a=itsbilal Requires cockroachdb/pebble#2753 This change renames the existing IngestExternalFiles method on storage.Engine to IngestLocalFiles, and adds a new IngestExternalFiles that ingests pebble.ExternalFile, for use with online restore. Depends on cockroachdb/pebble#2753. Epic: none Release note: None 108402: serverutils: remove ad-hoc code from StartNewTestCluster r=yuzefovich a=knz This function is a convenience alias for NewTestCluster+Start. This should not contain custom logic specific to certain tests. Any custom logic should be conditional on testing knobs and put inside `(*testcluster.TestCluster).Start()` instead. (The code removed here was mistakenly added in the wrong place in 70f85cd). Release note: None Needed for cockroachdb#107986. Epic: CRDB-18499 108446: kv: skip TestConstraintConformanceReportIntegration under deadlock r=erikgrinaker a=nvanbenschoten Fixes cockroachdb#108430. This commit avoids flakiness in `TestConstraintConformanceReportIntegration` by skipping the test under deadlock builds. It has been observed to run slowly and flake under stress, and we see the same kinds of behavior under deadlock builds. Release notes: None 108451: schemachanger: Refactor tests for concurrent schema changer behaviors r=Xiang-Gu a=Xiang-Gu 1. It cleans up some redundant tests about concurrent schema changer behavior and refactor in a new simpler, cleaner test 2. It adds an integration style test for testing concurrent schema change behaviors where we run many schema changes for an extended period of time and assert that all of they eventually succeed and the descriptors end up in the expected state. Fix cockroachdb#108140 Fix cockroachdb#107223 Epic: None Release note: None 108492: kv: remove errSavepointInvalidAfterTxnRestart r=knz a=nvanbenschoten This commit simplifies logic in `checkSavepointLocked`. Epic: None Release note: None 108497: sql: don't start default test tenant in MT admin function tests r=yuzefovich a=yuzefovich These tests themselves start multiple tenants, so there is no need to create a default test tenant (doing that also makes it a bit more confusing because the default tenant as well as the first test tenant share the same TenantID effectively making it two SQL pod config, which is confusing). Starting the default test tenant was enabled recently in c899661 when we enabled the CCL license, and we have seen at least one confusing failure that is possibly related to this. Starting the default test tenant was originally added in cfa4375, but I don't see a good reason for it. This PR is opportunistic fix of cockroachdb#108081. Fixes: cockroachdb#108081. Release note: None 108502: kvstreamer: add more assertions to RequestsProvider.enqueue r=yuzefovich a=michae2 If we ever enqueue zero-length requests, it could cause a deadlock where the `workerCoordinator` is waiting for more requests and the enqueuer is waiting for results. Add assertions that we never do this. Informs: cockroachdb#101823 Release note: None 108517: roachtest: pin liveness lease to live node in lease prefs test r=erikgrinaker a=kvoli The lease preferences roachtest could occasionally fail, if the liveness leaseholder were on a stopped node. We should address this issue, for now, pin the liveness lease to a live node to prevent flakes. Informs: cockroachdb#108512 Resolves: cockroachdb#108425 Release note: None Co-authored-by: Bilal Akhtar <[email protected]> Co-authored-by: Raphael 'kena' Poss <[email protected]> Co-authored-by: Nathan VanBenschoten <[email protected]> Co-authored-by: Xiang Gu <[email protected]> Co-authored-by: Yahor Yuzefovich <[email protected]> Co-authored-by: Michael Erickson <[email protected]> Co-authored-by: Austen McClernon <[email protected]>
9 parents c0fbeb3 + d9573d2 + 6f3652a + 5975d2d + c02d1f3 + 7f6f9c3 + b661394 + 1e3cefe + 31b24e5 commit f194f92

22 files changed

+488
-840
lines changed

pkg/cmd/roachtest/tests/lease_preferences.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,14 @@ func runLeasePreferences(
252252
// enforcement.
253253
require.NoError(t, WaitForReplication(ctx, t, conn, spec.replFactor))
254254

255+
// Set a lease preference for the liveness range, to be on n5. This test
256+
// would occasionally fail due to the liveness heartbeat failures, when the
257+
// liveness lease is on a stopped node. This is not ideal behavior, #108512.
258+
configureZone(t, ctx, conn, "RANGE liveness", zoneConfig{
259+
replicas: spec.replFactor,
260+
leaseNode: 5,
261+
})
262+
255263
t.L().Printf("setting lease preferences: %s", spec.preferences)
256264
setLeasePreferences(ctx, spec.preferences)
257265
t.L().Printf("waiting for initial lease preference conformance")

pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go

Lines changed: 11 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
2121
"github.com/cockroachdb/cockroach/pkg/util/uuid"
2222
"github.com/cockroachdb/errors"
23+
"github.com/cockroachdb/redact"
2324
)
2425

2526
// savepoint captures the state in the TxnCoordSender necessary to restore that
@@ -121,15 +122,8 @@ func (tc *TxnCoordSender) RollbackToSavepoint(ctx context.Context, s kv.Savepoin
121122
}
122123

123124
sp := s.(*savepoint)
124-
err := tc.checkSavepointLocked(sp)
125+
err := tc.checkSavepointLocked(sp, "rollback to")
125126
if err != nil {
126-
if errors.Is(err, errSavepointInvalidAfterTxnRestart) {
127-
err = kvpb.NewTransactionRetryWithProtoRefreshError(
128-
"cannot rollback to savepoint after a transaction restart",
129-
tc.mu.txn.ID,
130-
tc.mu.txn,
131-
)
132-
}
133127
return err
134128
}
135129

@@ -165,15 +159,7 @@ func (tc *TxnCoordSender) ReleaseSavepoint(ctx context.Context, s kv.SavepointTo
165159
}
166160

167161
sp := s.(*savepoint)
168-
err := tc.checkSavepointLocked(sp)
169-
if errors.Is(err, errSavepointInvalidAfterTxnRestart) {
170-
err = kvpb.NewTransactionRetryWithProtoRefreshError(
171-
"cannot release savepoint after a transaction restart",
172-
tc.mu.txn.ID,
173-
tc.mu.txn,
174-
)
175-
}
176-
return err
162+
return tc.checkSavepointLocked(sp, "release")
177163
}
178164

179165
type errSavepointOperationInErrorTxn struct{}
@@ -193,23 +179,22 @@ func (tc *TxnCoordSender) assertNotFinalized() error {
193179
return nil
194180
}
195181

196-
var errSavepointInvalidAfterTxnRestart = errors.New("savepoint invalid after transaction restart")
197-
198182
// checkSavepointLocked checks whether the provided savepoint is still valid.
199-
// Returns errSavepointInvalidAfterTxnRestart if the savepoint is not an
183+
// Returns a TransactionRetryWithProtoRefreshError if the savepoint is not an
200184
// "initial" one and the transaction has restarted since the savepoint was
201185
// created.
202-
func (tc *TxnCoordSender) checkSavepointLocked(s *savepoint) error {
186+
func (tc *TxnCoordSender) checkSavepointLocked(s *savepoint, op redact.SafeString) error {
203187
// Only savepoints taken before any activity are allowed to be used after a
204188
// transaction restart.
205189
if s.Initial() {
206190
return nil
207191
}
208-
if s.txnID != tc.mu.txn.ID {
209-
return errSavepointInvalidAfterTxnRestart
210-
}
211-
if s.epoch != tc.mu.txn.Epoch {
212-
return errSavepointInvalidAfterTxnRestart
192+
if s.txnID != tc.mu.txn.ID || s.epoch != tc.mu.txn.Epoch {
193+
return kvpb.NewTransactionRetryWithProtoRefreshError(
194+
redact.Sprintf("cannot %s savepoint after a transaction restart", op),
195+
s.txnID,
196+
tc.mu.txn,
197+
)
213198
}
214199

215200
if s.seqNum < 0 || s.seqNum > tc.interceptorAlloc.txnSeqNumAllocator.writeSeq {

pkg/kv/kvclient/kvstreamer/requests_provider.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,9 @@ func (p *outOfOrderRequestsProvider) enqueue(requests []singleRangeBatch) {
264264
if len(p.requests) > 0 {
265265
panic(errors.AssertionFailedf("outOfOrderRequestsProvider has old requests in enqueue"))
266266
}
267+
if len(requests) == 0 {
268+
panic(errors.AssertionFailedf("outOfOrderRequestsProvider enqueuing zero requests"))
269+
}
267270
p.requests = requests
268271
p.hasWork.Signal()
269272
}
@@ -388,6 +391,9 @@ func (p *inOrderRequestsProvider) enqueue(requests []singleRangeBatch) {
388391
if len(p.requests) > 0 {
389392
panic(errors.AssertionFailedf("inOrderRequestsProvider has old requests in enqueue"))
390393
}
394+
if len(requests) == 0 {
395+
panic(errors.AssertionFailedf("inOrderRequestsProvider enqueuing zero requests"))
396+
}
391397
p.requests = requests
392398
p.heapInit()
393399
p.hasWork.Signal()

pkg/kv/kvserver/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ go_library(
224224
"@com_github_cockroachdb_logtags//:logtags",
225225
"@com_github_cockroachdb_pebble//:pebble",
226226
"@com_github_cockroachdb_pebble//objstorage",
227+
"@com_github_cockroachdb_pebble//objstorage/remote",
227228
"@com_github_cockroachdb_pebble//vfs",
228229
"@com_github_cockroachdb_redact//:redact",
229230
"@com_github_gogo_protobuf//proto",

pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1249,7 +1249,7 @@ func TestEvalAddSSTable(t *testing.T) {
12491249
} else {
12501250
require.NotNil(t, result.Replicated.AddSSTable)
12511251
require.NoError(t, fs.WriteFile(engine, "sst", result.Replicated.AddSSTable.Data))
1252-
require.NoError(t, engine.IngestExternalFiles(ctx, []string{"sst"}))
1252+
require.NoError(t, engine.IngestLocalFiles(ctx, []string{"sst"}))
12531253
}
12541254

12551255
var expect kvs
@@ -1652,7 +1652,7 @@ func TestAddSSTableMVCCStats(t *testing.T) {
16521652
require.NoError(t, err)
16531653

16541654
require.NoError(t, fs.WriteFile(engine, "sst", sst))
1655-
require.NoError(t, engine.IngestExternalFiles(ctx, []string{"sst"}))
1655+
require.NoError(t, engine.IngestLocalFiles(ctx, []string{"sst"}))
16561656

16571657
statsEvaled := statsBefore
16581658
statsEvaled.Add(*cArgs.Stats)

pkg/kv/kvserver/replica_proposal.go

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ import (
3737
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
3838
"github.com/cockroachdb/cockroach/pkg/util/tracing"
3939
"github.com/cockroachdb/errors"
40+
"github.com/cockroachdb/pebble"
41+
"github.com/cockroachdb/pebble/objstorage/remote"
4042
"github.com/cockroachdb/redact"
4143
"github.com/kr/pretty"
4244
"golang.org/x/time/rate"
@@ -628,19 +630,32 @@ func addSSTablePreApply(
628630
sst.Span,
629631
sst.RemoteFileLoc,
630632
)
631-
// TODO(bilal): replace this with the real ingest.
632-
/*
633-
start := storage.EngineKey{Key: sst.Span.Key}
634-
end := storage.EngineKey{Key: sst.Span.EndKey}
635-
636-
externalFile := pebble.ExternalFile{
637-
Locator: shared.Locator(sst.RemoteFileLoc),
638-
ObjName: sst.RemoteFilePath,
639-
Size: sst.BackingFileSize,
640-
SmallestUserKey: start.Encode(),
641-
LargestUserKey: end.Encode(),
642-
}*/
643-
log.Fatalf(ctx, "Unsupported IngestRemoteFile")
633+
start := storage.EngineKey{Key: sst.Span.Key}
634+
end := storage.EngineKey{Key: sst.Span.EndKey}
635+
externalFile := pebble.ExternalFile{
636+
Locator: remote.Locator(sst.RemoteFileLoc),
637+
ObjName: sst.RemoteFilePath,
638+
Size: sst.BackingFileSize,
639+
SmallestUserKey: start.Encode(),
640+
LargestUserKey: end.Encode(),
641+
}
642+
tBegin := timeutil.Now()
643+
defer func() {
644+
if dur := timeutil.Since(tBegin); dur > addSSTPreApplyWarn.threshold && addSSTPreApplyWarn.ShouldLog() {
645+
log.Infof(ctx,
646+
"ingesting SST of size %s at index %d took %.2fs",
647+
humanizeutil.IBytes(int64(len(sst.Data))), index, dur.Seconds(),
648+
)
649+
}
650+
}()
651+
652+
_, ingestErr := env.eng.IngestExternalFiles(ctx, []pebble.ExternalFile{externalFile})
653+
if ingestErr != nil {
654+
log.Fatalf(ctx, "while ingesting %s: %v", sst.RemoteFilePath, ingestErr)
655+
}
656+
// Adding without modification succeeded, no copy necessary.
657+
log.Eventf(ctx, "ingested SSTable at index %d, term %d: external %s", index, term, sst.RemoteFilePath)
658+
return false
644659
}
645660
checksum := util.CRC32(sst.Data)
646661

@@ -685,7 +700,7 @@ func addSSTablePreApply(
685700
}
686701

687702
// Regular path - we made a hard link, so we can ingest the hard link now.
688-
ingestErr := env.eng.IngestExternalFiles(ctx, []string{ingestPath})
703+
ingestErr := env.eng.IngestLocalFiles(ctx, []string{ingestPath})
689704
if ingestErr != nil {
690705
log.Fatalf(ctx, "while ingesting %s: %v", ingestPath, ingestErr)
691706
}
@@ -726,7 +741,7 @@ func ingestViaCopy(
726741
if err := kvserverbase.WriteFileSyncing(ctx, ingestPath, sst.Data, eng, 0600, st, limiter); err != nil {
727742
return errors.Wrapf(err, "while ingesting %s", ingestPath)
728743
}
729-
if err := eng.IngestExternalFiles(ctx, []string{ingestPath}); err != nil {
744+
if err := eng.IngestLocalFiles(ctx, []string{ingestPath}); err != nil {
730745
return errors.Wrapf(err, "while ingesting %s", ingestPath)
731746
}
732747
log.Eventf(ctx, "ingested SSTable at index %d, term %d: %s", index, term, ingestPath)

pkg/kv/kvserver/replica_raftstorage.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -578,7 +578,7 @@ func (r *Replica) applySnapshot(
578578
// TODO: separate ingestions for log and statemachine engine. See:
579579
//
580580
// https://github.com/cockroachdb/cockroach/issues/93251
581-
r.store.TODOEngine().IngestExternalFilesWithStats(ctx, inSnap.SSTStorageScratch.SSTs()); err != nil {
581+
r.store.TODOEngine().IngestLocalFilesWithStats(ctx, inSnap.SSTStorageScratch.SSTs()); err != nil {
582582
return errors.Wrapf(err, "while ingesting %s", inSnap.SSTStorageScratch.SSTs())
583583
}
584584
if r.store.cfg.KVAdmissionController != nil {

pkg/kv/kvserver/reports/reporter_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ func TestConstraintConformanceReportIntegration(t *testing.T) {
4848
// don't make progress.
4949
skip.UnderStressRace(t)
5050
skip.UnderRace(t, "takes >1min under race")
51+
// Similarly, skip the test under deadlock builds.
52+
skip.UnderDeadlock(t, "takes >1min under deadlock")
5153

5254
ctx := context.Background()
5355
tc := serverutils.StartNewTestCluster(t, 5, base.TestClusterArgs{

pkg/sql/create_as_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,7 @@ AND status != 'succeeded'`
325325

326326
func TestFormat(t *testing.T) {
327327
defer leaktest.AfterTest(t)()
328+
defer log.Scope(t).Close(t)
328329

329330
testCases := []struct {
330331
sql string

pkg/sql/create_function_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ SELECT nextval(105:::REGCLASS);`,
151151

152152
func TestVersionGatingUDFInCheckConstraints(t *testing.T) {
153153
defer leaktest.AfterTest(t)()
154+
defer log.Scope(t).Close(t)
154155

155156
t.Run("new_schema_changer_version_enabled", func(t *testing.T) {
156157
params, _ := createTestServerParams()

0 commit comments

Comments
 (0)