Skip to content

Commit f09c7b8

Browse files
craig[bot]spilchenelizaMkraulestevendannadt
committed
149621: sql: validate trigger backreferences in table descriptors r=spilchen a=spilchen Previously, descriptor validation only accounted for backreferences from sequences and views. However, the introduction of triggers introduces table-to-table references, which were not being validated. This change extends the validation logic to cover these cases, improving correctness and robustness of descriptor validation involving triggers. Fixes: #148167 Epic: CRDB-42942 Release note: None 149622: changefeedccl: support resolved option with protobuf format r=asg0451 a=elizaMkraule This change introduces support for the resolved option with protobuf format for changefeeds Release note (general change): the changefeeds with protobuf format now supports the resolved option for emitting resolved timestamps. Fixes #148934 150319: kv: remove some tautological conditionals r=tbg a=stevendanna Epic: none Release note: None 150370: pcr: add debug benchmarking settings to discard data r=dt a=dt Release note: none. Epic: none. 150436: upgradeccl: ensure multiple releases for TestTenantUpgradeFailure test r=rail a=celiala TestTenantUpgradeFailure can only run when we support more than one previous release. This PR ensures that we setup multple releases for this test. This unblocks the PR needed to mint release-25.3: #150211 (comment) - Once 150436 is backported onto release-25.3, I'll remove a38ea8f from #150211, and just rebase on top of this backported fix. Epic: None Release note: None Release justification: test-only fix. Co-authored-by: Matt Spilchen <[email protected]> Co-authored-by: Eliza Kraule <[email protected]> Co-authored-by: Steven Danna <[email protected]> Co-authored-by: David Taylor <[email protected]> Co-authored-by: Celia La <[email protected]>
6 parents 7661812 + 17bda39 + e35ddcd + 66128ef + 68d7548 + 36dc40c commit f09c7b8

File tree

10 files changed

+278
-56
lines changed

10 files changed

+278
-56
lines changed

