Skip to content

Commit 1e8ab10

Browse files
craig[bot]kev-caodt
committed
149075: backup: create tests for simulating backup compactions r=jeffswenson a=kev-cao This commit adds a test for simulating the compaction heuristic policy under various types of workloads and outputs the size of the resulting compaction compared against the size of the chain it compacts. Epic: CRDB-48786 Fixes: #148378 Release note: None 149472: jobs: make GC of old jobs txn'l r=dt a=dt A single logical job is now made up of many rows stored in many different tables, meaning that deletion of a job from the system should imply complete deletion of all of its rows in all of these tables. Previously we were not using a single txn to delete from all of the tables, meaning a job could have its row deleted from e.g. system.jobs but not from job_status, orphaning the status row. These orphan rows were not particularly problematic but are cruft which could build up over time as nothing would remove them once the main record in system.jobs was removed. Instead, we now identify all expired jobs in system jobs which we will delete using one txn, then delete each job in a txn that deletes all rows for that job in all tables. Release note: none. Epic: CRDB-51121. Fixes: #147552. 149473: sql: skip vtable for control ALL X JOBS r=dt a=dt The PAUSE/RESUME/CANCEL ALL X JOBS syntax previously decoded every job payload to see if the payload had a field corresponding to X to find jobs of that type. Instead it can just find those jobs using the indexed job_type column in system.jobs. Release note: none. Epic: https://cockroachlabs.atlassian.net/browse/CRDB-48791. Co-authored-by: Kevin Cao <[email protected]> Co-authored-by: David Taylor <[email protected]>
4 parents 124aee2 + aea2f75 + 1868efe + 545e0fb commit 1e8ab10

File tree

6 files changed

+396
-115
lines changed

6 files changed

+396
-115
lines changed

pkg/backup/compaction_policy.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ var (
2727
)
2828
)
2929

