66 */
77package org .elasticsearch .xpack .ml .integration ;
88
9+ import org .elasticsearch .action .ActionListener ;
10+ import org .elasticsearch .action .ActionRequest ;
11+ import org .elasticsearch .action .ActionResponse ;
12+ import org .elasticsearch .action .support .ActionFilter ;
13+ import org .elasticsearch .action .support .ActionFilterChain ;
914import org .elasticsearch .cluster .metadata .Metadata ;
1015import org .elasticsearch .core .TimeValue ;
1116import org .elasticsearch .index .query .QueryBuilders ;
17+ import org .elasticsearch .plugins .ActionPlugin ;
18+ import org .elasticsearch .plugins .Plugin ;
1219import org .elasticsearch .search .SearchHit ;
1320import org .elasticsearch .search .sort .SortOrder ;
21+ import org .elasticsearch .tasks .Task ;
1422import org .elasticsearch .xpack .core .ml .action .GetBucketsAction ;
23+ import org .elasticsearch .xpack .core .ml .action .GetCalendarEventsAction ;
1524import org .elasticsearch .xpack .core .ml .action .GetRecordsAction ;
1625import org .elasticsearch .xpack .core .ml .action .UpdateJobAction ;
26+ import org .elasticsearch .xpack .core .ml .action .UpdateProcessAction ;
1727import org .elasticsearch .xpack .core .ml .calendars .ScheduledEvent ;
1828import org .elasticsearch .xpack .core .ml .job .config .AnalysisConfig ;
1929import org .elasticsearch .xpack .core .ml .job .config .DataDescription ;
2838import java .io .IOException ;
2939import java .time .Instant ;
3040import java .util .ArrayList ;
41+ import java .util .Collection ;
3142import java .util .Collections ;
3243import java .util .List ;
44+ import java .util .concurrent .atomic .AtomicInteger ;
3345import java .util .stream .Collectors ;
3446
3547import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertResponse ;
@@ -45,6 +57,13 @@ public void cleanUpTest() {
4557 cleanUp ();
4658 }
4759
60+ @ Override
61+ protected Collection <Class <? extends Plugin >> nodePlugins () {
62+ List <Class <? extends Plugin >> plugins = new ArrayList <>(super .nodePlugins ());
63+ plugins .add (ScheduledEventsIT .UpdateProcessActionTrackerPlugin .class );
64+ return plugins ;
65+ }
66+
4867 public void testScheduledEvents () throws IOException {
4968
5069 TimeValue bucketSpan = TimeValue .timeValueMinutes (30 );
@@ -464,6 +483,166 @@ public void testNewJobWithGlobalCalendar() throws Exception {
464483 assertThat (buckets .get (5 ).getScheduledEvents (), contains ("Event added after job is opened" ));
465484 }
466485
486+ /**
487+ * Test that verifies UpdateProcessAction is called with correct parameters when calendar events
488+ * are posted asynchronously, using ActionFilter to directly intercept the calls
489+ */
490+ public void testCalendarUpdateCallsUpdateProcessAction () throws Exception {
491+ // Reset tracker
492+ ScheduledEventsIT .UpdateProcessActionTrackerPlugin .reset ();
493+
494+ TimeValue bucketSpan = TimeValue .timeValueMinutes (30 );
495+ String jobId = "calendar-update-test" ;
496+ String calendarId = "test-calendar" ;
497+
498+ // Create and open single job
499+ createJob (jobId , bucketSpan );
500+ openJob (jobId );
501+
502+ // Create calendar with the job
503+ putCalendar (calendarId , List .of (jobId ), "Update process action test" );
504+
505+ // Create scheduled event
506+ List <ScheduledEvent > events = List .of (
507+ new ScheduledEvent .Builder ().description ("Direct Test Event" )
508+ .startTime (Instant .ofEpochMilli (System .currentTimeMillis () + 60000 ))
509+ .endTime (Instant .ofEpochMilli (System .currentTimeMillis () + 120000 ))
510+ .calendarId (calendarId )
511+ .build ()
512+ );
513+
514+ // Post events - API should return immediately with async implementation
515+ postScheduledEvents (calendarId , events );
516+
517+ // Wait for and verify ActionFilter captured the UpdateProcessAction call
518+ // We intercept the call to UpdateProcessAction using an ActionFilter to verify the call was made
519+ assertBusy (() -> {
520+ assertThat (
521+ "Should have intercepted UpdateProcessAction call" ,
522+ ScheduledEventsIT .UpdateProcessActionTrackerPlugin .updateProcessCallCount .get (),
523+ equalTo (1 )
524+ );
525+ assertThat (
526+ "Should have called UpdateProcessAction for the correct job" ,
527+ ScheduledEventsIT .UpdateProcessActionTrackerPlugin .updatedJobIds ,
528+ contains (jobId )
529+ );
530+ });
531+
532+ // Verify calendar events were stored correctly
533+ verifyCalendarEventsStored (calendarId , 1 );
534+
535+ // Cleanup
536+ closeJob (jobId );
537+
538+ logger .info ("Successfully verified UpdateProcessAction call with updateScheduledEvents=true for job [{}]" , jobId );
539+ }
540+
541+ /**
542+ * Test calendar updates with closed jobs (should not fail)
543+ */
544+ public void testCalendarUpdateWithClosedJobs () throws IOException {
545+ TimeValue bucketSpan = TimeValue .timeValueMinutes (30 );
546+ String jobId = "closed-job-test" ;
547+
548+ // Create and run job, then close it
549+ Job .Builder job = createJob (jobId , bucketSpan );
550+ long startTime = 1514764800000L ;
551+ runJob (job , startTime , bucketSpan , 10 );
552+
553+ // Create calendar with the closed job
554+ String calendarId = "closed-job-calendar" ;
555+ putCalendar (calendarId , Collections .singletonList (jobId ), "Calendar with closed job" );
556+
557+ // Create scheduled event
558+ List <ScheduledEvent > events = new ArrayList <>();
559+ long eventStartTime = startTime + (bucketSpan .millis () * 5 );
560+ long eventEndTime = eventStartTime + (bucketSpan .millis () * 2 );
561+ events .add (
562+ new ScheduledEvent .Builder ().description ("Closed Job Event" )
563+ .startTime (Instant .ofEpochMilli (eventStartTime ))
564+ .endTime (Instant .ofEpochMilli (eventEndTime ))
565+ .calendarId (calendarId )
566+ .build ()
567+ );
568+
569+ // This should not fail even though the job is closed
570+ // The async implementation should gracefully skip closed jobs
571+ postScheduledEvents (calendarId , events );
572+
573+ // Verify job is still closed and buckets don't have the new event
574+ // (since the job was closed when the event was added)
575+ GetBucketsAction .Request getBucketsRequest = new GetBucketsAction .Request (jobId );
576+ List <Bucket > buckets = getBuckets (getBucketsRequest );
577+
578+ // All buckets should be empty of scheduled events since job was closed when event was added
579+ for (Bucket bucket : buckets ) {
580+ assertThat ("Closed job buckets should not contain new scheduled events" , bucket .getScheduledEvents (), empty ());
581+ }
582+ }
583+
584+ /**
585+ * Test calendar updates with mixed open and closed jobs - verify open jobs are updated and closed jobs are skipped
586+ */
587+ public void testCalendarUpdateWithMixedOpenAndClosedJobs () throws Exception {
588+ TimeValue bucketSpan = TimeValue .timeValueMinutes (30 );
589+
590+ // Create two jobs
591+ String openJobId = "mixed-test-open-job" ;
592+ String closedJobId = "mixed-test-closed-job" ;
593+
594+ // Create and open first job
595+ createJob (openJobId , bucketSpan );
596+ openJob (openJobId );
597+
598+ // Create and run second job, then close it
599+ Job .Builder closedJob = createJob (closedJobId , bucketSpan );
600+ long startTime = 1514764800000L ;
601+ runJob (closedJob , startTime , bucketSpan , 10 );
602+
603+ // Create calendar with both jobs
604+ String calendarId = "mixed-jobs-calendar" ;
605+ putCalendar (calendarId , List .of (openJobId , closedJobId ), "Calendar with mixed open and closed jobs" );
606+
607+ // Reset tracker
608+ ScheduledEventsIT .UpdateProcessActionTrackerPlugin .reset ();
609+
610+ // Create scheduled event
611+ List <ScheduledEvent > events = List .of (
612+ new ScheduledEvent .Builder ().description ("Mixed Jobs Event" )
613+ .startTime (Instant .ofEpochMilli (System .currentTimeMillis () + 60000 ))
614+ .endTime (Instant .ofEpochMilli (System .currentTimeMillis () + 120000 ))
615+ .calendarId (calendarId )
616+ .build ()
617+ );
618+
619+ // Post events - should update open job and skip closed job
620+ postScheduledEvents (calendarId , events );
621+
622+ // Wait for ActionFilter to capture the UpdateProcessAction call
623+ // Should only be called for the open job, not the closed one
624+ assertBusy (() -> {
625+ assertThat (
626+ "Should have intercepted UpdateProcessAction call for open job only" ,
627+ ScheduledEventsIT .UpdateProcessActionTrackerPlugin .updateProcessCallCount .get (),
628+ equalTo (1 )
629+ );
630+ assertThat (
631+ "Should have called UpdateProcessAction for the open job only" ,
632+ ScheduledEventsIT .UpdateProcessActionTrackerPlugin .updatedJobIds ,
633+ contains (openJobId )
634+ );
635+ assertThat (
636+ "Should not have called UpdateProcessAction for the closed job" ,
637+ ScheduledEventsIT .UpdateProcessActionTrackerPlugin .updatedJobIds .contains (closedJobId ),
638+ is (false )
639+ );
640+ });
641+
642+ // Cleanup
643+ closeJob (openJobId );
644+ }
645+
467646 private Job .Builder createJob (String jobId , TimeValue bucketSpan ) {
468647 Detector .Builder detector = new Detector .Builder ("count" , null );
469648 AnalysisConfig .Builder analysisConfig = new AnalysisConfig .Builder (Collections .singletonList (detector .build ()));
@@ -486,4 +665,62 @@ private void runJob(Job.Builder job, long startTime, TimeValue bucketSpan, int b
486665 );
487666 closeJob (job .getId ());
488667 }
668+
669+ /**
670+ * Helper method to verify that calendar events are stored and retrievable
671+ */
672+ private void verifyCalendarEventsStored (String calendarId , int expectedEventCount ) {
673+ GetCalendarEventsAction .Request request = new GetCalendarEventsAction .Request (calendarId );
674+ GetCalendarEventsAction .Response response = client ().execute (GetCalendarEventsAction .INSTANCE , request ).actionGet ();
675+
676+ assertThat (
677+ "Calendar should have the expected number of events" ,
678+ response .getResources ().results ().size (),
679+ equalTo (expectedEventCount )
680+ );
681+ }
682+
683+ /**
684+ * Test plugin that tracks UpdateProcessAction calls with updateScheduledEvents=true
685+ * using an ActionFilter to verify native process interaction in integration tests
686+ */
687+ public static class UpdateProcessActionTrackerPlugin extends Plugin implements ActionPlugin {
688+
689+ public static final AtomicInteger updateProcessCallCount = new AtomicInteger (0 );
690+ public static final List <String > updatedJobIds = Collections .synchronizedList (new ArrayList <>());
691+
692+ public static void reset () {
693+ updateProcessCallCount .set (0 );
694+ updatedJobIds .clear ();
695+ }
696+
697+ @ Override
698+ public List <ActionFilter > getActionFilters () {
699+ return List .of (new ActionFilter () {
700+ @ Override
701+ public int order () {
702+ return 0 ;
703+ }
704+
705+ @ Override
706+ public <Request extends ActionRequest , Response extends ActionResponse > void apply (
707+ Task task ,
708+ String action ,
709+ Request request ,
710+ ActionListener <Response > listener ,
711+ ActionFilterChain <Request , Response > chain
712+ ) {
713+ if (UpdateProcessAction .NAME .equals (action ) && request instanceof UpdateProcessAction .Request ) {
714+ UpdateProcessAction .Request updateRequest = (UpdateProcessAction .Request ) request ;
715+ if (updateRequest .isUpdateScheduledEvents ()) {
716+ updateProcessCallCount .incrementAndGet ();
717+ updatedJobIds .add (updateRequest .getJobId ());
718+ }
719+ }
720+ chain .proceed (task , action , request , listener );
721+ }
722+ });
723+ }
724+ }
725+
489726}
0 commit comments