Skip to content

Commit 704615c

Browse files
authored
[ML] Fix ML calendar event update scalability issues (elastic#136886) (elastic#138006)
Fixes issue where calendar events failed to update some jobs when associated with large numbers of jobs (>1000) due to queue capacity limits and sequential processing. Problem: UpdateJobProcessNotifier has a 1000-item queue and processes updates sequentially. It uses offer() on the queue, which silently drops updates when the queue is full. However, calendar/filter updates don't need ordering guarantees. Hence, JobManager.submitJobEventUpdate() can bypass the queue and avoid the bottleneck of the queue size. Another problem is the "fire-and-forget" pattern: submitJobEventUpdate() returns immediately without waiting for the update to complete. I introduce RefCountingListener to track the calendar updates. We start a background thread that updates the jobs and tracks succeeded, failed, and skipped jobs, while the request is returned immediately to prevent a timeout. Finally, if the problem with failed job updates persists, I enhanced the logging throughout the system to create a trace for future diagnostics. Refactor JobManager.submitJobEventUpdate() to bypass UpdateJobProcessNotifier queue Use RefCountingListener for parallel calendar/filter updates Add comprehensive logging throughout the system Create CalendarScalabilityIT integration tests Add helper methods to base test class
1 parent a662521 commit 704615c

File tree

7 files changed

+434
-56
lines changed

7 files changed

+434
-56
lines changed

docs/changelog/136886.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 136886
2+
summary: Fix ML calendar event update scalability issues
3+
area: Machine Learning
4+
type: bug
5+
issues: []

x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ScheduledEventsIT.java

Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,24 @@
66
*/
77
package org.elasticsearch.xpack.ml.integration;
88

9+
import org.elasticsearch.action.ActionListener;
10+
import org.elasticsearch.action.ActionRequest;
11+
import org.elasticsearch.action.ActionResponse;
12+
import org.elasticsearch.action.support.ActionFilter;
13+
import org.elasticsearch.action.support.ActionFilterChain;
914
import org.elasticsearch.cluster.metadata.Metadata;
1015
import org.elasticsearch.core.TimeValue;
1116
import org.elasticsearch.index.query.QueryBuilders;
17+
import org.elasticsearch.plugins.ActionPlugin;
18+
import org.elasticsearch.plugins.Plugin;
1219
import org.elasticsearch.search.SearchHit;
1320
import org.elasticsearch.search.sort.SortOrder;
21+
import org.elasticsearch.tasks.Task;
1422
import org.elasticsearch.xpack.core.ml.action.GetBucketsAction;
23+
import org.elasticsearch.xpack.core.ml.action.GetCalendarEventsAction;
1524
import org.elasticsearch.xpack.core.ml.action.GetRecordsAction;
1625
import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
26+
import org.elasticsearch.xpack.core.ml.action.UpdateProcessAction;
1727
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
1828
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
1929
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
@@ -28,8 +38,10 @@
2838
import java.io.IOException;
2939
import java.time.Instant;
3040
import java.util.ArrayList;
41+
import java.util.Collection;
3142
import java.util.Collections;
3243
import java.util.List;
44+
import java.util.concurrent.atomic.AtomicInteger;
3345
import java.util.stream.Collectors;
3446

3547
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
@@ -45,6 +57,13 @@ public void cleanUpTest() {
4557
cleanUp();
4658
}
4759

60+
@Override
61+
protected Collection<Class<? extends Plugin>> nodePlugins() {
62+
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
63+
plugins.add(ScheduledEventsIT.UpdateProcessActionTrackerPlugin.class);
64+
return plugins;
65+
}
66+
4867
public void testScheduledEvents() throws IOException {
4968

5069
TimeValue bucketSpan = TimeValue.timeValueMinutes(30);
@@ -464,6 +483,166 @@ public void testNewJobWithGlobalCalendar() throws Exception {
464483
assertThat(buckets.get(5).getScheduledEvents(), contains("Event added after job is opened"));
465484
}
466485

486+
/**
487+
* Test that verifies UpdateProcessAction is called with correct parameters when calendar events
488+
* are posted asynchronously, using ActionFilter to directly intercept the calls
489+
*/
490+
public void testCalendarUpdateCallsUpdateProcessAction() throws Exception {
491+
// Reset tracker
492+
ScheduledEventsIT.UpdateProcessActionTrackerPlugin.reset();
493+
494+
TimeValue bucketSpan = TimeValue.timeValueMinutes(30);
495+
String jobId = "calendar-update-test";
496+
String calendarId = "test-calendar";
497+
498+
// Create and open single job
499+
createJob(jobId, bucketSpan);
500+
openJob(jobId);
501+
502+
// Create calendar with the job
503+
putCalendar(calendarId, List.of(jobId), "Update process action test");
504+
505+
// Create scheduled event
506+
List<ScheduledEvent> events = List.of(
507+
new ScheduledEvent.Builder().description("Direct Test Event")
508+
.startTime(Instant.ofEpochMilli(System.currentTimeMillis() + 60000))
509+
.endTime(Instant.ofEpochMilli(System.currentTimeMillis() + 120000))
510+
.calendarId(calendarId)
511+
.build()
512+
);
513+
514+
// Post events - API should return immediately with async implementation
515+
postScheduledEvents(calendarId, events);
516+
517+
// Wait for and verify ActionFilter captured the UpdateProcessAction call
518+
// We intercept the call to UpdateProcessAction using an ActionFilter to verify the call was made
519+
assertBusy(() -> {
520+
assertThat(
521+
"Should have intercepted UpdateProcessAction call",
522+
ScheduledEventsIT.UpdateProcessActionTrackerPlugin.updateProcessCallCount.get(),
523+
equalTo(1)
524+
);
525+
assertThat(
526+
"Should have called UpdateProcessAction for the correct job",
527+
ScheduledEventsIT.UpdateProcessActionTrackerPlugin.updatedJobIds,
528+
contains(jobId)
529+
);
530+
});
531+
532+
// Verify calendar events were stored correctly
533+
verifyCalendarEventsStored(calendarId, 1);
534+
535+
// Cleanup
536+
closeJob(jobId);
537+
538+
logger.info("Successfully verified UpdateProcessAction call with updateScheduledEvents=true for job [{}]", jobId);
539+
}
540+
541+
/**
542+
* Test calendar updates with closed jobs (should not fail)
543+
*/
544+
public void testCalendarUpdateWithClosedJobs() throws IOException {
545+
TimeValue bucketSpan = TimeValue.timeValueMinutes(30);
546+
String jobId = "closed-job-test";
547+
548+
// Create and run job, then close it
549+
Job.Builder job = createJob(jobId, bucketSpan);
550+
long startTime = 1514764800000L;
551+
runJob(job, startTime, bucketSpan, 10);
552+
553+
// Create calendar with the closed job
554+
String calendarId = "closed-job-calendar";
555+
putCalendar(calendarId, Collections.singletonList(jobId), "Calendar with closed job");
556+
557+
// Create scheduled event
558+
List<ScheduledEvent> events = new ArrayList<>();
559+
long eventStartTime = startTime + (bucketSpan.millis() * 5);
560+
long eventEndTime = eventStartTime + (bucketSpan.millis() * 2);
561+
events.add(
562+
new ScheduledEvent.Builder().description("Closed Job Event")
563+
.startTime(Instant.ofEpochMilli(eventStartTime))
564+
.endTime(Instant.ofEpochMilli(eventEndTime))
565+
.calendarId(calendarId)
566+
.build()
567+
);
568+
569+
// This should not fail even though the job is closed
570+
// The async implementation should gracefully skip closed jobs
571+
postScheduledEvents(calendarId, events);
572+
573+
// Verify job is still closed and buckets don't have the new event
574+
// (since the job was closed when the event was added)
575+
GetBucketsAction.Request getBucketsRequest = new GetBucketsAction.Request(jobId);
576+
List<Bucket> buckets = getBuckets(getBucketsRequest);
577+
578+
// All buckets should be empty of scheduled events since job was closed when event was added
579+
for (Bucket bucket : buckets) {
580+
assertThat("Closed job buckets should not contain new scheduled events", bucket.getScheduledEvents(), empty());
581+
}
582+
}
583+
584+
/**
585+
* Test calendar updates with mixed open and closed jobs - verify open jobs are updated and closed jobs are skipped
586+
*/
587+
public void testCalendarUpdateWithMixedOpenAndClosedJobs() throws Exception {
588+
TimeValue bucketSpan = TimeValue.timeValueMinutes(30);
589+
590+
// Create two jobs
591+
String openJobId = "mixed-test-open-job";
592+
String closedJobId = "mixed-test-closed-job";
593+
594+
// Create and open first job
595+
createJob(openJobId, bucketSpan);
596+
openJob(openJobId);
597+
598+
// Create and run second job, then close it
599+
Job.Builder closedJob = createJob(closedJobId, bucketSpan);
600+
long startTime = 1514764800000L;
601+
runJob(closedJob, startTime, bucketSpan, 10);
602+
603+
// Create calendar with both jobs
604+
String calendarId = "mixed-jobs-calendar";
605+
putCalendar(calendarId, List.of(openJobId, closedJobId), "Calendar with mixed open and closed jobs");
606+
607+
// Reset tracker
608+
ScheduledEventsIT.UpdateProcessActionTrackerPlugin.reset();
609+
610+
// Create scheduled event
611+
List<ScheduledEvent> events = List.of(
612+
new ScheduledEvent.Builder().description("Mixed Jobs Event")
613+
.startTime(Instant.ofEpochMilli(System.currentTimeMillis() + 60000))
614+
.endTime(Instant.ofEpochMilli(System.currentTimeMillis() + 120000))
615+
.calendarId(calendarId)
616+
.build()
617+
);
618+
619+
// Post events - should update open job and skip closed job
620+
postScheduledEvents(calendarId, events);
621+
622+
// Wait for ActionFilter to capture the UpdateProcessAction call
623+
// Should only be called for the open job, not the closed one
624+
assertBusy(() -> {
625+
assertThat(
626+
"Should have intercepted UpdateProcessAction call for open job only",
627+
ScheduledEventsIT.UpdateProcessActionTrackerPlugin.updateProcessCallCount.get(),
628+
equalTo(1)
629+
);
630+
assertThat(
631+
"Should have called UpdateProcessAction for the open job only",
632+
ScheduledEventsIT.UpdateProcessActionTrackerPlugin.updatedJobIds,
633+
contains(openJobId)
634+
);
635+
assertThat(
636+
"Should not have called UpdateProcessAction for the closed job",
637+
ScheduledEventsIT.UpdateProcessActionTrackerPlugin.updatedJobIds.contains(closedJobId),
638+
is(false)
639+
);
640+
});
641+
642+
// Cleanup
643+
closeJob(openJobId);
644+
}
645+
467646
private Job.Builder createJob(String jobId, TimeValue bucketSpan) {
468647
Detector.Builder detector = new Detector.Builder("count", null);
469648
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build()));
@@ -486,4 +665,62 @@ private void runJob(Job.Builder job, long startTime, TimeValue bucketSpan, int b
486665
);
487666
closeJob(job.getId());
488667
}
668+
669+
/**
670+
* Helper method to verify that calendar events are stored and retrievable
671+
*/
672+
private void verifyCalendarEventsStored(String calendarId, int expectedEventCount) {
673+
GetCalendarEventsAction.Request request = new GetCalendarEventsAction.Request(calendarId);
674+
GetCalendarEventsAction.Response response = client().execute(GetCalendarEventsAction.INSTANCE, request).actionGet();
675+
676+
assertThat(
677+
"Calendar should have the expected number of events",
678+
response.getResources().results().size(),
679+
equalTo(expectedEventCount)
680+
);
681+
}
682+
683+
/**
684+
* Test plugin that tracks UpdateProcessAction calls with updateScheduledEvents=true
685+
* using an ActionFilter to verify native process interaction in integration tests
686+
*/
687+
public static class UpdateProcessActionTrackerPlugin extends Plugin implements ActionPlugin {
688+
689+
public static final AtomicInteger updateProcessCallCount = new AtomicInteger(0);
690+
public static final List<String> updatedJobIds = Collections.synchronizedList(new ArrayList<>());
691+
692+
public static void reset() {
693+
updateProcessCallCount.set(0);
694+
updatedJobIds.clear();
695+
}
696+
697+
@Override
698+
public List<ActionFilter> getActionFilters() {
699+
return List.of(new ActionFilter() {
700+
@Override
701+
public int order() {
702+
return 0;
703+
}
704+
705+
@Override
706+
public <Request extends ActionRequest, Response extends ActionResponse> void apply(
707+
Task task,
708+
String action,
709+
Request request,
710+
ActionListener<Response> listener,
711+
ActionFilterChain<Request, Response> chain
712+
) {
713+
if (UpdateProcessAction.NAME.equals(action) && request instanceof UpdateProcessAction.Request) {
714+
UpdateProcessAction.Request updateRequest = (UpdateProcessAction.Request) request;
715+
if (updateRequest.isUpdateScheduledEvents()) {
716+
updateProcessCallCount.incrementAndGet();
717+
updatedJobIds.add(updateRequest.getJobId());
718+
}
719+
}
720+
chain.proceed(task, action, request, listener);
721+
}
722+
});
723+
}
724+
}
725+
489726
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import org.elasticsearch.client.internal.Client;
1818
import org.elasticsearch.common.util.concurrent.EsExecutors;
1919
import org.elasticsearch.injection.guice.Inject;
20+
import org.elasticsearch.logging.LogManager;
21+
import org.elasticsearch.logging.Logger;
2022
import org.elasticsearch.tasks.Task;
2123
import org.elasticsearch.transport.TransportService;
2224
import org.elasticsearch.xcontent.ToXContent;
@@ -42,6 +44,8 @@ public class TransportPostCalendarEventsAction extends HandledTransportAction<
4244
PostCalendarEventsAction.Request,
4345
PostCalendarEventsAction.Response> {
4446

47+
private static final Logger logger = LogManager.getLogger(TransportPostCalendarEventsAction.class);
48+
4549
private final Client client;
4650
private final JobResultsProvider jobResultsProvider;
4751
private final JobManager jobManager;
@@ -75,6 +79,13 @@ protected void doExecute(
7579
List<ScheduledEvent> events = request.getScheduledEvents();
7680

7781
ActionListener<Calendar> calendarListener = ActionListener.wrap(calendar -> {
82+
logger.debug(
83+
"Calendar [{}] accepted for background update: {} jobs with {} events",
84+
request.getCalendarId(),
85+
calendar.getJobIds().size(),
86+
events.size()
87+
);
88+
7889
BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
7990

8091
for (ScheduledEvent event : events) {
@@ -102,13 +113,10 @@ protected void doExecute(
102113
new ActionListener<BulkResponse>() {
103114
@Override
104115
public void onResponse(BulkResponse response) {
105-
jobManager.updateProcessOnCalendarChanged(
106-
calendar.getJobIds(),
107-
ActionListener.wrap(
108-
r -> listener.onResponse(new PostCalendarEventsAction.Response(events)),
109-
listener::onFailure
110-
)
111-
);
116+
jobManager.updateProcessOnCalendarChanged(calendar.getJobIds(), ActionListener.wrap(r -> {
117+
logger.debug("Calendar [{}] update initiated successfully", request.getCalendarId());
118+
listener.onResponse(new PostCalendarEventsAction.Response(events));
119+
}, listener::onFailure));
112120
}
113121

114122
@Override

0 commit comments

Comments
 (0)