Skip to content

Commit 64317af

Browse files
committed
jobfrontier: add GetAllResolvedSpans helper function
This patch adds the helper function `GetResolvedSpans`, which is like `GetResolvedSpans` except it collects and returns the resolved spans for all frontiers associated with a job ID. Release note: None
1 parent 19e18da commit 64317af

File tree

2 files changed

+186
-0
lines changed

2 files changed

+186
-0
lines changed

pkg/jobs/jobfrontier/frontier.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,31 @@ func GetResolvedSpans(
8585
return spans, found, nil
8686
}
8787

88+
// GetAllResolvedSpans is like GetResolvedSpans except it collects and returns
89+
// the resolved spans for all frontiers associated with a job ID. The returned
90+
// spans are not sorted or combined in any way.
91+
func GetAllResolvedSpans(
92+
ctx context.Context, txn isql.Txn, jobID jobspb.JobID,
93+
) ([]jobspb.ResolvedSpan, bool, error) {
94+
infoStorage := jobs.InfoStorageForJob(txn, jobID)
95+
96+
var found bool
97+
var spans []jobspb.ResolvedSpan
98+
if err := infoStorage.Iterate(ctx, frontierPrefix, func(_ string, value []byte) error {
99+
found = true
100+
var r jobspb.ResolvedSpans
101+
if err := protoutil.Unmarshal(value, &r); err != nil {
102+
return err
103+
}
104+
spans = append(spans, r.ResolvedSpans...)
105+
return nil
106+
}); err != nil || !found {
107+
return nil, false, err
108+
}
109+
110+
return spans, found, nil
111+
}
112+
88113
// Store persists a frontier's current state to storage.
89114
//
90115
// All span entries in the frontier and their current timestamps will be

