Skip to content

Commit ac6a44a

Browse files
craig[bot]yuzefovichdtrafiss
committed
152741: sql: don't use t.Parallel() in TestQueryCache r=michae2 a=yuzefovich We started running sub-tests of `TestQueryCache` in parallel long time ago in b1ebe33. Later on we disabled the parallelism under stress in 1cf1edc. Each sub-test starts an in-memory server, and we have about 15 sub-tests, so this could be problematic in "heavy" configs. Recently we saw some flakes after a change to the underlying test infra ("noisy neighbor" was introduced), and even though that change has been reverted, I don't see why we should be using `t.Parallel()` for this particular test at all - it's almost the only one within `sql` folder that does this. Thus, this commit removes the parallelization altogether. On my laptop I saw the time to run in a regular config increase from 1.5s to 4.5s, and we definitely have slower tests, so it shouldn't matter in practice. Fixes: #152424. Release note: None 152789: jobs/jobfrontier: add helpers for storing and loading frontiers r=dt a=dt We want jobs to move to storing and loading progress from individual keys as needed vs storing fields in big, all-in-one legacy_progress protos. One of the most commonly useful ways to store progres is a frontier, so a helper to get and put frontiers appears to be a promising helper to provide as a starting point for the move towards granular infostorage usage. Release note: none. Epic: none. 152878: scbuild: split TestBuildDataDriven into 2 tests r=rafiss a=rafiss This will allow for better test sharding to avoid timeouts. fixes #152622 Release note: None Co-authored-by: Yahor Yuzefovich <[email protected]> Co-authored-by: David Taylor <[email protected]> Co-authored-by: Rafi Shamim <[email protected]>
4 parents 35625cf + 6834fcb + 9e78cb0 + b05b94d commit ac6a44a

File tree

7 files changed

+953
-449
lines changed

7 files changed

+953
-449
lines changed

