5353import java .util .Map ;
5454import java .util .Objects ;
5555import java .util .Set ;
56+ import java .util .concurrent .atomic .AtomicInteger ;
5657import java .util .function .Function ;
5758import java .util .function .Supplier ;
5859import java .util .regex .Pattern ;
@@ -103,7 +104,14 @@ public class EnterpriseGeoIpDownloader extends AllocatedPersistentTask {
103104
104105 // visible for testing
105106 protected volatile EnterpriseGeoIpTaskState state ;
106- private volatile Scheduler .ScheduledCancellable scheduled ;
107+ /**
108+ * The currently scheduled periodic run. Only null before first periodic run.
109+ */
110+ private volatile Scheduler .ScheduledCancellable scheduledPeriodicRun ;
111+ /**
112+ * The number of requested runs. If this is greater than 0, then a run is either in progress or scheduled to run as soon as possible.
113+ */
114+ private final AtomicInteger queuedRuns = new AtomicInteger (0 );
107115 private final Supplier <TimeValue > pollIntervalSupplier ;
108116 private final Function <String , char []> tokenProvider ;
109117
@@ -380,50 +388,120 @@ static byte[] getChunk(InputStream is) throws IOException {
380388 }
381389
382390 /**
383- * Downloads the geoip databases now, and schedules them to be downloaded again after pollInterval.
391+ * Cancels the currently scheduled run (if any) and schedules a new periodic run using the current poll interval, then requests
392+ * that the downloader runs on demand now. The main reason we need that last step is that if this persistent task
393+ * gets reassigned to a different node, we want to run the downloader immediately on that new node, not wait for the next periodic run.
384394 */
385- synchronized void runDownloader () {
386- // by the time we reach here, the state will never be null
387- assert this .state != null : "this.setState() is null. You need to call setState() before calling runDownloader()" ;
395+ public void restartPeriodicRun () {
396+ if (isCancelled () || isCompleted () || threadPool .scheduler ().isShutdown ()) {
397+ logger .debug ("Not restarting periodic run because task is cancelled, completed, or shutting down" );
398+ return ;
399+ }
400+ logger .debug ("Restarting periodic run" );
401+ // We synchronize to ensure we only have one scheduledPeriodicRun at a time.
402+ synchronized (this ) {
403+ if (scheduledPeriodicRun != null ) {
404+ // Technically speaking, there's a chance that the scheduled run is already running, in which case cancelling it here does
405+ // nothing. That means that we might end up with two periodic runs scheduled close together. However, that's unlikely to
406+ // happen and relatively harmless if it does, as we only end up running the downloader more often than strictly necessary.
407+ final boolean cancelSuccessful = scheduledPeriodicRun .cancel ();
408+ logger .debug ("Cancelled scheduled run: [{}]" , cancelSuccessful );
409+ }
410+ // This is based on the premise that the poll interval is sufficiently large that we don't need to worry about
411+ // the scheduled `runPeriodic` running before this method completes.
412+ scheduledPeriodicRun = threadPool .schedule (this ::runPeriodic , pollIntervalSupplier .get (), threadPool .generic ());
413+ }
414+ // Technically, with multiple rapid calls to restartPeriodicRun, we could end up with multiple calls to requestRunOnDemand, but
415+ // that's unlikely to happen and harmless if it does, as we only end up running the downloader more often than strictly necessary.
416+ requestRunOnDemand ();
417+ }
418+
419+ /**
420+ * Runs the downloader now and schedules the next periodic run using the poll interval.
421+ */
422+ private void runPeriodic () {
423+ if (isCancelled () || isCompleted () || threadPool .scheduler ().isShutdown ()) {
424+ logger .debug ("Not running periodic downloader because task is cancelled, completed, or shutting down" );
425+ return ;
426+ }
388427
389- // there's a race condition between here and requestReschedule. originally this scheduleNextRun call was at the end of this
390- // block, but remember that updateDatabases can take seconds to run (it's downloading bytes from the internet), and so during the
391- // very first run there would be no future run scheduled to reschedule in requestReschedule. which meant that if you went from zero
392- // to N(>=2) databases in quick succession, then all but the first database wouldn't necessarily get downloaded, because the
393- // requestReschedule call in the EnterpriseGeoIpDownloaderTaskExecutor's clusterChanged wouldn't have a scheduled future run to
394- // reschedule. scheduling the next run at the beginning of this run means that there's a much smaller window (milliseconds?, rather
395- // than seconds) in which such a race could occur. technically there's a window here, still, but i think it's _greatly_ reduced.
396- scheduleNextRun (pollIntervalSupplier .get ());
397- // TODO regardless of the above comment, i like the idea of checking the lowest last-checked time and then running the math to get
398- // to the next interval from then -- maybe that's a neat future enhancement to add
428+ logger .trace ("Running periodic downloader" );
429+ // There's a chance that an on-demand run is already in progress, in which case this periodic run is redundant.
430+ // However, we don't try to avoid that case here, as it's harmless to run the downloader more than strictly necessary (due to
431+ // the high default poll interval of 3d), and it simplifies the logic considerably.
432+ requestRunOnDemand ();
399433
434+ synchronized (this ) {
435+ scheduledPeriodicRun = threadPool .schedule (this ::runPeriodic , pollIntervalSupplier .get (), threadPool .generic ());
436+ }
437+ }
438+
439+ /**
440+ * This method requests that the downloader runs on the latest cluster state, which likely contains a change in the GeoIP metadata.
441+ * This method does nothing if this task is cancelled or completed.
442+ */
443+ public void requestRunOnDemand () {
400444 if (isCancelled () || isCompleted ()) {
445+ logger .debug ("Not requesting downloader to run on demand because task is cancelled or completed" );
401446 return ;
402447 }
403- try {
404- updateDatabases (); // n.b. this downloads bytes from the internet, it can take a while
405- } catch (Exception e ) {
406- logger .error ("exception during databases update" , e );
448+ logger .trace ("Requesting downloader run on demand" );
449+ // If queuedRuns was greater than 0, then either a run is in progress and it will fire off another run when it finishes,
450+ // or a run is scheduled to run as soon as possible and it will include the latest cluster state.
451+ // If it was 0, we set it to 1 to indicate that a run is scheduled to run as soon as possible and schedule it now.
452+ if (queuedRuns .getAndIncrement () == 0 ) {
453+ logger .trace ("Scheduling downloader run on demand" );
454+ threadPool .generic ().submit (this ::runOnDemand );
455+ }
456+ }
457+
458+ /**
459+ * Runs the downloader on the latest cluster state. {@link #queuedRuns} protects against multiple concurrent runs and ensures that
460+ * if a run is requested while this method is running, then another run will be scheduled to run as soon as this method finishes.
461+ */
462+ private void runOnDemand () {
463+ if (isCancelled () || isCompleted ()) {
464+ logger .debug ("Not running downloader on demand because task is cancelled or completed" );
465+ return ;
407466 }
467+ // Capture the current queue size, so that if another run is requested while we're running, we'll know at the end of this method
468+ // whether we need to run again.
469+ final int currentQueueSize = queuedRuns .get ();
470+ logger .trace ("Running downloader on demand" );
408471 try {
409- cleanDatabases ();
410- } catch (Exception e ) {
411- logger .error ("exception during databases cleanup" , e );
472+ runDownloader ();
473+ logger .trace ("Downloader completed successfully" );
474+ } finally {
475+ // If any exception was thrown during runDownloader, we still want to check queuedRuns.
476+ // Subtract this "batch" of runs from queuedRuns.
477+ // If queuedRuns is still > 0, then a run was requested while we were running, so we need to run again.
478+ if (queuedRuns .addAndGet (-currentQueueSize ) > 0 ) {
479+ logger .debug ("Downloader on demand requested again while running, scheduling another run" );
480+ threadPool .generic ().submit (this ::runOnDemand );
481+ }
412482 }
413483 }
414484
415485 /**
416- * This method requests that the downloader be rescheduled to run immediately (presumably because a dynamic property supplied by
417- * pollIntervalSupplier or eagerDownloadSupplier has changed, or a pipeline with a geoip processor has been added). This method does
418- * nothing if this task is cancelled, completed, or has not yet been scheduled to run for the first time. It cancels any existing
419- * scheduled run.
486+ * Downloads the geoip databases now based on the supplied cluster state.
420487 */
421- public void requestReschedule () {
488+ void runDownloader () {
422489 if (isCancelled () || isCompleted ()) {
490+ logger .debug ("Not running downloader because task is cancelled or completed" );
423491 return ;
424492 }
425- if (scheduled != null && scheduled .cancel ()) {
426- scheduleNextRun (TimeValue .ZERO );
493+ // by the time we reach here, the state will never be null
494+ assert this .state != null : "this.setState() is null. You need to call setState() before calling runDownloader()" ;
495+
496+ try {
497+ updateDatabases (); // n.b. this downloads bytes from the internet, it can take a while
498+ } catch (Exception e ) {
499+ logger .error ("exception during databases update" , e );
500+ }
501+ try {
502+ cleanDatabases ();
503+ } catch (Exception e ) {
504+ logger .error ("exception during databases cleanup" , e );
427505 }
428506 }
429507
@@ -445,18 +523,14 @@ private void cleanDatabases() {
445523
446524 @ Override
447525 protected void onCancelled () {
448- if (scheduled != null ) {
449- scheduled .cancel ();
526+ synchronized (this ) {
527+ if (scheduledPeriodicRun != null ) {
528+ scheduledPeriodicRun .cancel ();
529+ }
450530 }
451531 markAsCompleted ();
452532 }
453533
454- private void scheduleNextRun (TimeValue time ) {
455- if (threadPool .scheduler ().isShutdown () == false ) {
456- scheduled = threadPool .schedule (this ::runDownloader , time , threadPool .generic ());
457- }
458- }
459-
460534 private ProviderDownload downloaderFor (DatabaseConfiguration database ) {
461535 if (database .provider () instanceof DatabaseConfiguration .Maxmind maxmind ) {
462536 return new MaxmindDownload (database .name (), maxmind );
0 commit comments