@@ -11,13 +11,15 @@ import (
11
11
"time"
12
12
13
13
"github.com/cockroachdb/cockroach/pkg/clusterversion"
14
+ "github.com/cockroachdb/cockroach/pkg/jobs"
14
15
"github.com/cockroachdb/cockroach/pkg/kv"
15
16
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
16
17
"github.com/cockroachdb/cockroach/pkg/roachpb"
17
18
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
18
19
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
19
20
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
20
21
"github.com/cockroachdb/cockroach/pkg/sql/catalog/nstree"
22
+ "github.com/cockroachdb/cockroach/pkg/sql/isql"
21
23
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
22
24
"github.com/cockroachdb/cockroach/pkg/upgrade"
23
25
"github.com/cockroachdb/cockroach/pkg/util/envutil"
@@ -56,9 +58,29 @@ func FirstUpgradeFromRelease(
56
58
}
57
59
var descsToUpdate catalog.DescriptorIDSet
58
60
if err := all .ForEachDescriptor (func (desc catalog.Descriptor ) error {
61
+ // SKip virtual and synthetic descriptors.
62
+ switch d := desc .(type ) {
63
+ case catalog.SchemaDescriptor :
64
+ switch d .SchemaKind () {
65
+ case catalog .SchemaPublic , catalog .SchemaVirtual , catalog .SchemaTemporary :
66
+ return nil
67
+ }
68
+ case catalog.TableDescriptor :
69
+ if d .IsVirtualTable () {
70
+ return nil
71
+ }
72
+ }
59
73
changes := desc .GetPostDeserializationChanges ()
60
74
if ! changes .HasChanges () || (changes .Len () == 1 && changes .Contains (catalog .SetModTimeToMVCCTimestamp )) {
61
- return nil
75
+ // In the upgrade to 25.4 (in between 25.3 and 25.4), we do a one-time
76
+ // rewrite of all descriptors in order to upgrade them to use the new type
77
+ // serialization format. Skip the unconditional rewrite if this is a
78
+ // database descriptor, as those never reference types.
79
+ // See https://github.com/cockroachdb/cockroach/issues/152629.
80
+ duringUpgradeTo25_4 := d .Settings .Version .IsActive (ctx , clusterversion .V25_3 ) && ! d .Settings .Version .IsActive (ctx , clusterversion .V25_4 )
81
+ if ! duringUpgradeTo25_4 || desc .DescriptorType () == catalog .Database {
82
+ return nil
83
+ }
62
84
}
63
85
descsToUpdate .Add (desc .GetID ())
64
86
return nil
@@ -82,6 +104,10 @@ func upgradeDescriptors(
82
104
repairBatchTimeLimit := 1 * time .Minute
83
105
currentIdx := 0
84
106
idsToRewrite := ids .Ordered ()
107
+ totalDescs := len (idsToRewrite )
108
+ every := log .Every (time .Minute )
109
+ log .Dev .Infof (ctx , "upgrading format of %d descriptors for first upgrade from release" , totalDescs )
110
+
85
111
for currentIdx <= len (idsToRewrite ) {
86
112
descBatch := idsToRewrite [currentIdx :min (currentIdx + batchSize , len (idsToRewrite ))]
87
113
err := timeutil .RunWithTimeout (ctx , "repair-post-deserialization" , repairBatchTimeLimit , func (ctx context.Context ) error {
@@ -109,11 +135,6 @@ func upgradeDescriptors(
109
135
if d .Settings .Version .IsActive (ctx , clusterversion .V25_4 ) {
110
136
continue
111
137
}
112
- // Skip the unconditional rewrite if this is a database descriptor,
113
- // as those never reference types.
114
- if mut .DescriptorType () == catalog .Database {
115
- continue
116
- }
117
138
}
118
139
key := catalogkeys .MakeDescMetadataKey (d .Codec , mut .GetID ())
119
140
b .CPut (key , mut .DescriptorProto (), mut .GetRawBytesInStorage ())
@@ -135,6 +156,19 @@ func upgradeDescriptors(
135
156
return err
136
157
}
137
158
currentIdx += batchSize
159
+
160
+ if every .ShouldLog () {
161
+ completedDescs := min (currentIdx , totalDescs )
162
+ log .Dev .Infof (ctx , "upgraded %d of %d descriptors so far" , completedDescs , totalDescs )
163
+ if jobID := d .OptionalJobID ; jobID != 0 {
164
+ frac := float32 (completedDescs ) / float32 (totalDescs )
165
+ if err := d .DB .Txn (ctx , func (ctx context.Context , txn isql.Txn ) error {
166
+ return jobs .ProgressStorage (jobID ).Set (ctx , txn , float64 (frac ), txn .KV ().ReadTimestamp ())
167
+ }); err != nil {
168
+ log .Dev .Warningf (ctx , "failed to update progress for job %d: %v" , jobID , err )
169
+ }
170
+ }
171
+ }
138
172
}
139
173
return nil
140
174
}
@@ -171,16 +205,16 @@ func FirstUpgradeFromReleasePrecondition(
171
205
// there are no corruptions now. Otherwise, we retry and do everything
172
206
// without an AOST clause henceforth.
173
207
withAOST := firstUpgradePreconditionUsesAOST
174
- diagnose := func (tbl redact.SafeString ) (hasRows bool , err error ) {
208
+ diagnose := func (tbl redact.SafeString ) (count int , err error ) {
175
209
withAOST := withAOST
176
210
for {
177
211
q := fmt .Sprintf ("SELECT count(*) FROM \" \" .crdb_internal.%s" , tbl )
178
212
if withAOST {
179
213
q = q + " AS OF SYSTEM TIME '-10s'"
180
214
}
181
215
row , err := d .InternalExecutor .QueryRow (ctx , redact .Sprintf ("query-%s" , tbl ), nil /* txn */ , q )
182
- if err == nil && row [ 0 ]. String () != "0" {
183
- hasRows = true
216
+ if err == nil {
217
+ count = int ( tree . MustBeDInt ( row [ 0 ]))
184
218
}
185
219
// In tests like "declarative_schema_changer/job-compatibility-mixed-version", its
186
220
// possible to hit BatchTimestampBeforeGCError, because the GC interval is
@@ -191,24 +225,25 @@ func FirstUpgradeFromReleasePrecondition(
191
225
withAOST = false
192
226
continue
193
227
}
194
- return hasRows , err
228
+ return count , err
195
229
}
196
230
}
197
231
// Check for possibility of time travel.
198
- if hasRows , err := diagnose ("databases" ); err != nil {
232
+ if count , err := diagnose ("databases" ); err != nil {
199
233
return err
200
- } else if ! hasRows {
234
+ } else if count == 0 {
201
235
// We're looking back in time to before the cluster was bootstrapped
202
236
// and no databases exist at that point. Disable time-travel henceforth.
203
237
withAOST = false
204
238
}
205
239
// Check for repairable catalog corruptions.
206
- if hasRows , err := diagnose ("kv_repairable_catalog_corruptions" ); err != nil {
240
+ if totalCorruptions , err := diagnose ("kv_repairable_catalog_corruptions" ); err != nil {
207
241
return err
208
- } else if hasRows {
242
+ } else if totalCorruptions > 0 {
209
243
// Attempt to repair catalog corruptions in batches.
210
- log .Dev .Info (ctx , "auto-repairing catalog corruptions detected during upgrade attempt" )
211
- var n int
244
+ log .Dev .Infof (ctx , "beginning auto-repair of %d catalog corruptions detected during upgrade attempt" , totalCorruptions )
245
+
246
+ var repairedCount int
212
247
const repairQuery = `
213
248
SELECT
214
249
count(*)
@@ -256,7 +291,7 @@ WHERE
256
291
if kv .IsAutoRetryLimitExhaustedError (err ) ||
257
292
errors .HasType (err , (* timeutil .TimeoutError )(nil )) {
258
293
batchSize = max (batchSize / 2 , 1 )
259
- log .Dev .Infof (ctx , "reducing batch size of invalid_object repair query to %d (hipri=%t)" ,
294
+ log .Dev .Infof (ctx , "reducing batch size of repair query to %d (hipri=%t)" ,
260
295
batchSize ,
261
296
batchSize <= HighPriBatchSize )
262
297
continue
@@ -267,21 +302,19 @@ WHERE
267
302
if rowsUpdated == 0 {
268
303
break
269
304
}
270
- n += int (rowsUpdated )
305
+ repairedCount += int (rowsUpdated )
271
306
log .Dev .Infof (ctx , "repaired %d catalog corruptions" , rowsUpdated )
272
307
}
273
- if n == 0 {
274
- log .Dev .Info (ctx , "no catalog corruptions found to repair during upgrade attempt" )
275
- } else {
308
+ if repairedCount > 0 {
276
309
// Repairs have actually been performed: stop all time travel henceforth.
277
310
withAOST = false
278
- log .Dev .Infof (ctx , "%d catalog corruptions have been repaired in total" , n )
311
+ log .Dev .Infof (ctx , "%d catalog corruptions have been repaired in total" , repairedCount )
279
312
}
280
313
}
281
314
// Check for all known catalog corruptions.
282
- if hasRows , err := diagnose ("invalid_objects" ); err != nil {
315
+ if invalidObjectCount , err := diagnose ("invalid_objects" ); err != nil {
283
316
return err
284
- } else if ! hasRows {
317
+ } else if invalidObjectCount == 0 {
285
318
return nil
286
319
}
287
320
if ! withAOST {
@@ -291,9 +324,9 @@ WHERE
291
324
// Re-run the diagnosis without the clause, because we might not be seeing
292
325
// repairs which might have taken place recently.
293
326
withAOST = false
294
- if hasRows , err := diagnose ("invalid_objects" ); err != nil {
327
+ if invalidObjectCount , err := diagnose ("invalid_objects" ); err != nil {
295
328
return err
296
- } else if ! hasRows {
329
+ } else if invalidObjectCount == 0 {
297
330
return nil
298
331
}
299
332
return errors .AssertionFailedf ("\" \" .crdb_internal.invalid_objects is not empty" )
0 commit comments