pkg/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ ALL_TESTS = [
207207
"//pkg/internal/sqlsmith:sqlsmith_test",
208208
"//pkg/internal/team:team_test",
209209
"//pkg/jobs/joberror:joberror_test",
210+
"//pkg/jobs/jobfrontier:jobfrontier_test",
210211
"//pkg/jobs/jobsauth:jobsauth_test",
211212
"//pkg/jobs/jobspb:jobspb_test",
212213
"//pkg/jobs/jobsprofiler:jobsprofiler_test",
@@ -1416,6 +1417,8 @@ GO_TARGETS = [
14161417
"//pkg/jobs/ingeststopped:ingeststopped",
14171418
"//pkg/jobs/joberror:joberror",
14181419
"//pkg/jobs/joberror:joberror_test",
1420+
"//pkg/jobs/jobfrontier:jobfrontier",
1421+
"//pkg/jobs/jobfrontier:jobfrontier_test",
14191422
"//pkg/jobs/jobsauth:jobsauth",
14201423
"//pkg/jobs/jobsauth:jobsauth_test",
14211424
"//pkg/jobs/jobspb:jobspb",

pkg/jobs/jobfrontier/BUILD.bazel

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
2+
3+
go_library(
4+
name = "jobfrontier",
5+
srcs = ["frontier.go"],
6+
importpath = "github.com/cockroachdb/cockroach/pkg/jobs/jobfrontier",
7+
visibility = ["//visibility:public"],
8+
deps = [
9+
"//pkg/jobs",
10+
"//pkg/jobs/jobspb",
11+
"//pkg/roachpb",
12+
"//pkg/sql/isql",
13+
"//pkg/util/hlc",
14+
"//pkg/util/protoutil",
15+
"//pkg/util/span",
16+
"@com_github_cockroachdb_errors//:errors",
17+
],
18+
)
19+
20+
go_test(
21+
name = "jobfrontier_test",
22+
srcs = [
23+
"frontier_test.go",
24+
"main_test.go",
25+
],
26+
embed = [":jobfrontier"],
27+
deps = [
28+
"//pkg/base",
29+
"//pkg/jobs/jobspb",
30+
"//pkg/roachpb",
31+
"//pkg/security/securityassets",
32+
"//pkg/security/securitytest",
33+
"//pkg/server",
34+
"//pkg/sql/isql",
35+
"//pkg/testutils/serverutils",
36+
"//pkg/testutils/testcluster",
37+
"//pkg/util/hlc",
38+
"//pkg/util/leaktest",
39+
"//pkg/util/log",
40+
"//pkg/util/randutil",
41+
"//pkg/util/span",
42+
"@com_github_stretchr_testify//require",
43+
],
44+
)

pkg/jobs/jobfrontier/frontier.go

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package jobfrontier
7+
8+
import (
9+
"context"
10+
"fmt"
11+
12+
"github.com/cockroachdb/cockroach/pkg/jobs"
13+
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
14+
"github.com/cockroachdb/cockroach/pkg/roachpb"
15+
"github.com/cockroachdb/cockroach/pkg/sql/isql"
16+
"github.com/cockroachdb/cockroach/pkg/util/hlc"
17+
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
18+
"github.com/cockroachdb/cockroach/pkg/util/span"
19+
"github.com/cockroachdb/errors"
20+
)
21+
22+
const frontierPrefix = "frontier/"
23+
const shardSep = "_"
24+
25+
// Get loads a complete frontier from persistent storage and returns it and a
26+
// true value, or nil and false if one is not found.
27+
//
28+
// The returned frontier will contain all spans and their timestamps that were
29+
// previously stored via Store(). The spans are derived from the persisted data.
30+
func Get(
31+
ctx context.Context, txn isql.Txn, jobID jobspb.JobID, name string,
32+
) (span.Frontier, bool, error) {
33+
infoStorage := jobs.InfoStorageForJob(txn, jobID)
34+
35+
// Read all persisted entries, both as entries and as plain spans; we need the
36+
// latter form to construct the frontier and the former to advance it.
37+
// TODO(dt): we could avoid duplicate allocation here if we added an API to
38+
// construct frontier directly from entries.
39+
var entries []frontierEntry
40+
var spans []roachpb.Span
41+
42+
keyPrefix := frontierPrefix + name + shardSep
43+
44+
var found bool
45+
if err := infoStorage.Iterate(ctx, keyPrefix, func(_ string, value []byte) error {
46+
found = true
47+
var r jobspb.ResolvedSpans
48+
if err := protoutil.Unmarshal(value, &r); err != nil {
49+
return err
50+
}
51+
for _, sp := range r.ResolvedSpans {
52+
entries = append(entries, frontierEntry{Span: sp.Span, Timestamp: sp.Timestamp})
53+
spans = append(spans, sp.Span)
54+
}
55+
return nil
56+
}); err != nil || !found {
57+
return nil, false, err
58+
}
59+
60+
// Construct frontier to track the set of spans found and advance it to their
61+
// persisted timestamps. This implies we perist zero-timestamp spans to keep
62+
// the set of tracked spans even if they do not have progress.
63+
frontier, err := span.MakeFrontier(spans...)
64+
if err != nil {
65+
return nil, false, err
66+
}
67+
for _, entry := range entries {
68+
if _, err := frontier.Forward(entry.Span, entry.Timestamp); err != nil {
69+
return nil, false, err
70+
}
71+
}
72+
73+
return frontier, true, nil
74+
}
75+
76+
// Store persists a frontier's current state to storage.
77+
//
78+
// All span entries in the frontier and their current timestamps will be
79+
// persisted. Any previously stored frontier data under the same name will be
80+
// replaced.
81+
//
82+
// InfoStorage keys are prefixed with "frontier/", the passed name, and then a
83+
// chunk identifier.
84+
func Store(
85+
ctx context.Context, txn isql.Txn, jobID jobspb.JobID, name string, frontier span.Frontier,
86+
) error {
87+
return storeChunked(ctx, txn, jobID, name, frontier, 2<<20 /* 2mb */)
88+
}
89+
90+
func storeChunked(
91+
ctx context.Context,
92+
txn isql.Txn,
93+
jobID jobspb.JobID,
94+
name string,
95+
frontier span.Frontier,
96+
chunkSize int,
97+
) error {
98+
infoStorage := jobs.InfoStorageForJob(txn, jobID)
99+
100+
// Wipe any existing frontier shards, since we cannot rely on the shard func
101+
// to return the same set of shards to guarantee a full overwrite. Slightly
102+
// annoying that each shard's info key write will *also* issue a delete during
103+
// its write that is duplicative as we already deleted everything here, but
104+
// we don't really have a choice. We could specialize non-sharded/fixed-shard
105+
// frontiers (non-sharded is just fixed=1), where the write call would handle
106+
// deleting any prior entry, but doesn't seem worth it: you need a promise it
107+
// does not become sharded later, so would probably want to be a separate API.
108+
if err := deleteEntries(ctx, infoStorage, name); err != nil {
109+
return err
110+
}
111+
112+
// Collect all frontier entries
113+
var all []jobspb.ResolvedSpan
114+
all = make([]jobspb.ResolvedSpan, 0, frontier.Len())
115+
for spanEntry, timestamp := range frontier.Entries() {
116+
all = append(all, jobspb.ResolvedSpan{
117+
Span: spanEntry,
118+
Timestamp: timestamp,
119+
})
120+
}
121+
122+
// Flush the frontier chunks.
123+
var chunk, size int
124+
var chunkStart int
125+
// Group entries by shard.
126+
for i, sp := range all {
127+
if size > chunkSize {
128+
if err := storeEntries(ctx, infoStorage, name, fmt.Sprintf("%d", chunk), all[chunkStart:i]); err != nil {
129+
return err
130+
}
131+
chunk++
132+
size = 0
133+
chunkStart = i
134+
}
135+
136+
size += len(sp.Span.Key) + len(sp.Span.EndKey) + 16 // timestamp/overhead.
137+
}
138+
if chunkStart < len(all) {
139+
if err := storeEntries(ctx, infoStorage, name, fmt.Sprintf("%d", chunk), all[chunkStart:]); err != nil {
140+
return err
141+
}
142+
}
143+
return nil
144+
}
145+
146+
func storeEntries(
147+
ctx context.Context,
148+
infoStorage jobs.InfoStorage,
149+
name, shard string,
150+
entries []jobspb.ResolvedSpan,
151+
) error {
152+
data, err := protoutil.Marshal(&jobspb.ResolvedSpans{ResolvedSpans: entries})
153+
if err != nil {
154+
return errors.Wrap(err, "failed to serialize frontier entries")
155+
}
156+
key := fmt.Sprintf("%s%s%s%s", frontierPrefix, name, shardSep, shard)
157+
return infoStorage.Write(ctx, key, data)
158+
}
159+
160+
// Delete removes a persisted frontier by the given name for the given job.
161+
func Delete(ctx context.Context, txn isql.Txn, jobID jobspb.JobID, name string) error {
162+
infoStorage := jobs.InfoStorageForJob(txn, jobID)
163+
return deleteEntries(ctx, infoStorage, name)
164+
}
165+
166+
func deleteEntries(ctx context.Context, infoStorage jobs.InfoStorage, name string) error {
167+
startKey := frontierPrefix + name + shardSep
168+
endKey := frontierPrefix + name + string(rune(shardSep[0])+1)
169+
return infoStorage.DeleteRange(ctx, startKey, endKey, 0 /* no limit */)
170+
}
171+
172+
// frontierEntry represents a single persisted frontier entry.
173+
// This is used internally for serialization but may be useful for testing.
174+
type frontierEntry struct {
175+
Span roachpb.Span
176+
Timestamp hlc.Timestamp
177+
}

0 commit comments

Comments
 (0)