30+
// compactionPolicy is a function that determines what backups to compact when
31+
// given a chain of backups. It returns the inclusive start and exclusive end of
32+
// the window of backups to compact, as well as an error if one occurs.
33+
type compactionPolicy func(context.Context, *sql.ExecutorConfig, []backuppb.BackupManifest) (int, int, error)
34+
3035
// minSizeDeltaHeuristic is a heuristic that selects a window of backups with the
3136
// smallest delta in data size between each backup.
3237
func minSizeDeltaHeuristic(

pkg/backup/compaction_policy_test.go

Lines changed: 359 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,19 @@ package backup
77

88
import (
99
"context"
10+
"math/rand"
11+
"slices"
1012
"testing"
1113

1214
"github.com/cockroachdb/cockroach/pkg/backup/backuppb"
15+
"github.com/cockroachdb/cockroach/pkg/roachpb"
1316
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1417
"github.com/cockroachdb/cockroach/pkg/sql"
18+
"github.com/cockroachdb/cockroach/pkg/util"
1519
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
1620
"github.com/cockroachdb/cockroach/pkg/util/log"
21+
"github.com/cockroachdb/cockroach/pkg/util/randutil"
22+
"github.com/cockroachdb/errors"
1723
"github.com/stretchr/testify/require"
1824
)
1925

@@ -71,3 +77,356 @@ func TestBackupCompactionHeuristic(t *testing.T) {
7177
require.Error(t, err)
7278
})
7379
}
80+
81+
func TestSimulateCompactionPolicy(t *testing.T) {
82+
defer leaktest.AfterTest(t)()
83+
defer log.Scope(t).Close(t)
84+
85+
ctx := context.Background()
86+
rng, seed := randutil.NewPseudoRand()
87+
t.Logf("random seed: %d", seed)
88+
policies := map[string]compactionPolicy{
89+
"min size delta": minSizeDeltaHeuristic,
90+
}
91+
92+
appendOnly := randomWorkload{updateProbability: 0.0}
93+
updateOnly := randomWorkload{updateProbability: 1.0}
94+
evenWorkload := randomWorkload{updateProbability: 0.5}
95+
96+
testcases := []struct {
97+
name string
98+
factory *fakeBackupChainFactory
99+
windowSize int64
100+
}{
101+
{
102+
name: "append-only workload",
103+
factory: newFakeBackupChainFactory(t, rng, 100).
104+
AddWorkload(newWorkloadCfg(appendOnly).Backups(10).Keys(20)),
105+
},
106+
{
107+
name: "update-only workload",
108+
factory: newFakeBackupChainFactory(t, rng, 100).
109+
AddWorkload(newWorkloadCfg(updateOnly).Backups(10).Keys(20)),
110+
},
111+
{
112+
name: "mixed workload",
113+
factory: newFakeBackupChainFactory(t, rng, 100).
114+
AddWorkload(newWorkloadCfg(evenWorkload).Backups(10).Keys(20)),
115+
},
116+
{
117+
name: "update same small keyspace workload",
118+
factory: newFakeBackupChainFactory(t, rng, 100).AddWorkload(
119+
newWorkloadCfg(appendOnly).Backups(1).Keys(20),
120+
newWorkloadCfg(updateSameKeysWorkload{}).Backups(10).Keys(15),
121+
),
122+
},
123+
{
124+
name: "append-only workload with varying size update-only workload over small keyspace",
125+
factory: newFakeBackupChainFactory(t, rng, 100).AddWorkload(
126+
newWorkloadCfg(appendOnly).Backups(3).Keys(20),
127+
newWorkloadCfg(updateOnly).Backups(1).Keys(10),
128+
newWorkloadCfg(updateOnly).Backups(1).Keys(15),
129+
newWorkloadCfg(updateOnly).Backups(1).Keys(20),
130+
),
131+
},
132+
}
133+
for policyName, policy := range policies {
134+
for _, tc := range testcases {
135+
t.Run(tc.name+" with policy "+policyName, func(t *testing.T) {
136+
if tc.windowSize <= 0 {
137+
tc.windowSize = backupCompactionWindow.Default()
138+
}
139+
st := cluster.MakeTestingClusterSettings()
140+
backupCompactionWindow.Override(ctx, &st.SV, tc.windowSize)
141+
execCfg := &sql.ExecutorConfig{Settings: st}
142+
143+
chain := tc.factory.CreateBackupChain()
144+
compacted, chain, err := chain.Compact(t, ctx, execCfg, policy)
145+
require.NoError(t, err)
146+
147+
t.Logf(
148+
"%s:\n\tchain size: %d\n\tlast backup size: %d\n\tcompacted size: %d",
149+
tc.name, chain.Size(), chain[len(chain)-1].Size(), compacted.Size(),
150+
)
151+
})
152+
}
153+
}
154+
}
155+
156+
// We choose an integer as a fake key type since it is quickest to generate new
157+
// unique keys for testing purposes.
158+
type fakeKey int64
159+
160+
// fakeBackupChainFactory is a factory that iterates over a set of workloads to
161+
// create a backup chain.
162+
type fakeBackupChainFactory struct {
163+
t *testing.T
164+
rng *rand.Rand
165+
initialKeySpace int // The number of keys in the initial full backup
166+
workloads []*fakeWorkloadCfg // Maps a workload to the number of backups it has
167+
}
168+
169+
func newFakeBackupChainFactory(
170+
t *testing.T, rng *rand.Rand, initialKeySpace int,
171+
) *fakeBackupChainFactory {
172+
if initialKeySpace <= 0 {
173+
t.Fatalf("initial key space must be greater than zero, got: %d", initialKeySpace)
174+
}
175+
return &fakeBackupChainFactory{
176+
t: t,
177+
rng: rng,
178+
initialKeySpace: initialKeySpace,
179+
}
180+
}
181+
182+
// AddWorkload adds a workload configuration to the factory.
183+
func (f *fakeBackupChainFactory) AddWorkload(
184+
workloadCfg ...*fakeWorkloadCfg,
185+
) *fakeBackupChainFactory {
186+
f.workloads = append(f.workloads, workloadCfg...)
187+
return f
188+
}
189+
190+
// CreateBackupChain generates a fake backup chain based on the configured
191+
// workloads.
192+
func (f *fakeBackupChainFactory) CreateBackupChain() fakeBackupChain {
193+
if f.rng == nil {
194+
var seed int64
195+
f.rng, seed = randutil.NewPseudoRand()
196+
f.t.Logf("no rng specified, using random seed: %d", seed)
197+
}
198+
var totalBackups int
199+
for _, workload := range f.workloads {
200+
totalBackups += workload.numBackups
201+
}
202+
203+
chain := make(fakeBackupChain, 0, totalBackups+1)
204+
chain = append(chain, f.initializeFullBackup())
205+
206+
for _, workload := range f.workloads {
207+
for range workload.numBackups {
208+
chain = append(
209+
chain,
210+
workload.workload.CreateBackup(f.rng, chain, workload.numKeys),
211+
)
212+
}
213+
}
214+
return chain
215+
}
216+
217+
// initializeFullBackup creates a full backup with the initial key space defined
218+
// in the factory.
219+
func (f *fakeBackupChainFactory) initializeFullBackup() fakeBackup {
220+
backup := newFakeBackup()
221+
key := fakeKey(0)
222+
for range f.initialKeySpace {
223+
backup.AddKey(key)
224+
key++
225+
}
226+
return backup
227+
}
228+
229+
// fakeWorkloadCfg specifies the number of backups and keys per backup for a
230+
// specific workload to create.
231+
type fakeWorkloadCfg struct {
232+
workload fakeWorkload // The workload to be added
233+
numBackups int // The number of backups to create for this workload
234+
numKeys int // The number of keys to write in each backup
235+
}
236+
237+
func newWorkloadCfg(workload fakeWorkload) *fakeWorkloadCfg {
238+
return &fakeWorkloadCfg{
239+
workload: workload,
240+
}
241+
}
242+
243+
// Backups sets the number of backups to create for the workload.
244+
func (c *fakeWorkloadCfg) Backups(count int) *fakeWorkloadCfg {
245+
c.numBackups = count
246+
return c
247+
}
248+
249+
// Keys sets the number of keys to write in each backup created by the workload.
250+
func (c *fakeWorkloadCfg) Keys(count int) *fakeWorkloadCfg {
251+
c.numKeys = count
252+
return c
253+
}
254+
255+
// fakeWorkload is an interface that defines a method to create a backup based
256+
// on a given workload. It generates a backup based on the provided backup
257+
// chain and the number of keys to write.
258+
type fakeWorkload interface {
259+
CreateBackup(*rand.Rand, fakeBackupChain, int) fakeBackup
260+
}
261+
262+
// A workload that will randomly append or update keys in a backup.
263+
type randomWorkload struct {
264+
updateProbability float64 // 0 for append-only, 1 for update-only
265+
}
266+
267+
func (w randomWorkload) CreateBackup(
268+
rng *rand.Rand, chain fakeBackupChain, numKeys int,
269+
) fakeBackup {
270+
backup := newFakeBackup()
271+
allKeys := chain.AllKeys()
272+
if len(allKeys) == 0 {
273+
return backup
274+
}
275+
276+
// Store keys that can be updated to avoid duplicate updates.
277+
updateableKeys := allKeys[:]
278+
279+
for range numKeys {
280+
if rng.Float64() < w.updateProbability && len(updateableKeys) > 0 {
281+
randIdx := rng.Intn(len(updateableKeys))
282+
randomKey := updateableKeys[randIdx]
283+
updateableKeys = slices.Delete(updateableKeys, randIdx, randIdx+1)
284+
backup.AddKey(randomKey)
285+
} else {
286+
newKey := allKeys[len(allKeys)-1] + 1
287+
backup.AddKey(newKey)
288+
allKeys = append(allKeys, newKey)
289+
}
290+
}
291+
return backup
292+
}
293+
294+
// A workload that only updates keys that were written in the previous backup.
295+
type updateSameKeysWorkload struct{}
296+
297+
func (w updateSameKeysWorkload) CreateBackup(
298+
rng *rand.Rand, chain fakeBackupChain, numKeys int,
299+
) fakeBackup {
300+
backup := newFakeBackup()
301+
lastBackup := chain[len(chain)-1]
302+
keys := lastBackup.Keys()
303+
304+
for range numKeys {
305+
if len(keys) == 0 {
306+
break
307+
}
308+
randIdx := rng.Intn(len(keys))
309+
randomKey := keys[randIdx]
310+
keys = slices.Delete(keys, randIdx, randIdx+1)
311+
backup.AddKey(randomKey)
312+
}
313+
314+
return backup
315+
}
316+
317+
type fakeBackupChain []fakeBackup
318+
319+
// AllKeys returns a sorted list of all unique keys across all backups in the
320+
// chain.
321+
func (c fakeBackupChain) AllKeys() []fakeKey {
322+
keys := make(map[fakeKey]struct{})
323+
for _, backup := range c {
324+
for key := range backup.keys {
325+
keys[key] = struct{}{}
326+
}
327+
}
328+
allKeys := make([]fakeKey, 0, len(keys))
329+
for key := range keys {
330+
allKeys = append(allKeys, key)
331+
}
332+
slices.Sort(allKeys)
333+
return allKeys
334+
}
335+
336+
// Size returns the total number of keys across all backups in the chain,
337+
// counting duplicates.
338+
func (c fakeBackupChain) Size() int {
339+
totalSize := 0
340+
for _, backup := range c {
341+
totalSize += backup.Size()
342+
}
343+
return totalSize
344+
}
345+
346+
// Compact applies the provided compaction policy to the backup chain, returning
347+
// a compacted backup and the backups that were compacted.
348+
func (c fakeBackupChain) Compact(
349+
t *testing.T, ctx context.Context, execCfg *sql.ExecutorConfig, policy compactionPolicy,
350+
) (fakeBackup, []fakeBackup, error) {
351+
manifests := c.toBackupManifests()
352+
start, end, err := policy(ctx, execCfg, manifests)
353+
if err != nil {
354+
return fakeBackup{}, nil, err
355+
}
356+
t.Logf("Compacting backups from index %d to %d", start, end)
357+
compacted, err := c.compactWindow(start, end)
358+
if err != nil {
359+
return fakeBackup{}, nil, err
360+
}
361+
return compacted, c[start:end], nil
362+
}
363+
364+
// compactWindow compacts the backups in the chain from the specified start to
365+
// end indices, returning a new fakeBackup that contains all unique keys from
366+
// the specified range.
367+
func (c fakeBackupChain) compactWindow(start, end int) (fakeBackup, error) {
368+
if start < 1 || end > len(c) || start >= end {
369+
return fakeBackup{}, errors.New("invalid window indices")
370+
}
371+
backup := newFakeBackup()
372+
for i := start; i < end; i++ {
373+
for key := range c[i].keys {
374+
backup.AddKey(key)
375+
}
376+
}
377+
return backup, nil
378+
}
379+
380+
// toBackupManifests converts the fakeBackupChain into a slice of backup
381+
// manifests to be used in the compaction policy.
382+
func (c fakeBackupChain) toBackupManifests() []backuppb.BackupManifest {
383+
return util.Map(c, func(backup fakeBackup) backuppb.BackupManifest {
384+
return backup.toBackupManifest()
385+
})
386+
}
387+
388+
// fakeBackup represents a backup that contains some set of keys.
389+
// Note: As we write more heuristics, it may be necessary to increase the
390+
// complexity of this struct to include more metadata about the backup that can
391+
// then be translated into backup manifests for the policy to use.
392+
type fakeBackup struct {
393+
keys map[fakeKey]struct{}
394+
}
395+
396+
func newFakeBackup() fakeBackup {
397+
return fakeBackup{
398+
keys: make(map[fakeKey]struct{}),
399+
}
400+
}
401+
402+
// Size returns the number of unique keys in the backup.
403+
func (m *fakeBackup) Size() int {
404+
return len(m.keys)
405+
}
406+
407+
// Keys returns a sorted slice of all unique keys in the backup.
408+
func (m *fakeBackup) Keys() []fakeKey {
409+
keys := make([]fakeKey, 0, len(m.keys))
410+
for key := range m.keys {
411+
keys = append(keys, key)
412+
}
413+
slices.Sort(keys)
414+
return keys
415+
}
416+
417+
// AddKey adds a key to the backup. If the key already exists, it will not be
418+
// added again, ensuring uniqueness.
419+
func (m *fakeBackup) AddKey(key fakeKey) *fakeBackup {
420+
m.keys[key] = struct{}{}
421+
return m
422+
}
423+
424+
// toBackupManifest converts the fakeBackup into a backup manifest.
425+
func (m *fakeBackup) toBackupManifest() backuppb.BackupManifest {
426+
manifest := backuppb.BackupManifest{
427+
EntryCounts: roachpb.RowCount{
428+
DataSize: int64(len(m.keys)),
429+
},
430+
}
431+
return manifest
432+
}

pkg/jobs/adopt.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ const (
3737
`'` + string(StatePauseRequested) + `', ` +
3838
`'` + string(StateReverting) + `'`
3939

40+
terminalStateList = `'` + string(StateFailed) + `', ` +
41+
`'` + string(StateCanceled) + `', ` +
42+
`'` + string(StateSucceeded) + `'`
43+
4044
claimableStateTupleString = `(` + claimableStateList + `)`
4145

4246
nonTerminalStateList = claimableStateList + `, ` +

0 commit comments

Comments
 (0)