@@ -104,10 +104,24 @@ protected String logProxyOptionsString() {
104104 + String .format (" 'rootserver-list' = '%s'" , METADATA .getRsList ());
105105 }
106106
107+ /**
108+ * Current OceanBase connector uses timestamp (in seconds) to mark the offset during the
109+ * transition from {@code SNAPSHOT} to {@code STREAMING} mode. Thus, if some snapshot inserting
110+ * events are too close to the transitioning offset, snapshot inserting events might be emitted
111+ * multiple times. <br>
112+ * This could be safely removed after switching to incremental snapshot framework which provides
113+ * Exactly-once guarantee.
114+ */
115+ private void waitForTableInitialization () throws InterruptedException {
116+ Thread .sleep (5000L );
117+ }
118+
107119 @ Test
108120 public void testTableList () throws Exception {
109121 inventoryDatabase = new UniqueDatabase (OB_SERVER , "inventory" );
110122 inventoryDatabase .createAndInitialize ("mysql" );
123+ waitForTableInitialization ();
124+
111125 String sourceDDL =
112126 String .format (
113127 "CREATE TABLE ob_source ("
@@ -212,6 +226,8 @@ public void testTableList() throws Exception {
212226 public void testMetadataColumns () throws Exception {
213227 inventoryDatabase = new UniqueDatabase (OB_SERVER , "inventory" );
214228 inventoryDatabase .createAndInitialize ("mysql" );
229+ waitForTableInitialization ();
230+
215231 String sourceDDL =
216232 String .format (
217233 "CREATE TABLE ob_source ("
@@ -297,6 +313,7 @@ public void testAllDataTypes() throws Exception {
297313
298314 columnTypesDatabase = new UniqueDatabase (OB_SERVER , "column_type_test" );
299315 columnTypesDatabase .createAndInitialize ("mysql" );
316+ waitForTableInitialization ();
300317
301318 String sourceDDL =
302319 String .format (
@@ -488,6 +505,7 @@ public void testTimeDataTypes(String serverTimeZone) throws Exception {
488505
489506 columnTypesDatabase = new UniqueDatabase (OB_SERVER , "column_type_test" );
490507 columnTypesDatabase .createAndInitialize ("mysql" );
508+ waitForTableInitialization ();
491509
492510 String sourceDDL =
493511 String .format (
@@ -559,6 +577,7 @@ public void testTimeDataTypes(String serverTimeZone) throws Exception {
559577 public void testSnapshotOnly () throws Exception {
560578 inventoryDatabase = new UniqueDatabase (OB_SERVER , "inventory" );
561579 inventoryDatabase .createAndInitialize ("mysql" );
580+ waitForTableInitialization ();
562581
563582 String sourceDDL =
564583 String .format (
@@ -611,7 +630,7 @@ public void testSnapshotOnly() throws Exception {
611630
612631 while (result .getJobClient ().get ().getJobStatus ().get ().equals (JobStatus .RUNNING )) {
613632 Thread .sleep (100 );
614- // Waiting for job to quit, in case if
633+ // Waiting for job to finish (SNAPSHOT job will end spontaneously)
615634 }
616635 }
617636}
0 commit comments