@@ -13,21 +13,25 @@ import (
13
13
14
14
"github.com/cockroachdb/cockroach/pkg/base"
15
15
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
16
+ "github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
16
17
"github.com/cockroachdb/cockroach/pkg/keys"
17
18
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
18
19
"github.com/cockroachdb/cockroach/pkg/roachpb"
19
20
"github.com/cockroachdb/cockroach/pkg/sql"
20
21
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
21
22
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
22
23
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
24
+ "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
23
25
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
24
26
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
25
27
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
28
+ "github.com/cockroachdb/cockroach/pkg/sql/isql"
26
29
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
27
30
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
28
31
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
29
32
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
30
33
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
34
+ "github.com/cockroachdb/cockroach/pkg/testutils/skip"
31
35
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
32
36
"github.com/cockroachdb/cockroach/pkg/util/hlc"
33
37
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
@@ -116,7 +120,7 @@ func (b *EventBuilder) deleteEvent(
116
120
// newKvBatchHandler creates a new batch handler for testing.
117
121
func newKvBatchHandler (
118
122
t * testing.T , s serverutils.ApplicationLayerInterface , tableName string ,
119
- ) (* kvRowProcessor , catalog.TableDescriptor ) {
123
+ ) (BatchHandler , catalog.TableDescriptor ) {
120
124
ctx := context .Background ()
121
125
desc := cdctest .GetHydratedTableDescriptor (t , s .ExecutorConfig (), tree .Name (tableName ))
122
126
sd := sql .NewInternalSessionData (ctx , s .ClusterSettings (), "" /* opName */ )
@@ -143,6 +147,38 @@ func newKvBatchHandler(
143
147
return handler , desc
144
148
}
145
149
150
+ // newSqlBatchHandler creates a new SQL-based batch handler for testing.
151
+ func newSqlBatchHandler (
152
+ t * testing.T , s serverutils.ApplicationLayerInterface , tableName string ,
153
+ ) (BatchHandler , catalog.TableDescriptor ) {
154
+ ctx := context .Background ()
155
+ desc := cdctest .GetHydratedTableDescriptor (t , s .ExecutorConfig (), tree .Name (tableName ))
156
+ sd := sql .NewInternalSessionData (ctx , s .ClusterSettings (), "" /* opName */ )
157
+ codec := s .Codec ()
158
+ handler , err := makeSQLProcessor (
159
+ ctx ,
160
+ s .ClusterSettings (),
161
+ map [descpb.ID ]sqlProcessorTableConfig {
162
+ desc .GetID (): {
163
+ srcDesc : desc ,
164
+ },
165
+ },
166
+ 0 , // jobID
167
+ s .InternalDB ().(descs.DB ),
168
+ s .InternalDB ().(isql.DB ).Executor (isql .WithSessionData (sd )),
169
+ sd ,
170
+ execinfrapb.LogicalReplicationWriterSpec {
171
+ Mode : jobspb .LogicalReplicationDetails_Immediate ,
172
+ },
173
+ codec ,
174
+ s .LeaseManager ().(* lease.Manager ),
175
+ )
176
+ require .NoError (t , err )
177
+ return handler , desc
178
+ }
179
+
180
+ type batchHandlerFactory func (t * testing.T , s serverutils.ApplicationLayerInterface , tableName string ) (BatchHandler , catalog.TableDescriptor )
181
+
146
182
// addAndRemoveColumn is a metamorphic test option that randomly decides whether
147
183
// to add and remove a column to trigger a rebuild of the primary key.
148
184
var addAndRemoveColumn = metamorphic .ConstantWithTestBool ("batch-handler-exhaustive-rebuild-primary-index" , false )
@@ -151,10 +187,34 @@ var addAndRemoveColumn = metamorphic.ConstantWithTestBool("batch-handler-exhaust
151
187
// to add a unique constraint to the table.
152
188
var uniqueConstraint = metamorphic .ConstantWithTestBool ("batch-handler-exhaustive-unique-constraint" , false )
153
189
154
- func TestBatchHandlerExhaustive (t * testing.T ) {
190
+ func TestBatchHandlerExhaustiveKV (t * testing.T ) {
191
+ defer leaktest .AfterTest (t )()
192
+ defer log .Scope (t ).Close (t )
193
+
194
+ testBatchHandlerExhaustive (t , newKvBatchHandler )
195
+ }
196
+
197
+ func TestBatchHandlerExhaustiveSQL (t * testing.T ) {
198
+ defer leaktest .AfterTest (t )()
199
+ defer log .Scope (t ).Close (t )
200
+
201
+ skip .WithIssue (t , 146117 )
202
+
203
+ testBatchHandlerExhaustive (t , newSqlBatchHandler )
204
+ }
205
+
206
+ func TestBatchHandlerExhaustiveCrud (t * testing.T ) {
155
207
defer leaktest .AfterTest (t )()
156
208
defer log .Scope (t ).Close (t )
157
209
210
+ skip .WithIssue (t , 146117 )
211
+
212
+ testBatchHandlerExhaustive (t , newCrudBatchHandler )
213
+ }
214
+
215
+ func testBatchHandlerExhaustive (t * testing.T , factory batchHandlerFactory ) {
216
+ t .Helper ()
217
+
158
218
// This test is an "exhaustive" test of the batch handler. It tries to test
159
219
// cross product of every possible (replication event type, local value,
160
220
// previous value, lww win).
@@ -188,7 +248,7 @@ func TestBatchHandlerExhaustive(t *testing.T) {
188
248
runner .Exec (t , `ALTER TABLE test_table DROP COLUMN temp_col` )
189
249
}
190
250
191
- handler , desc := newKvBatchHandler (t , s , "test_table" )
251
+ handler , desc := factory (t , s , "test_table" )
192
252
defer handler .ReleaseLeases (ctx )
193
253
194
254
// TODO(jeffswenson): test the other handler types.
0 commit comments