Skip to content

Commit b077a4e

Browse files
craig[bot]rafissmiraradevawenyihu6herkolategan
committed
155795: workload/schemachange: remove FK violation prediction for INSERT operations r=rafiss a=rafiss Previously, the random schema workload attempted to predict whether INSERT operations would violate foreign key constraints by analyzing the database state and constraint definitions. This prediction logic involved: - Querying the database to check if FK violations would occur - Checking for ongoing FK constraint mutations - Distinguishing between expected and potential FK violations This commit removes all FK violation prediction logic and makes ForeignKeyViolation unconditionally a potential execution and commit error for INSERT operations. This aligns with the existing pattern used for adding FK constraints, where FK violations are always treated as potential errors due to the asynchronous validation job. This change simplifies the code by removing ~200 lines of complex prediction logic and deflakes tests that depend on accurate error handling during concurrent schema changes. Resolves: #155686 Resolves: #154715 Resolves: #152402 Epic: None Release note: None 🤖 Generated with [Claude Code](https://claude.com/claude-code) 155798: kvnemesis: log a metrics report r=stevendanna a=miraradeva This commit adds logging of some key metrics at the end of each kvnemesis run. Fixes: #153793 Release note: None ---- This is what the report looks like for a run of `TestKVNemesisMultiNode`: ``` Metric | Node 1 | Node 2 | Node 3 | Node 4 ------------------------------------+----------------------+----------------------+----------------------+--------------------- raft.commands.proposed | 2906 | 117 | 273 | 118 raft.commands.reproposed.new-lai | 0 | 1 | 12 | 0 raft.commands.reproposed.unchanged | 78 | 0 | 0 | 0 txn.server_side.1PC.success | 84 | 0 | 1 | 0 txnrecovery.successes.committed | 0 | 0 | 0 | 0 txnwaitqueue.deadlocks_total | 0 | 0 | 4 | 0 txn.aborts | 13 | 10 | 8 | 12 txn.durations | μ=25ms p99=260ms | μ=82ms p99=671ms | μ=81ms p99=1.476s | μ=224ms p99=4.832s txn.restarts.writetooold | 8 | 1 | 2 | 1 txn.restarts.serializable | 1 | 0 | 0 | 0 txn.restarts.readwithinuncertainty | 0 | 0 | 0 | 0 ``` 155850: kvserver/rangefeed: move from log.KvDistribution to log.KvExec r=stevendanna a=wenyihu6 Epic: none Release note: none 155853: roachprod: fix label propagation on workload machines r=DarrylWong,golgeek a=herkolategan Previously, label propagation did not take the boot-disk-only parameter of workload machines into account. This change fixes the propagation to skip trying to add labels to non-existent persistent disks if the boot-disk-only flag has been passed to the provider. Epic: None Release note: None Co-authored-by: Rafi Shamim <[email protected]> Co-authored-by: Mira Radeva <[email protected]> Co-authored-by: wenyihu6 <[email protected]> Co-authored-by: Herko Lategan <[email protected]>
5 parents 9961f05 + 0688885 + 6ddfc2c + e404533 + 83dfe8e commit b077a4e

26 files changed

+182
-348
lines changed

