Skip to content

Commit 69a0bf1

Browse files
craig[bot]pav-kvwenyihu6dt
committed
150168: kvstorage: revise replica deletion plan r=tbg a=pav-kv This PR modifies the plan for `CreateUninitializedReplica`, `DestroyReplica` and `applySnapshot`. The `LogID` concept is no longer needed, and the snapshot transitions can retain the old raft log up to `RaftAppliedIndex` since the snapshot is always created at a higher index. Epic: CRDB-49111 150835: kvserver: add mmaStoreRebalancer r=tbg,sumeerbhola a=wenyihu6 Epic: CRDB-25222 Release note: none --- **kvserver: add MMARangeLoad** Previously, we lacked a utility method for mma to inspect range load and convert it to a mmaprototype.RangeLoad. This commit adds MMARangeLoad(), which builds a mmaprototype.RangeLoad from a replica’s LoadStats. It is currently unused; future commits will use it to populate the range message field for MMA input. --- **kvserver: add mmaRangeMessageNeeded** Previously, there was no struct under Replica to track state changes and signal mma when an up-to-date range message with full information was needed. This commit introduces such a struct - mmaRangeMessageNeeded. It is marked as needed when any of the following occur: replica initialization, span config changes, leaseholder changes, or range descriptor updates. --- **kvserver: add more methods to mmaRangeMessageNeeded** Previously, mmaRangeMessageNeeded was introduced to track when a replica state change required sending updated information. This commit adds methods to mmaRangeMessageNeeded to help mma decide whether a new range message with full information is needed. Since processing full updates can be expensive, mma sends them only when necessary. In addition to tracking state changes, the struct now also tracks the content of the last full info range message sent to determine if full information is necessary. --- **kvserver: add store.MakeStoreLeaseholderMsg** Previously, mma lacked a method to retrieve range messages for all leaseholder replicas on a store. This commit adds such a method to Store, which iterates over all replicas and constructs range message for leaseholder replicas. Note that it is currently left unused. mma will use this as input for computing rebalancing decisions in future commits. --- **kvserver: add mmaStoreRebalancer** This commit introduces mma_store_rebalancer.go, which defines the main struct mmaStoreRebalancer and its basic methods for computing and applying rebalancing changes. It is currently unplumbed; future commits will wire it into each store and start a background goroutine to perform rebalancing periodically. --- **kvserver: plumb mmaStoreRebalancer** Previously, mmaStoreRebalancer was unplumbed. This commit initializes it per Store. It also constructs and plumbs a shared MMAAllocator per node, which is shared by all stores' mmaStoreRebalancer on the same node. --- **kvserver: use RegisterCallback for mma** Previously, we extended RegisterCallback to include origTimestampNanos in the callback arguments. This commit puts it to use for MMA by registering a callback in NewServer that notifies the MMA allocator of stores and updated store load messages. This callback is triggered when the store descriptor is gossiped via StoreGossip, either periodically or due to significant store capacity changes. 151058: backup: fix external storage error propagation r=dt a=dt See #151050. Fixes #151050. Release note (bug fix): fix a bug that could cause some error returned by attempts to upload backup data to external storage providers to be undetected, potentially causing incomplete backups. Epic: none. Co-authored-by: Pavel Kalinnikov <[email protected]> Co-authored-by: wenyihu6 <[email protected]> Co-authored-by: David Taylor <[email protected]>
4 parents fe8f9ab + 5d082df + d2c54cd + 673c46a commit 69a0bf1

20 files changed

+641
-52
lines changed

pkg/backup/backup_processor.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,14 @@ type spanAndTime struct {
292292
finishesSpec bool
293293
}
294294

