Skip to content

Commit 310981e

Browse files
craig[bot]kev-cao
andcommitted
Merge #148484
148484: restore: add retry loop to AdminSplit in split and scatterer r=msbutler a=kev-cao Previously, restore jobs would fail upon encountering a failed split in the split and scatter processor. However, brief periods of unhealthiness at the KV layer can be expected under high workloads, and failing immediately on a failed spilt is too aggressive. This patch teaches the split and scatter processor to retry the `AdminSplit` before returning an error. Epic: CRDB-50823 Fixes: #148026 Informs: #148027 Release note: Restore will now re-attempt `AdminSplit` KV requests instead of immediately failing and pausing the job. Co-authored-by: Kevin Cao <[email protected]>
2 parents f9a96f4 + ab33240 commit 310981e

File tree

2 files changed

+143
-3
lines changed

2 files changed

+143
-3
lines changed

pkg/backup/generative_split_and_scatter_processor.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,15 @@ import (
3333
"github.com/cockroachdb/cockroach/pkg/util/log"
3434
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
3535
"github.com/cockroachdb/cockroach/pkg/util/randutil"
36+
"github.com/cockroachdb/cockroach/pkg/util/retry"
3637
"github.com/cockroachdb/cockroach/pkg/util/stop"
3738
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
3839
"github.com/cockroachdb/errors"
3940
"github.com/cockroachdb/logtags"
4041
)
4142

4243
const generativeSplitAndScatterProcessorName = "generativeSplitAndScatter"
44+
const maxAdminSplitAttempts = 5
4345

4446
var generativeSplitAndScatterOutputTypes = []*types.T{
4547
types.Bytes, // Span key for the range router
@@ -158,11 +160,23 @@ func (s dbSplitAndScatterer) split(
158160
newSplitKey = splitAt
159161
}
160162
log.VEventf(ctx, 1, "presplitting new key %+v", newSplitKey)
161-
if err := s.db.AdminSplit(ctx, newSplitKey, expirationTime); err != nil {
162-
return errors.Wrapf(err, "splitting key %s", newSplitKey)
163+
retryOpts := retry.Options{
164+
InitialBackoff: 100 * time.Millisecond,
165+
MaxBackoff: 5 * time.Second,
166+
Multiplier: 2,
167+
MaxRetries: maxAdminSplitAttempts,
168+
}
169+
for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
170+
if err = s.db.AdminSplit(ctx, newSplitKey, expirationTime); err != nil {
171+
log.VInfof(
172+
ctx, 1, "attempt %d failed to split at key %s: %v", r.CurrentAttempt(), newSplitKey, err,
173+
)
174+
continue
175+
}
176+
return nil
163177
}
164178

165-
return nil
179+
return errors.Wrapf(err, "retries exhausted for splitting at key %s", newSplitKey)
166180
}
167181

168182
// scatter implements splitAndScatterer.

pkg/backup/generative_split_and_scatter_processor_test.go

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212
"github.com/cockroachdb/cockroach/pkg/base"
1313
"github.com/cockroachdb/cockroach/pkg/jobs"
1414
"github.com/cockroachdb/cockroach/pkg/keys"
15+
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
16+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
1517
"github.com/cockroachdb/cockroach/pkg/roachpb"
1618
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1719
"github.com/cockroachdb/cockroach/pkg/sql"
@@ -26,6 +28,8 @@ import (
2628
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
2729
"github.com/cockroachdb/cockroach/pkg/util/log"
2830
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
31+
"github.com/cockroachdb/cockroach/pkg/util/randutil"
32+
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
2933
"github.com/cockroachdb/errors"
3034
"github.com/stretchr/testify/require"
3135
)
@@ -243,6 +247,128 @@ func TestRunGenerativeSplitAndScatterRandomizedDestOnFailScatter(t *testing.T) {
243247
))
244248
}
245249