pkg/kv/kvnemesis/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ go_test(
113113
"//pkg/util/hlc",
114114
"//pkg/util/leaktest",
115115
"//pkg/util/log",
116+
"//pkg/util/metric",
116117
"//pkg/util/randutil",
117118
"//pkg/util/stop",
118119
"//pkg/util/syncutil",

pkg/kv/kvnemesis/kvnemesis_test.go

Lines changed: 82 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"os"
1414
"path"
1515
"strconv"
16+
"strings"
1617
"testing"
1718
"time"
1819

@@ -46,6 +47,7 @@ import (
4647
"github.com/cockroachdb/cockroach/pkg/util/hlc"
4748
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
4849
"github.com/cockroachdb/cockroach/pkg/util/log"
50+
"github.com/cockroachdb/cockroach/pkg/util/metric"
4951
"github.com/cockroachdb/cockroach/pkg/util/randutil"
5052
"github.com/cockroachdb/cockroach/pkg/util/stop"
5153
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
@@ -698,19 +700,91 @@ func testKVNemesisImpl(t testing.TB, cfg kvnemesisTestCfg) {
698700
env := &Env{SQLDBs: sqlDBs, Tracker: tr, L: logger, Partitioner: &partitioner, Restarter: tc}
699701
failures, err := RunNemesis(ctx, rng, env, config, cfg.concurrency, cfg.numSteps, cfg.mode, dbs...)
700702

701-
for i := 0; i < cfg.numNodes; i++ {
702-
t.Logf("[%d] proposed: %d", i,
703-
tc.GetFirstStoreFromServer(t, i).Metrics().RaftCommandsProposed.Count())
704-
t.Logf("[%d] reproposed unchanged: %d", i,
705-
tc.GetFirstStoreFromServer(t, i).Metrics().RaftCommandsReproposed.Count())
706-
t.Logf("[%d] reproposed with new LAI: %d", i,
707-
tc.GetFirstStoreFromServer(t, i).Metrics().RaftCommandsReproposedLAI.Count())
708-
}
703+
logMetricsReport(t, tc)
709704

710705
require.NoError(t, err, `%+v`, err)
711706
require.Zero(t, len(failures), "kvnemesis detected failures") // they've been logged already
712707
}
713708

709+
func logMetricsReport(t testing.TB, tc *testcluster.TestCluster) {
710+
metricsOfInterest := []string{
711+
// Raft command metrics
712+
"raft.commands.proposed",
713+
"raft.commands.reproposed.new-lai",
714+
"raft.commands.reproposed.unchanged",
715+
// Transaction metrics
716+
"txn.aborts",
717+
"txn.durations",
718+
"txn.restarts.writetooold",
719+
"txn.restarts.serializable",
720+
"txn.restarts.readwithinuncertainty",
721+
"txn.server_side.1PC.success",
722+
"txnrecovery.failures",
723+
"txnrecovery.successes.aborted",
724+
"txnrecovery.successes.committed",
725+
"txnwaitqueue.deadlocks_total",
726+
}
727+
728+
numNodes := tc.NumServers()
729+
nodeMetrics := make([]map[string]string, numNodes)
730+
metricsMap := make(map[string]struct{}, len(metricsOfInterest))
731+
for _, m := range metricsOfInterest {
732+
metricsMap[m] = struct{}{}
733+
}
734+
for i := 0; i < numNodes; i++ {
735+
nodeMetrics[i] = make(map[string]string)
736+
processMetric := func(name string, v interface{}) {
737+
switch val := v.(type) {
738+
case *metric.Counter:
739+
nodeMetrics[i][name] = fmt.Sprintf("%d", val.Count())
740+
case *metric.Gauge:
741+
nodeMetrics[i][name] = fmt.Sprintf("%d", val.Value())
742+
case metric.IHistogram:
743+
// TODO(mira): Currently, histograms are assumed to store values of type
744+
// time duration.
745+
snapshot := val.CumulativeSnapshot()
746+
makePretty := func(nanos float64) time.Duration {
747+
return time.Duration(nanos).Round(time.Millisecond)
748+
}
749+
meanDur := makePretty(snapshot.Mean())
750+
p50Dur := makePretty(snapshot.ValueAtQuantile(50))
751+
p99Dur := makePretty(snapshot.ValueAtQuantile(99))
752+
nodeMetrics[i][name] = fmt.Sprintf("μ=%v p50=%v p99=%v", meanDur, p50Dur, p99Dur)
753+
default:
754+
nodeMetrics[i][name] = fmt.Sprintf("unknown:%T", v)
755+
}
756+
}
757+
758+
clientRegistry := tc.Server(i).StorageLayer().MetricsRecorder().AppRegistry()
759+
nodeRegistry := tc.Server(i).StorageLayer().MetricsRecorder().NodeRegistry()
760+
storeID := tc.GetFirstStoreFromServer(t, i).StoreID()
761+
storeRegistry := tc.Server(i).StorageLayer().MetricsRecorder().StoreRegistry(storeID)
762+
clientRegistry.Select(metricsMap, func(name string, v interface{}) { processMetric(name, v) })
763+
nodeRegistry.Select(metricsMap, func(name string, v interface{}) { processMetric(name, v) })
764+
storeRegistry.Select(metricsMap, func(name string, v interface{}) { processMetric(name, v) })
765+
}
766+
767+
header := fmt.Sprintf("%-35s", "Metric")
768+
for i := 0; i < numNodes; i++ {
769+
header += fmt.Sprintf(" | %-30s", fmt.Sprintf("Node %d", i+1))
770+
}
771+
t.Log(header)
772+
773+
separator := strings.Repeat("-", 35)
774+
for i := 0; i < numNodes; i++ {
775+
separator += "-+-" + strings.Repeat("-", 30)
776+
}
777+
t.Log(separator)
778+
779+
for _, metricName := range metricsOfInterest {
780+
row := fmt.Sprintf("%-35s", metricName)
781+
for i := 0; i < numNodes; i++ {
782+
row += fmt.Sprintf(" | %-30s", nodeMetrics[i][metricName])
783+
}
784+
t.Log(row)
785+
}
786+
}
787+
714788
// TestRunReproductionSteps is a helper that allows quickly running a kvnemesis
715789
// history.
716790
func TestRunReproductionSteps(t *testing.T) {

pkg/kv/kvserver/closedts/policyrefresher/policy_refresher.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func NewPolicyRefresher(
6969
knobs *TestingKnobs,
7070
) *PolicyRefresher {
7171
if getLeaseholderReplicas == nil || getNodeLatencies == nil {
72-
log.KvDistribution.Fatalf(context.Background(), "getLeaseholderReplicas and getNodeLatencies must be non-nil")
72+
log.KvExec.Fatalf(context.Background(), "getLeaseholderReplicas and getNodeLatencies must be non-nil")
7373
return nil
7474
}
7575
refresher := &PolicyRefresher{

pkg/kv/kvserver/closedts/sidetransport/receiver.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ func (s *Receiver) onRecvErr(ctx context.Context, nodeID roachpb.NodeID, err err
147147
defer s.mu.Unlock()
148148

149149
if err != io.EOF {
150-
log.KvDistribution.Warningf(ctx, "closed timestamps side-transport connection dropped from node: %d (%s)", nodeID, err)
150+
log.KvExec.Warningf(ctx, "closed timestamps side-transport connection dropped from node: %d (%s)", nodeID, err)
151151
} else {
152152
log.VEventf(ctx, 2, "closed timestamps side-transport connection dropped from node: %d (%s)", nodeID, err)
153153
}
@@ -236,11 +236,11 @@ func (r *incomingStream) processUpdate(ctx context.Context, msg *ctpb.Update) {
236236
log.VEventf(ctx, 4, "received side-transport update: %v", msg)
237237

238238
if msg.NodeID == 0 {
239-
log.KvDistribution.Fatalf(ctx, "missing NodeID in message: %s", msg)
239+
log.KvExec.Fatalf(ctx, "missing NodeID in message: %s", msg)
240240
}
241241

242242
if msg.NodeID != r.nodeID {
243-
log.KvDistribution.Fatalf(ctx, "wrong NodeID; expected %d, got %d", r.nodeID, msg.NodeID)
243+
log.KvExec.Fatalf(ctx, "wrong NodeID; expected %d, got %d", r.nodeID, msg.NodeID)
244244
}
245245

246246
// Handle the removed ranges. In order to not lose closed ts info, before we
@@ -259,11 +259,11 @@ func (r *incomingStream) processUpdate(ctx context.Context, msg *ctpb.Update) {
259259
for _, rangeID := range msg.Removed {
260260
info, ok := r.mu.tracked[rangeID]
261261
if !ok {
262-
log.KvDistribution.Fatalf(ctx, "attempting to unregister a missing range: r%d", rangeID)
262+
log.KvExec.Fatalf(ctx, "attempting to unregister a missing range: r%d", rangeID)
263263
}
264264
ts, ok := r.mu.lastClosed[info.policy]
265265
if !ok {
266-
log.KvDistribution.Fatalf(ctx, "missing closed timestamp policy %v for range r%d", info.policy, rangeID)
266+
log.KvExec.Fatalf(ctx, "missing closed timestamp policy %v for range r%d", info.policy, rangeID)
267267
}
268268
r.stores.ForwardSideTransportClosedTimestampForRange(ctx, rangeID, ts, info.lai)
269269
}
@@ -279,7 +279,7 @@ func (r *incomingStream) processUpdate(ctx context.Context, msg *ctpb.Update) {
279279
r.mu.lastClosed = make(map[ctpb.RangeClosedTimestampPolicy]hlc.Timestamp, len(r.mu.lastClosed))
280280
r.mu.tracked = make(map[roachpb.RangeID]trackedRange, len(r.mu.tracked))
281281
} else if msg.SeqNum != r.mu.lastSeqNum+1 {
282-
log.KvDistribution.Fatalf(ctx, "expected closed timestamp side-transport message with sequence number "+
282+
log.KvExec.Fatalf(ctx, "expected closed timestamp side-transport message with sequence number "+
283283
"%d, got %d", r.mu.lastSeqNum+1, msg.SeqNum)
284284
}
285285
r.mu.lastSeqNum = msg.SeqNum
@@ -328,13 +328,13 @@ func (r *incomingStream) Run(
328328
r.nodeID = msg.NodeID
329329

330330
if err := r.server.onFirstMsg(ctx, r, r.nodeID); err != nil {
331-
log.KvDistribution.Warningf(ctx, "%s", err.Error())
331+
log.KvExec.Warningf(ctx, "%s", err.Error())
332332
return
333333
} else if ch := r.testingKnobs.onFirstMsg; ch != nil {
334334
ch <- struct{}{}
335335
}
336336
if !msg.Snapshot {
337-
log.KvDistribution.Fatal(ctx, "expected the first message to be a snapshot")
337+
log.KvExec.Fatal(ctx, "expected the first message to be a snapshot")
338338
}
339339
r.AddLogTag("remote", r.nodeID)
340340
ctx = r.AnnotateCtx(ctx)

pkg/kv/kvserver/closedts/sidetransport/sender.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -653,7 +653,7 @@ func (b *updatesBuf) Push(ctx context.Context, update *ctpb.Update) {
653653
if b.sizeLocked() != 0 {
654654
lastIdx := b.lastIdxLocked()
655655
if prevSeq := b.mu.data[lastIdx].SeqNum; prevSeq != update.SeqNum-1 {
656-
log.KvDistribution.Fatalf(ctx, "bad sequence number; expected %d, got %d", prevSeq+1, update.SeqNum)
656+
log.KvExec.Fatalf(ctx, "bad sequence number; expected %d, got %d", prevSeq+1, update.SeqNum)
657657
}
658658
}
659659

@@ -764,7 +764,7 @@ func (b *updatesBuf) GetBySeq(ctx context.Context, seqNum ctpb.SeqNum) (*ctpb.Up
764764
continue
765765
}
766766
if seqNum > lastSeq+1 {
767-
log.KvDistribution.Fatalf(ctx, "skipping sequence numbers; requested: %d, last: %d", seqNum, lastSeq)
767+
log.KvExec.Fatalf(ctx, "skipping sequence numbers; requested: %d, last: %d", seqNum, lastSeq)
768768
}
769769
idx := (b.mu.head + (int)(seqNum-firstSeq)) % len(b.mu.data)
770770
return b.mu.data[idx], true
@@ -965,7 +965,7 @@ func (r *rpcConn) run(ctx context.Context, stopper *stop.Stopper) {
965965
}
966966
if err := r.maybeConnect(ctx, stopper); err != nil {
967967
if !errors.HasType(err, (*netutil.InitialHeartbeatFailedError)(nil)) && everyN.ShouldLog() {
968-
log.KvDistribution.Infof(ctx, "side-transport failed to connect to n%d: %s", r.nodeID, err)
968+
log.KvExec.Infof(ctx, "side-transport failed to connect to n%d: %s", r.nodeID, err)
969969
}
970970
time.Sleep(errSleepTime)
971971
continue
@@ -995,7 +995,7 @@ func (r *rpcConn) run(ctx context.Context, stopper *stop.Stopper) {
995995
}
996996
if err := r.stream.Send(msg); err != nil {
997997
if err != io.EOF && everyN.ShouldLog() {
998-
log.KvDistribution.Warningf(ctx, "failed to send closed timestamp message %d to n%d: %s",
998+
log.KvExec.Warningf(ctx, "failed to send closed timestamp message %d to n%d: %s",
999999
r.lastSent, r.nodeID, err)
10001000
}
10011001
// Keep track of the fact that we need a new connection.

pkg/kv/kvserver/closedts/tracker/heap_tracker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func (h *heapTracker) Track(ctx context.Context, ts hlc.Timestamp) RemovalToken
103103
func (h *heapTracker) Untrack(ctx context.Context, tok RemovalToken) {
104104
idx := tok.(heapToken).index
105105
if idx == -1 {
106-
log.KvDistribution.Fatalf(ctx, "attempting to untrack already-untracked item")
106+
log.KvExec.Fatalf(ctx, "attempting to untrack already-untracked item")
107107
}
108108
h.mu.Lock()
109109
defer h.mu.Unlock()

pkg/kv/kvserver/closedts/tracker/lockfree_tracker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ func (t *lockfreeTracker) Untrack(ctx context.Context, tok RemovalToken) {
168168
// types.
169169
refcnt := b.refcnt.Add(-1)
170170
if refcnt < 0 {
171-
log.KvDistribution.Fatalf(ctx, "negative bucket refcount: %d", refcnt)
171+
log.KvExec.Fatalf(ctx, "negative bucket refcount: %d", refcnt)
172172
}
173173
if refcnt == 0 {
174174
// Reset the bucket, so that future Track() calls can create a new one.

pkg/kv/kvserver/closedts/tracker/tracker_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ func TestLockfreeTrackerRandomStress(t *testing.T) {
167167
// hours of stressrace on GCE worker (and it failed once on CI stress too).
168168
// Figure out why.
169169
maxToleratedErrorMillis := 3 * c.maxEvaluationTime.Milliseconds()
170-
log.KvDistribution.Infof(ctx, "maximum lower bound error: %dms. maximum request evaluation time: %s",
170+
log.KvExec.Infof(ctx, "maximum lower bound error: %dms. maximum request evaluation time: %s",
171171
maxOvershotMillis, c.maxEvaluationTime)
172172
require.Lessf(t, maxOvershotMillis, maxToleratedErrorMillis,
173173
"maximum tracker lowerbound error was %dms, above maximum tolerated %dms",
@@ -446,7 +446,7 @@ func (c *trackerChecker) run(ctx context.Context) error {
446446
if c.maxOvershotNanos < overshotNanos {
447447
c.maxOvershotNanos = overshotNanos
448448
}
449-
log.KvDistribution.VInfof(ctx, 1, "lower bound error: %dms", overshotNanos/1000000)
449+
log.KvExec.VInfof(ctx, 1, "lower bound error: %dms", overshotNanos/1000000)
450450
}
451451
}
452452

@@ -517,7 +517,7 @@ func benchmarkTracker(ctx context.Context, b *testing.B, t Tracker) {
517517
toks[i] = toks[i][:0]
518518
}
519519
mu.Unlock()
520-
log.KvDistribution.VInfof(ctx, 1, "cleared %d reqs", n)
520+
log.KvExec.VInfof(ctx, 1, "cleared %d reqs", n)
521521
}
522522
}()
523523

pkg/kv/kvserver/rangefeed/budget.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ type SharedBudgetAllocation struct {
248248
func (a *SharedBudgetAllocation) Use(ctx context.Context) {
249249
if a != nil {
250250
if atomic.AddInt32(&a.refCount, 1) == 1 {
251-
log.KvDistribution.Fatalf(ctx, "unexpected shared memory allocation usage increase after free")
251+
log.KvExec.Fatalf(ctx, "unexpected shared memory allocation usage increase after free")
252252
}
253253
}
254254
}

pkg/kv/kvserver/rangefeed/buffered_registration.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ func (br *bufferedRegistration) outputLoop(ctx context.Context) error {
196196
// If the registration has a catch-up scan, run it.
197197
if err := br.maybeRunCatchUpScan(ctx); err != nil {
198198
err = errors.Wrap(err, "catch-up scan failed")
199-
log.KvDistribution.Errorf(ctx, "%v", err)
199+
log.KvExec.Errorf(ctx, "%v", err)
200200
return err
201201
}
202202

@@ -229,7 +229,7 @@ func (br *bufferedRegistration) outputLoop(ctx context.Context) error {
229229

230230
if overflowed {
231231
if wasOverflowedOnFirstIteration && br.shouldLogOverflow(oneCheckpointWithTimestampSent) {
232-
log.KvDistribution.Warningf(ctx, "rangefeed %s overflowed during catch up scan from %s (useful checkpoint sent: %v)",
232+
log.KvExec.Warningf(ctx, "rangefeed %s overflowed during catch up scan from %s (useful checkpoint sent: %v)",
233233
br.span, br.catchUpTimestamp, oneCheckpointWithTimestampSent)
234234
}
235235

0 commit comments

Comments
 (0)