295+
type errInjectingStorage struct {
296+
cloud.ExternalStorage
297+
}
298+
299+
func (e errInjectingStorage) Writer(_ context.Context, _ string) (io.WriteCloser, error) {
300+
return nil, errors.New("injected error")
301+
}
302+
295303
func runBackupProcessor(
296304
ctx context.Context,
297305
flowCtx *execinfra.FlowCtx,
@@ -393,12 +401,19 @@ func runBackupProcessor(
393401
Settings: &flowCtx.Cfg.Settings.SV,
394402
ElideMode: spec.ElidePrefix,
395403
}
404+
396405
storage, err := flowCtx.Cfg.ExternalStorage(ctx, dest, cloud.WithClientName("backup"))
397406
if err != nil {
398407
return err
399408
}
400409
defer logClose(ctx, storage, "external storage")
401410

411+
if backupKnobs, ok := flowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok {
412+
if fn := backupKnobs.InjectErrorsInBackupRowDataStorage; fn != nil && fn() {
413+
storage = errInjectingStorage{storage}
414+
}
415+
}
416+
402417
// Start start a group of goroutines which each pull spans off of `todo` and
403418
// send export requests. Any spans that encounter lock conflict errors during
404419
// Export are put back on the todo queue for later processing.
@@ -683,7 +698,7 @@ func runBackupProcessor(
683698
var writeErr error
684699
resumeSpan.span.Key, writeErr = sink.Write(ctx, ret)
685700
if writeErr != nil {
686-
return err
701+
return writeErr
687702
}
688703
}
689704
// Emit the stats for the processed ExportRequest.

pkg/backup/backup_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6322,6 +6322,36 @@ func TestPublicIndexTableSpans(t *testing.T) {
63226322
}
63236323
}
63246324

