@@ -217,7 +217,6 @@ private void createContinuousTransform(String indexName, String transformId, Str
217217 * Verify the basic stats API, which includes state, health, and optionally progress (if it exists).
218218 * These are required for Kibana 8.13+.
219219 */
220- @ SuppressWarnings ("unchecked" )
221220 public void testBasicContinuousTransformStats () throws Exception {
222221 var transformId = "transform-continuous-basic-stats" ;
223222 createContinuousTransform ("continuous-basic-stats-reviews" , transformId , "reviews-by-user-business-day" );
@@ -234,6 +233,54 @@ public void testBasicContinuousTransformStats() throws Exception {
234233 deleteTransform (transformId );
235234 }
236235
236+ public void testEmptySourceIndexClearsErrors () throws Exception {
237+ var sourceIndexName = "source-empty-reviews" ;
238+ var destIndexName = "destination-empty-reviews" ;
239+ var transformId = "transform-empty-source-index" ;
240+
241+ createReviewsIndexMappings (sourceIndexName , null );
242+
243+ var config = createTransformConfigBuilder (transformId , destIndexName , QueryConfig .matchAll (), sourceIndexName ).setPivotConfig (
244+ createPivotConfig (groupByUserOnly (), aggregateScoresAndTimes ())
245+ )
246+ .setSyncConfig (new TimeSyncConfig ("timestamp" , TimeValue .timeValueSeconds (1 )))
247+ .setSettings (new SettingsConfig .Builder ().setUnattended (true ).build ())
248+ .build ();
249+
250+ putTransform (transformId , Strings .toString (config ), RequestOptions .DEFAULT );
251+ startTransform (config .getId (), RequestOptions .DEFAULT );
252+
253+ waitUntilCheckpoint (config .getId (), 1L );
254+ assertEquals ("green" , getTransformHealthStatus (transformId ));
255+
256+ // this will cause the transform to fail to search
257+ assertAcknowledged (adminClient ().performRequest (new Request ("PUT" , sourceIndexName + "/_block/read" )));
258+ assertBusy (() -> assertThat (getTransformHealthStatus (transformId ), oneOf ("yellow" , "red" )), 30 , TimeUnit .SECONDS );
259+
260+ // unblock reads on the search index and the transform should recover
261+ var request = new Request ("PUT" , sourceIndexName + "/_settings" );
262+ request .setJsonEntity ("""
263+ { "blocks.read": false }
264+ """ );
265+ assertAcknowledged (adminClient ().performRequest (request ));
266+ assertBusy (() -> assertEquals ("green" , getTransformHealthStatus (transformId )), 30 , TimeUnit .SECONDS );
267+
268+ stopTransform (transformId );
269+ deleteTransform (transformId );
270+ deleteIndex (sourceIndexName );
271+ deleteIndex (destIndexName );
272+ }
273+
274+ private Map <String , SingleGroupSource > groupByUserOnly () {
275+ return Map .of ("by-user" , new TermsGroupSource ("user_id" , null , false ));
276+ }
277+
278+ private AggregatorFactories .Builder aggregateScoresAndTimes () {
279+ return AggregatorFactories .builder ()
280+ .addAggregator (AggregationBuilders .avg ("review_score" ).field ("stars" ))
281+ .addAggregator (AggregationBuilders .max ("timestamp" ).field ("timestamp" ));
282+ }
283+
237284 public void testDestinationIndexBlocked () throws Exception {
238285 var transformId = "transform-continuous-blocked-destination" ;
239286 var sourceIndexName = "source-reviews" ;
@@ -385,12 +432,8 @@ public void testContinuousTransformUpdate() throws Exception {
385432 String indexName = "continuous-reviews-update" ;
386433 createReviewsIndex (indexName , 10 , NUM_USERS , TransformIT ::getUserIdForRow , TransformIT ::getDateStringForRow );
387434
388- Map <String , SingleGroupSource > groups = new HashMap <>();
389- groups .put ("by-user" , new TermsGroupSource ("user_id" , null , false ));
390-
391- AggregatorFactories .Builder aggs = AggregatorFactories .builder ()
392- .addAggregator (AggregationBuilders .avg ("review_score" ).field ("stars" ))
393- .addAggregator (AggregationBuilders .max ("timestamp" ).field ("timestamp" ));
435+ var groups = groupByUserOnly ();
436+ var aggs = aggregateScoresAndTimes ();
394437
395438 String id = "transform-to-update" ;
396439 String dest = "reviews-by-user-business-day-to-update" ;
@@ -481,8 +524,7 @@ public void testRetentionPolicyDelete() throws Exception {
481524 String dest = "retention-policy-dest" ;
482525 createReviewsIndex (indexName , 10 , NUM_USERS , TransformIT ::getUserIdForRow , TransformIT ::getDateStringForRow );
483526
484- Map <String , SingleGroupSource > groups = new HashMap <>();
485- groups .put ("by-user" , new TermsGroupSource ("user_id" , null , false ));
527+ var groups = groupByUserOnly ();
486528
487529 AggregatorFactories .Builder aggs = AggregatorFactories .builder ()
488530 .addAggregator (AggregationBuilders .max ("timestamp" ).field ("timestamp" ));
0 commit comments