Skip to content

Commit e014575

Browse files
craig[bot]rafiss
andcommitted
Merge #152788
152788: upgrades: log and record progress for FirstUpgrade job r=rafiss a=rafiss The Precondition step does not record job progress, as jobs are not used for the Precondition step. Instead, we just log there. For the Upgrade itself, a job is used, and we log progress as well as store it in the job metadata. This is the phase that looks at all descriptors, so it's more useful to have finer-grained progress here. This also fixes a mistake in 0cc42a8 (where the upgrade was changed to rewrite all descriptors), to make sure we actually consider all descriptors. As part of this, we need to make sure to skip virtual and syntehtic descriptors now. informs #152629 Release note: None Co-authored-by: Rafi Shamim <[email protected]>
2 parents ded3a0d + 9c789e6 commit e014575

File tree

3 files changed

+72
-29
lines changed

3 files changed

+72
-29
lines changed

pkg/sql/catalog/lease/descriptor_set.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
1515
"github.com/cockroachdb/errors"
16+
"github.com/cockroachdb/redact"
1617
)
1718

1819
// descriptorSet maintains an ordered set of descriptorVersionState objects
@@ -43,7 +44,7 @@ func (l *descriptorSet) String() string {
4344
func (l *descriptorSet) insert(s *descriptorVersionState) {
4445
i, match := l.findIndex(s.GetVersion())
4546
if match {
46-
panic("unable to insert duplicate lease")
47+
panic(redact.Sprintf("unable to insert duplicate lease for id=%d", s.GetID()))
4748
}
4849
if i == len(l.data) {
4950
l.data = append(l.data, s)

pkg/upgrade/upgrades/first_upgrade.go

Lines changed: 59 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,15 @@ import (
1111
"time"
1212

1313
"github.com/cockroachdb/cockroach/pkg/clusterversion"
14+
"github.com/cockroachdb/cockroach/pkg/jobs"
1415
"github.com/cockroachdb/cockroach/pkg/kv"
1516
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1617
"github.com/cockroachdb/cockroach/pkg/roachpb"
1718
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
1819
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
1920
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
2021
"github.com/cockroachdb/cockroach/pkg/sql/catalog/nstree"
22+
"github.com/cockroachdb/cockroach/pkg/sql/isql"
2123
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
2224
"github.com/cockroachdb/cockroach/pkg/upgrade"
2325
"github.com/cockroachdb/cockroach/pkg/util/envutil"
@@ -56,9 +58,29 @@ func FirstUpgradeFromRelease(
5658
}
5759
var descsToUpdate catalog.DescriptorIDSet
5860
if err := all.ForEachDescriptor(func(desc catalog.Descriptor) error {
61+
// SKip virtual and synthetic descriptors.
62+
switch d := desc.(type) {
63+
case catalog.SchemaDescriptor:
64+
switch d.SchemaKind() {
65+
case catalog.SchemaPublic, catalog.SchemaVirtual, catalog.SchemaTemporary:
66+
return nil
67+
}
68+
case catalog.TableDescriptor:
69+
if d.IsVirtualTable() {
70+
return nil
71+
}
72+
}
5973
changes := desc.GetPostDeserializationChanges()
6074
if !changes.HasChanges() || (changes.Len() == 1 && changes.Contains(catalog.SetModTimeToMVCCTimestamp)) {
61-
return nil
75+
// In the upgrade to 25.4 (in between 25.3 and 25.4), we do a one-time
76+
// rewrite of all descriptors in order to upgrade them to use the new type
77+
// serialization format. Skip the unconditional rewrite if this is a
78+
// database descriptor, as those never reference types.
79+
// See https://github.com/cockroachdb/cockroach/issues/152629.
80+
duringUpgradeTo25_4 := d.Settings.Version.IsActive(ctx, clusterversion.V25_3) && !d.Settings.Version.IsActive(ctx, clusterversion.V25_4)
81+
if !duringUpgradeTo25_4 || desc.DescriptorType() == catalog.Database {
82+
return nil
83+
}
6284
}
6385
descsToUpdate.Add(desc.GetID())
6486
return nil
@@ -82,6 +104,10 @@ func upgradeDescriptors(
82104
repairBatchTimeLimit := 1 * time.Minute
83105
currentIdx := 0
84106
idsToRewrite := ids.Ordered()
107+
totalDescs := len(idsToRewrite)
108+
every := log.Every(time.Minute)
109+
log.Dev.Infof(ctx, "upgrading format of %d descriptors for first upgrade from release", totalDescs)
110+
85111
for currentIdx <= len(idsToRewrite) {
86112
descBatch := idsToRewrite[currentIdx:min(currentIdx+batchSize, len(idsToRewrite))]
87113
err := timeutil.RunWithTimeout(ctx, "repair-post-deserialization", repairBatchTimeLimit, func(ctx context.Context) error {
@@ -109,11 +135,6 @@ func upgradeDescriptors(
109135
if d.Settings.Version.IsActive(ctx, clusterversion.V25_4) {
110136
continue
111137
}
112-
// Skip the unconditional rewrite if this is a database descriptor,
113-
// as those never reference types.
114-
if mut.DescriptorType() == catalog.Database {
115-
continue
116-
}
117138
}
118139
key := catalogkeys.MakeDescMetadataKey(d.Codec, mut.GetID())
119140
b.CPut(key, mut.DescriptorProto(), mut.GetRawBytesInStorage())
@@ -135,6 +156,19 @@ func upgradeDescriptors(
135156
return err
136157
}
137158
currentIdx += batchSize
159+
160+
if every.ShouldLog() {
161+
completedDescs := min(currentIdx, totalDescs)
162+
log.Dev.Infof(ctx, "upgraded %d of %d descriptors so far", completedDescs, totalDescs)
163+
if jobID := d.OptionalJobID; jobID != 0 {
164+
frac := float32(completedDescs) / float32(totalDescs)
165+
if err := d.DB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
166+
return jobs.ProgressStorage(jobID).Set(ctx, txn, float64(frac), txn.KV().ReadTimestamp())
167+
}); err != nil {
168+
log.Dev.Warningf(ctx, "failed to update progress for job %d: %v", jobID, err)
169+
}
170+
}
171+
}
138172
}
139173
return nil
140174
}
@@ -171,16 +205,16 @@ func FirstUpgradeFromReleasePrecondition(
171205
// there are no corruptions now. Otherwise, we retry and do everything
172206
// without an AOST clause henceforth.
173207
withAOST := firstUpgradePreconditionUsesAOST
174-
diagnose := func(tbl redact.SafeString) (hasRows bool, err error) {
208+
diagnose := func(tbl redact.SafeString) (count int, err error) {
175209
withAOST := withAOST
176210
for {
177211
q := fmt.Sprintf("SELECT count(*) FROM \"\".crdb_internal.%s", tbl)
178212
if withAOST {
179213
q = q + " AS OF SYSTEM TIME '-10s'"
180214
}
181215
row, err := d.InternalExecutor.QueryRow(ctx, redact.Sprintf("query-%s", tbl), nil /* txn */, q)
182-
if err == nil && row[0].String() != "0" {
183-
hasRows = true
216+
if err == nil {
217+
count = int(tree.MustBeDInt(row[0]))
184218
}
185219
// In tests like "declarative_schema_changer/job-compatibility-mixed-version", its
186220
// possible to hit BatchTimestampBeforeGCError, because the GC interval is
@@ -191,24 +225,25 @@ func FirstUpgradeFromReleasePrecondition(
191225
withAOST = false
192226
continue
193227
}
194-
return hasRows, err
228+
return count, err
195229
}
196230
}
197231
// Check for possibility of time travel.
198-
if hasRows, err := diagnose("databases"); err != nil {
232+
if count, err := diagnose("databases"); err != nil {
199233
return err
200-
} else if !hasRows {
234+
} else if count == 0 {
201235
// We're looking back in time to before the cluster was bootstrapped
202236
// and no databases exist at that point. Disable time-travel henceforth.
203237
withAOST = false
204238
}
205239
// Check for repairable catalog corruptions.
206-
if hasRows, err := diagnose("kv_repairable_catalog_corruptions"); err != nil {
240+
if totalCorruptions, err := diagnose("kv_repairable_catalog_corruptions"); err != nil {
207241
return err
208-
} else if hasRows {
242+
} else if totalCorruptions > 0 {
209243
// Attempt to repair catalog corruptions in batches.
210-
log.Dev.Info(ctx, "auto-repairing catalog corruptions detected during upgrade attempt")
211-
var n int
244+
log.Dev.Infof(ctx, "beginning auto-repair of %d catalog corruptions detected during upgrade attempt", totalCorruptions)
245+
246+
var repairedCount int
212247
const repairQuery = `
213248
SELECT
214249
count(*)
@@ -256,7 +291,7 @@ WHERE
256291
if kv.IsAutoRetryLimitExhaustedError(err) ||
257292
errors.HasType(err, (*timeutil.TimeoutError)(nil)) {
258293
batchSize = max(batchSize/2, 1)
259-
log.Dev.Infof(ctx, "reducing batch size of invalid_object repair query to %d (hipri=%t)",
294+
log.Dev.Infof(ctx, "reducing batch size of repair query to %d (hipri=%t)",
260295
batchSize,
261296
batchSize <= HighPriBatchSize)
262297
continue
@@ -267,21 +302,19 @@ WHERE
267302
if rowsUpdated == 0 {
268303
break
269304
}
270-
n += int(rowsUpdated)
305+
repairedCount += int(rowsUpdated)
271306
log.Dev.Infof(ctx, "repaired %d catalog corruptions", rowsUpdated)
272307
}
273-
if n == 0 {
274-
log.Dev.Info(ctx, "no catalog corruptions found to repair during upgrade attempt")
275-
} else {
308+
if repairedCount > 0 {
276309
// Repairs have actually been performed: stop all time travel henceforth.
277310
withAOST = false
278-
log.Dev.Infof(ctx, "%d catalog corruptions have been repaired in total", n)
311+
log.Dev.Infof(ctx, "%d catalog corruptions have been repaired in total", repairedCount)
279312
}
280313
}
281314
// Check for all known catalog corruptions.
282-
if hasRows, err := diagnose("invalid_objects"); err != nil {
315+
if invalidObjectCount, err := diagnose("invalid_objects"); err != nil {
283316
return err
284-
} else if !hasRows {
317+
} else if invalidObjectCount == 0 {
285318
return nil
286319
}
287320
if !withAOST {
@@ -291,9 +324,9 @@ WHERE
291324
// Re-run the diagnosis without the clause, because we might not be seeing
292325
// repairs which might have taken place recently.
293326
withAOST = false
294-
if hasRows, err := diagnose("invalid_objects"); err != nil {
327+
if invalidObjectCount, err := diagnose("invalid_objects"); err != nil {
295328
return err
296-
} else if !hasRows {
329+
} else if invalidObjectCount == 0 {
297330
return nil
298331
}
299332
return errors.AssertionFailedf("\"\".crdb_internal.invalid_objects is not empty")

pkg/upgrade/upgrades/first_upgrade_test.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
2828
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
2929
"github.com/cockroachdb/cockroach/pkg/sql/catalog/funcdesc"
30+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
3031
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
3132
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
3233
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
@@ -140,8 +141,14 @@ func TestFirstUpgrade(t *testing.T) {
140141
// the only post-deserialization change should be SetModTimeToMVCCTimestamp.
141142
require.False(t, readDescFromStorage().GetModificationTime().IsEmpty())
142143
changes = readDescFromStorage().GetPostDeserializationChanges()
143-
require.Equal(t, changes.Len(), 1)
144-
require.True(t, changes.Contains(catalog.SetModTimeToMVCCTimestamp))
144+
if v1.Equal(clusterversion.V25_4.Version()) {
145+
// In 25.4, we do a one-time rewrite of all descriptors, so there should be
146+
// no changes here. In later versions, there should be one change.
147+
require.Equal(t, 0, changes.Len())
148+
} else {
149+
require.Equal(t, 1, changes.Len())
150+
require.True(t, changes.Contains(catalog.SetModTimeToMVCCTimestamp))
151+
}
145152
}
146153

147154
// TestFirstUpgradeRepair tests the correct repair behavior of upgrade
@@ -163,6 +170,7 @@ func TestFirstUpgradeRepair(t *testing.T) {
163170
// also holds a lease on the system database descriptor, which we will wait to
164171
// be released. Reducing the lease duration makes this part of the test speed
165172
// up.
173+
lease.LeaseDuration.Override(ctx, &settings.SV, time.Second*30)
166174
require.NoError(t, clusterversion.Initialize(ctx, v0, &settings.SV))
167175
upgradePausePoint := make(chan struct{})
168176
upgradeResumePoint := make(chan struct{})
@@ -375,6 +383,7 @@ func TestFirstUpgradeRepairBatchSize(t *testing.T) {
375383
// also holds a lease on the system database descriptor, which we will wait to
376384
// be released. Reducing the lease duration makes this part of the test speed
377385
// up.
386+
lease.LeaseDuration.Override(ctx, &settings.SV, time.Second*30)
378387
require.NoError(t, clusterversion.Initialize(ctx, v0, &settings.SV))
379388
testServer, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
380389
Settings: settings,

0 commit comments

Comments
 (0)