@@ -175,21 +175,22 @@ public static Stream<Arguments> parameters() {
175175
176176 @ ParameterizedTest
177177 @ MethodSource ("parameters" )
178- void testReadSingleTableWithSingleParallelism (String tableName , String chunkColumnName )
179- throws Exception {
178+ void testReadSingleTableWithSingleParallelism (
179+ String tableName , String chunkColumnName , String assignEndingFirst ) throws Exception {
180180 testMySqlParallelSource (
181181 1 ,
182182 FailoverType .NONE ,
183183 FailoverPhase .NEVER ,
184184 new String [] {tableName },
185185 tableName ,
186- chunkColumnName );
186+ chunkColumnName ,
187+ assignEndingFirst );
187188 }
188189
189190 @ ParameterizedTest
190191 @ MethodSource ("parameters" )
191192 void testReadSingleTableWithSingleParallelismAndSkipBackFill (
192- String tableName , String chunkColumnName ) throws Exception {
193+ String tableName , String chunkColumnName , String assignEndingFirst ) throws Exception {
193194 testMySqlParallelSource (
194195 1 ,
195196 DEFAULT_SCAN_STARTUP_MODE ,
@@ -199,77 +200,83 @@ void testReadSingleTableWithSingleParallelismAndSkipBackFill(
199200 RestartStrategies .fixedDelayRestart (1 , 0 ),
200201 true ,
201202 tableName ,
202- chunkColumnName );
203+ chunkColumnName ,
204+ assignEndingFirst );
203205 }
204206
205207 @ ParameterizedTest
206208 @ MethodSource ("parameters" )
207- void testReadSingleTableWithMultipleParallelism (String tableName , String chunkColumnName )
208- throws Exception {
209+ void testReadSingleTableWithMultipleParallelism (
210+ String tableName , String chunkColumnName , String assignEndingFirst ) throws Exception {
209211 testMySqlParallelSource (
210212 4 ,
211213 FailoverType .NONE ,
212214 FailoverPhase .NEVER ,
213215 new String [] {tableName },
214216 tableName ,
215- chunkColumnName );
217+ chunkColumnName ,
218+ assignEndingFirst );
216219 }
217220
218221 @ ParameterizedTest
219222 @ MethodSource ("parameters" )
220- void testReadMultipleTableWithSingleParallelism (String tableName , String chunkColumnName )
221- throws Exception {
223+ void testReadMultipleTableWithSingleParallelism (
224+ String tableName , String chunkColumnName , String assignEndingFirst ) throws Exception {
222225 testMySqlParallelSource (
223226 1 ,
224227 FailoverType .NONE ,
225228 FailoverPhase .NEVER ,
226229 new String [] {tableName , "customers_1" },
227230 tableName ,
228- chunkColumnName );
231+ chunkColumnName ,
232+ assignEndingFirst );
229233 }
230234
231235 @ ParameterizedTest
232236 @ MethodSource ("parameters" )
233- void testReadMultipleTableWithMultipleParallelism (String tableName , String chunkColumnName )
234- throws Exception {
237+ void testReadMultipleTableWithMultipleParallelism (
238+ String tableName , String chunkColumnName , String assignEndingFirst ) throws Exception {
235239 testMySqlParallelSource (
236240 4 ,
237241 FailoverType .NONE ,
238242 FailoverPhase .NEVER ,
239243 new String [] {tableName , "customers_1" },
240244 tableName ,
241- chunkColumnName );
245+ chunkColumnName ,
246+ assignEndingFirst );
242247 }
243248
244249 // Failover tests
245250 @ ParameterizedTest
246251 @ MethodSource ("parameters" )
247- void testTaskManagerFailoverInSnapshotPhase (String tableName , String chunkColumnName )
248- throws Exception {
252+ void testTaskManagerFailoverInSnapshotPhase (
253+ String tableName , String chunkColumnName , String assignEndingFirst ) throws Exception {
249254 testMySqlParallelSource (
250255 FailoverType .TM ,
251256 FailoverPhase .SNAPSHOT ,
252257 new String [] {tableName , "customers_1" },
253258 tableName ,
254- chunkColumnName );
259+ chunkColumnName ,
260+ assignEndingFirst );
255261 }
256262
257263 @ ParameterizedTest
258264 @ MethodSource ("parameters" )
259- void testTaskManagerFailoverInBinlogPhase (String tableName , String chunkColumnName )
260- throws Exception {
265+ void testTaskManagerFailoverInBinlogPhase (
266+ String tableName , String chunkColumnName , String assignEndingFirst ) throws Exception {
261267 testMySqlParallelSource (
262268 FailoverType .TM ,
263269 FailoverPhase .BINLOG ,
264270 new String [] {tableName , "customers_1" },
265271 tableName ,
266- chunkColumnName );
272+ chunkColumnName ,
273+ assignEndingFirst );
267274 }
268275
269276 @ ParameterizedTest
270277 @ MethodSource ("parameters" )
271- void testTaskManagerFailoverFromLatestOffset (String tableName , String chunkColumnName )
272- throws Exception {
278+ void testTaskManagerFailoverFromLatestOffset (
279+ String tableName , String chunkColumnName , String assignEndingFirst ) throws Exception {
273280 testMySqlParallelSource (
274281 DEFAULT_PARALLELISM ,
275282 "latest-offset" ,
@@ -278,37 +285,40 @@ void testTaskManagerFailoverFromLatestOffset(String tableName, String chunkColum
278285 new String [] {tableName , "customers_1" },
279286 RestartStrategies .fixedDelayRestart (1 , 0 ),
280287 tableName ,
281- chunkColumnName );
288+ chunkColumnName ,
289+ assignEndingFirst );
282290 }
283291
284292 @ ParameterizedTest
285293 @ MethodSource ("parameters" )
286- void testJobManagerFailoverInSnapshotPhase (String tableName , String chunkColumnName )
287- throws Exception {
294+ void testJobManagerFailoverInSnapshotPhase (
295+ String tableName , String chunkColumnName , String assignEndingFirst ) throws Exception {
288296 testMySqlParallelSource (
289297 FailoverType .JM ,
290298 FailoverPhase .SNAPSHOT ,
291299 new String [] {tableName , "customers_1" },
292300 tableName ,
293- chunkColumnName );
301+ chunkColumnName ,
302+ assignEndingFirst );
294303 }
295304
296305 @ ParameterizedTest
297306 @ MethodSource ("parameters" )
298- void testJobManagerFailoverInBinlogPhase (String tableName , String chunkColumnName )
299- throws Exception {
307+ void testJobManagerFailoverInBinlogPhase (
308+ String tableName , String chunkColumnName , String assignEndingFirst ) throws Exception {
300309 testMySqlParallelSource (
301310 FailoverType .JM ,
302311 FailoverPhase .BINLOG ,
303312 new String [] {tableName , "customers_1" },
304313 tableName ,
305- chunkColumnName );
314+ chunkColumnName ,
315+ assignEndingFirst );
306316 }
307317
308318 @ ParameterizedTest
309319 @ MethodSource ("parameters" )
310- void testJobManagerFailoverFromLatestOffset (String tableName , String chunkColumnName )
311- throws Exception {
320+ void testJobManagerFailoverFromLatestOffset (
321+ String tableName , String chunkColumnName , String assignEndingFirst ) throws Exception {
312322 testMySqlParallelSource (
313323 DEFAULT_PARALLELISM ,
314324 "latest-offset" ,
@@ -317,33 +327,36 @@ void testJobManagerFailoverFromLatestOffset(String tableName, String chunkColumn
317327 new String [] {tableName , "customers_1" },
318328 RestartStrategies .fixedDelayRestart (1 , 0 ),
319329 tableName ,
320- chunkColumnName );
330+ chunkColumnName ,
331+ assignEndingFirst );
321332 }
322333
323334 @ ParameterizedTest
324335 @ MethodSource ("parameters" )
325- void testTaskManagerFailoverSingleParallelism (String tableName , String chunkColumnName )
326- throws Exception {
336+ void testTaskManagerFailoverSingleParallelism (
337+ String tableName , String chunkColumnName , String assignEndingFirst ) throws Exception {
327338 testMySqlParallelSource (
328339 1 ,
329340 FailoverType .TM ,
330341 FailoverPhase .SNAPSHOT ,
331342 new String [] {tableName },
332343 tableName ,
333- chunkColumnName );
344+ chunkColumnName ,
345+ assignEndingFirst );
334346 }
335347
336348 @ ParameterizedTest
337349 @ MethodSource ("parameters" )
338- void testJobManagerFailoverSingleParallelism (String tableName , String chunkColumnName )
339- throws Exception {
350+ void testJobManagerFailoverSingleParallelism (
351+ String tableName , String chunkColumnName , String assignEndingFirst ) throws Exception {
340352 testMySqlParallelSource (
341353 1 ,
342354 FailoverType .JM ,
343355 FailoverPhase .SNAPSHOT ,
344356 new String [] {tableName },
345357 tableName ,
346- chunkColumnName );
358+ chunkColumnName ,
359+ assignEndingFirst );
347360 }
348361
349362 @ ParameterizedTest
@@ -681,6 +694,7 @@ void testSourceMetrics(String tableName, String chunkColumnName) throws Exceptio
681694 .password (customDatabase .getPassword ())
682695 .deserializer (new StringDebeziumDeserializationSchema ())
683696 .serverId (getServerId ())
697+ .serverTimeZone ("UTC" )
684698 .build ();
685699 DataStreamSource <String > stream =
686700 env .fromSource (source , WatermarkStrategy .noWatermarks (), "MySQL CDC Source" );
@@ -1062,15 +1076,17 @@ private void testMySqlParallelSource(
10621076 FailoverPhase failoverPhase ,
10631077 String [] captureCustomerTables ,
10641078 String tableName ,
1065- String chunkColumnName )
1079+ String chunkColumnName ,
1080+ String assignEndingFirst )
10661081 throws Exception {
10671082 testMySqlParallelSource (
10681083 DEFAULT_PARALLELISM ,
10691084 failoverType ,
10701085 failoverPhase ,
10711086 captureCustomerTables ,
10721087 tableName ,
1073- chunkColumnName );
1088+ chunkColumnName ,
1089+ assignEndingFirst );
10741090 }
10751091
10761092 private void testMySqlParallelSource (
@@ -1079,7 +1095,8 @@ private void testMySqlParallelSource(
10791095 FailoverPhase failoverPhase ,
10801096 String [] captureCustomerTables ,
10811097 String tableName ,
1082- String chunkColumnName )
1098+ String chunkColumnName ,
1099+ String assignEndingFirst )
10831100 throws Exception {
10841101 testMySqlParallelSource (
10851102 parallelism ,
@@ -1089,7 +1106,8 @@ private void testMySqlParallelSource(
10891106 captureCustomerTables ,
10901107 RestartStrategies .fixedDelayRestart (1 , 0 ),
10911108 tableName ,
1092- chunkColumnName );
1109+ chunkColumnName ,
1110+ assignEndingFirst );
10931111 }
10941112
10951113 private void testMySqlParallelSource (
@@ -1100,7 +1118,8 @@ private void testMySqlParallelSource(
11001118 String [] captureCustomerTables ,
11011119 RestartStrategies .RestartStrategyConfiguration restartStrategyConfiguration ,
11021120 String tableName ,
1103- String chunkColumnName )
1121+ String chunkColumnName ,
1122+ String assignEndingFirst )
11041123 throws Exception {
11051124 testMySqlParallelSource (
11061125 parallelism ,
@@ -1111,7 +1130,8 @@ private void testMySqlParallelSource(
11111130 restartStrategyConfiguration ,
11121131 false ,
11131132 tableName ,
1114- chunkColumnName );
1133+ chunkColumnName ,
1134+ assignEndingFirst );
11151135 }
11161136
11171137 private void testMySqlParallelSource (
0 commit comments