66package logical
77
88import (
9+ "context"
10+
11+ "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
12+ "github.com/cockroachdb/cockroach/pkg/repstream/streampb"
13+ "github.com/cockroachdb/cockroach/pkg/roachpb"
14+ "github.com/cockroachdb/cockroach/pkg/settings/cluster"
915 "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
16+ "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
1017 "github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
18+ "github.com/cockroachdb/cockroach/pkg/sql/types"
1119 "github.com/cockroachdb/cockroach/pkg/util/hlc"
20+ "github.com/cockroachdb/errors"
1221)
1322
23+ // eventDecoder takes a KV from the source cluster and decodes it into datums
24+ // that are appropriate for the destination table.
25+ type eventDecoder struct {
26+ decoder cdcevent.Decoder
27+ srcToDest map [descpb.ID ]destinationTable
28+
29+ // TODO(jeffswenson): clean this interface up. There's a problem with
30+ // layering that requires the event decoder to know about the most recent
31+ // row. If a batch fails to process, its broken up into batches of size 1 and
32+ // those are retried until they succeed or are DLQ'd. The batch handler is
33+ // responsible for decoding rows, but the distsql processor is responsible
34+ // for calling the batch handler and adding rows to the DLQ.
35+ lastRow cdcevent.Row
36+ }
37+
1438// decodedEvent is constructed from a replication stream event. The replication
1539// stream event is encoded using the source table descriptor. The decoded must
1640// contain datums that are compatible with the destination table descriptor.
@@ -37,3 +61,124 @@ type decodedEvent struct {
3761 // prevRow may still lose LWW if there is a recent tombstone.
3862 prevRow tree.Datums
3963}
64+
65+ func newEventDecoder (
66+ ctx context.Context ,
67+ descriptors descs.DB ,
68+ settings * cluster.Settings ,
69+ procConfigByDestID map [descpb.ID ]sqlProcessorTableConfig ,
70+ ) (* eventDecoder , error ) {
71+ srcToDest := make (map [descpb.ID ]destinationTable , len (procConfigByDestID ))
72+ err := descriptors .DescsTxn (ctx , func (ctx context.Context , txn descs.Txn ) error {
73+ for dstID , s := range procConfigByDestID {
74+ descriptor , err := txn .Descriptors ().GetLeasedImmutableTableByID (ctx , txn .KV (), dstID )
75+ if err != nil {
76+ return err
77+ }
78+
79+ columns := getPhysicalColumnsSchema (descriptor )
80+ columnNames := make ([]string , 0 , len (columns ))
81+ for _ , column := range columns {
82+ columnNames = append (columnNames , column .column .GetName ())
83+ }
84+
85+ srcToDest [s .srcDesc .GetID ()] = destinationTable {
86+ id : dstID ,
87+ columns : columnNames ,
88+ }
89+ }
90+ return nil
91+ })
92+ if err != nil {
93+ return nil , err
94+ }
95+
96+ decoder , err := newCdcEventDecoder (ctx , procConfigByDestID , settings )
97+ if err != nil {
98+ return nil , err
99+ }
100+
101+ return & eventDecoder {
102+ decoder : decoder ,
103+ srcToDest : srcToDest ,
104+ }, nil
105+ }
106+
107+ func (d * eventDecoder ) decodeEvent (
108+ ctx context.Context , event streampb.StreamEvent_KV ,
109+ ) (decodedEvent , error ) {
110+ decodedRow , err := d .decoder .DecodeKV (ctx , event .KeyValue , cdcevent .CurrentRow , event .KeyValue .Value .Timestamp , false )
111+ if err != nil {
112+ return decodedEvent {}, err
113+ }
114+
115+ dstTable , ok := d .srcToDest [decodedRow .TableID ]
116+ if ! ok {
117+ return decodedEvent {}, errors .AssertionFailedf ("table %d not found" , decodedRow .TableID )
118+ }
119+
120+ row := make (tree.Datums , 0 , len (dstTable .columns ))
121+ row , err = appendDatums (row , decodedRow , dstTable .columns )
122+ if err != nil {
123+ return decodedEvent {}, err
124+ }
125+ d .lastRow = decodedRow
126+
127+ var prevKV roachpb.KeyValue
128+ prevKV .Key = event .KeyValue .Key
129+ prevKV .Value = event .PrevValue
130+
131+ decodedPrevRow , err := d .decoder .DecodeKV (ctx , prevKV , cdcevent .PrevRow , event .PrevValue .Timestamp , false )
132+ if err != nil {
133+ return decodedEvent {}, err
134+ }
135+
136+ prevRow := make (tree.Datums , 0 , len (dstTable .columns ))
137+ prevRow , err = appendDatums (prevRow , decodedPrevRow , dstTable .columns )
138+ if err != nil {
139+ return decodedEvent {}, err
140+ }
141+
142+ return decodedEvent {
143+ dstDescID : dstTable .id ,
144+ isDelete : decodedRow .IsDeleted (),
145+ originTimestamp : event .KeyValue .Value .Timestamp ,
146+ row : row ,
147+ prevRow : prevRow ,
148+ }, nil
149+ }
150+
151+ // appendDatums appends datums for the specified column names from the cdcevent.Row
152+ // to the datums slice and returns the updated slice.
153+ func appendDatums (datums tree.Datums , row cdcevent.Row , columnNames []string ) (tree.Datums , error ) {
154+ it , err := row .DatumsNamed (columnNames )
155+ if err != nil {
156+ return nil , err
157+ }
158+
159+ if err := it .Datum (func (d tree.Datum , col cdcevent.ResultColumn ) error {
160+ if dEnum , ok := d .(* tree.DEnum ); ok {
161+ // Override the type to Unknown to avoid a mismatched type OID error
162+ // during execution. Note that Unknown is the type used by default
163+ // when a SQL statement is executed without type hints.
164+ //
165+ // TODO(jeffswenson): this feels like the wrong place to do this,
166+ // but its inspired by the implementation in queryBuilder.AddRow.
167+ //
168+ // Really we should be mapping from the source datum type to the
169+ // destination datum type.
170+ dEnum .EnumTyp = types .Unknown
171+ }
172+ datums = append (datums , d )
173+ return nil
174+ }); err != nil {
175+ return nil , err
176+ }
177+
178+ return datums , nil
179+ }
180+
181+ type destinationTable struct {
182+ id descpb.ID
183+ columns []string
184+ }
0 commit comments