Skip to content

Commit 4689b7d

Browse files
craig[bot]michae2RaduBerindekyle-a-wongjeffswenson
committed
144316: sql: move job check back out of txn creating autostats job r=msbutler,mgartner,rytaft a=michae2 In #119383 we moved the find-running-jobs-of-type query into the same transaction that creates the autostats jobs, in order to avoid a race between different nodes trying to start an autostats job. In practice, however, we've seen that this combined read-write transaction suffers from many retries due to RETRY_SERIALIZABLE errors, when racing transactions write to system.jobs and then fail to refresh the find-running-jobs read. One the one hand, the fact that these transactions fail to commit does mean fewer rows are visible in system.jobs. On the other hand, each aborted transaction leaves behind an intent to clean up, so I don't think the load on system.jobs is any lighter than it was before. These types of races, where writing a new row depends on the absence of other rows, is kind of the worst case for optimistic Serializable isolation. If we could materialize the conflict to a single existing row that the transaction locks, we'd avoid both the serialization failures and the race to start a job. But we don't have such a row. (Maybe one day we could create a "NEXT AUTOSTATS JOB" sentinel row in system.jobs that the winner locks before claiming it and starting the job? But that seems like a big change.) For now, this PR partially reverts #119383 so that find-running-jobs is back in a separate read-only transaction before creating the job. Using a read-only transaction to check avoids the serializable errors, at the cost of sometimes creating too many jobs which will then immediately fail the repeated check within the job. We think this is going to reduce the load on system.jobs during times of high contention. Cluster setting `sql.stats.automatic_job_check_before_creating_job` can be used to get back the old behavior. In the future when we allow more than one autostats job, we hope to improve the situation by changing the mechanism for preventing concurrent stats collections on the same table. Maybe then we can have some kind of sentinel row per table to materialize the conflict. Informs: #108435 Release note: None 144568: aggmetric: switch to new btree r=RaduBerinde a=RaduBerinde Note: first commit is #144555 Use `BtreeG[MetricItem]` in the `aggmetric` code. Informs: #144504 Release note: None 144597: sql/tests: use newer btree in enum test r=RaduBerinde a=RaduBerinde Informs: #144504 Release note: None 144638: ui: fix various bugs on db console schedules page r=kyle-a-wong a=kyle-a-wong - Updated max result size of sql query to fetch schedules to ensure all schedules are rendered in db console - Fixed a bug where changing the "show" and "status" values in the url query params resulted in a "something went wrong" page. Now, if an invalid value is set in the "show" or "status" query parameters, the corresponding dropdown will reset to the default value. ("Show: Latest 50" and "Status: All" respectively) Fixes: #143925, #143924 Epic: None Release note (bug fix): Fixes bugs in the db console page: - Previously, the schedules page only showed a subset of the total schedules for a cluster due to a missing parameter in the server api call. Now the schedules table should correctly show all schedules in a cluster - Fixed a bug where manually upadting the "show" or "status" query 144640: logical: wrap recoverable errors in savepoints r=jeffswenson a=jeffswenson Internal executor statements failures are not atomic. They may fail and leave behind live KV operations. Usually this is okay because the error is returned up the stack and the caller aborts the transaction. There is a bug LDR because it recovers condition failed errors to detect LWW losses. The insert fast path writes indexes before it writes to the primary key, so these fast path failures could corrupt index entries. Now, all LDR code paths that commit a transaction containing a recoverable internal executor error wrap the statement in a savepoint. Informs: #144645 Epic: [CRDB-48647](https://cockroachlabs.atlassian.net/browse/CRDB-48647) Release note: None 144720: vecindex: remove search_beam_size cluster setting r=rafiss a=andy-kimball The `sql.vecindex.search_beam_size` cluster setting only exists to specify the default value for the vector_search_beam_size session setting. However, this is not necessary; session settings can specify a default value on their own. This commit removes the cluster setting and sets the global default to be "32" for the session setting. Epic: CRDB-42943 Release note: None Co-authored-by: Michael Erickson <[email protected]> Co-authored-by: Radu Berinde <[email protected]> Co-authored-by: Kyle Wong <[email protected]> Co-authored-by: Jeff Swenson <[email protected]> Co-authored-by: Andrew Kimball <[email protected]>
7 parents c78cb00 + ff1b46d + 9041bd2 + 8ba7b9a + 3e293ec + 48b940d + 3c9e677 commit 4689b7d

