@@ -255,6 +255,97 @@ void validate(ActionListener<Void> listener) {
255255 }
256256 }
257257
258+ class MockedTransformIndexerForStatePersistenceTesting extends TransformIndexer {
259+
260+ private long timeNanos = 0 ;
261+
262+ MockedTransformIndexerForStatePersistenceTesting (
263+ ThreadPool threadPool ,
264+ TransformServices transformServices ,
265+ CheckpointProvider checkpointProvider ,
266+ TransformConfig transformConfig ,
267+ AtomicReference <IndexerState > initialState ,
268+ TransformIndexerPosition initialPosition ,
269+ TransformIndexerStats jobStats ,
270+ TransformContext context
271+ ) {
272+ super (
273+ threadPool ,
274+ transformServices ,
275+ checkpointProvider ,
276+ transformConfig ,
277+ initialState ,
278+ initialPosition ,
279+ jobStats ,
280+ /* TransformProgress */ null ,
281+ TransformCheckpoint .EMPTY ,
282+ TransformCheckpoint .EMPTY ,
283+ context
284+ );
285+ }
286+
287+ public void setTimeMillis (long millis ) {
288+ this .timeNanos = TimeUnit .MILLISECONDS .toNanos (millis );
289+ }
290+
291+ @ Override
292+ protected long getTimeNanos () {
293+ return timeNanos ;
294+ }
295+
296+ @ Override
297+ protected void doNextSearch (long waitTimeInNanos , ActionListener <SearchResponse > nextPhase ) {
298+ threadPool .generic ().execute (() -> nextPhase .onResponse (ONE_HIT_SEARCH_RESPONSE ));
299+ }
300+
301+ @ Override
302+ protected void doNextBulk (BulkRequest request , ActionListener <BulkResponse > nextPhase ) {
303+ threadPool .generic ().execute (() -> nextPhase .onResponse (new BulkResponse (new BulkItemResponse [0 ], 100 )));
304+ }
305+
306+ @ Override
307+ void doGetInitialProgress (SearchRequest request , ActionListener <SearchResponse > responseListener ) {
308+ responseListener .onResponse (ONE_HIT_SEARCH_RESPONSE );
309+ }
310+
311+ @ Override
312+ void doGetFieldMappings (ActionListener <Map <String , String >> fieldMappingsListener ) {
313+ fieldMappingsListener .onResponse (Collections .emptyMap ());
314+ }
315+
316+ @ Override
317+ void doDeleteByQuery (DeleteByQueryRequest deleteByQueryRequest , ActionListener <BulkByScrollResponse > responseListener ) {
318+ responseListener .onResponse (
319+ new BulkByScrollResponse (
320+ TimeValue .ZERO ,
321+ new BulkByScrollTask .Status (Collections .emptyList (), null ),
322+ Collections .emptyList (),
323+ Collections .emptyList (),
324+ false
325+ )
326+ );
327+ }
328+
329+ @ Override
330+ void refreshDestinationIndex (ActionListener <RefreshResponse > responseListener ) {
331+ responseListener .onResponse (new RefreshResponse (1 , 1 , 0 , Collections .emptyList ()));
332+ }
333+
334+ @ Override
335+ void persistState (TransformState state , ActionListener <Void > listener ) {
336+ listener .onResponse (null );
337+ }
338+
339+ @ Override
340+ void validate (ActionListener <Void > listener ) {
341+ listener .onResponse (null );
342+ }
343+
344+ public void initialize () {
345+ this .initializeFunction ();
346+ }
347+ }
348+
258349 @ Before
259350 public void setUpMocks () {
260351 auditor = MockTransformAuditor .createMockAuditor ();
@@ -269,6 +360,87 @@ public void tearDownClient() {
269360 ThreadPool .terminate (threadPool , 30 , TimeUnit .SECONDS );
270361 }
271362
363+ public void testTriggerStatePersistence () {
364+ TransformConfig config = new TransformConfig (
365+ randomAlphaOfLength (10 ),
366+ randomSourceConfig (),
367+ randomDestConfig (),
368+ null ,
369+ new TimeSyncConfig ("timestamp" , TimeValue .timeValueSeconds (1 )),
370+ null ,
371+ randomPivotConfig (),
372+ null ,
373+ randomBoolean () ? null : randomAlphaOfLengthBetween (1 , 1000 ),
374+ null ,
375+ null ,
376+ null ,
377+ null ,
378+ null
379+ );
380+ AtomicReference <IndexerState > state = new AtomicReference <>(IndexerState .INDEXING );
381+
382+ TransformContext context = new TransformContext (TransformTaskState .STARTED , "" , 0 , mock (TransformContext .Listener .class ));
383+ final MockedTransformIndexerForStatePersistenceTesting indexer = createMockIndexerForStatePersistenceTesting (
384+ config ,
385+ state ,
386+ null ,
387+ threadPool ,
388+ auditor ,
389+ null ,
390+ new TransformIndexerStats (),
391+ context
392+ );
393+
394+ assertFalse (indexer .triggerSaveState ());
395+ // simple: advancing the time should trigger state persistence
396+ indexer .setTimeMillis (80_000 );
397+ assertTrue (indexer .triggerSaveState ());
398+ // still true as state persistence has not been executed
399+ assertTrue (indexer .triggerSaveState ());
400+ indexer .doSaveState (IndexerState .INDEXING , null , () -> {});
401+ // after state persistence, the trigger should return false
402+ assertFalse (indexer .triggerSaveState ());
403+ // advance time twice, but don't persist
404+ indexer .setTimeMillis (81_000 );
405+ assertFalse (indexer .triggerSaveState ());
406+ indexer .setTimeMillis (140_000 );
407+ assertFalse (indexer .triggerSaveState ());
408+ // now trigger should return false as last persistence was 80_000
409+ indexer .setTimeMillis (140_001 );
410+ assertTrue (indexer .triggerSaveState ());
411+ // persist and check trigger
412+ indexer .doSaveState (IndexerState .INDEXING , null , () -> {});
413+ assertFalse (indexer .triggerSaveState ());
414+ // check trigger but persist later
415+ indexer .setTimeMillis (200_001 );
416+ assertFalse (indexer .triggerSaveState ());
417+ indexer .setTimeMillis (240_000 );
418+ indexer .doSaveState (IndexerState .INDEXING , null , () -> {});
419+ assertFalse (indexer .triggerSaveState ());
420+ // last persistence should be 240_000, so don't trigger
421+ indexer .setTimeMillis (270_000 );
422+ assertFalse (indexer .triggerSaveState ());
423+ indexer .setTimeMillis (300_001 );
424+ assertTrue (indexer .triggerSaveState ());
425+ indexer .doSaveState (IndexerState .INDEXING , null , () -> {});
426+ assertFalse (indexer .triggerSaveState ());
427+ // advance again, it shouldn't trigger
428+ indexer .setTimeMillis (310_000 );
429+ assertFalse (indexer .triggerSaveState ());
430+
431+ // set stop at checkpoint, which must trigger state persistence
432+ setStopAtCheckpoint (indexer , true , ActionListener .noop ());
433+ assertTrue (indexer .triggerSaveState ());
434+ indexer .setTimeMillis (311_000 );
435+ // after state persistence, trigger should return false
436+ indexer .doSaveState (IndexerState .INDEXING , null , () -> {});
437+ indexer .setTimeMillis (310_200 );
438+ assertFalse (indexer .triggerSaveState ());
439+
440+ // after time has passed, trigger should work again
441+ indexer .setTimeMillis (371_001 );
442+ }
443+
272444 public void testStopAtCheckpoint () throws Exception {
273445 TransformConfig config = new TransformConfig (
274446 randomAlphaOfLength (10 ),
@@ -653,4 +825,38 @@ private MockedTransformIndexer createMockIndexer(
653825 indexer .initialize ();
654826 return indexer ;
655827 }
828+
829+ private MockedTransformIndexerForStatePersistenceTesting createMockIndexerForStatePersistenceTesting (
830+ TransformConfig config ,
831+ AtomicReference <IndexerState > state ,
832+ Consumer <String > failureConsumer ,
833+ ThreadPool threadPool ,
834+ TransformAuditor transformAuditor ,
835+ TransformIndexerPosition initialPosition ,
836+ TransformIndexerStats jobStats ,
837+ TransformContext context
838+ ) {
839+ CheckpointProvider checkpointProvider = new MockTimebasedCheckpointProvider (config );
840+ transformConfigManager .putTransformConfiguration (config , ActionListener .noop ());
841+ TransformServices transformServices = new TransformServices (
842+ transformConfigManager ,
843+ mock (TransformCheckpointService .class ),
844+ transformAuditor ,
845+ new TransformScheduler (Clock .systemUTC (), threadPool , Settings .EMPTY )
846+ );
847+
848+ MockedTransformIndexerForStatePersistenceTesting indexer = new MockedTransformIndexerForStatePersistenceTesting (
849+ threadPool ,
850+ transformServices ,
851+ checkpointProvider ,
852+ config ,
853+ state ,
854+ initialPosition ,
855+ jobStats ,
856+ context
857+ );
858+
859+ indexer .initialize ();
860+ return indexer ;
861+ }
656862}
0 commit comments