pkg/jobs/jobfrontier/frontier_test.go

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,3 +303,164 @@ func TestStoreChunked(t *testing.T) {
303303
})
304304
}
305305
}
306+
307+
func TestGetAllResolvedSpans(t *testing.T) {
308+
defer leaktest.AfterTest(t)()
309+
defer log.Scope(t).Close(t)
310+
311+
ctx := context.Background()
312+
srv := serverutils.StartServerOnly(t, base.TestServerArgs{})
313+
defer srv.Stopper().Stop(ctx)
314+
315+
s := srv.ApplicationLayer()
316+
sp := func(i int) roachpb.Span {
317+
return roachpb.Span{
318+
Key: s.Codec().TablePrefix(100 + uint32(i)),
319+
EndKey: s.Codec().TablePrefix(101 + uint32(i)),
320+
}
321+
}
322+
sp1, sp2, sp3 := sp(0), sp(1), sp(2)
323+
324+
ts1, ts2 := hlc.Timestamp{WallTime: 100, Logical: 1}, hlc.Timestamp{WallTime: 200, Logical: 2}
325+
326+
tests := []struct {
327+
name string
328+
setup func(t *testing.T, txn isql.Txn, jobID jobspb.JobID)
329+
expected []jobspb.ResolvedSpan
330+
}{
331+
{
332+
name: "no frontiers",
333+
setup: func(t *testing.T, txn isql.Txn, jobID jobspb.JobID) {},
334+
expected: nil,
335+
},
336+
{
337+
name: "single frontier",
338+
setup: func(t *testing.T, txn isql.Txn, jobID jobspb.JobID) {
339+
f, err := span.MakeFrontier(sp1, sp2, sp3)
340+
require.NoError(t, err)
341+
_, err = f.Forward(sp1, ts1)
342+
require.NoError(t, err)
343+
require.NoError(t, Store(ctx, txn, jobID, "single", f))
344+
},
345+
expected: []jobspb.ResolvedSpan{
346+
{Span: sp1, Timestamp: ts1},
347+
{Span: sp2},
348+
{Span: sp3},
349+
},
350+
},
351+
{
352+
name: "multiple frontiers",
353+
setup: func(t *testing.T, txn isql.Txn, jobID jobspb.JobID) {
354+
// First frontier with sp1@ts1 and sp2@zero.
355+
f1, err := span.MakeFrontier(sp1, sp2)
356+
require.NoError(t, err)
357+
_, err = f1.Forward(sp1, ts1)
358+
require.NoError(t, err)
359+
require.NoError(t, Store(ctx, txn, jobID, "frontier1", f1))
360+
361+
// Second frontier with sp3@ts2.
362+
f2, err := span.MakeFrontier(sp3)
363+
require.NoError(t, err)
364+
_, err = f2.Forward(sp3, ts2)
365+
require.NoError(t, err)
366+
require.NoError(t, Store(ctx, txn, jobID, "frontier2", f2))
367+
},
368+
expected: []jobspb.ResolvedSpan{
369+
{Span: sp1, Timestamp: ts1},
370+
{Span: sp2},
371+
{Span: sp3, Timestamp: ts2},
372+
},
373+
},
374+
{
375+
name: "multiple frontiers with overlapping spans",
376+
setup: func(t *testing.T, txn isql.Txn, jobID jobspb.JobID) {
377+
// First frontier with sp1@ts1.
378+
f1, err := span.MakeFrontier(sp1)
379+
require.NoError(t, err)
380+
_, err = f1.Forward(sp1, ts1)
381+
require.NoError(t, err)
382+
require.NoError(t, Store(ctx, txn, jobID, "overlap1", f1))
383+
384+
// Second frontier with sp1@ts2 (different timestamp) and sp2@zero.
385+
f2, err := span.MakeFrontier(sp1, sp2)
386+
require.NoError(t, err)
387+
_, err = f2.Forward(sp1, ts2)
388+
require.NoError(t, err)
389+
require.NoError(t, Store(ctx, txn, jobID, "overlap2", f2))
390+
},
391+
expected: []jobspb.ResolvedSpan{
392+
{Span: sp1, Timestamp: ts1},
393+
{Span: sp1, Timestamp: ts2},
394+
{Span: sp2},
395+
},
396+
},
397+
{
398+
name: "multiple frontiers with some deleted",
399+
setup: func(t *testing.T, txn isql.Txn, jobID jobspb.JobID) {
400+
// Store first frontier with sp1@ts1.
401+
f1, err := span.MakeFrontier(sp1)
402+
require.NoError(t, err)
403+
_, err = f1.Forward(sp1, ts1)
404+
require.NoError(t, err)
405+
require.NoError(t, Store(ctx, txn, jobID, "keep", f1))
406+
407+
// Store second frontier with sp2@ts2 then delete it.
408+
f2, err := span.MakeFrontier(sp2)
409+
require.NoError(t, err)
410+
_, err = f2.Forward(sp2, ts2)
411+
require.NoError(t, err)
412+
require.NoError(t, Store(ctx, txn, jobID, "delete", f2))
413+
require.NoError(t, Delete(ctx, txn, jobID, "delete"))
414+
415+
// Store third frontier with sp3@ts1.
416+
f3, err := span.MakeFrontier(sp3)
417+
require.NoError(t, err)
418+
_, err = f3.Forward(sp3, ts1)
419+
require.NoError(t, err)
420+
require.NoError(t, Store(ctx, txn, jobID, "also_keep", f3))
421+
},
422+
expected: []jobspb.ResolvedSpan{
423+
{Span: sp1, Timestamp: ts1},
424+
{Span: sp3, Timestamp: ts1},
425+
},
426+
},
427+
{
428+
name: "empty frontiers",
429+
setup: func(t *testing.T, txn isql.Txn, jobID jobspb.JobID) {
430+
// Store a frontier with no spans forwarded.
431+
f, err := span.MakeFrontier(sp1, sp2, sp3)
432+
require.NoError(t, err)
433+
require.NoError(t, Store(ctx, txn, jobID, "empty", f))
434+
},
435+
expected: []jobspb.ResolvedSpan{
436+
{Span: sp1},
437+
{Span: sp2},
438+
{Span: sp3},
439+
},
440+
},
441+
}
442+
443+
for i, tc := range tests {
444+
t.Run(tc.name, func(t *testing.T) {
445+
jobID := jobspb.JobID(3000 + i)
446+
447+
require.NoError(t, s.InternalDB().(isql.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
448+
tc.setup(t, txn, jobID)
449+
return nil
450+
}))
451+
452+
require.NoError(t, s.InternalDB().(isql.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
453+
spans, found, err := GetAllResolvedSpans(ctx, txn, jobID)
454+
require.NoError(t, err)
455+
if len(tc.expected) > 0 {
456+
require.True(t, found)
457+
require.ElementsMatch(t, tc.expected, spans)
458+
} else {
459+
require.False(t, found)
460+
require.Empty(t, spans)
461+
}
462+
return nil
463+
}))
464+
})
465+
}
466+
}

0 commit comments

Comments
 (0)