Skip to content

Commit 7d97799

Browse files
craig[bot]Peter-Fayez95andy-kimballmiraradeva
committed
149975: changefeedccl: emit warning when `resolved` or `min_checkpoint_frequency` is set too low r=asg0451 a=Peter-Fayez95 This change adds client-side notices to `CREATE CHANGEFEED` and `ALTER CHANGEFEED` statements when the `resolved` or `min_checkpoint_frequency` options are set below a recommended threshold (e.g., 500ms). These warnings aim to guide users toward more balanced configurations. Setting these options too low can significantly increase CPU usage due to more frequent checkpointing and resolved timestamp emissions, introducing performance trade-offs. Epic: CRDB-52074 Fixes #149238 Release note (general change): A warning is now emitted when creating or altering a changefeed with `resolved` or `min_checkpoint_frequency` set below 500ms. This helps users understand the tradeoff between message latency and cluster CPU usage. 150146: vecindex: fix Cosine/InnerProduct accuracy bugs r=drewkimball a=andy-kimball #### cspann: use correct metric for assigning vectors during split Previously, during split the BalancedKmeans class was not initialized with the distance metric used by the index. This caused vectors to be assigned to partitions using a potentially incorrect metric, which can negatively impact accuracy. #### quantize: set zero dot product for centroid data vector Previously, the dot product between a data unit vector and its quantized form was not being set in the case where the data vector is equal to the centroid. This could cause an issue when a RaBitQuantizedSet is reused and the dot product memory is not zero. Fix this buglet and update the code to scribble undefined memory. #### quantize: recompute norm when centroid is updated When RaBitQuantizedVectorSet.Clear was called with a new centroid, the norm was not being recomputed. This commit fixes that bug. 150147: roachtest: actually fail TPCC bench if reached max warehouses r=miraradeva a=miraradeva In 1bfe55b, we attempted to fail the TPCC bench test run if the configured maximum warehouses were reached and the success criteria were met. The idea was that this would prompt us to increase the max warehouses. However, that commit failed only the specific line search run, not the full test run. This commit moves the max warehouses check out of the result handling and actually fatals the test. Part of: #148235 Release note: None Co-authored-by: Peter <[email protected]> Co-authored-by: Andrew Kimball <[email protected]> Co-authored-by: Mira Radeva <[email protected]>
4 parents d7ce141 + 2d8fc98 + f026464 + 7c99d8f commit 7d97799

File tree

12 files changed

+209
-52
lines changed

12 files changed

+209
-52
lines changed

pkg/ccl/changefeedccl/changefeed_stmt.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -875,6 +875,20 @@ func createChangefeedJobRecord(
875875
"less frequently", resolved, resolvedStr, freqStr, freq))
876876
}
877877