File tree

17 files changed

+274
-74
lines changed

17 files changed

+274
-74
lines changed

pkg/crosscluster/logical/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ go_library(
1717
"purgatory.go",
1818
"range_stats.go",
1919
"replication_statements.go",
20+
"savepoint.go",
2021
"sql_crud_writer.go",
2122
"sql_row_reader.go",
2223
"sql_row_writer.go",
@@ -127,6 +128,7 @@ go_test(
127128
"purgatory_test.go",
128129
"range_stats_test.go",
129130
"replication_statements_test.go",
131+
"savepoint_test.go",
130132
"sql_row_reader_test.go",
131133
"sql_row_writer_test.go",
132134
"table_batch_handler_test.go",

pkg/crosscluster/logical/logical_replication_job_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,61 @@ func TestLogicalStreamIngestionJobNameResolution(t *testing.T) {
181181
}
182182
}
183183

184+
func TestOptimsitcInsertCorruption(t *testing.T) {
185+
defer leaktest.AfterTest(t)()
186+
skip.UnderDeadlock(t)
187+
defer log.Scope(t).Close(t)
188+
189+
// This is a regression test for #144645. Running the optimistic insert code
190+
// path could corrupt indexes if the insert is not wrapped in a savepoint.
191+
192+
ctx := context.Background()
193+
194+
server, s, dbSource, dbDest := setupLogicalTestServer(t, ctx, testClusterBaseClusterArgs, 1)
195+
defer server.Stopper().Stop(ctx)
196+
197+
dbSource.Exec(t, "SET CLUSTER SETTING logical_replication.consumer.try_optimistic_insert.enabled = true")
198+
199+
createTableStmt := `CREATE TABLE computed_cols (
200+
a INT,
201+
b INT,
202+
c INT,
203+
PRIMARY KEY (a, b)
204+
)`
205+
dbSource.Exec(t, createTableStmt)
206+
dbDest.Exec(t, createTableStmt)
207+
208+
createIdxStmt := `CREATE INDEX c ON computed_cols (c)`
209+
dbSource.Exec(t, createIdxStmt)
210+
dbDest.Exec(t, createIdxStmt)
211+
212+
// Insert initial data into destination that should be overwritten. This
213+
// gives more opportunities for corruption.
214+
dbDest.Exec(t, "INSERT INTO computed_cols (a, b, c) VALUES (1, 2, 3), (3, 4, 5)")
215+
dbSource.Exec(t, "INSERT INTO computed_cols (a, b, c) VALUES (1, 2, 6), (3, 4, 7)")
216+
217+
// Create logical replication stream from source to destination
218+
sourceURL := replicationtestutils.GetExternalConnectionURI(t, s, s, serverutils.DBName("a"))
219+
var jobID jobspb.JobID
220+
dbDest.QueryRow(t,
221+
"CREATE LOGICAL REPLICATION STREAM FROM TABLE computed_cols ON $1 INTO TABLE computed_cols",
222+
sourceURL.String(),
223+
).Scan(&jobID)
224+
225+
WaitUntilReplicatedTime(t, s.Clock().Now(), dbDest, jobID)
226+
227+
dbSource.CheckQueryResults(t, "SELECT * FROM computed_cols", [][]string{
228+
{"1", "2", "6"},
229+
{"3", "4", "7"},
230+
})
231+
dbDest.CheckQueryResults(t, "SELECT * FROM computed_cols", [][]string{
232+
{"1", "2", "6"},
233+
{"3", "4", "7"},
234+
})
235+
236+
compareReplicatedTables(t, s, "a", "b", "computed_cols", dbSource, dbDest)
237+
}
238+
184239
type fatalDLQ struct{ *testing.T }
185240

186241
func (fatalDLQ) Create(ctx context.Context) error { return nil }

pkg/crosscluster/logical/lww_row_processor.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -640,7 +640,11 @@ func (lww *lwwQuerier) InsertRow(
640640
if !useLowPriority.Get(&lww.settings.SV) {
641641
sess.QualityOfService = nil
642642
}
643-
if _, err = ie.ExecParsed(ctx, replicatedOptimisticInsertOpName, kvTxn, sess, stmt, datums...); err != nil {
643+
err = withSavepoint(ctx, kvTxn, func() error {
644+
_, err = ie.ExecParsed(ctx, replicatedOptimisticInsertOpName, kvTxn, sess, stmt, datums...)
645+
return err
646+
})
647+
if err != nil {
644648
if isLwwLoser(err) {
645649
return batchStats{}, nil
646650
}
@@ -667,7 +671,10 @@ func (lww *lwwQuerier) InsertRow(
667671
sess.QualityOfService = nil
668672
}
669673
sess.OriginTimestampForLogicalDataReplication = row.MvccTimestamp
670-
_, err = ie.ExecParsed(ctx, replicatedInsertOpName, kvTxn, sess, stmt, datums...)
674+
err = withSavepoint(ctx, kvTxn, func() error {
675+
_, err = ie.ExecParsed(ctx, replicatedInsertOpName, kvTxn, sess, stmt, datums...)
676+
return err
677+
})
671678
if isLwwLoser(err) {
672679
return batchStats{}, nil
673680
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package logical
7+
8+
import (
9+
"context"
10+
11+
"github.com/cockroachdb/cockroach/pkg/kv"
12+
"github.com/cockroachdb/errors"
13+
)
14+
15+
// withSavepoint is a utility function that runs the provided function within
16+
// the context of the savepoint.
17+
//
18+
// If the caller wishes to recover from an error returned by an internal
19+
// executor and keep the transaction, then the call to the internal executor
20+
// must be scoped within a savepoint. Failed internal executor calls are not
21+
// atomic and may have left behind some KV operations. Usually that is okay
22+
// because the error is passed up the call stack and the transaction is rolled
23+
// back.
24+
func withSavepoint(ctx context.Context, txn *kv.Txn, fn func() error) error {
25+
// In the classic sql writer the txn is optional.
26+
if txn == nil {
27+
return fn()
28+
}
29+
// TODO(jeffswenson): consider changing the internal executor so all calls
30+
// implicitly create and apply/rollback savepoints.
31+
savepoint, err := txn.CreateSavepoint(ctx)
32+
if err != nil {
33+
return err
34+
}
35+
err = fn()
36+
if err != nil {
37+
// NOTE: we return the save point error if rollback fails because we do not
38+
// want something checking error types to attempt to handle the inner
39+
// error.
40+
if savePointErr := txn.RollbackToSavepoint(ctx, savepoint); savePointErr != nil {
41+
return errors.WithSecondaryError(savePointErr, err)
42+
}
43+
return err
44+
}
45+
if err := txn.ReleaseSavepoint(ctx, savepoint); err != nil {
46+
return err
47+
}
48+
return nil
49+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package logical
7+
8+
import (
9+
"context"
10+
"testing"
11+
12+
"github.com/cockroachdb/cockroach/pkg/base"
13+
"github.com/cockroachdb/cockroach/pkg/kv"
14+
"github.com/cockroachdb/cockroach/pkg/sql"
15+
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
16+
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
17+
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
18+
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
19+
"github.com/cockroachdb/cockroach/pkg/util/log"
20+
"github.com/cockroachdb/errors"
21+
"github.com/stretchr/testify/require"
22+
)
23+
24+
func TestWithSavepoint(t *testing.T) {
25+
defer leaktest.AfterTest(t)()
26+
defer log.Scope(t).Close(t)
27+
28+
srv, rawDB, _ := serverutils.StartServer(t, base.TestServerArgs{})
29+
defer srv.Stopper().Stop(context.Background())
30+
31+
ctx := context.Background()
32+
sqlDB := sqlutils.MakeSQLRunner(rawDB)
33+
sqlDB.Exec(t, "CREATE TABLE test (id STRING PRIMARY KEY, value STRING)")
34+
35+
require.NoError(t, srv.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
36+
err := withSavepoint(ctx, txn, func() error {
37+
_, err := srv.InternalExecutor().(*sql.InternalExecutor).ExecEx(
38+
ctx,
39+
"test-insert",
40+
txn,
41+
sessiondata.NodeUserSessionDataOverride,
42+
"INSERT INTO defaultdb.test VALUES ('ok', 'is-persisted')",
43+
)
44+
return err
45+
})
46+
require.NoError(t, err)
47+
48+
err = withSavepoint(ctx, txn, func() error {
49+
_, err := srv.InternalExecutor().(*sql.InternalExecutor).ExecEx(
50+
ctx,
51+
"test-insert",
52+
txn,
53+
sessiondata.NodeUserSessionDataOverride,
54+
"INSERT INTO defaultdb.test VALUES ('fails', 'is-rolled-back')",
55+
)
56+
require.NoError(t, err)
57+
// NOTE: the query above is okay, which means it wrote things to KV,
58+
// but we're going to return an error which rolls back the
59+
// savepoint.
60+
return errors.New("something to rollback")
61+
})
62+
require.ErrorContains(t, err, "something to rollback")
63+
64+
return nil
65+
}))
66+
67+
sqlDB.CheckQueryResults(t, "SELECT id, value FROM test", [][]string{
68+
{"ok", "is-persisted"},
69+
})
70+
}

pkg/crosscluster/logical/table_batch_handler.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,9 @@ func (t *tableHandler) attemptBatch(
170170
stats.kvLwwLosers += tombstoneUpdateStats.kvWriteTooOld
171171
case event.prevRow == nil:
172172
stats.inserts++
173-
err := t.sqlWriter.InsertRow(ctx, txn, event.originTimestamp, event.row)
173+
err := withSavepoint(ctx, txn.KV(), func() error {
174+
return t.sqlWriter.InsertRow(ctx, txn, event.originTimestamp, event.row)
175+
})
174176
if isLwwLoser(err) {
175177
// Insert may observe a LWW failure if it attempts to write over a tombstone.
176178
stats.kvLwwLosers++

pkg/settings/registry.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -271,8 +271,13 @@ var retiredSettings = map[InternalKey]struct{}{
271271

272272
// grandfatheredDefaultSettings is the list of "grandfathered" existing sql.defaults
273273
// cluster settings. In 22.2 and later, new session settings do not need an
274-
// associated sql.defaults cluster setting. Instead they can have their default
275-
// changed with ALTER ROLE ... SET.
274+
// associated sql.defaults cluster setting (see the `vector_search_beam_size`
275+
// setting in vars.go for an example). A session setting can have its default
276+
// changed with ALTER ROLE ... SET, similar to this (the example assumes that
277+
// all roles should use the new default):
278+
//
279+
// ALTER ROLE ALL SET vector_search_beam_size=128;
280+
//
276281
// Caveat: in some cases, we may still add new sql.defaults cluster settings,
277282
// but the new ones *must* be marked as non-public. Undocumented settings are
278283
// excluded from the check that prevents new sql.defaults settings. The

pkg/sql/create_stats.go

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,13 @@ var nonIndexJSONHistograms = settings.RegisterBoolSetting(
7676
false,
7777
settings.WithPublic)
7878

79+
var automaticJobCheckBeforeCreatingJob = settings.RegisterBoolSetting(
80+
settings.ApplicationLevel,
81+
"sql.stats.automatic_job_check_before_creating_job.enabled",
82+
"set to true to perform the autostats job check before creating the job, instead of in the same "+
83+
"transaction as creating the job",
84+
true)
85+
7986
const nonIndexColHistogramBuckets = 2
8087

8188
// StubTableStats generates "stub" statistics for a table which are missing
@@ -148,18 +155,27 @@ func (n *createStatsNode) runJob(ctx context.Context) error {
148155
}
149156
details := record.Details.(jobspb.CreateStatsDetails)
150157

151-
if n.Name != jobspb.AutoStatsName && n.Name != jobspb.AutoPartialStatsName {
158+
jobCheckBefore := automaticJobCheckBeforeCreatingJob.Get(n.p.ExecCfg().SV())
159+
if (n.Name == jobspb.AutoStatsName || n.Name == jobspb.AutoPartialStatsName) && jobCheckBefore {
160+
// Don't start the job if there is already a CREATE STATISTICS job running.
161+
// (To handle race conditions we check this again after the job starts,
162+
// but this check is used to prevent creating a large number of jobs that
163+
// immediately fail).
164+
if err := checkRunningJobs(
165+
ctx, nil /* job */, n.p, n.Name == jobspb.AutoPartialStatsName, n.p.ExecCfg().JobRegistry,
166+
details.Table.ID,
167+
); err != nil {
168+
return err
169+
}
170+
} else {
152171
telemetry.Inc(sqltelemetry.CreateStatisticsUseCounter)
153172
}
154173

155174
var job *jobs.StartableJob
156175
jobID := n.p.ExecCfg().JobRegistry.MakeJobID()
157176
if err := n.p.ExecCfg().InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) (err error) {
158-
if n.Name == jobspb.AutoStatsName || n.Name == jobspb.AutoPartialStatsName {
177+
if (n.Name == jobspb.AutoStatsName || n.Name == jobspb.AutoPartialStatsName) && !jobCheckBefore {
159178
// Don't start the job if there is already a CREATE STATISTICS job running.
160-
// (To handle race conditions we check this again after the job starts,
161-
// but this check is used to prevent creating a large number of jobs that
162-
// immediately fail).
163179
if err := checkRunningJobsInTxn(
164180
ctx, n.p.EvalContext().Settings, jobspb.InvalidJobID, txn,
165181
); err != nil {

pkg/sql/tests/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,13 +146,13 @@ go_test(
146146
"@com_github_cockroachdb_cockroach_go_v2//crdb/crdbpgxv5",
147147
"@com_github_cockroachdb_datadriven//:datadriven",
148148
"@com_github_cockroachdb_errors//:errors",
149+
"@com_github_google_btree//:btree",
149150
"@com_github_google_pprof//profile",
150151
"@com_github_jackc_pgx_v5//:pgx",
151152
"@com_github_jackc_pgx_v5//pgconn",
152153
"@com_github_kr_pretty//:pretty",
153154
"@com_github_lib_pq//:pq",
154155
"@com_github_petermattis_goid//:goid",
155-
"@com_github_raduberinde_btree//:btree",
156156
"@com_github_stretchr_testify//assert",
157157
"@com_github_stretchr_testify//require",
158158
"@org_golang_x_sync//errgroup",

pkg/sql/tests/enum_test.go

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,20 @@
66
package tests
77

88
import (
9+
"cmp"
910
"context"
1011
"fmt"
1112
"math/rand"
1213
"sort"
1314
"strings"
1415
"testing"
1516

16-
"github.com/RaduBerinde/btree" // TODO(#144504): switch to the newer btree
1717
"github.com/cockroachdb/cockroach/pkg/base"
1818
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
1919
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
2020
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
2121
"github.com/cockroachdb/cockroach/pkg/util/log"
22+
"github.com/google/btree"
2223
"github.com/stretchr/testify/require"
2324
)
2425

@@ -46,17 +47,17 @@ func TestLargeEnums(t *testing.T) {
4647
// have to wait for lots of versions and it would take a very long time.
4748
var createEnumsQuery string
4849
{
49-
alreadyInserted := btree.New(8)
50+
alreadyInserted := btree.NewG[int](8, cmp.Less[int])
5051
next := func(n int) (next int, ok bool) {
51-
alreadyInserted.AscendGreaterOrEqual(intItem(n), func(i btree.Item) (wantMore bool) {
52-
next, ok = int(i.(intItem)), true
52+
alreadyInserted.AscendGreaterOrEqual(n, func(i int) (wantMore bool) {
53+
next, ok = i, true
5354
return false
5455
})
5556
return next, ok
5657
}
5758
prev := func(n int) (prev int, ok bool) {
58-
alreadyInserted.DescendLessOrEqual(intItem(n), func(i btree.Item) (wantMore bool) {
59-
prev, ok = int(i.(intItem)), true
59+
alreadyInserted.DescendLessOrEqual(n, func(i int) (wantMore bool) {
60+
prev, ok = i, true
6061
return false
6162
})
6263
return prev, ok
@@ -74,7 +75,7 @@ func TestLargeEnums(t *testing.T) {
7475
require.Truef(t, ok, "prev %v %v", n, order[:i])
7576
fmt.Fprintf(&buf, " AFTER '%d';\n", prev)
7677
}
77-
alreadyInserted.ReplaceOrInsert(intItem(n))
78+
alreadyInserted.ReplaceOrInsert(n)
7879
}
7980
buf.WriteString("COMMIT;")
8081
createEnumsQuery = buf.String()
@@ -101,12 +102,6 @@ func TestLargeEnums(t *testing.T) {
101102
require.Truef(t, sort.IntsAreSorted(read), "%v", read)
102103
}
103104

104-
type intItem int
105-
106-
func (i intItem) Less(o btree.Item) bool {
107-
return i < o.(intItem)
108-
}
109-
110105
// TestEnumPlaceholderWithAsOfSystemTime is a regression test for an edge case
111106
// with bind where we would not properly deal with leases involving types.
112107
func TestEnumPlaceholderWithAsOfSystemTime(t *testing.T) {

0 commit comments

Comments
 (0)