pkg/ccl/changefeedccl/encoder_protobuf.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedpb"
1616
"github.com/cockroachdb/cockroach/pkg/geo"
1717
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
18-
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
1918
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2019
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
2120
"github.com/cockroachdb/errors"
@@ -77,9 +76,29 @@ func (e *protobufEncoder) EncodeValue(
7776

7877
// EncodeResolvedTimestamp encodes a resolved timestamp message for the specified topic.
7978
func (e *protobufEncoder) EncodeResolvedTimestamp(
80-
context.Context, string, hlc.Timestamp,
79+
ctx context.Context, topic string, ts hlc.Timestamp,
8180
) ([]byte, error) {
82-
return nil, unimplemented.NewWithIssuef(148934, "protobuf encoder does not support resolved timestamps yet")
81+
var msg *changefeedpb.Message
82+
if e.envelopeType == changefeedbase.OptEnvelopeBare {
83+
msg = &changefeedpb.Message{
84+
Data: &changefeedpb.Message_BareResolved{
85+
BareResolved: &changefeedpb.BareResolved{
86+
XCrdb__: &changefeedpb.Resolved{
87+
Resolved: ts.AsOfSystemTime(),
88+
},
89+
},
90+
},
91+
}
92+
} else {
93+
msg = &changefeedpb.Message{
94+
Data: &changefeedpb.Message_Resolved{
95+
Resolved: &changefeedpb.Resolved{
96+
Resolved: ts.AsOfSystemTime(),
97+
},
98+
},
99+
}
100+
}
101+
return protoutil.Marshal(msg)
83102
}
84103

85104
// buildBare constructs a BareEnvelope with optional metadata and serializes it.

pkg/ccl/changefeedccl/encoder_protobuf_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,3 +70,60 @@ func TestProtoEncoder_BareEnvelope_WithMetadata(t *testing.T) {
7070
require.Equal(t, int64(1), bare.Values["id"].GetInt64Value())
7171
require.Equal(t, "Alice", bare.Values["name"].GetStringValue())
7272
}
73+
74+
func TestProtoEncoder_ResolvedEnvelope(t *testing.T) {
75+
defer leaktest.AfterTest(t)()
76+
defer log.Scope(t).Close(t)
77+
78+
tableDesc, err := parseTableDesc(`CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
79+
require.NoError(t, err)
80+
targets := mkTargets(tableDesc)
81+
82+
ts := hlc.Timestamp{WallTime: 123, Logical: 456}
83+
84+
tests := []struct {
85+
name string
86+
envelopeType changefeedbase.EnvelopeType
87+
expectWrapped bool
88+
}{
89+
{
90+
name: "wrapped envelope",
91+
envelopeType: changefeedbase.OptEnvelopeWrapped,
92+
},
93+
{
94+
name: "bare envelope",
95+
envelopeType: changefeedbase.OptEnvelopeBare,
96+
},
97+
}
98+
99+
for _, tc := range tests {
100+
t.Run(tc.name, func(t *testing.T) {
101+
opts := changefeedbase.EncodingOptions{
102+
Envelope: tc.envelopeType,
103+
Format: changefeedbase.OptFormatProtobuf,
104+
}
105+
106+
enc, err := getEncoder(context.Background(), opts, targets, false, nil, nil, nil)
107+
require.NoError(t, err)
108+
109+
b, err := enc.EncodeResolvedTimestamp(context.Background(), "test-topic", ts)
110+
require.NoError(t, err)
111+
112+
var msg changefeedpb.Message
113+
require.NoError(t, protoutil.Unmarshal(b, &msg))
114+
115+
switch tc.envelopeType {
116+
case changefeedbase.OptEnvelopeWrapped:
117+
res := msg.GetResolved()
118+
require.NotNil(t, res, "wrapped envelope should populate Resolved field")
119+
require.Equal(t, ts.AsOfSystemTime(), res.Resolved)
120+
case changefeedbase.OptEnvelopeBare:
121+
res := msg.GetBareResolved()
122+
require.NotNil(t, res, "bare envelope should populate BareResolved field")
123+
require.Equal(t, ts.AsOfSystemTime(), res.XCrdb__.Resolved)
124+
default:
125+
t.Fatalf("unexpected envelope type: %v", tc.envelopeType)
126+
}
127+
})
128+
}
129+
}

pkg/ccl/kvccl/kvtenantccl/upgradeccl/tenant_upgrade_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -346,20 +346,21 @@ func TestTenantUpgradeFailure(t *testing.T) {
346346

347347
v0 := clusterversion.MinSupported.Version()
348348
v2 := clusterversion.Latest.Version()
349-
// v1 needs to be between v0 and v2. Set it to the minor release
350-
// after v0 and before v2.
349+
// v1 needs to be between v0 and v2. Set it to the first version with a
350+
// different major/minor from v0.
351351
var v1 roachpb.Version
352352
for _, version := range clusterversion.ListBetween(v0, v2) {
353-
if version.Minor != v0.Minor {
353+
if version.Major != v0.Major && version.Minor != v0.Minor {
354354
v1 = version
355355
break
356356
}
357357
}
358-
if v1 == (roachpb.Version{}) {
358+
if v1 == (roachpb.Version{}) || v1 == v2 {
359359
// There is no in-between version supported; skip this test.
360360
skip.IgnoreLint(t, "test can only run when we support two previous releases")
361361
}
362362

363+
t.Logf("v0=%s v1=%s v2=%s", v0, v1, v2)
363364
t.Log("starting server")
364365
ctx := context.Background()
365366
settings := cluster.MakeTestingClusterSettingsWithVersions(

pkg/crosscluster/producer/event_stream.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -366,15 +366,27 @@ func (s *eventStream) maybeFlushBatch(ctx context.Context) error {
366366
return nil
367367
}
368368

369+
var debugSettingDropData = settings.RegisterBoolSetting(
370+
settings.ApplicationLevel,
371+
"physical_replication.producer.unsafe_debug.discard_all_data.enabled",
372+
"discard all row data during cluster replication (for experimental debugging purposes only)",
373+
false,
374+
settings.WithUnsafe,
375+
)
376+
369377
func (s *eventStream) flushBatch(ctx context.Context, reason streampb.FlushReason) error {
370378
if s.seb.size == 0 {
371379
return nil
372380
}
381+
defer s.seb.reset()
382+
383+
if debugSettingDropData.Get(s.execCfg.SV()) {
384+
return nil
385+
}
386+
373387
s.seqNum++
374388
s.debug.Flushed(int64(s.seb.size), reason, s.seqNum)
375389

376-
defer s.seb.reset()
377-
378390
return s.sendFlush(ctx, &streampb.StreamEvent{StreamSeq: s.seqNum, Batch: &s.seb.batch})
379391
}
380392
func (s *eventStream) sendFlush(ctx context.Context, event *streampb.StreamEvent) error {

pkg/kv/bulk/sst_batcher.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -636,6 +636,14 @@ func (b *SSTBatcher) syncFlush() error {
636636
return flushErr
637637
}
638638

639+
var debugDropSSTOnFlush = settings.RegisterBoolSetting(
640+
settings.ApplicationLevel,
641+
"bulkio.ingest.unsafe_debug.drop_sst_on_flush.enabled",
642+
"if set, the SSTBatcher will simply discard data instead of flushing it (destroys data; for performance debugging experiments only)",
643+
false,
644+
settings.WithUnsafe,
645+
)
646+
639647
// startFlush starts a flush of the current batch. If it encounters any errors
640648
// the errors are reported by the call to `syncFlush`.
641649
//
@@ -825,9 +833,13 @@ func (b *SSTBatcher) startFlush(ctx context.Context, reason int) {
825833
b.asyncAddSSTs.GoCtx(func(ctx context.Context) error {
826834
defer res.Release()
827835
defer b.mem.Shrink(ctx, reserved)
828-
results, err := b.adder.AddSSTable(ctx, batchTS, start, end, data, mvccStats, performanceStats)
829-
if err != nil {
830-
return err
836+
837+
var results []addSSTResult
838+
if !debugDropSSTOnFlush.Get(&b.settings.SV) {
839+
results, err = b.adder.AddSSTable(ctx, batchTS, start, end, data, mvccStats, performanceStats)
840+
if err != nil {
841+
return err
842+
}
831843
}
832844

833845
// Now that we have completed ingesting the SSTables we take a lock and

pkg/kv/client_test.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -874,10 +874,8 @@ func TestTxn_ReverseScan(t *testing.T) {
874874
t.Errorf("expected empty, got %v", rows)
875875
}
876876
return err
877-
}); err != nil {
878-
if err == nil {
879-
t.Errorf("expected a truncation error, got %s", err)
880-
}
877+
}); err == nil || !strings.Contains(err.Error(), "must be greater than start") {
878+
t.Errorf("expected a truncation error, got %s", err)
881879
}
882880

883881
// Try reverse scan with non-existent key.

pkg/kv/txn_test.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,9 +119,7 @@ func newTestTxnFactory(
119119
}
120120
if ba.Txn != nil && br.Txn == nil {
121121
br.Txn = ba.Txn.Clone()
122-
if pErr == nil {
123-
br.Txn.Status = status
124-
}
122+
br.Txn.Status = status
125123
// Update the MockTxnSender's proto.
126124
*txn = *br.Txn
127125
}

pkg/sql/catalog/tabledesc/validate.go

Lines changed: 62 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -361,8 +361,6 @@ func (desc *wrapper) ValidateBackReferences(
361361
}
362362
switch depDesc.DescriptorType() {
363363
case catalog.Table:
364-
// If this is a table, it may be referenced by a view, otherwise if this
365-
// is a sequence, then it may be also be referenced by a table.
366364
vea.Report(desc.validateInboundTableRef(by, vdg))
367365
case catalog.Function:
368366
// This relation may be referenced by a function.
@@ -504,49 +502,76 @@ func (desc *wrapper) validateInboundTableRef(
504502
backReferencedTable.GetName(), backReferencedTable.GetID())
505503
}
506504
if desc.IsSequence() {
507-
// The ColumnIDs field takes a different meaning when the validated
508-
// descriptor is for a sequence. In this case, they refer to the columns
509-
// in the referenced descriptor instead.
510-
for _, colID := range by.ColumnIDs {
511-
// Skip this check if the column ID is zero. This can happen due to
512-
// bugs in 20.2.
513-
//
514-
// TODO(ajwerner): Make sure that a migration in 22.2 fixes this issue.
515-
if colID == 0 {
516-
continue
517-
}
518-
col := catalog.FindColumnByID(backReferencedTable, colID)
519-
if col == nil {
520-
return errors.AssertionFailedf("depended-on-by relation %q (%d) does not have a column with ID %d",
521-
backReferencedTable.GetName(), by.ID, colID)
505+
if err := validateSequenceColumnBackrefs(desc, backReferencedTable, by); err != nil {
506+
return err
507+
}
508+
}
509+
510+
// View back-references need corresponding forward reference.
511+
if backReferencedTable.IsView() {
512+
for _, id := range backReferencedTable.TableDesc().DependsOn {
513+
if id == desc.GetID() {
514+
return nil
522515
}
523-
var found bool
524-
for i := 0; i < col.NumUsesSequences(); i++ {
525-
if col.GetUsesSequenceID(i) == desc.GetID() {
526-
found = true
527-
break
516+
}
517+
return errors.AssertionFailedf("depended-on-by view %q (%d) has no corresponding depends-on forward reference",
518+
backReferencedTable.GetName(), by.ID)
519+
}
520+
521+
// Table to table back-references must have a trigger reference.
522+
if backReferencedTable.IsTable() && desc.IsTable() {
523+
for _, trigger := range backReferencedTable.TableDesc().Triggers {
524+
for _, id := range trigger.DependsOn {
525+
if id == desc.GetID() {
526+
return nil
528527
}
529528
}
530-
if found {
531-
continue
532-
}
533-
return errors.AssertionFailedf(
534-
"depended-on-by relation %q (%d) has no reference to this sequence in column %q (%d)",
535-
backReferencedTable.GetName(), by.ID, col.GetName(), col.GetID())
536529
}
537-
}
538530

539-
// View back-references need corresponding forward reference.
540-
if !backReferencedTable.IsView() {
541-
return nil
531+
// No valid forward reference found to justify the backref.
532+
return errors.AssertionFailedf(
533+
"table %q (%d) does not have a forward reference to descriptor %q (%d)",
534+
backReferencedTable.GetName(), by.ID, desc.GetName(), desc.GetID())
542535
}
543-
for _, id := range backReferencedTable.TableDesc().DependsOn {
544-
if id == desc.GetID() {
545-
return nil
536+
return nil
537+
}
538+
539+
func validateSequenceColumnBackrefs(
540+
seq catalog.Descriptor,
541+
backReferencedTable catalog.TableDescriptor,
542+
by descpb.TableDescriptor_Reference,
543+
) error {
544+
// The ColumnIDs field takes a different meaning when the validated
545+
// descriptor is for a sequence. In this case, they refer to the columns
546+
// in the referenced descriptor instead.
547+
for _, colID := range by.ColumnIDs {
548+
// Skip this check if the column ID is zero. This can happen due to
549+
// bugs in 20.2.
550+
//
551+
// TODO(ajwerner): Make sure that a migration in 22.2 fixes this issue.
552+
if colID == 0 {
553+
continue
546554
}
555+
col := catalog.FindColumnByID(backReferencedTable, colID)
556+
if col == nil {
557+
return errors.AssertionFailedf("depended-on-by relation %q (%d) does not have a column with ID %d",
558+
backReferencedTable.GetName(), by.ID, colID)
559+
}
560+
var found bool
561+
for i := 0; i < col.NumUsesSequences(); i++ {
562+
if col.GetUsesSequenceID(i) == seq.GetID() {
563+
found = true
564+
break
565+
}
566+
}
567+
if found {
568+
continue
569+
}
570+
return errors.AssertionFailedf(
571+
"depended-on-by relation %q (%d) has no reference to this sequence in column %q (%d)",
572+
backReferencedTable.GetName(), by.ID, col.GetName(), col.GetID())
547573
}
548-
return errors.AssertionFailedf("depended-on-by view %q (%d) has no corresponding depends-on forward reference",
549-
backReferencedTable.GetName(), by.ID)
574+
return nil
550575
}
551576

552577
// validateFK asserts that references to desc from inbound and outbound FKs are

0 commit comments

Comments
 (0)