Skip to content

Commit 7c13dda

Browse files
craig[bot]rafiss
andcommitted
Merge #149153
149153: sql,eventpb: log structured event for REFRESH MATERIALIZED VIEW r=rafiss a=rafiss ### sql: add backfill validation for entries written by BulkAdder This patch fixes the logic for counting the number of entries written by the BulkAdder during the backfillQueryIntoTable operation, which is used by CREATE TABLE AS, CREATE MATERIALIZED VIEW, and REFRESH MATERIALIZED VIEW. We now use this count when validating the backfill, as an additional check. ### sql,eventpb: log structured event for REFRESH MATERIALIZED VIEW fixes #144863 Release note (ops change): A structured event is now logged in the SQL_SCHEMA channgel when the REFRESH MATERIALIZED VIEW statement is executed. Co-authored-by: Rafi Shamim <[email protected]>
2 parents dc2191b + d3ffe1e commit 7c13dda

File tree

7 files changed

+112
-20
lines changed

7 files changed

+112
-20
lines changed

docs/generated/eventlog.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1871,6 +1871,29 @@ initiated schema change rollback has completed.
18711871
| `DescriptorID` | | no |
18721872

18731873

1874+
#### Common fields
1875+
1876+
| Field | Description | Sensitive |
1877+
|--|--|--|
1878+
| `Timestamp` | The timestamp of the event. Expressed as nanoseconds since the Unix epoch. | no |
1879+
| `EventType` | The type of the event. | no |
1880+
| `Statement` | A normalized copy of the SQL statement that triggered the event. The statement string contains a mix of sensitive and non-sensitive details (it is redactable). | partially |
1881+
| `Tag` | The statement tag. This is separate from the statement string, since the statement string can contain sensitive information. The tag is guaranteed not to. | no |
1882+
| `User` | The user account that triggered the event. The special usernames `root` and `node` are not considered sensitive. | depends |
1883+
| `DescriptorID` | The primary object descriptor affected by the operation. Set to zero for operations that don't affect descriptors. | no |
1884+
| `ApplicationName` | The application name for the session where the event was emitted. This is included in the event to ease filtering of logging output by application. | no |
1885+
| `PlaceholderValues` | The mapping of SQL placeholders to their values, for prepared statements. | yes |
1886+
1887+
### `refresh_materialized_view`
1888+
1889+
An event of type `refresh_materialized_view` is recorded when a materialized view is refreshed.
1890+
1891+
1892+
| Field | Description | Sensitive |
1893+
|--|--|--|
1894+
| `ViewName` | The name of the materialized view being refreshed. | no |
1895+
1896+
18741897
#### Common fields
18751898

18761899
| Field | Description | Sensitive |

pkg/sql/refresh_materialized_view.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice"
1616
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
1717
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
18+
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
1819
)
1920