878+
const minRecommendedFrequency = 500 * time.Millisecond
879+
880+
if emit && resolvedOpt != nil && *resolvedOpt < minRecommendedFrequency {
881+
p.BufferClientNotice(ctx, pgnotice.Newf(
882+
"the 'resolved' timestamp interval (%s) is very low; consider increasing it to at least %s",
883+
resolvedOpt, minRecommendedFrequency))
884+
}
885+
886+
if freqOpt != nil && *freqOpt < minRecommendedFrequency {
887+
p.BufferClientNotice(ctx, pgnotice.Newf(
888+
"the 'min_checkpoint_frequency' timestamp interval (%s) is very low; consider increasing it to at least %s",
889+
freqOpt, minRecommendedFrequency))
890+
}
891+
878892
ptsExpiration, err := opts.GetPTSExpiration()
879893
if err != nil {
880894
return nil, err

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 63 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5074,6 +5074,10 @@ func TestChangefeedResolvedNotice(t *testing.T) {
50745074
defer cleanup()
50755075
s := cluster.Server(1)
50765076

5077+
// Set the default min_checkpoint_frequency to 30 seconds for this test
5078+
restoreDefault := changefeedbase.TestingSetDefaultMinCheckpointFrequency(30 * time.Second)
5079+
defer restoreDefault()
5080+
50775081
pgURL, cleanup := pgurlutils.PGUrl(t, s.SQLAddr(), t.Name(), url.User(username.RootUser))
50785082
defer cleanup()
50795083
pgBase, err := pq.NewConnector(pgURL.String())
@@ -5103,10 +5107,9 @@ func TestChangefeedResolvedNotice(t *testing.T) {
51035107
t.Run("resolved<min_checkpoint_frequency default", func(t *testing.T) {
51045108
actual = "(no notice)"
51055109
f := makeKafkaFeedFactory(t, s, dbWithHandler)
5106-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH resolved='20ms'`)
5110+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH resolved='5s'`)
51075111
defer closeFeed(t, testFeed)
5108-
// Note: default min_checkpoint_frequency is set to 100ms in startTestCluster.
5109-
require.Equal(t, `resolved (20ms) messages will not be emitted more frequently than the default min_checkpoint_frequency (100ms), but may be emitted less frequently`, actual)
5112+
require.Equal(t, `resolved (5s) messages will not be emitted more frequently than the default min_checkpoint_frequency (30s), but may be emitted less frequently`, actual)
51105113
})
51115114
t.Run("resolved=min_checkpoint_frequency", func(t *testing.T) {
51125115
actual = "(no notice)"
@@ -5131,6 +5134,63 @@ func TestChangefeedResolvedNotice(t *testing.T) {
51315134
})
51325135
}
51335136

5137+
func TestChangefeedLowFrequencyNotices(t *testing.T) {
5138+
defer leaktest.AfterTest(t)()
5139+
defer log.Scope(t).Close(t)
5140+
5141+
cluster, _, cleanup := startTestCluster(t)
5142+
defer cleanup()
5143+
s := cluster.Server(1)
5144+
5145+
pgURL, cleanup := pgurlutils.PGUrl(t, s.SQLAddr(), t.Name(), url.User(username.RootUser))
5146+
defer cleanup()
5147+
pgBase, err := pq.NewConnector(pgURL.String())
5148+
if err != nil {
5149+
t.Fatal(err)
5150+
}
5151+
var actual string
5152+
connector := pq.ConnectorWithNoticeHandler(pgBase, func(n *pq.Error) {
5153+
actual = n.Message
5154+
})
5155+
5156+
dbWithHandler := gosql.OpenDB(connector)
5157+
defer dbWithHandler.Close()
5158+
5159+
sqlDB := sqlutils.MakeSQLRunner(dbWithHandler)
5160+
5161+
sqlDB.Exec(t, `CREATE TABLE ☃ (i INT PRIMARY KEY)`)
5162+
sqlDB.Exec(t, `INSERT INTO ☃ VALUES (0)`)
5163+
5164+
t.Run("no options specified", func(t *testing.T) {
5165+
actual = "(no notice)"
5166+
f := makeKafkaFeedFactory(t, s, dbWithHandler)
5167+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/'`)
5168+
defer closeFeed(t, testFeed)
5169+
require.Equal(t, `changefeed will emit to topic _u2603_`, actual)
5170+
})
5171+
t.Run("normal resolved and min_checkpoint_frequency", func(t *testing.T) {
5172+
actual = "(no notice)"
5173+
f := makeKafkaFeedFactory(t, s, dbWithHandler)
5174+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH resolved='10s', min_checkpoint_frequency='10s'`)
5175+
defer closeFeed(t, testFeed)
5176+
require.Equal(t, `changefeed will emit to topic _u2603_`, actual)
5177+
})
5178+
t.Run("low resolved timestamp", func(t *testing.T) {
5179+
actual = "(no notice)"
5180+
f := makeKafkaFeedFactory(t, s, dbWithHandler)
5181+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH resolved='200ms'`)
5182+
defer closeFeed(t, testFeed)
5183+
require.Equal(t, `the 'resolved' timestamp interval (200ms) is very low; consider increasing it to at least 500ms`, actual)
5184+
})
5185+
t.Run("low min_checkpoint_frequency timestamp", func(t *testing.T) {
5186+
actual = "(no notice)"
5187+
f := makeKafkaFeedFactory(t, s, dbWithHandler)
5188+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH min_checkpoint_frequency='200ms'`)
5189+
defer closeFeed(t, testFeed)
5190+
require.Equal(t, `the 'min_checkpoint_frequency' timestamp interval (200ms) is very low; consider increasing it to at least 500ms`, actual)
5191+
})
5192+
}
5193+
51345194
func TestChangefeedOutputTopics(t *testing.T) {
51355195
defer leaktest.AfterTest(t)()
51365196
defer log.Scope(t).Close(t)

pkg/cmd/roachtest/tests/tpcc.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2250,7 +2250,22 @@ func runTPCCBench(ctx context.Context, t test.Test, c cluster.Cluster, b tpccBen
22502250
results = append(results, partial)
22512251
}
22522252
res = tpcc.MergeResults(results...)
2253-
failErr = res.FailureError(b.LoadWarehouses(c.Cloud()))
2253+
failErr = res.FailureError()
2254+
// If the active warehouses have reached the load warehouses, fail the test;
2255+
// it needs to be updated to allow for more warehouses. Note that the line
2256+
// search assumes that the test fails at the number of load warehouses, so it
2257+
// never attempts to reach it exactly. Therefore, active warehouses can be at
2258+
// most LoadWarehouses-1.
2259+
if res.ActiveWarehouses >= b.LoadWarehouses(c.Cloud())-1 {
2260+
err = errors.CombineErrors(
2261+
failErr,
2262+
errors.Errorf(
2263+
"the number of active warehouses (%d) reached the maximum number of "+
2264+
"warehouses; consider updating LoadWarehouses and EstimatedMax", res.ActiveWarehouses,
2265+
),
2266+
)
2267+
t.Fatal(err)
2268+
}
22542269
}
22552270

22562271
// Print the result.
@@ -2681,7 +2696,7 @@ func runTPCCPublished(
26812696
results = append(results, partial)
26822697
}
26832698
res = tpcc.MergeResults(results...)
2684-
failErr = res.FailureError(opts.LoadWarehousesGCE)
2699+
failErr = res.FailureError()
26852700
}
26862701

26872702
// Print result for current iteration

pkg/sql/vecindex/cspann/fixup_split.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1023,7 +1023,11 @@ func (fw *fixupWorker) copyToSplitSubPartitions(
10231023
defer fw.workspace.FreeUint64s(tempAssignments)
10241024

10251025
// Assign vectors to the partition with the nearest centroid.
1026-
kmeans := BalancedKmeans{Workspace: &fw.workspace, Rand: fw.rng}
1026+
kmeans := BalancedKmeans{
1027+
Workspace: &fw.workspace,
1028+
Rand: fw.rng,
1029+
DistanceMetric: fw.index.quantizer.GetDistanceMetric(),
1030+
}
10271031
leftCount = kmeans.AssignPartitions(
10281032
vectors, leftMetadata.Centroid, rightMetadata.Centroid, tempAssignments)
10291033

pkg/sql/vecindex/cspann/quantize/rabitq.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -557,9 +557,11 @@ func (q *RaBitQuantizer) quantizeHelper(
557557
// Store the inverted dot product, which will be used to make distance
558558
// estimates. The dot product is only zero in the case where the data vector
559559
// is equal to the centroid vector. That case is handled separately in
560-
// EstimatedDistances.
560+
// EstimateDistances.
561561
if dotProduct != 0 {
562562
dotProducts[i] = 1.0 / dotProduct
563+
} else {
564+
dotProducts[i] = 0
563565
}
564566
}
565567
}

pkg/sql/vecindex/cspann/quantize/rabitq_test.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,19 +150,29 @@ func TestRaBitQuantizerEdge(t *testing.T) {
150150

151151
t.Run("add centroid to set", func(t *testing.T) {
152152
quantizer := NewRaBitQuantizer(2, 42, vecpb.L2SquaredDistance)
153+
quantizedSet := quantizer.NewSet(4, []float32{3, 9}).(*RaBitQuantizedVectorSet)
153154
vectors := vector.MakeSetFromRawData([]float32{1, 5, 5, 13}, 2)
154-
quantizedSet := quantizer.Quantize(&workspace, vectors).(*RaBitQuantizedVectorSet)
155-
require.Equal(t, []float32{3, 9}, quantizedSet.Centroid)
155+
quantizer.QuantizeInSet(&workspace, quantizedSet, vectors)
156156

157157
// Add centroid to the set along with another vector.
158158
vectors = vector.MakeSetFromRawData([]float32{1, 5, 3, 9}, 2)
159159
quantizer.QuantizeInSet(&workspace, quantizedSet, vectors)
160+
require.Equal(t, float32(0), quantizedSet.QuantizedDotProducts[3],
161+
"dot product for centroid should be zero")
162+
163+
// Estimate distances from a query vector not in the set.
160164
distances := make([]float32, 4)
161165
errorBounds := make([]float32, 4)
162166
quantizer.EstimateDistances(
163167
&workspace, quantizedSet, vector.T{3, 2}, distances, errorBounds)
164168
require.Equal(t, []float32{22.33, 115.67, 22.33, 49}, testutils.RoundFloats(distances, 2))
165169
require.Equal(t, []float32{44.27, 44.27, 44.27, 0}, testutils.RoundFloats(errorBounds, 2))
170+
171+
// Estimate distances when the query vector is the centroid.
172+
quantizer.EstimateDistances(
173+
&workspace, quantizedSet, vector.T{3, 9}, distances, errorBounds)
174+
require.Equal(t, []float32{20, 20, 20, 0}, testutils.RoundFloats(distances, 2))
175+
require.Equal(t, []float32{0, 0, 0, 0}, testutils.RoundFloats(errorBounds, 2))
166176
})
167177

168178
t.Run("query vector is centroid", func(t *testing.T) {

pkg/sql/vecindex/cspann/quantize/rabitqpb.go

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,11 @@ func (cs *RaBitQCodeSet) AddUndefined(count int) {
9898
cs.Data = slices.Grow(cs.Data, count*cs.Width)
9999
cs.Count += count
100100
cs.Data = cs.Data[:cs.Count*cs.Width]
101+
if buildutil.CrdbTestBuild {
102+
for i := len(cs.Data) - count*cs.Width; i < len(cs.Data); i++ {
103+
cs.Data[i] = 0xBADF00D
104+
}
105+
}
101106
}
102107

103108
// ReplaceWithLast removes the code at the given offset from the set, replacing
@@ -145,16 +150,18 @@ func (vs *RaBitQuantizedVectorSet) Clone() QuantizedVectorSet {
145150
// Clear implements the QuantizedVectorSet interface
146151
func (vs *RaBitQuantizedVectorSet) Clear(centroid vector.T) {
147152
if buildutil.CrdbTestBuild {
148-
for i := range len(vs.CodeCounts) {
149-
vs.CodeCounts[i] = 0xBADF00D
153+
if vs.Centroid == nil {
154+
panic(errors.New("Clear cannot be called on an uninitialized vector set"))
150155
}
151-
for i := range len(vs.CentroidDistances) {
152-
vs.CentroidDistances[i] = math.Pi
153-
}
154-
for i := range len(vs.QuantizedDotProducts) {
155-
vs.QuantizedDotProducts[i] = math.Pi
156+
vs.scribble(0, len(vs.CodeCounts))
157+
}
158+
159+
// Recompute the centroid norm for Cosine and InnerProduct metrics, but only
160+
// if a new centroid is provided.
161+
if vs.Metric != vecpb.L2SquaredDistance {
162+
if &vs.Centroid[0] != &centroid[0] {
163+
vs.CentroidNorm = num32.Norm(centroid)
156164
}
157-
// RaBitQCodeSet.Clear takes care of scribbling memory for vs.Codes.
158165
}
159166

160167
// vs.Centroid is immutable, so do not try to reuse its memory.
@@ -164,11 +171,6 @@ func (vs *RaBitQuantizedVectorSet) Clear(centroid vector.T) {
164171
vs.CentroidDistances = vs.CentroidDistances[:0]
165172
vs.QuantizedDotProducts = vs.QuantizedDotProducts[:0]
166173
vs.CentroidDotProducts = vs.CentroidDotProducts[:0]
167-
if vs.Metric != vecpb.L2SquaredDistance {
168-
if &vs.Centroid[0] != &centroid[0] {
169-
vs.CentroidNorm = num32.Norm(centroid)
170-
}
171-
}
172174
}
173175

174176
// AddUndefined adds the given number of quantized vectors to this set. The new
@@ -187,4 +189,28 @@ func (vs *RaBitQuantizedVectorSet) AddUndefined(count int) {
187189
vs.CentroidDotProducts = slices.Grow(vs.CentroidDotProducts, count)
188190
vs.CentroidDotProducts = vs.CentroidDotProducts[:newCount]
189191
}
192+
if buildutil.CrdbTestBuild {
193+
vs.scribble(newCount-count, newCount)
194+
}
195+
}
196+
197+
// scribble writes garbage values to undefined vector set values. This is only
198+
// called in test builds to make detecting bugs easier.
199+
func (vs *RaBitQuantizedVectorSet) scribble(start, end int) {
200+
for i := start; i < end; i++ {
201+
vs.CodeCounts[i] = 0xBADF00D
202+
}
203+
for i := start; i < end; i++ {
204+
vs.CentroidDistances[i] = math.Pi
205+
}
206+
for i := start; i < end; i++ {
207+
vs.QuantizedDotProducts[i] = math.Pi
208+
}
209+
if vs.Metric != vecpb.L2SquaredDistance {
210+
for i := start; i < end; i++ {
211+
vs.CentroidDotProducts[i] = math.Pi
212+
}
213+
}
214+
// RaBitQCodeSet Clear and AddUndefined methods take care of scribbling
215+
// memory for vs.Codes.
190216
}

pkg/sql/vecindex/cspann/quantize/rabitqpb_test.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,10 +105,15 @@ func TestRaBitQuantizedVectorSet(t *testing.T) {
105105
require.Equal(t, []float32{10}, cloned.CentroidDistances)
106106
require.Equal(t, []float32{10}, cloned.QuantizedDotProducts)
107107

108+
// Clear the set and ensure that norm is not updated.
109+
quantizedSet.Clear(quantizedSet.Centroid)
110+
require.Equal(t, float32(0), quantizedSet.CentroidNorm)
111+
108112
// Test InnerProduct distance metric, which uses the CentroidDotProducts
109113
// field (L2Squared does not use it).
110-
quantizedSet.Clear(quantizedSet.Centroid)
111114
quantizedSet.Metric = vecpb.InnerProductDistance
115+
quantizedSet.Clear(quantizedSet.Centroid)
116+
require.Equal(t, float32(0), quantizedSet.CentroidNorm)
112117
quantizedSet.AddUndefined(2)
113118
copy(quantizedSet.Codes.At(1), []uint64{1, 2, 3})
114119
quantizedSet.CodeCounts[1] = 15
@@ -124,4 +129,8 @@ func TestRaBitQuantizedVectorSet(t *testing.T) {
124129
require.Len(t, cloned.CentroidDotProducts, 2)
125130
cloned.Clear(quantizedSet.Centroid)
126131
require.Len(t, cloned.CentroidDotProducts, 0)
132+
133+
// Update the centroid and ensure that norm is updated.
134+
quantizedSet.Clear([]float32{2, 3, 6})
135+
require.Equal(t, float32(7), quantizedSet.CentroidNorm)
127136
}

pkg/sql/vecindex/cspann/testdata/search-embeddings.ddt

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ vec420: 3781823
197197
new-index dataset=fashion-784d-1k.gob dataset-count=1000 distance-metric=Cosine min-partition-size=4 max-partition-size=16 quality-samples=8 beam-size=4 hide-tree
198198
----
199199
Created index with 1000 vectors with 784 dimensions.
200-
3 levels, 209 partitions.
200+
3 levels, 211 partitions.
201201
CV stats:
202202
level 2 - mean: 0.0000, stdev: 0.0000
203203
level 3 - mean: 0.0000, stdev: 0.0000
@@ -215,7 +215,7 @@ vec409: 0.1185
215215
vec144: 0.1197
216216
vec476: 0.124
217217
vec109: 0.1273
218-
1000 leaf vectors, 1108 vectors, 11 full vectors, 109 partitions
218+
1000 leaf vectors, 1109 vectors, 12 full vectors, 110 partitions
219219

220220
# Now use lower beam size.
221221
search max-results=10 use-dataset=999 beam-size=8
@@ -230,13 +230,13 @@ vec409: 0.1185
230230
vec144: 0.1197
231231
vec476: 0.124
232232
vec109: 0.1273
233-
84 leaf vectors, 135 vectors, 11 full vectors, 13 partitions
233+
91 leaf vectors, 134 vectors, 12 full vectors, 13 partitions
234234

235235
# InnerProduct.
236236
new-index dataset=fashion-784d-1k.gob dataset-count=1000 distance-metric=InnerProduct min-partition-size=4 max-partition-size=16 quality-samples=8 beam-size=4 hide-tree
237237
----
238238
Created index with 1000 vectors with 784 dimensions.
239-
3 levels, 239 partitions.
239+
3 levels, 245 partitions.
240240
CV stats:
241241
level 2 - mean: 0.0000, stdev: 0.0000
242242
level 3 - mean: 0.0000, stdev: 0.0000
@@ -257,22 +257,22 @@ vec312: -14063724
257257
vec197: -14040257
258258
vec476: -13816669
259259
vec311: -13589641
260-
1000 leaf vectors, 1123 vectors, 18 full vectors, 124 partitions
260+
1000 leaf vectors, 1125 vectors, 21 full vectors, 126 partitions
261261

262262
# Now use lower beam size.
263263
search max-results=10 use-dataset=999 beam-size=8
264264
----
265+
vec109: -14526173
265266
vec811: -14265605
266-
vec312: -14063724
267-
vec311: -13589641
268-
vec265: -13573769
267+
vec660: -13573067
269268
vec984: -13534513
270269
vec610: -13491291
271-
vec220: -13433810
270+
vec226: -13364679
271+
vec144: -13148124
272272
vec968: -13060514
273273
vec999: -12779612
274-
vec735: -12533078
275-
71 leaf vectors, 131 vectors, 12 full vectors, 13 partitions
274+
vec853: -12163027
275+
64 leaf vectors, 115 vectors, 10 full vectors, 13 partitions
276276

277277
# ----------------------------------------------------------------------
278278
# Load 950 1536-dimension image embeddings and search them using Cosine
@@ -322,13 +322,13 @@ CV stats:
322322

323323
recall topk=10 beam-size=4 samples=50
324324
----
325-
50.60% recall@10
325+
50.40% recall@10
326326
44 leaf vectors, 74 vectors, 18 full vectors, 7 partitions
327327

328328
recall topk=10 beam-size=8 samples=50
329329
----
330330
69.80% recall@10
331-
86 leaf vectors, 136 vectors, 21 full vectors, 13 partitions
331+
86 leaf vectors, 136 vectors, 22 full vectors, 13 partitions
332332

333333
recall topk=10 beam-size=16 samples=50
334334
----

0 commit comments

Comments
 (0)