@@ -129,30 +129,7 @@ public void test_runIndexTask_andReindexIntoAnotherDatasource(PartitionsSpec par
129129 {
130130 final boolean isRollup = partitionsSpec .isForceGuaranteedRollupCompatible ();
131131
132- final TaskBuilder .IndexParallel indexTask =
133- TaskBuilder .ofTypeIndexParallel ()
134- .dataSource (dataSource )
135- .timestampColumn ("timestamp" )
136- .jsonInputFormat ()
137- .localInputSourceWithFiles (
138- Resources .DataFile .tinyWiki1Json (),
139- Resources .DataFile .tinyWiki2Json (),
140- Resources .DataFile .tinyWiki3Json ()
141- )
142- .segmentGranularity ("DAY" )
143- .dimensions ("namespace" , "page" , "language" )
144- .metricAggregates (
145- new DoubleSumAggregatorFactory ("added" , "added" ),
146- new DoubleSumAggregatorFactory ("deleted" , "deleted" ),
147- new DoubleSumAggregatorFactory ("delta" , "delta" ),
148- new CountAggregatorFactory ("count" )
149- )
150- .tuningConfig (
151- t -> t .withPartitionsSpec (partitionsSpec )
152- .withForceGuaranteedRollup (isRollup )
153- .withMaxNumConcurrentSubTasks (10 )
154- .withSplitHintSpec (new MaxSizeSplitHintSpec (1 , null ))
155- );
132+ final TaskBuilder .IndexParallel indexTask = buildIndexParallelTask (partitionsSpec , false );
156133
157134 runTask (indexTask , dataSource );
158135 cluster .callApi ().waitForAllSegmentsToBeAvailable (dataSource , coordinator , broker );
@@ -211,6 +188,55 @@ public void test_runIndexTask_andReindexIntoAnotherDatasource(PartitionsSpec par
211188 runQueries (dataSource3 );
212189 }
213190
191+ @ MethodSource ("getTestParamPartitionsSpec" )
192+ @ ParameterizedTest (name = "partitionsSpec={0}" )
193+ public void test_runIndexTask_andAppendData (PartitionsSpec partitionsSpec )
194+ {
195+ final TaskBuilder .IndexParallel initialTask = buildIndexParallelTask (partitionsSpec , false );
196+ runTask (initialTask , dataSource );
197+ cluster .callApi ().waitForAllSegmentsToBeAvailable (dataSource , coordinator , broker );
198+ cluster .callApi ().verifySqlQuery ("SELECT COUNT(*) FROM %s" , dataSource , "10" );
199+
200+ final TaskBuilder .IndexParallel appendTask
201+ = buildIndexParallelTask (new DynamicPartitionsSpec (null , null ), true );
202+ runTask (appendTask , dataSource );
203+ cluster .callApi ().waitForAllSegmentsToBeAvailable (dataSource , coordinator , broker );
204+ cluster .callApi ().verifySqlQuery ("SELECT COUNT(*) FROM %s" , dataSource , "20" );
205+ }
206+
207+ private TaskBuilder .IndexParallel buildIndexParallelTask (
208+ PartitionsSpec partitionsSpec ,
209+ boolean appendToExisting
210+ )
211+ {
212+ final boolean isRollup = partitionsSpec .isForceGuaranteedRollupCompatible ();
213+
214+ return TaskBuilder .ofTypeIndexParallel ()
215+ .dataSource (dataSource )
216+ .timestampColumn ("timestamp" )
217+ .jsonInputFormat ()
218+ .localInputSourceWithFiles (
219+ Resources .DataFile .tinyWiki1Json (),
220+ Resources .DataFile .tinyWiki2Json (),
221+ Resources .DataFile .tinyWiki3Json ()
222+ )
223+ .segmentGranularity ("DAY" )
224+ .dimensions ("namespace" , "page" , "language" )
225+ .metricAggregates (
226+ new DoubleSumAggregatorFactory ("added" , "added" ),
227+ new DoubleSumAggregatorFactory ("deleted" , "deleted" ),
228+ new DoubleSumAggregatorFactory ("delta" , "delta" ),
229+ new CountAggregatorFactory ("count" )
230+ )
231+ .appendToExisting (appendToExisting )
232+ .tuningConfig (
233+ t -> t .withPartitionsSpec (partitionsSpec )
234+ .withForceGuaranteedRollup (isRollup )
235+ .withMaxNumConcurrentSubTasks (10 )
236+ .withSplitHintSpec (new MaxSizeSplitHintSpec (1 , null ))
237+ );
238+ }
239+
214240 private String runTask (TaskBuilder .IndexParallel taskBuilder , String dataSource )
215241 {
216242 final String taskId = EmbeddedClusterApis .newTaskId (dataSource );
0 commit comments