Skip to content

Commit d354740

Browse files
craig[bot]andyyang890
andcommitted
Merge #153186
153186: jobfrontier: add GetResolvedSpans/GetAllResolvedSpans r=msbutler a=andyyang890 Fixes #153074 --- **jobfrontier: add GetResolvedSpans helper function** This patch adds the helper function `GetResolvedSpans`, which is like `Get` except it returns the resolved spans directly so that callers can decide what to do with them. Release note: None --- **jobfrontier: add GetAllResolvedSpans helper function** This patch adds the helper function `GetResolvedSpans`, which is like `GetResolvedSpans` except it collects and returns the resolved spans for all frontiers associated with a job ID. Release note: None Co-authored-by: Andy Yang <[email protected]>
2 parents 5aeb08d + 64317af commit d354740

File tree

3 files changed

+261
-30
lines changed

3 files changed

+261
-30
lines changed

pkg/jobs/jobfrontier/BUILD.bazel

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,7 @@ go_library(
88
deps = [
99
"//pkg/jobs",
1010
"//pkg/jobs/jobspb",
11-
"//pkg/roachpb",
1211
"//pkg/sql/isql",
13-
"//pkg/util/hlc",
1412
"//pkg/util/protoutil",
1513
"//pkg/util/span",
1614
"@com_github_cockroachdb_errors//:errors",

pkg/jobs/jobfrontier/frontier.go

Lines changed: 57 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,7 @@ import (
1111

1212
"github.com/cockroachdb/cockroach/pkg/jobs"
1313
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
14-
"github.com/cockroachdb/cockroach/pkg/roachpb"
1514
"github.com/cockroachdb/cockroach/pkg/sql/isql"
16-
"github.com/cockroachdb/cockroach/pkg/util/hlc"
1715
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
1816
"github.com/cockroachdb/cockroach/pkg/util/span"
1917
"github.com/cockroachdb/errors"
@@ -32,15 +30,15 @@ func Get(
3230
) (span.Frontier, bool, error) {
3331
infoStorage := jobs.InfoStorageForJob(txn, jobID)
3432

35-
// Read all persisted entries, both as entries and as plain spans; we need the
36-
// latter form to construct the frontier and the former to advance it.
37-
// TODO(dt): we could avoid duplicate allocation here if we added an API to
38-
// construct frontier directly from entries.
39-
var entries []frontierEntry
40-
var spans []roachpb.Span
41-
4233
keyPrefix := frontierPrefix + name + shardSep
4334

35+
// Construct frontier to track the set of spans found and advance it to their
36+
// persisted timestamps. This implies we persist zero-timestamp spans to keep
37+
// the set of tracked spans even if they do not have progress.
38+
frontier, err := span.MakeFrontier()
39+
if err != nil {
40+
return nil, false, err
41+
}
4442
var found bool
4543
if err := infoStorage.Iterate(ctx, keyPrefix, func(_ string, value []byte) error {
4644
found = true
@@ -49,28 +47,67 @@ func Get(
4947
return err
5048
}
5149
for _, sp := range r.ResolvedSpans {
52-
entries = append(entries, frontierEntry{Span: sp.Span, Timestamp: sp.Timestamp})
53-
spans = append(spans, sp.Span)
50+
if err := frontier.AddSpansAt(sp.Timestamp, sp.Span); err != nil {
51+
return err
52+
}
5453
}
5554
return nil
5655
}); err != nil || !found {
5756
return nil, false, err
5857
}
5958

60-
// Construct frontier to track the set of spans found and advance it to their
61-
// persisted timestamps. This implies we perist zero-timestamp spans to keep
62-
// the set of tracked spans even if they do not have progress.
63-
frontier, err := span.MakeFrontier(spans...)
64-
if err != nil {
59+
return frontier, found, nil
60+
}
61+
62+
// GetResolvedSpans is like Get except it returns the resolved spans directly
63+
// so that callers can decide what to do with them.
64+
func GetResolvedSpans(
65+
ctx context.Context, txn isql.Txn, jobID jobspb.JobID, name string,
66+
) ([]jobspb.ResolvedSpan, bool, error) {
67+
infoStorage := jobs.InfoStorageForJob(txn, jobID)
68+
69+
keyPrefix := frontierPrefix + name + shardSep
70+
71+
var found bool
72+
var spans []jobspb.ResolvedSpan
73+
if err := infoStorage.Iterate(ctx, keyPrefix, func(_ string, value []byte) error {
74+
found = true
75+
var r jobspb.ResolvedSpans
76+
if err := protoutil.Unmarshal(value, &r); err != nil {
77+
return err
78+
}
79+
spans = append(spans, r.ResolvedSpans...)
80+
return nil
81+
}); err != nil || !found {
6582
return nil, false, err
6683
}
67-
for _, entry := range entries {
68-
if _, err := frontier.Forward(entry.Span, entry.Timestamp); err != nil {
69-
return nil, false, err
84+
85+
return spans, found, nil
86+
}
87+
88+
// GetAllResolvedSpans is like GetResolvedSpans except it collects and returns
89+
// the resolved spans for all frontiers associated with a job ID. The returned
90+
// spans are not sorted or combined in any way.
91+
func GetAllResolvedSpans(
92+
ctx context.Context, txn isql.Txn, jobID jobspb.JobID,
93+
) ([]jobspb.ResolvedSpan, bool, error) {
94+
infoStorage := jobs.InfoStorageForJob(txn, jobID)
95+
96+
var found bool
97+
var spans []jobspb.ResolvedSpan
98+
if err := infoStorage.Iterate(ctx, frontierPrefix, func(_ string, value []byte) error {
99+
found = true
100+
var r jobspb.ResolvedSpans
101+
if err := protoutil.Unmarshal(value, &r); err != nil {
102+
return err
70103
}
104+
spans = append(spans, r.ResolvedSpans...)
105+
return nil
106+
}); err != nil || !found {
107+
return nil, false, err
71108
}
72109

73-
return frontier, true, nil
110+
return spans, found, nil
74111
}
75112

76113
// Store persists a frontier's current state to storage.
@@ -172,10 +209,3 @@ func deleteEntries(ctx context.Context, infoStorage jobs.InfoStorage, name strin
172209
endKey := frontierPrefix + name + string(rune(shardSep[0])+1)
173210
return infoStorage.DeleteRange(ctx, startKey, endKey, 0 /* no limit */)
174211
}
175-
176-
// frontierEntry represents a single persisted frontier entry.
177-
// This is used internally for serialization but may be useful for testing.
178-
type frontierEntry struct {
179-
Span roachpb.Span
180-
Timestamp hlc.Timestamp
181-
}

pkg/jobs/jobfrontier/frontier_test.go

Lines changed: 204 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,10 @@ func TestFrontier(t *testing.T) {
5454

5555
s := srv.ApplicationLayer()
5656
sp := func(i int) roachpb.Span {
57-
return roachpb.Span{Key: s.Codec().TablePrefix(100 + uint32(i)), EndKey: s.Codec().TablePrefix(101 + uint32(i))}
57+
return roachpb.Span{
58+
Key: s.Codec().TablePrefix(100 + uint32(i)),
59+
EndKey: s.Codec().TablePrefix(101 + uint32(i)),
60+
}
5861
}
5962
sp1, sp2, sp3 := sp(0), sp(1), sp(2)
6063

@@ -77,6 +80,17 @@ func TestFrontier(t *testing.T) {
7780
require.Equal(t, 3, countEntries(loadedFrontier))
7881
checkFrontierContainsSpan(t, loadedFrontier, sp1, ts1)
7982
checkFrontierContainsSpan(t, loadedFrontier, sp3, ts1)
83+
84+
spans, found, err := GetResolvedSpans(ctx, txn, jobID, "basic")
85+
require.NoError(t, err)
86+
require.True(t, found)
87+
require.ElementsMatch(t,
88+
[]jobspb.ResolvedSpan{
89+
{Span: sp1, Timestamp: ts1},
90+
{Span: sp2},
91+
{Span: sp3, Timestamp: ts1},
92+
},
93+
spans)
8094
},
8195
},
8296
{
@@ -86,6 +100,10 @@ func TestFrontier(t *testing.T) {
86100
_, found, err := Get(ctx, txn, jobID, "nonexistent")
87101
require.NoError(t, err)
88102
require.False(t, found)
103+
104+
_, found, err = GetResolvedSpans(ctx, txn, jobID, "nonexistent")
105+
require.NoError(t, err)
106+
require.False(t, found)
89107
},
90108
},
91109
{
@@ -106,6 +124,17 @@ func TestFrontier(t *testing.T) {
106124
checkFrontierContainsSpan(t, loadedFrontier, sp1, ts1)
107125
checkFrontierContainsSpan(t, loadedFrontier, sp2, ts2)
108126
checkFrontierContainsSpan(t, loadedFrontier, sp3, ts1)
127+
128+
spans, found, err := GetResolvedSpans(ctx, txn, jobID, "multi")
129+
require.NoError(t, err)
130+
require.True(t, found)
131+
require.ElementsMatch(t,
132+
[]jobspb.ResolvedSpan{
133+
{Span: sp1, Timestamp: ts1},
134+
{Span: sp2, Timestamp: ts2},
135+
{Span: sp3, Timestamp: ts1},
136+
},
137+
spans)
109138
},
110139
},
111140
{
@@ -118,6 +147,10 @@ func TestFrontier(t *testing.T) {
118147
_, found, err := Get(ctx, txn, jobID, "deleteme")
119148
require.NoError(t, err)
120149
require.False(t, found)
150+
151+
_, found, err = GetResolvedSpans(ctx, txn, jobID, "deleteme")
152+
require.NoError(t, err)
153+
require.False(t, found)
121154
},
122155
},
123156
{
@@ -135,6 +168,15 @@ func TestFrontier(t *testing.T) {
135168
require.True(t, found)
136169
require.Equal(t, 1, countEntries(loadedFrontier))
137170
checkFrontierContainsSpan(t, loadedFrontier, sp2, ts2)
171+
172+
spans, found, err := GetResolvedSpans(ctx, txn, jobID, "overwrite")
173+
require.NoError(t, err)
174+
require.True(t, found)
175+
require.ElementsMatch(t,
176+
[]jobspb.ResolvedSpan{
177+
{Span: sp2, Timestamp: ts2},
178+
},
179+
spans)
138180
},
139181
},
140182
}
@@ -261,3 +303,164 @@ func TestStoreChunked(t *testing.T) {
261303
})
262304
}
263305
}
306+
307+
func TestGetAllResolvedSpans(t *testing.T) {
308+
defer leaktest.AfterTest(t)()
309+
defer log.Scope(t).Close(t)
310+
311+
ctx := context.Background()
312+
srv := serverutils.StartServerOnly(t, base.TestServerArgs{})
313+
defer srv.Stopper().Stop(ctx)
314+
315+
s := srv.ApplicationLayer()
316+
sp := func(i int) roachpb.Span {
317+
return roachpb.Span{
318+
Key: s.Codec().TablePrefix(100 + uint32(i)),
319+
EndKey: s.Codec().TablePrefix(101 + uint32(i)),
320+
}
321+
}
322+
sp1, sp2, sp3 := sp(0), sp(1), sp(2)
323+
324+
ts1, ts2 := hlc.Timestamp{WallTime: 100, Logical: 1}, hlc.Timestamp{WallTime: 200, Logical: 2}
325+
326+
tests := []struct {
327+
name string
328+
setup func(t *testing.T, txn isql.Txn, jobID jobspb.JobID)
329+
expected []jobspb.ResolvedSpan
330+
}{
331+
{
332+
name: "no frontiers",
333+
setup: func(t *testing.T, txn isql.Txn, jobID jobspb.JobID) {},
334+
expected: nil,
335+
},
336+
{
337+
name: "single frontier",
338+
setup: func(t *testing.T, txn isql.Txn, jobID jobspb.JobID) {
339+
f, err := span.MakeFrontier(sp1, sp2, sp3)
340+
require.NoError(t, err)
341+
_, err = f.Forward(sp1, ts1)
342+
require.NoError(t, err)
343+
require.NoError(t, Store(ctx, txn, jobID, "single", f))
344+
},
345+
expected: []jobspb.ResolvedSpan{
346+
{Span: sp1, Timestamp: ts1},
347+
{Span: sp2},
348+
{Span: sp3},
349+
},
350+
},
351+
{
352+
name: "multiple frontiers",
353+
setup: func(t *testing.T, txn isql.Txn, jobID jobspb.JobID) {
354+
// First frontier with sp1@ts1 and sp2@zero.
355+
f1, err := span.MakeFrontier(sp1, sp2)
356+
require.NoError(t, err)
357+
_, err = f1.Forward(sp1, ts1)
358+
require.NoError(t, err)
359+
require.NoError(t, Store(ctx, txn, jobID, "frontier1", f1))
360+
361+
// Second frontier with sp3@ts2.
362+
f2, err := span.MakeFrontier(sp3)
363+
require.NoError(t, err)
364+
_, err = f2.Forward(sp3, ts2)
365+
require.NoError(t, err)
366+
require.NoError(t, Store(ctx, txn, jobID, "frontier2", f2))
367+
},
368+
expected: []jobspb.ResolvedSpan{
369+
{Span: sp1, Timestamp: ts1},
370+
{Span: sp2},
371+
{Span: sp3, Timestamp: ts2},
372+
},
373+
},
374+
{
375+
name: "multiple frontiers with overlapping spans",
376+
setup: func(t *testing.T, txn isql.Txn, jobID jobspb.JobID) {
377+
// First frontier with sp1@ts1.
378+
f1, err := span.MakeFrontier(sp1)
379+
require.NoError(t, err)
380+
_, err = f1.Forward(sp1, ts1)
381+
require.NoError(t, err)
382+
require.NoError(t, Store(ctx, txn, jobID, "overlap1", f1))
383+
384+
// Second frontier with sp1@ts2 (different timestamp) and sp2@zero.
385+
f2, err := span.MakeFrontier(sp1, sp2)
386+
require.NoError(t, err)
387+
_, err = f2.Forward(sp1, ts2)
388+
require.NoError(t, err)
389+
require.NoError(t, Store(ctx, txn, jobID, "overlap2", f2))
390+
},
391+
expected: []jobspb.ResolvedSpan{
392+
{Span: sp1, Timestamp: ts1},
393+
{Span: sp1, Timestamp: ts2},
394+
{Span: sp2},
395+
},
396+
},
397+
{
398+
name: "multiple frontiers with some deleted",
399+
setup: func(t *testing.T, txn isql.Txn, jobID jobspb.JobID) {
400+
// Store first frontier with sp1@ts1.
401+
f1, err := span.MakeFrontier(sp1)
402+
require.NoError(t, err)
403+
_, err = f1.Forward(sp1, ts1)
404+
require.NoError(t, err)
405+
require.NoError(t, Store(ctx, txn, jobID, "keep", f1))
406+
407+
// Store second frontier with sp2@ts2 then delete it.
408+
f2, err := span.MakeFrontier(sp2)
409+
require.NoError(t, err)
410+
_, err = f2.Forward(sp2, ts2)
411+
require.NoError(t, err)
412+
require.NoError(t, Store(ctx, txn, jobID, "delete", f2))
413+
require.NoError(t, Delete(ctx, txn, jobID, "delete"))
414+
415+
// Store third frontier with sp3@ts1.
416+
f3, err := span.MakeFrontier(sp3)
417+
require.NoError(t, err)
418+
_, err = f3.Forward(sp3, ts1)
419+
require.NoError(t, err)
420+
require.NoError(t, Store(ctx, txn, jobID, "also_keep", f3))
421+
},
422+
expected: []jobspb.ResolvedSpan{
423+
{Span: sp1, Timestamp: ts1},
424+
{Span: sp3, Timestamp: ts1},
425+
},
426+
},
427+
{
428+
name: "empty frontiers",
429+
setup: func(t *testing.T, txn isql.Txn, jobID jobspb.JobID) {
430+
// Store a frontier with no spans forwarded.
431+
f, err := span.MakeFrontier(sp1, sp2, sp3)
432+
require.NoError(t, err)
433+
require.NoError(t, Store(ctx, txn, jobID, "empty", f))
434+
},
435+
expected: []jobspb.ResolvedSpan{
436+
{Span: sp1},
437+
{Span: sp2},
438+
{Span: sp3},
439+
},
440+
},
441+
}
442+
443+
for i, tc := range tests {
444+
t.Run(tc.name, func(t *testing.T) {
445+
jobID := jobspb.JobID(3000 + i)
446+
447+
require.NoError(t, s.InternalDB().(isql.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
448+
tc.setup(t, txn, jobID)
449+
return nil
450+
}))
451+
452+
require.NoError(t, s.InternalDB().(isql.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
453+
spans, found, err := GetAllResolvedSpans(ctx, txn, jobID)
454+
require.NoError(t, err)
455+
if len(tc.expected) > 0 {
456+
require.True(t, found)
457+
require.ElementsMatch(t, tc.expected, spans)
458+
} else {
459+
require.False(t, found)
460+
require.Empty(t, spans)
461+
}
462+
return nil
463+
}))
464+
})
465+
}
466+
}

0 commit comments

Comments
 (0)