@@ -21,14 +21,20 @@ func tableUUIDSuffix() string {
2121
2222// nolint:revive
2323func (s * WriterTestSuite ) migrate (ctx context.Context , target * schema.Table , source * schema.Table , supportsSafeMigrate bool , writeOptionMigrateForce bool ) error {
24- const rowsPerRecord = 10
24+ var rowsPerRecord = int ( 10 )
2525 if err := s .plugin .writeOne (ctx , & message.WriteMigrateTable {
2626 Table : source ,
2727 MigrateForce : writeOptionMigrateForce ,
2828 }); err != nil {
2929 return fmt .Errorf ("failed to create table: %w" , err )
3030 }
3131
32+ records , err := s .plugin .readAll (ctx , source )
33+ if err != nil {
34+ return fmt .Errorf ("failed to read initial records: %w" , err )
35+ }
36+ initialItems := int (TotalRows (records ))
37+
3238 sourceName := target .Name
3339 syncTime := time .Now ().UTC ().Round (1 * time .Second )
3440 opts := schema.GenTestDataOptions {
@@ -37,7 +43,10 @@ func (s *WriterTestSuite) migrate(ctx context.Context, target *schema.Table, sou
3743 MaxRows : rowsPerRecord ,
3844 TimePrecision : s .genDatOptions .TimePrecision ,
3945 }
40- tg := schema .NewTestDataGenerator ()
46+ // Test Generator should be initialized with the current number of items in the destination
47+ // this allows us to have multi-pass tests that ensure the migrations are stable
48+ // create--> write --> migrate --> write -->migrate -->write-->migrate -->write
49+ tg := schema .NewTestDataGenerator (uint64 (initialItems ))
4150 resource1 := tg .Generate (source , opts )
4251 if err := s .plugin .writeOne (ctx , & message.WriteInsert {
4352 Record : resource1 ,
@@ -46,15 +55,18 @@ func (s *WriterTestSuite) migrate(ctx context.Context, target *schema.Table, sou
4655 }
4756 resource1 = s .handleNulls (resource1 ) // we process nulls after writing
4857
49- records , err : = s .plugin .readAll (ctx , source )
58+ records , err = s .plugin .readAll (ctx , source )
5059 if err != nil {
5160 return fmt .Errorf ("failed to sync: %w" , err )
5261 }
62+ sortRecords (source , records , "id" )
63+ records = records [initialItems :]
64+
5365 totalItems := TotalRows (records )
54- if totalItems != rowsPerRecord {
66+ if totalItems != int64 ( rowsPerRecord ) {
5567 return fmt .Errorf ("expected items: %d, got: %d" , rowsPerRecord , totalItems )
5668 }
57- sortRecords ( source , records , "id" )
69+
5870 if diff := RecordsDiff (source .ToArrowSchema (), records , []arrow.Record {resource1 }); diff != "" {
5971 return fmt .Errorf ("first record differs from expectation: %s" , diff )
6072 }
@@ -79,14 +91,14 @@ func (s *WriterTestSuite) migrate(ctx context.Context, target *schema.Table, sou
7991 return fmt .Errorf ("failed to readAll: %w" , err )
8092 }
8193 sortRecords (target , records , "id" )
82-
94+ records = records [ initialItems :]
8395 lastRow := resource2 .NewSlice (resource2 .NumRows ()- 1 , resource2 .NumRows ())
8496 // if force migration is not required, we don't expect any items to be dropped (so there should be 2 items)
8597 if ! writeOptionMigrateForce || supportsSafeMigrate {
86- if err := expectRows (target .ToArrowSchema (), records , 2 * rowsPerRecord , lastRow ); err != nil {
87- if writeOptionMigrateForce && TotalRows (records ) == rowsPerRecord {
98+ if err := expectRows (target .ToArrowSchema (), records , 2 * int64 ( rowsPerRecord ) , lastRow ); err != nil {
99+ if writeOptionMigrateForce && TotalRows (records ) == int64 ( rowsPerRecord ) {
88100 // if force migration is required, we can also expect 1 item to be dropped
89- return expectRows (target .ToArrowSchema (), records , rowsPerRecord , lastRow )
101+ return expectRows (target .ToArrowSchema (), records , int64 ( rowsPerRecord ) , lastRow )
90102 }
91103
92104 return err
@@ -95,7 +107,7 @@ func (s *WriterTestSuite) migrate(ctx context.Context, target *schema.Table, sou
95107 return nil
96108 }
97109
98- return expectRows (target .ToArrowSchema (), records , rowsPerRecord , lastRow )
110+ return expectRows (target .ToArrowSchema (), records , int64 ( rowsPerRecord ) , lastRow )
99111}
100112
101113// nolint:revive
@@ -112,7 +124,7 @@ func (s *WriterTestSuite) testMigrate(
112124 if ! forceMigrate && ! s .tests .SafeMigrations .AddColumn {
113125 t .Skip ("skipping test: add_column" )
114126 }
115- tableName := "add_column " + suffix + "_" + tableUUIDSuffix ()
127+ tableName := "cq_add_column " + suffix + "_" + tableUUIDSuffix ()
116128 source := & schema.Table {
117129 Name : tableName ,
118130 Columns : schema.ColumnList {
@@ -129,16 +141,17 @@ func (s *WriterTestSuite) testMigrate(
129141 {Name : "bool" , Type : arrow .FixedWidthTypes .Boolean },
130142 },
131143 }
132- if err := s .migrate (ctx , target , source , s .tests .SafeMigrations .AddColumn , forceMigrate ); err != nil {
133- t .Fatalf ("failed to migrate %s: %v" , tableName , err )
144+ require .NoError (t , s .migrate (ctx , target , source , s .tests .SafeMigrations .AddColumn , forceMigrate ))
145+ if ! forceMigrate {
146+ require .NoError (t , s .migrate (ctx , target , target , true , false ))
134147 }
135148 })
136149
137150 t .Run ("add_column_not_null" + suffix , func (t * testing.T ) {
138151 if ! forceMigrate && ! s .tests .SafeMigrations .AddColumnNotNull {
139152 t .Skip ("skipping test: add_column_not_null" )
140153 }
141- tableName := "add_column_not_null " + suffix + "_" + tableUUIDSuffix ()
154+ tableName := "cq_add_column_not_null " + suffix + "_" + tableUUIDSuffix ()
142155 source := & schema.Table {
143156 Name : tableName ,
144157 Columns : schema.ColumnList {
@@ -154,16 +167,18 @@ func (s *WriterTestSuite) testMigrate(
154167 {Name : "uuid" , Type : types .ExtensionTypes .UUID },
155168 {Name : "bool" , Type : arrow .FixedWidthTypes .Boolean , NotNull : true },
156169 }}
157- if err := s .migrate (ctx , target , source , s .tests .SafeMigrations .AddColumnNotNull , forceMigrate ); err != nil {
158- t .Fatalf ("failed to migrate add_column_not_null: %v" , err )
170+ require .NoError (t , s .migrate (ctx , target , source , s .tests .SafeMigrations .AddColumnNotNull , forceMigrate ))
171+ if ! forceMigrate {
172+ require .NoError (t , s .migrate (ctx , target , target , true , false ))
159173 }
174+
160175 })
161176
162177 t .Run ("remove_column" + suffix , func (t * testing.T ) {
163178 if ! forceMigrate && ! s .tests .SafeMigrations .RemoveColumn {
164179 t .Skip ("skipping test: remove_column" )
165180 }
166- tableName := "remove_column " + suffix + "_" + tableUUIDSuffix ()
181+ tableName := "cq_remove_column " + suffix + "_" + tableUUIDSuffix ()
167182 source := & schema.Table {
168183 Name : tableName ,
169184 Columns : schema.ColumnList {
@@ -177,16 +192,17 @@ func (s *WriterTestSuite) testMigrate(
177192 {Name : "id" , Type : arrow .PrimitiveTypes .Int64 },
178193 {Name : "uuid" , Type : types .ExtensionTypes .UUID },
179194 }}
180- if err := s .migrate (ctx , target , source , s .tests .SafeMigrations .RemoveColumn , forceMigrate ); err != nil {
181- t .Fatalf ("failed to migrate remove_column: %v" , err )
195+ require .NoError (t , s .migrate (ctx , target , source , s .tests .SafeMigrations .RemoveColumn , forceMigrate ))
196+ if ! forceMigrate {
197+ require .NoError (t , s .migrate (ctx , target , target , true , false ))
182198 }
183199 })
184200
185201 t .Run ("remove_column_not_null" + suffix , func (t * testing.T ) {
186202 if ! forceMigrate && ! s .tests .SafeMigrations .RemoveColumnNotNull {
187203 t .Skip ("skipping test: remove_column_not_null" )
188204 }
189- tableName := "remove_column_not_null " + suffix + "_" + tableUUIDSuffix ()
205+ tableName := "cq_remove_column_not_null " + suffix + "_" + tableUUIDSuffix ()
190206 source := & schema.Table {
191207 Name : tableName ,
192208 Columns : schema.ColumnList {
@@ -201,16 +217,17 @@ func (s *WriterTestSuite) testMigrate(
201217 {Name : "id" , Type : arrow .PrimitiveTypes .Int64 },
202218 {Name : "uuid" , Type : types .ExtensionTypes .UUID },
203219 }}
204- if err := s .migrate (ctx , target , source , s .tests .SafeMigrations .RemoveColumnNotNull , forceMigrate ); err != nil {
205- t .Fatalf ("failed to migrate remove_column_not_null: %v" , err )
220+ require .NoError (t , s .migrate (ctx , target , source , s .tests .SafeMigrations .RemoveColumnNotNull , forceMigrate ))
221+ if ! forceMigrate {
222+ require .NoError (t , s .migrate (ctx , target , target , true , false ))
206223 }
207224 })
208225
209226 t .Run ("change_column" + suffix , func (t * testing.T ) {
210227 if ! forceMigrate && ! s .tests .SafeMigrations .ChangeColumn {
211228 t .Skip ("skipping test: change_column" )
212229 }
213- tableName := "change_column " + suffix + "_" + tableUUIDSuffix ()
230+ tableName := "cq_change_column " + suffix + "_" + tableUUIDSuffix ()
214231 source := & schema.Table {
215232 Name : tableName ,
216233 Columns : schema.ColumnList {
@@ -225,16 +242,17 @@ func (s *WriterTestSuite) testMigrate(
225242 {Name : "uuid" , Type : types .ExtensionTypes .UUID },
226243 {Name : "bool" , Type : arrow .BinaryTypes .String , NotNull : true },
227244 }}
228- if err := s .migrate (ctx , target , source , s .tests .SafeMigrations .ChangeColumn , forceMigrate ); err != nil {
229- t .Fatalf ("failed to migrate change_column: %v" , err )
245+ require .NoError (t , s .migrate (ctx , target , source , s .tests .SafeMigrations .ChangeColumn , forceMigrate ))
246+ if ! forceMigrate {
247+ require .NoError (t , s .migrate (ctx , target , target , true , false ))
230248 }
231249 })
232250
233251 t .Run ("move_to_cq_id_only" + suffix , func (t * testing.T ) {
234252 if ! forceMigrate && ! s .tests .SafeMigrations .MovePKToCQOnly {
235253 t .Skip ("skipping test: move_to_cq_id_only" )
236254 }
237- tableName := "move_to_cq_id_only " + suffix + "_" + tableUUIDSuffix ()
255+ tableName := "cq_move_to_cq_id_only " + suffix + "_" + tableUUIDSuffix ()
238256 source := & schema.Table {
239257 Name : tableName ,
240258 Columns : schema.ColumnList {
@@ -251,16 +269,43 @@ func (s *WriterTestSuite) testMigrate(
251269 {Name : "uuid" , Type : types .ExtensionTypes .UUID },
252270 {Name : "bool" , Type : arrow .FixedWidthTypes .Boolean , NotNull : true },
253271 }}
254- if err := s .migrate (ctx , target , source , s .tests .SafeMigrations .MovePKToCQOnly , forceMigrate ); err != nil {
255- t .Fatalf ("failed to migrate move_to_cq_id_only: %v" , err )
272+ require .NoError (t , s .migrate (ctx , target , source , s .tests .SafeMigrations .MovePKToCQOnly , forceMigrate ))
273+ if ! forceMigrate {
274+ require .NoError (t , s .migrate (ctx , target , target , true , false ))
275+ }
276+ })
277+ t .Run ("move_to_cq_id_only_adding_pkc" + suffix , func (t * testing.T ) {
278+ if ! forceMigrate && ! s .tests .SafeMigrations .MovePKToCQOnly {
279+ t .Skip ("skipping test: move_to_cq_id_only_adding_pk" )
280+ }
281+ tableName := "cq_move_to_cq_id_only_adding_pkc" + suffix + "_" + tableUUIDSuffix ()
282+ source := & schema.Table {
283+ Name : tableName ,
284+ Columns : schema.ColumnList {
285+ {Name : "_cq_id" , Type : types .ExtensionTypes .UUID , NotNull : true , Unique : true },
286+ {Name : "id" , Type : arrow .PrimitiveTypes .Int64 , PrimaryKey : true },
287+ {Name : "uuid" , Type : types .ExtensionTypes .UUID },
288+ {Name : "bool" , Type : arrow .FixedWidthTypes .Boolean , NotNull : true , PrimaryKeyComponent : true },
289+ }}
290+ target := & schema.Table {
291+ Name : tableName ,
292+ Columns : schema.ColumnList {
293+ {Name : "_cq_id" , Type : types .ExtensionTypes .UUID , NotNull : true , Unique : true , PrimaryKey : true },
294+ {Name : "id" , Type : arrow .PrimitiveTypes .Int64 },
295+ {Name : "uuid" , Type : types .ExtensionTypes .UUID },
296+ {Name : "bool" , Type : arrow .FixedWidthTypes .Boolean , NotNull : true },
297+ }}
298+ require .NoError (t , s .migrate (ctx , target , source , s .tests .SafeMigrations .MovePKToCQOnly , forceMigrate ))
299+ if ! forceMigrate {
300+ require .NoError (t , s .migrate (ctx , target , target , true , false ))
256301 }
257302 })
258303
259304 t .Run ("double_migration" , func (t * testing.T ) {
260305 if forceMigrate {
261306 t .Skip ("double migration test has sense only for safe migrations" )
262307 }
263- tableName := "double_migration_ " + tableUUIDSuffix ()
308+ tableName := "cq_double_migration_ " + tableUUIDSuffix ()
264309 table := schema .TestTable (tableName , s .genDatOptions )
265310 // s.migrate will perform create->write->migrate->write
266311 require .NoError (t , s .migrate (ctx , table , table , true , false ))
0 commit comments