Skip to content

Commit 19e18da

Browse files
committed
jobfrontier: add GetResolvedSpans helper function
This patch adds the helper function `GetResolvedSpans`, which is like `Get` except it returns the resolved spans directly so that callers can decide what to do with them. Release note: None
1 parent a78d085 commit 19e18da

File tree

3 files changed

+77
-32
lines changed

3 files changed

+77
-32
lines changed

pkg/jobs/jobfrontier/BUILD.bazel

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,7 @@ go_library(
88
deps = [
99
"//pkg/jobs",
1010
"//pkg/jobs/jobspb",
11-
"//pkg/roachpb",
1211
"//pkg/sql/isql",
13-
"//pkg/util/hlc",
1412
"//pkg/util/protoutil",
1513
"//pkg/util/span",
1614
"@com_github_cockroachdb_errors//:errors",

pkg/jobs/jobfrontier/frontier.go

Lines changed: 34 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,7 @@ import (
1111

1212
"github.com/cockroachdb/cockroach/pkg/jobs"
1313
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
14-
"github.com/cockroachdb/cockroach/pkg/roachpb"
1514
"github.com/cockroachdb/cockroach/pkg/sql/isql"
16-
"github.com/cockroachdb/cockroach/pkg/util/hlc"
1715
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
1816
"github.com/cockroachdb/cockroach/pkg/util/span"
1917
"github.com/cockroachdb/errors"
@@ -32,15 +30,15 @@ func Get(
3230
) (span.Frontier, bool, error) {
3331
infoStorage := jobs.InfoStorageForJob(txn, jobID)
3432

35-
// Read all persisted entries, both as entries and as plain spans; we need the
36-
// latter form to construct the frontier and the former to advance it.
37-
// TODO(dt): we could avoid duplicate allocation here if we added an API to
38-
// construct frontier directly from entries.
39-
var entries []frontierEntry
40-
var spans []roachpb.Span
41-
4233
keyPrefix := frontierPrefix + name + shardSep
4334

35+
// Construct frontier to track the set of spans found and advance it to their
36+
// persisted timestamps. This implies we persist zero-timestamp spans to keep
37+
// the set of tracked spans even if they do not have progress.
38+
frontier, err := span.MakeFrontier()
39+
if err != nil {
40+
return nil, false, err
41+
}
4442
var found bool
4543
if err := infoStorage.Iterate(ctx, keyPrefix, func(_ string, value []byte) error {
4644
found = true
@@ -49,28 +47,42 @@ func Get(
4947
return err
5048
}
5149
for _, sp := range r.ResolvedSpans {
52-
entries = append(entries, frontierEntry{Span: sp.Span, Timestamp: sp.Timestamp})
53-
spans = append(spans, sp.Span)
50+
if err := frontier.AddSpansAt(sp.Timestamp, sp.Span); err != nil {
51+
return err
52+
}
5453
}
5554
return nil
5655
}); err != nil || !found {
5756
return nil, false, err
5857
}
5958

60-
// Construct frontier to track the set of spans found and advance it to their
61-
// persisted timestamps. This implies we perist zero-timestamp spans to keep
62-
// the set of tracked spans even if they do not have progress.
63-
frontier, err := span.MakeFrontier(spans...)
64-
if err != nil {
65-
return nil, false, err
66-
}
67-
for _, entry := range entries {
68-
if _, err := frontier.Forward(entry.Span, entry.Timestamp); err != nil {
69-
return nil, false, err
59+
return frontier, found, nil
60+
}
61+
62+
// GetResolvedSpans is like Get except it returns the resolved spans directly
63+
// so that callers can decide what to do with them.
64+
func GetResolvedSpans(
65+
ctx context.Context, txn isql.Txn, jobID jobspb.JobID, name string,
66+
) ([]jobspb.ResolvedSpan, bool, error) {
67+
infoStorage := jobs.InfoStorageForJob(txn, jobID)
68+
69+
keyPrefix := frontierPrefix + name + shardSep
70+
71+
var found bool
72+
var spans []jobspb.ResolvedSpan
73+
if err := infoStorage.Iterate(ctx, keyPrefix, func(_ string, value []byte) error {
74+
found = true
75+
var r jobspb.ResolvedSpans
76+
if err := protoutil.Unmarshal(value, &r); err != nil {
77+
return err
7078
}
79+
spans = append(spans, r.ResolvedSpans...)
80+
return nil
81+
}); err != nil || !found {
82+
return nil, false, err
7183
}
7284

73-
return frontier, true, nil
85+
return spans, found, nil
7486
}
7587

7688
// Store persists a frontier's current state to storage.
@@ -168,10 +180,3 @@ func deleteEntries(ctx context.Context, infoStorage jobs.InfoStorage, name strin
168180
endKey := frontierPrefix + name + string(rune(shardSep[0])+1)
169181
return infoStorage.DeleteRange(ctx, startKey, endKey, 0 /* no limit */)
170182
}
171-
172-
// frontierEntry represents a single persisted frontier entry.
173-
// This is used internally for serialization but may be useful for testing.
174-
type frontierEntry struct {
175-
Span roachpb.Span
176-
Timestamp hlc.Timestamp
177-
}

pkg/jobs/jobfrontier/frontier_test.go

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,10 @@ func TestFrontier(t *testing.T) {
5454

5555
s := srv.ApplicationLayer()
5656
sp := func(i int) roachpb.Span {
57-
return roachpb.Span{Key: s.Codec().TablePrefix(100 + uint32(i)), EndKey: s.Codec().TablePrefix(101 + uint32(i))}
57+
return roachpb.Span{
58+
Key: s.Codec().TablePrefix(100 + uint32(i)),
59+
EndKey: s.Codec().TablePrefix(101 + uint32(i)),
60+
}
5861
}
5962
sp1, sp2, sp3 := sp(0), sp(1), sp(2)
6063

@@ -77,6 +80,17 @@ func TestFrontier(t *testing.T) {
7780
require.Equal(t, 3, countEntries(loadedFrontier))
7881
checkFrontierContainsSpan(t, loadedFrontier, sp1, ts1)
7982
checkFrontierContainsSpan(t, loadedFrontier, sp3, ts1)
83+
84+
spans, found, err := GetResolvedSpans(ctx, txn, jobID, "basic")
85+
require.NoError(t, err)
86+
require.True(t, found)
87+
require.ElementsMatch(t,
88+
[]jobspb.ResolvedSpan{
89+
{Span: sp1, Timestamp: ts1},
90+
{Span: sp2},
91+
{Span: sp3, Timestamp: ts1},
92+
},
93+
spans)
8094
},
8195
},
8296
{
@@ -86,6 +100,10 @@ func TestFrontier(t *testing.T) {
86100
_, found, err := Get(ctx, txn, jobID, "nonexistent")
87101
require.NoError(t, err)
88102
require.False(t, found)
103+
104+
_, found, err = GetResolvedSpans(ctx, txn, jobID, "nonexistent")
105+
require.NoError(t, err)
106+
require.False(t, found)
89107
},
90108
},
91109
{
@@ -106,6 +124,17 @@ func TestFrontier(t *testing.T) {
106124
checkFrontierContainsSpan(t, loadedFrontier, sp1, ts1)
107125
checkFrontierContainsSpan(t, loadedFrontier, sp2, ts2)
108126
checkFrontierContainsSpan(t, loadedFrontier, sp3, ts1)
127+
128+
spans, found, err := GetResolvedSpans(ctx, txn, jobID, "multi")
129+
require.NoError(t, err)
130+
require.True(t, found)
131+
require.ElementsMatch(t,
132+
[]jobspb.ResolvedSpan{
133+
{Span: sp1, Timestamp: ts1},
134+
{Span: sp2, Timestamp: ts2},
135+
{Span: sp3, Timestamp: ts1},
136+
},
137+
spans)
109138
},
110139
},
111140
{
@@ -118,6 +147,10 @@ func TestFrontier(t *testing.T) {
118147
_, found, err := Get(ctx, txn, jobID, "deleteme")
119148
require.NoError(t, err)
120149
require.False(t, found)
150+
151+
_, found, err = GetResolvedSpans(ctx, txn, jobID, "deleteme")
152+
require.NoError(t, err)
153+
require.False(t, found)
121154
},
122155
},
123156
{
@@ -135,6 +168,15 @@ func TestFrontier(t *testing.T) {
135168
require.True(t, found)
136169
require.Equal(t, 1, countEntries(loadedFrontier))
137170
checkFrontierContainsSpan(t, loadedFrontier, sp2, ts2)
171+
172+
spans, found, err := GetResolvedSpans(ctx, txn, jobID, "overwrite")
173+
require.NoError(t, err)
174+
require.True(t, found)
175+
require.ElementsMatch(t,
176+
[]jobspb.ResolvedSpan{
177+
{Span: sp2, Timestamp: ts2},
178+
},
179+
spans)
138180
},
139181
},
140182
}

0 commit comments

Comments
 (0)