@@ -93,7 +93,11 @@ func (p rangefeedFactory) Run(ctx context.Context, sink kvevent.Writer, cfg rang
93
93
}
94
94
g := ctxgroup .WithContext (ctx )
95
95
g .GoCtx (feed .addEventsToBuffer )
96
- var rfOpts []kvcoord.RangeFeedOption
96
+
97
+ // Bulk delivery is an optimization that decreases rangefeed overhead during
98
+ // catchup scans. It results in the emission of BulkEvents instead of
99
+ // individual events where possible.
100
+ rfOpts := []kvcoord.RangeFeedOption {kvcoord .WithBulkDelivery ()}
97
101
if cfg .WithDiff {
98
102
rfOpts = append (rfOpts , kvcoord .WithDiff ())
99
103
}
@@ -132,6 +136,78 @@ func quantizeTS(ts hlc.Timestamp, granularity time.Duration) hlc.Timestamp {
132
136
}
133
137
}
134
138
139
+ func (p * rangefeed ) handleRangefeedEvent (ctx context.Context , e * kvpb.RangeFeedEvent ) error {
140
+ switch t := e .GetValue ().(type ) {
141
+ case * kvpb.RangeFeedValue :
142
+ if p .cfg .Knobs .OnRangeFeedValue != nil {
143
+ if err := p .cfg .Knobs .OnRangeFeedValue (); err != nil {
144
+ return err
145
+ }
146
+ }
147
+ stop := p .st .RangefeedBufferValue .Start ()
148
+ if err := p .memBuf .Add (
149
+ ctx , kvevent .MakeKVEvent (e ),
150
+ ); err != nil {
151
+ return err
152
+ }
153
+ stop ()
154
+ case * kvpb.RangeFeedCheckpoint :
155
+ ev := e .ShallowCopy ()
156
+ ev .Checkpoint .ResolvedTS = quantizeTS (ev .Checkpoint .ResolvedTS , p .cfg .WithFrontierQuantize )
157
+ if resolvedTs := ev .Checkpoint .ResolvedTS ; ! resolvedTs .IsEmpty () && resolvedTs .Less (p .cfg .Frontier ) {
158
+ // RangeFeed happily forwards any closed timestamps it receives as
159
+ // soon as there are no outstanding intents under them.
160
+ // Changefeeds don't care about these at all, so throw them out.
161
+ return nil
162
+ }
163
+ if p .knobs .ShouldSkipCheckpoint != nil && p .knobs .ShouldSkipCheckpoint (t ) {
164
+ return nil
165
+ }
166
+ stop := p .st .RangefeedBufferCheckpoint .Start ()
167
+ if err := p .memBuf .Add (
168
+ ctx , kvevent .MakeResolvedEvent (ev , jobspb .ResolvedSpan_NONE ),
169
+ ); err != nil {
170
+ return err
171
+ }
172
+ stop ()
173
+ case * kvpb.RangeFeedSSTable :
174
+ // For now, we just error on SST ingestion, since we currently don't
175
+ // expect SST ingestion into spans with active changefeeds.
176
+ return errors .Errorf ("unexpected SST ingestion: %v" , t )
177
+ case * kvpb.RangeFeedBulkEvents :
178
+ // TODO(#138346): We can handle these more gracefully once we
179
+ // migrate to the new rangefeed client. Until then, this is
180
+ // still an improvement over not using these.
181
+ for _ , e := range t .Events {
182
+ if err := p .handleRangefeedEvent (ctx , e ); err != nil {
183
+ return err
184
+ }
185
+ }
186
+ case * kvpb.RangeFeedDeleteRange :
187
+ // For now, we just ignore on MVCC range tombstones. These are currently
188
+ // only expected to be used by schema GC and IMPORT INTO, and such spans
189
+ // should not have active changefeeds across them, at least at the times
190
+ // of interest. A case where one will show up in a changefeed is when
191
+ // the primary index changes while we're watching it and then the old
192
+ // primary index is dropped. In this case, we'll get a schema event to
193
+ // restart into the new primary index, but the DeleteRange may come
194
+ // through before the schema event.
195
+ //
196
+ // TODO(erikgrinaker): Write an end-to-end test which verifies that an
197
+ // IMPORT INTO which gets rolled back using MVCC range tombstones will
198
+ // not be visible to a changefeed, neither when it was started before
199
+ // the import or when resuming from a timestamp before the import. The
200
+ // table decriptor should be marked as offline during the import, and
201
+ // catchup scans should detect that this happened and prevent reading
202
+ // anything in that timespan. See:
203
+ // https://github.com/cockroachdb/cockroach/issues/70433
204
+ return nil
205
+ default :
206
+ return errors .Errorf ("unexpected RangeFeedEvent variant %v" , t )
207
+ }
208
+ return nil
209
+ }
210
+
135
211
// addEventsToBuffer consumes rangefeed events from `p.eventCh`, transforms
136
212
// them to kvevent.Event's, and pushes them into `p.memBuf`.
137
213
func (p * rangefeed ) addEventsToBuffer (ctx context.Context ) error {
@@ -141,69 +217,8 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error {
141
217
for {
142
218
select {
143
219
case e := <- p .eventCh :
144
- switch t := e .GetValue ().(type ) {
145
- case * kvpb.RangeFeedValue :
146
- if p .cfg .Knobs .OnRangeFeedValue != nil {
147
- if err := p .cfg .Knobs .OnRangeFeedValue (); err != nil {
148
- return err
149
- }
150
- }
151
- stop := p .st .RangefeedBufferValue .Start ()
152
- if err := p .memBuf .Add (
153
- ctx , kvevent .MakeKVEvent (e .RangeFeedEvent ),
154
- ); err != nil {
155
- return err
156
- }
157
- stop ()
158
- case * kvpb.RangeFeedCheckpoint :
159
- ev := e .ShallowCopy ()
160
- ev .Checkpoint .ResolvedTS = quantizeTS (ev .Checkpoint .ResolvedTS , p .cfg .WithFrontierQuantize )
161
- if resolvedTs := ev .Checkpoint .ResolvedTS ; ! resolvedTs .IsEmpty () && resolvedTs .Less (p .cfg .Frontier ) {
162
- // RangeFeed happily forwards any closed timestamps it receives as
163
- // soon as there are no outstanding intents under them.
164
- // Changefeeds don't care about these at all, so throw them out.
165
- continue
166
- }
167
- if p .knobs .ShouldSkipCheckpoint != nil && p .knobs .ShouldSkipCheckpoint (t ) {
168
- continue
169
- }
170
- stop := p .st .RangefeedBufferCheckpoint .Start ()
171
- if err := p .memBuf .Add (
172
- ctx , kvevent .MakeResolvedEvent (ev , jobspb .ResolvedSpan_NONE ),
173
- ); err != nil {
174
- return err
175
- }
176
- stop ()
177
- case * kvpb.RangeFeedSSTable :
178
- // For now, we just error on SST ingestion, since we currently don't
179
- // expect SST ingestion into spans with active changefeeds.
180
- return errors .Errorf ("unexpected SST ingestion: %v" , t )
181
- case * kvpb.RangeFeedBulkEvents :
182
- // Should be disabled so this is unreachable.
183
- return errors .Errorf ("unexpected bulk delivery: %v" , t )
184
-
185
- case * kvpb.RangeFeedDeleteRange :
186
- // For now, we just ignore on MVCC range tombstones. These are currently
187
- // only expected to be used by schema GC and IMPORT INTO, and such spans
188
- // should not have active changefeeds across them, at least at the times
189
- // of interest. A case where one will show up in a changefeed is when
190
- // the primary index changes while we're watching it and then the old
191
- // primary index is dropped. In this case, we'll get a schema event to
192
- // restart into the new primary index, but the DeleteRange may come
193
- // through before the schema event.
194
- //
195
- // TODO(erikgrinaker): Write an end-to-end test which verifies that an
196
- // IMPORT INTO which gets rolled back using MVCC range tombstones will
197
- // not be visible to a changefeed, neither when it was started before
198
- // the import or when resuming from a timestamp before the import. The
199
- // table decriptor should be marked as offline during the import, and
200
- // catchup scans should detect that this happened and prevent reading
201
- // anything in that timespan. See:
202
- // https://github.com/cockroachdb/cockroach/issues/70433
203
- continue
204
-
205
- default :
206
- return errors .Errorf ("unexpected RangeFeedEvent variant %v" , t )
220
+ if err := p .handleRangefeedEvent (ctx , e .RangeFeedEvent ); err != nil {
221
+ return err
207
222
}
208
223
case <- ctx .Done ():
209
224
return ctx .Err ()
0 commit comments