250+
func TestGenerativeSplitAndScatterWithAdminSplitFailures(t *testing.T) {
251+
defer leaktest.AfterTest(t)()
252+
defer log.Scope(t).Close(t)
253+
254+
const numAccounts = 1000
255+
rng, seed := randutil.NewPseudoRand()
256+
t.Logf("random seed: %d", seed)
257+
258+
ctx := context.Background()
259+
260+
var mu syncutil.Mutex
261+
allowAdminSplitFailures := false
262+
keySplitFailures := make(map[string]int)
263+
264+
clusterArgs := base.TestClusterArgs{
265+
ServerArgs: base.TestServerArgs{
266+
DefaultTestTenant: base.TestIsSpecificToStorageLayerAndNeedsASystemTenant,
267+
Knobs: base.TestingKnobs{
268+
Store: &kvserver.StoreTestingKnobs{
269+
TestingRequestFilter: func(ctx context.Context, ba *kvpb.BatchRequest) *kvpb.Error {
270+
mu.Lock()
271+
defer mu.Unlock()
272+
if !allowAdminSplitFailures {
273+
return nil
274+
}
275+
for _, req := range ba.Requests {
276+
kvReq := req.GetInner()
277+
if kvReq.Method() == kvpb.AdminSplit {
278+
splitKey := kvReq.Header().Key.String()
279+
nFails := keySplitFailures[splitKey]
280+
if nFails < maxAdminSplitAttempts-1 && rng.Intn(2) == 0 {
281+
keySplitFailures[splitKey]++
282+
return kvpb.NewErrorf("injected admin split failure for testing")
283+
}
284+
}
285+
}
286+
return nil
287+
},
288+
},
289+
},
290+
},
291+
}
292+
tc, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(
293+
t, singleNode, numAccounts, InitManualReplication, clusterArgs,
294+
)
295+
defer cleanupFn()
296+
297+
st := cluster.MakeTestingClusterSettings()
298+
evalCtx := eval.MakeTestingEvalContext(st)
299+
testDiskMonitor := execinfra.NewTestDiskMonitor(ctx, st)
300+
defer testDiskMonitor.Stop(ctx)
301+
302+
s0 := tc.ApplicationLayer(0)
303+
registry := s0.JobRegistry().(*jobs.Registry)
304+
execCfg := s0.ExecutorConfig().(sql.ExecutorConfig)
305+
flowCtx := execinfra.FlowCtx{
306+
Cfg: &execinfra.ServerConfig{
307+
Settings: st,
308+
DB: s0.InternalDB().(descs.DB),
309+
JobRegistry: registry,
310+
ExecutorConfig: &execCfg,
311+
},
312+
EvalCtx: &evalCtx,
313+
Mon: evalCtx.TestingMon,
314+
DiskMonitor: testDiskMonitor,
315+
NodeID: evalCtx.NodeID,
316+
}
317+
318+
sqlDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.file_size = '1'`)
319+
sqlDB.Exec(t, `BACKUP INTO $1`, localFoo)
320+
321+
backups := sqlDB.QueryStr(t, `SHOW BACKUPS IN $1`, localFoo)
322+
require.Equal(t, 1, len(backups))
323+
uri := localFoo + "/" + backups[0][0]
324+
325+
codec := keys.MakeSQLCodec(s0.RPCContext().TenantID)
326+
backupTableDesc := desctestutils.TestingGetPublicTableDescriptor(s0.DB(), codec, "data", "bank")
327+
backupStartKey := backupTableDesc.PrimaryIndexSpan(codec).Key
328+
329+
spec := makeTestingGenerativeSplitAndScatterSpec(
330+
[]string{uri},
331+
[]roachpb.Span{{
332+
Key: backupStartKey,
333+
EndKey: backupStartKey.PrefixEnd(),
334+
}},
335+
)
336+
337+
oldID := backupTableDesc.GetID()
338+
newID := backupTableDesc.GetID() + 1
339+
newDesc := protoutil.Clone(backupTableDesc.TableDesc()).(*descpb.TableDescriptor)
340+
newDesc.ID = newID
341+
tableRekeys := []execinfrapb.TableRekey{
342+
{
343+
OldID: uint32(oldID),
344+
NewDesc: mustMarshalDesc(t, newDesc),
345+
},
346+
}
347+
348+
kr, err := MakeKeyRewriterFromRekeys(keys.SystemSQLCodec, tableRekeys, nil, false)
349+
require.NoError(t, err)
350+
351+
baseSplitScatter := makeSplitAndScatterer(flowCtx.Cfg.DB.KV(), kr)
352+
chunkSplitScatterers := []splitAndScatterer{makeSplitAndScatterer(flowCtx.Cfg.DB.KV(), kr)}
353+
chunkEntrySpliterScatterers := []splitAndScatterer{makeSplitAndScatterer(flowCtx.Cfg.DB.KV(), kr)}
354+
355+
cache := routingDatumCache{
356+
cache: make(map[roachpb.NodeID]rowenc.EncDatum),
357+
}
358+
359+
// Large enough so doneScatterCh never blocks.
360+
doneScatterCh := make(chan entryNode, 1000)
361+
mu.Lock()
362+
allowAdminSplitFailures = true
363+
mu.Unlock()
364+
err = runGenerativeSplitAndScatter(
365+
ctx, &flowCtx, &spec, baseSplitScatter, chunkSplitScatterers,
366+
chunkEntrySpliterScatterers, doneScatterCh, &cache,
367+
)
368+
369+
require.NoError(t, err)
370+
}
371+
246372
// scatterAlwaysFailsSplitScatterer always fails the scatter and returns 0 as
247373
// the chunk destination.
248374
type scatterAlwaysFailsSplitScatterer struct {

0 commit comments

Comments
 (0)