2021
type refreshMaterializedViewNode struct {
@@ -120,6 +121,15 @@ func (n *refreshMaterializedViewNode) startExec(params runParams) error {
120121
}
121122
desc.AddMaterializedViewRefreshMutation(refreshProto)
122123

124+
// Log the refresh materialized view event.
125+
if err := params.p.logEvent(params.ctx,
126+
desc.ID,
127+
&eventpb.RefreshMaterializedView{
128+
ViewName: params.p.ResolvedName(n.n.Name).FQString(),
129+
}); err != nil {
130+
return err
131+
}
132+
123133
return params.p.writeSchemaChange(
124134
params.ctx,
125135
desc,

pkg/sql/rowexec/bulk_row_writer.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,9 @@ func (sp *bulkRowWriter) Start(ctx context.Context) {
7272
ctx = sp.StartInternal(ctx, "bulkRowWriter")
7373
sp.input.Start(ctx)
7474
err := sp.work(ctx)
75-
sp.MoveToDraining(err)
75+
if err != nil {
76+
sp.MoveToDraining(err)
77+
}
7678
}
7779

7880
// Next is part of the RowSource interface.

pkg/sql/schema_changer.go

Lines changed: 45 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"time"
1515

1616
"github.com/cockroachdb/cockroach/pkg/base"
17+
"github.com/cockroachdb/cockroach/pkg/clusterversion"
1718
"github.com/cockroachdb/cockroach/pkg/config"
1819
"github.com/cockroachdb/cockroach/pkg/jobs"
1920
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
@@ -311,13 +312,21 @@ const schemaChangerBackfillTxnDebugName = "schemaChangerBackfill"
311312
// validateBackfillQueryIntoTable validates that source query matches the contents of
312313
// a backfilled table, when executing the query at queryTS.
313314
func (sc *SchemaChanger) validateBackfillQueryIntoTable(
314-
ctx context.Context, table catalog.TableDescriptor, queryTS hlc.Timestamp, query string,
315+
ctx context.Context,
316+
table catalog.TableDescriptor,
317+
entryCountWrittenToPrimaryIdx int64,
318+
queryTS hlc.Timestamp,
319+
query string,
320+
skipAOSTValidation bool,
315321
) error {
316-
var entryCount int64
322+
var aostEntryCount int64
317323
sd := NewInternalSessionData(ctx, sc.execCfg.Settings, "validateBackfillQueryIntoTable")
318324
sd.SessionData = *sc.sessionData
319325
// First get the expected row count for the source query at the target timestamp.
320326
if err := sc.execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
327+
if skipAOSTValidation {
328+
return nil
329+
}
321330
parsedQuery, err := parser.ParseOne(query)
322331
if err != nil {
323332
return err
@@ -347,9 +356,11 @@ func (sc *SchemaChanger) validateBackfillQueryIntoTable(
347356
if err != nil {
348357
return err
349358
}
350-
entryCount = int64(tree.MustBeDInt(row[0]))
359+
aostEntryCount = int64(tree.MustBeDInt(row[0]))
351360
return nil
352-
}, isql.WithSessionData(sd)); err != nil {
361+
}, isql.WithSessionData(sd),
362+
isql.WithPriority(admissionpb.BulkNormalPri),
363+
); err != nil {
353364
return err
354365
}
355366
// Next run validation on table that was populated using count queries.
@@ -361,7 +372,7 @@ func (sc *SchemaChanger) validateBackfillQueryIntoTable(
361372
// counts match.
362373
mut := builder.BuildExistingMutable().(*tabledesc.Mutable)
363374
mut.SetPublic()
364-
count, err := countIndexRowsAndMaybeCheckUniqueness(ctx, mut, index, false,
375+
newTblEntryCount, err := countIndexRowsAndMaybeCheckUniqueness(ctx, mut, index, false,
365376
descs.NewHistoricalInternalExecTxnRunner(now, func(ctx context.Context, fn descs.InternalExecFn) error {
366377
return sc.execCfg.InternalDB.DescsTxn(ctx, func(
367378
ctx context.Context, txn descs.Txn,
@@ -379,13 +390,21 @@ func (sc *SchemaChanger) validateBackfillQueryIntoTable(
379390
// Testing knob that allows us to manipulate counts to fail
380391
// the validation.
381392
if sc.testingKnobs.RunDuringQueryBackfillValidation != nil {
382-
count, err = sc.testingKnobs.RunDuringQueryBackfillValidation(entryCount, count)
393+
newTblEntryCount, err = sc.testingKnobs.RunDuringQueryBackfillValidation(aostEntryCount, newTblEntryCount)
383394
if err != nil {
384395
return err
385396
}
386397
}
387-
if count != entryCount {
388-
return errors.AssertionFailedf("backfill query did not populate index %q with expected number of rows (expected: %d, got: %d)", index.GetName(), entryCount, count)
398+
if entryCountWrittenToPrimaryIdx > 0 {
399+
if newTblEntryCount != entryCountWrittenToPrimaryIdx {
400+
return errors.AssertionFailedf(
401+
"backfill query did not populate index %q with expected number of rows (expected: %d, got: %d)",
402+
index.GetName(), entryCountWrittenToPrimaryIdx, newTblEntryCount,
403+
)
404+
}
405+
}
406+
if !skipAOSTValidation && newTblEntryCount != aostEntryCount {
407+
return errors.AssertionFailedf("backfill query did not populate index %q with expected number of rows (expected: %d, got: %d)", index.GetName(), aostEntryCount, newTblEntryCount)
389408
}
390409
return nil
391410
}
@@ -417,9 +436,12 @@ func (sc *SchemaChanger) backfillQueryIntoTable(
417436
}
418437
}()
419438
validationTime := ts
420-
var skipValidation bool
421-
// Get the expected entry count against the source query.
439+
var skipAOSTValidation bool
440+
bulkSummary := kvpb.BulkOpSummary{}
441+
422442
err = sc.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
443+
// Clear out the previous summary if the transaction gets retried.
444+
bulkSummary = kvpb.BulkOpSummary{}
423445
defer func() {
424446
isTxnRetry = true
425447
}()
@@ -515,15 +537,12 @@ func (sc *SchemaChanger) backfillQueryIntoTable(
515537
return err
516538
}
517539
}
518-
res := kvpb.BulkOpSummary{}
519540
rw := NewCallbackResultWriter(func(ctx context.Context, row tree.Datums) error {
520-
// TODO(adityamaru): Use the BulkOpSummary for either telemetry or to
521-
// return to user.
522541
var counts kvpb.BulkOpSummary
523542
if err := protoutil.Unmarshal([]byte(*row[0].(*tree.DBytes)), &counts); err != nil {
524543
return err
525544
}
526-
res.Add(counts)
545+
bulkSummary.Add(counts)
527546
return nil
528547
})
529548
recv := MakeDistSQLReceiver(
@@ -590,7 +609,7 @@ func (sc *SchemaChanger) backfillQueryIntoTable(
590609
// since our AOST query might be reading data not in KV.
591610
for _, tbl := range localPlanner.curPlan.mem.Metadata().AllTables() {
592611
if tbl.Table.IsVirtualTable() {
593-
skipValidation = true
612+
skipAOSTValidation = true
594613
}
595614
}
596615
return nil
@@ -608,12 +627,19 @@ func (sc *SchemaChanger) backfillQueryIntoTable(
608627
return err
609628
}
610629

630+
// Count how many keys were written to the primary index, as reported by the
631+
// BulkAdder. We need a version gate since pre-25.4 clusters did not count
632+
// correctly.
633+
entriesWrittenToPrimaryIdx := int64(0)
634+
if sc.execCfg.Settings.Version.IsActive(ctx, clusterversion.V25_4) {
635+
key := kvpb.BulkOpSummaryID(uint64(table.GetID()), uint64(table.GetPrimaryIndex().GetID()))
636+
entriesWrittenToPrimaryIdx = bulkSummary.EntryCounts[key]
637+
}
638+
611639
// Validation will be skipped if the query is not AOST safe.
612640
// (i.e. reading non-KV data via CRDB internal)
613-
if !skipValidation {
614-
if err := sc.validateBackfillQueryIntoTable(ctx, table, validationTime, query); err != nil {
615-
return err
616-
}
641+
if err := sc.validateBackfillQueryIntoTable(ctx, table, entriesWrittenToPrimaryIdx, validationTime, query, skipAOSTValidation); err != nil {
642+
return err
617643
}
618644
return nil
619645
}

pkg/util/log/eventpb/ddl_events.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -643,3 +643,11 @@ message DropPolicy {
643643
string policy_name = 4 [(gogoproto.jsontag) = ",omitempty", (gogoproto.moretags) = "redact:\"nonsensitive\""];
644644
}
645645

646+
// RefreshMaterializedView is recorded when a materialized view is refreshed.
647+
message RefreshMaterializedView {
648+
CommonEventDetails common = 1 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "", (gogoproto.embed) = true];
649+
CommonSQLEventDetails sql = 2 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "", (gogoproto.embed) = true];
650+
// The name of the materialized view being refreshed.
651+
string view_name = 3 [(gogoproto.jsontag) = ",omitempty", (gogoproto.moretags) = "redact:\"nonsensitive\""];
652+
}
653+

pkg/util/log/eventpb/eventlog_channels_generated.go

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/util/log/eventpb/json_encode_generated.go

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)