6325+
// TestBackupStorageErrorPropagates ensures that errors from writing to storage
6326+
// propagate correctly during a backup operation.
6327+
func TestBackupStorageErrorPropagates(t *testing.T) {
6328+
defer leaktest.AfterTest(t)()
6329+
defer log.Scope(t).Close(t)
6330+
6331+
const numAccounts = 1000
6332+
6333+
var fail atomic.Bool
6334+
6335+
params := base.TestClusterArgs{}
6336+
knobs := base.TestingKnobs{
6337+
DistSQL: &execinfra.TestingKnobs{BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{
6338+
InjectErrorsInBackupRowDataStorage: func() bool { return fail.Load() },
6339+
}},
6340+
}
6341+
params.ServerArgs.Knobs = knobs
6342+
6343+
tc, _, _, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, InitManualReplication, params)
6344+
defer cleanupFn()
6345+
db := tc.ServerConn(0)
6346+
runner := sqlutils.MakeSQLRunner(db)
6347+
6348+
runner.Exec(t, "BACKUP DATABASE data INTO 'nodelocal://1/success'")
6349+
runner.Exec(t, "RESTORE DATABASE data FROM LATEST IN 'nodelocal://1/success' WITH new_db_name = 'restored'")
6350+
runner.CheckQueryResults(t, "SELECT count(*) FROM restored.bank", [][]string{{"1000"}})
6351+
fail.Store(true)
6352+
runner.ExpectErr(t, "injected", "BACKUP DATABASE data INTO 'nodelocal://1/failure'")
6353+
}
6354+
63256355
// TestRestoreJobErrorPropagates ensures that errors from creating the job
63266356
// record propagate correctly.
63276357
func TestRestoreErrorPropagates(t *testing.T) {

pkg/kv/kvserver/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ go_library(
2020
"merge_queue.go",
2121
"metric_rules.go",
2222
"metrics.go",
23+
"mma_replica_store.go",
24+
"mma_store_rebalancer.go",
2325
"mvcc_gc_queue.go",
2426
"queue.go",
2527
"queue_helpers_testutil.go",
@@ -130,6 +132,7 @@ go_library(
130132
"//pkg/kv/kvserver/allocator",
131133
"//pkg/kv/kvserver/allocator/allocatorimpl",
132134
"//pkg/kv/kvserver/allocator/load",
135+
"//pkg/kv/kvserver/allocator/mmaprototype",
133136
"//pkg/kv/kvserver/allocator/plan",
134137
"//pkg/kv/kvserver/allocator/storepool",
135138
"//pkg/kv/kvserver/apply",
@@ -170,6 +173,7 @@ go_library(
170173
"//pkg/kv/kvserver/raftentry",
171174
"//pkg/kv/kvserver/raftlog",
172175
"//pkg/kv/kvserver/rafttrace",
176+
"//pkg/kv/kvserver/raftutil",
173177
"//pkg/kv/kvserver/rangefeed",
174178
"//pkg/kv/kvserver/rditer",
175179
"//pkg/kv/kvserver/readsummary",

pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1201,15 +1201,15 @@ func (cs *clusterState) processStoreLeaseholderMsgInternal(
12011201
rs, ok := cs.ranges[rangeMsg.RangeID]
12021202
if !ok {
12031203
// This is the first time we've seen this range.
1204-
if !rangeMsg.Populated {
1204+
if !rangeMsg.MaybeSpanConfIsPopulated {
12051205
panic(errors.AssertionFailedf("rangeMsg for new range r%v is not populated", rangeMsg.RangeID))
12061206
}
12071207
rs = newRangeState(msg.StoreID)
12081208
cs.ranges[rangeMsg.RangeID] = rs
12091209
} else if rs.localRangeOwner != msg.StoreID {
12101210
rs.localRangeOwner = msg.StoreID
12111211
}
1212-
if !rangeMsg.Populated {
1212+
if !rangeMsg.MaybeSpanConfIsPopulated {
12131213
// When there are no pending changes, confirm that the membership state
12141214
// is consistent. If not, fall through and make it consistent. We have
12151215
// seen an example where AdjustPendingChangesDisposition lied about
@@ -1243,7 +1243,7 @@ func (cs *clusterState) processStoreLeaseholderMsgInternal(
12431243
// Set the range state and store state to match the range message state
12441244
// initially. The pending changes which are not enacted in the range
12451245
// message are handled and added back below.
1246-
if rangeMsg.Populated {
1246+
if rangeMsg.MaybeSpanConfIsPopulated {
12471247
rs.load = rangeMsg.RangeLoad
12481248
}
12491249
for _, replica := range rs.replicas {
@@ -1342,8 +1342,8 @@ func (cs *clusterState) processStoreLeaseholderMsgInternal(
13421342
}
13431343
}
13441344
}
1345-
if rangeMsg.Populated {
1346-
normSpanConfig, err := makeNormalizedSpanConfig(&rangeMsg.Conf, cs.constraintMatcher.interner)
1345+
if rangeMsg.MaybeSpanConfIsPopulated {
1346+
normSpanConfig, err := makeNormalizedSpanConfig(&rangeMsg.MaybeSpanConf, cs.constraintMatcher.interner)
13471347
if err != nil {
13481348
// TODO(kvoli): Should we log as a warning here, or return further back out?
13491349
panic(err)

pkg/kv/kvserver/allocator/mmaprototype/cluster_state_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ func parseStoreLeaseholderMsg(t *testing.T, in string) StoreLeaseholderMsg {
106106
tryAppendRangeMsg := func() {
107107
if rMsg.RangeID != 0 {
108108
if notPopulatedOverride {
109-
rMsg.Populated = false
109+
rMsg.MaybeSpanConfIsPopulated = false
110110
}
111111
msg.Ranges = append(msg.Ranges, rMsg)
112112
rMsg = RangeMsg{RangeID: 0}
@@ -124,17 +124,17 @@ func parseStoreLeaseholderMsg(t *testing.T, in string) StoreLeaseholderMsg {
124124
rMsg.RangeID = roachpb.RangeID(parseInt(t, parts[1]))
125125
case "load":
126126
rMsg.RangeLoad.Load = parseLoadVector(t, parts[1])
127-
rMsg.Populated = true
127+
rMsg.MaybeSpanConfIsPopulated = true
128128
case "raft-cpu":
129129
rMsg.RangeLoad.RaftCPU = LoadValue(parseInt(t, parts[1]))
130-
rMsg.Populated = true
130+
rMsg.MaybeSpanConfIsPopulated = true
131131
case "not-populated":
132132
notPopulatedOverride = true
133133
}
134134
}
135135
} else if strings.HasPrefix(line, "config=") {
136-
rMsg.Conf = spanconfigtestutils.ParseZoneConfig(t, strings.TrimPrefix(line, "config=")).AsSpanConfig()
137-
rMsg.Populated = true
136+
rMsg.MaybeSpanConf = spanconfigtestutils.ParseZoneConfig(t, strings.TrimPrefix(line, "config=")).AsSpanConfig()
137+
rMsg.MaybeSpanConfIsPopulated = true
138138
} else {
139139
var repl StoreIDAndReplicaState
140140
fields := strings.Fields(line)
@@ -158,7 +158,7 @@ func parseStoreLeaseholderMsg(t *testing.T, in string) StoreLeaseholderMsg {
158158
}
159159
}
160160
rMsg.Replicas = append(rMsg.Replicas, repl)
161-
rMsg.Populated = true
161+
rMsg.MaybeSpanConfIsPopulated = true
162162
}
163163
}
164164
tryAppendRangeMsg()

pkg/kv/kvserver/allocator/mmaprototype/messages.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,20 +43,20 @@ type StoreLeaseholderMsg struct {
4343

4444
// RangeMsg is generated by the leaseholder store (and part of
4545
// StoreLeaseholderMsg). If there is any change for that range, the full
46-
// information for that range is provided (and Populated is set to true). This
47-
// is also the case for a new leaseholder since it does not know whether
48-
// something has changed since the last leaseholder informed the allocator. A
49-
// tiny change to the RangeLoad (decided by the caller) will not cause the
50-
// fields to be populated.
46+
// information for that range is provided (and MaybeSpanConfIsPopulated is set
47+
// to true). This is also the case for a new leaseholder since it does not know
48+
// whether something has changed since the last leaseholder informed the
49+
// allocator. A tiny change to the RangeLoad (decided by the caller) will not
50+
// cause the fields to be populated.
5151
//
5252
// To ensure that the allocator does not lose synchronization with the current
5353
// set of replicas, due to spurious changes (we had one undiagnosed example
5454
// where the allocator was spuriously told that a lease was transferred away),
5555
// the Replicas field is always populated).
5656
type RangeMsg struct {
5757
roachpb.RangeID
58-
Replicas []StoreIDAndReplicaState
59-
Populated bool
60-
Conf roachpb.SpanConfig
61-
RangeLoad RangeLoad
58+
Replicas []StoreIDAndReplicaState
59+
MaybeSpanConfIsPopulated bool
60+
MaybeSpanConf roachpb.SpanConfig
61+
RangeLoad RangeLoad
6262
}

pkg/kv/kvserver/asim/mmaintegration/mma_integration.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,11 @@ func MakeStoreLeaseholderMsgFromState(
7777
rl.RaftCPU = mmaprototype.LoadValue(load.RaftCPUNanosPerSecond)
7878

7979
rangeMessages = append(rangeMessages, mmaprototype.RangeMsg{
80-
RangeID: roachpb.RangeID(replica.Range()),
81-
Populated: true,
82-
Replicas: replicas,
83-
Conf: *rng.SpanConfig(),
84-
RangeLoad: rl,
80+
RangeID: roachpb.RangeID(replica.Range()),
81+
MaybeSpanConfIsPopulated: true,
82+
Replicas: replicas,
83+
MaybeSpanConf: *rng.SpanConfig(),
84+
RangeLoad: rl,
8585
})
8686
}
8787

pkg/kv/kvserver/kvstorage/destroy.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -95,16 +95,17 @@ func ClearRangeData(
9595
// writes.
9696
//
9797
// 1. Log storage write (durable):
98-
// 1.1. Write WAG node with the state machine mutation (2).
98+
// 1.1. WAG: apply to RaftAppliedIndex.
99+
// 1.2. WAG: apply mutation (2).
99100
// 2. State machine mutation:
100101
// 2.1. Clear RangeID-local un-/replicated state.
101102
// 2.2. (optional) Clear replicated MVCC span.
102-
// 2.3. Write RangeTombstone with next ReplicaID/LogID.
103+
// 2.3. Write RangeTombstone with next ReplicaID.
103104
// 3. Log engine GC (after state machine mutation 2 is durably applied):
104-
// 3.1. Remove previous LogID.
105+
// 3.1. Remove raft state.
105106
//
106-
// TODO(sep-raft-log): support the status quo in which 1+2+3 is written
107-
// atomically, and 1.1 is not written.
107+
// TODO(sep-raft-log): support the status quo in which 2+3 is written
108+
// atomically, and 1 is not written.
108109
const DestroyReplicaTODO = 0
109110

110111
// DestroyReplica destroys all or a part of the Replica's state, installing a
@@ -130,7 +131,7 @@ func DestroyReplica(
130131
if diskReplicaID.ReplicaID >= nextReplicaID {
131132
return errors.AssertionFailedf("replica r%d/%d must not survive its own tombstone", rangeID, diskReplicaID)
132133
}
133-
_ = DestroyReplicaTODO // 2.1 + 3.1 + 2.2
134+
_ = DestroyReplicaTODO // 2.1 + 2.2 + 3.1
134135
if err := ClearRangeData(ctx, rangeID, reader, writer, opts); err != nil {
135136
return err
136137
}

pkg/kv/kvserver/kvstorage/replica_state.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ func (r LoadedReplicaState) check(storeID roachpb.StoreID) error {
111111
// 1. Log storage write (durable):
112112
// 1.1. Write WAG node with the state machine mutation (2).
113113
// 2. State machine mutation:
114-
// 2.1. Write RaftReplicaID with the new ReplicaID/LogID.
114+
// 2.1. Write the new RaftReplicaID.
115115
//
116116
// TODO(sep-raft-log): support the status quo in which only 2.1 is written.
117117
const CreateUninitReplicaTODO = 0

0 commit comments

Comments
 (0)