Skip to content

Commit e946ca8

Browse files
authored
Improve concurrency design of EnterpriseGeoIpDownloader (#134223)
Refactors `EnterpriseGeoIpDownloader` to avoid race conditions between the periodic and on-demand runs. See the discussion on #126124 for more details on the previously existing race condition. With this new approach, we make a distinction between the periodic and on-demand runs. The periodic runs simply run periodically on the configured poll interval. The on-demand runs are typically triggered by changes in the cluster state to the GeoIP metadata, and require running the downloader immediately to download any GeoIP databases that were just added by a user. By using an `AtomicInteger` to track the number of on-demand runs that were requested concurrently, we can guarantee that a new cluster state will result in the downloader running and avoid the downloader from running concurrently. While the (non-enterprise) `GeoIpDownloader` has the exact same concurrency implementation, we scope this PR to just the enterprise downloader to focus discussions on the design changes. A follow-up PR will modify the `GeoIpDownloader` to have the same implementation as the enterprise downloader. Fixes #126124
1 parent ffbf05c commit e946ca8

File tree

6 files changed

+200
-47
lines changed

6 files changed

+200
-47
lines changed

docs/changelog/134223.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 134223
2+
summary: Improve concurrency design of `EnterpriseGeoIpDownloader`
3+
area: Ingest Node
4+
type: bug
5+
issues:
6+
- 126124

modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloaderIT.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
9898
}
9999

100100
@SuppressWarnings("unchecked")
101-
@TestLogging(
102-
reason = "understanding why ipinfo asn database sometimes is not loaded",
103-
value = "org.elasticsearch.ingest.geoip.DatabaseNodeService:TRACE"
104-
)
101+
@TestLogging(reason = "For debugging tricky race conditions", value = "org.elasticsearch.ingest.geoip:TRACE")
105102
public void testEnterpriseDownloaderTask() throws Exception {
106103
/*
107104
* This test starts the enterprise geoip downloader task, and creates a database configuration. Then it creates an ingest

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloader.java

Lines changed: 111 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import java.util.Map;
5656
import java.util.Objects;
5757
import java.util.Set;
58+
import java.util.concurrent.atomic.AtomicInteger;
5859
import java.util.function.Function;
5960
import java.util.function.Supplier;
6061
import java.util.regex.Pattern;
@@ -108,7 +109,14 @@ public class EnterpriseGeoIpDownloader extends AllocatedPersistentTask {
108109

109110
// visible for testing
110111
protected volatile EnterpriseGeoIpTaskState state;
111-
private volatile Scheduler.ScheduledCancellable scheduled;
112+
/**
113+
* The currently scheduled periodic run. Only null before first periodic run.
114+
*/
115+
private volatile Scheduler.ScheduledCancellable scheduledPeriodicRun;
116+
/**
117+
* The number of requested runs. If this is greater than 0, then a run is either in progress or scheduled to run as soon as possible.
118+
*/
119+
private final AtomicInteger queuedRuns = new AtomicInteger(0);
112120
private final Supplier<TimeValue> pollIntervalSupplier;
113121
private final Function<String, char[]> tokenProvider;
114122

@@ -390,50 +398,120 @@ static byte[] getChunk(InputStream is) throws IOException {
390398
}
391399

392400
/**
393-
* Downloads the geoip databases now, and schedules them to be downloaded again after pollInterval.
401+
* Cancels the currently scheduled run (if any) and schedules a new periodic run using the current poll interval, then requests
402+
* that the downloader runs on demand now. The main reason we need that last step is that if this persistent task
403+
* gets reassigned to a different node, we want to run the downloader immediately on that new node, not wait for the next periodic run.
394404
*/
395-
synchronized void runDownloader() {
396-
// by the time we reach here, the state will never be null
397-
assert this.state != null : "this.setState() is null. You need to call setState() before calling runDownloader()";
405+
public void restartPeriodicRun() {
406+
if (isCancelled() || isCompleted() || threadPool.scheduler().isShutdown()) {
407+
logger.debug("Not restarting periodic run because task is cancelled, completed, or shutting down");
408+
return;
409+
}
410+
logger.debug("Restarting periodic run");
411+
// We synchronize to ensure we only have one scheduledPeriodicRun at a time.
412+
synchronized (this) {
413+
if (scheduledPeriodicRun != null) {
414+
// Technically speaking, there's a chance that the scheduled run is already running, in which case cancelling it here does
415+
// nothing. That means that we might end up with two periodic runs scheduled close together. However, that's unlikely to
416+
// happen and relatively harmless if it does, as we only end up running the downloader more often than strictly necessary.
417+
final boolean cancelSuccessful = scheduledPeriodicRun.cancel();
418+
logger.debug("Cancelled scheduled run: [{}]", cancelSuccessful);
419+
}
420+
// This is based on the premise that the poll interval is sufficiently large that we don't need to worry about
421+
// the scheduled `runPeriodic` running before this method completes.
422+
scheduledPeriodicRun = threadPool.schedule(this::runPeriodic, pollIntervalSupplier.get(), threadPool.generic());
423+
}
424+
// Technically, with multiple rapid calls to restartPeriodicRun, we could end up with multiple calls to requestRunOnDemand, but
425+
// that's unlikely to happen and harmless if it does, as we only end up running the downloader more often than strictly necessary.
426+
requestRunOnDemand();
427+
}
428+
429+
/**
430+
* Runs the downloader now and schedules the next periodic run using the poll interval.
431+
*/
432+
private void runPeriodic() {
433+
if (isCancelled() || isCompleted() || threadPool.scheduler().isShutdown()) {
434+
logger.debug("Not running periodic downloader because task is cancelled, completed, or shutting down");
435+
return;
436+
}
398437

399-
// there's a race condition between here and requestReschedule. originally this scheduleNextRun call was at the end of this
400-
// block, but remember that updateDatabases can take seconds to run (it's downloading bytes from the internet), and so during the
401-
// very first run there would be no future run scheduled to reschedule in requestReschedule. which meant that if you went from zero
402-
// to N(>=2) databases in quick succession, then all but the first database wouldn't necessarily get downloaded, because the
403-
// requestReschedule call in the EnterpriseGeoIpDownloaderTaskExecutor's clusterChanged wouldn't have a scheduled future run to
404-
// reschedule. scheduling the next run at the beginning of this run means that there's a much smaller window (milliseconds?, rather
405-
// than seconds) in which such a race could occur. technically there's a window here, still, but i think it's _greatly_ reduced.
406-
scheduleNextRun(pollIntervalSupplier.get());
407-
// TODO regardless of the above comment, i like the idea of checking the lowest last-checked time and then running the math to get
408-
// to the next interval from then -- maybe that's a neat future enhancement to add
438+
logger.trace("Running periodic downloader");
439+
// There's a chance that an on-demand run is already in progress, in which case this periodic run is redundant.
440+
// However, we don't try to avoid that case here, as it's harmless to run the downloader more than strictly necessary (due to
441+
// the high default poll interval of 3d), and it simplifies the logic considerably.
442+
requestRunOnDemand();
409443

444+
synchronized (this) {
445+
scheduledPeriodicRun = threadPool.schedule(this::runPeriodic, pollIntervalSupplier.get(), threadPool.generic());
446+
}
447+
}
448+
449+
/**
450+
* This method requests that the downloader runs on the latest cluster state, which likely contains a change in the GeoIP metadata.
451+
* This method does nothing if this task is cancelled or completed.
452+
*/
453+
public void requestRunOnDemand() {
410454
if (isCancelled() || isCompleted()) {
455+
logger.debug("Not requesting downloader to run on demand because task is cancelled or completed");
411456
return;
412457
}
413-
try {
414-
updateDatabases(); // n.b. this downloads bytes from the internet, it can take a while
415-
} catch (Exception e) {
416-
logger.error("exception during databases update", e);
458+
logger.trace("Requesting downloader run on demand");
459+
// If queuedRuns was greater than 0, then either a run is in progress and it will fire off another run when it finishes,
460+
// or a run is scheduled to run as soon as possible and it will include the latest cluster state.
461+
// If it was 0, we set it to 1 to indicate that a run is scheduled to run as soon as possible and schedule it now.
462+
if (queuedRuns.getAndIncrement() == 0) {
463+
logger.trace("Scheduling downloader run on demand");
464+
threadPool.generic().submit(this::runOnDemand);
465+
}
466+
}
467+
468+
/**
469+
* Runs the downloader on the latest cluster state. {@link #queuedRuns} protects against multiple concurrent runs and ensures that
470+
* if a run is requested while this method is running, then another run will be scheduled to run as soon as this method finishes.
471+
*/
472+
private void runOnDemand() {
473+
if (isCancelled() || isCompleted()) {
474+
logger.debug("Not running downloader on demand because task is cancelled or completed");
475+
return;
417476
}
477+
// Capture the current queue size, so that if another run is requested while we're running, we'll know at the end of this method
478+
// whether we need to run again.
479+
final int currentQueueSize = queuedRuns.get();
480+
logger.trace("Running downloader on demand");
418481
try {
419-
cleanDatabases();
420-
} catch (Exception e) {
421-
logger.error("exception during databases cleanup", e);
482+
runDownloader();
483+
logger.trace("Downloader completed successfully");
484+
} finally {
485+
// If any exception was thrown during runDownloader, we still want to check queuedRuns.
486+
// Subtract this "batch" of runs from queuedRuns.
487+
// If queuedRuns is still > 0, then a run was requested while we were running, so we need to run again.
488+
if (queuedRuns.addAndGet(-currentQueueSize) > 0) {
489+
logger.debug("Downloader on demand requested again while running, scheduling another run");
490+
threadPool.generic().submit(this::runOnDemand);
491+
}
422492
}
423493
}
424494

425495
/**
426-
* This method requests that the downloader be rescheduled to run immediately (presumably because a dynamic property supplied by
427-
* pollIntervalSupplier or eagerDownloadSupplier has changed, or a pipeline with a geoip processor has been added). This method does
428-
* nothing if this task is cancelled, completed, or has not yet been scheduled to run for the first time. It cancels any existing
429-
* scheduled run.
496+
* Downloads the geoip databases now based on the supplied cluster state.
430497
*/
431-
public void requestReschedule() {
498+
void runDownloader() {
432499
if (isCancelled() || isCompleted()) {
500+
logger.debug("Not running downloader because task is cancelled or completed");
433501
return;
434502
}
435-
if (scheduled != null && scheduled.cancel()) {
436-
scheduleNextRun(TimeValue.ZERO);
503+
// by the time we reach here, the state will never be null
504+
assert this.state != null : "this.setState() is null. You need to call setState() before calling runDownloader()";
505+
506+
try {
507+
updateDatabases(); // n.b. this downloads bytes from the internet, it can take a while
508+
} catch (Exception e) {
509+
logger.error("exception during databases update", e);
510+
}
511+
try {
512+
cleanDatabases();
513+
} catch (Exception e) {
514+
logger.error("exception during databases cleanup", e);
437515
}
438516
}
439517

@@ -455,18 +533,14 @@ private void cleanDatabases() {
455533

456534
@Override
457535
protected void onCancelled() {
458-
if (scheduled != null) {
459-
scheduled.cancel();
536+
synchronized (this) {
537+
if (scheduledPeriodicRun != null) {
538+
scheduledPeriodicRun.cancel();
539+
}
460540
}
461541
markAsCompleted();
462542
}
463543

464-
private void scheduleNextRun(TimeValue time) {
465-
if (threadPool.scheduler().isShutdown() == false) {
466-
scheduled = threadPool.schedule(this::runDownloader, time, threadPool.generic());
467-
}
468-
}
469-
470544
private ProviderDownload downloaderFor(DatabaseConfiguration database) {
471545
if (database.provider() instanceof DatabaseConfiguration.Maxmind maxmind) {
472546
return new MaxmindDownload(database.name(), maxmind);

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloaderTaskExecutor.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ private void setPollInterval(TimeValue pollInterval) {
9999
this.pollInterval = pollInterval;
100100
EnterpriseGeoIpDownloader currentDownloader = getCurrentTask();
101101
if (currentDownloader != null) {
102-
currentDownloader.requestReschedule();
102+
currentDownloader.restartPeriodicRun();
103103
}
104104
}
105105
}
@@ -150,7 +150,7 @@ protected void nodeOperation(AllocatedPersistentTask task, EnterpriseGeoIpTaskPa
150150
downloader.setState(geoIpTaskState);
151151
currentTask.set(downloader);
152152
if (ENABLED_SETTING.get(clusterService.state().metadata().settings(), settings)) {
153-
downloader.runDownloader();
153+
downloader.restartPeriodicRun();
154154
}
155155
}
156156

@@ -165,7 +165,8 @@ public void clusterChanged(ClusterChangedEvent event) {
165165
boolean hasGeoIpMetadataChanges = event.metadataChanged()
166166
&& event.changedCustomProjectMetadataSet().contains(IngestGeoIpMetadata.TYPE);
167167
if (hasGeoIpMetadataChanges) {
168-
currentDownloader.requestReschedule(); // watching the cluster changed events to kick the thing off if it's not running
168+
// watching the cluster changed events to kick the thing off if it's not running
169+
currentDownloader.requestRunOnDemand();
169170
}
170171
}
171172
}

modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloaderTests.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,84 @@ public void testIpinfoUrls() {
532532
}
533533
}
534534

535+
/**
536+
* Tests that if an exception is thrown while {@link EnterpriseGeoIpDownloader#runOnDemand()} is running subsequent calls still proceed.
537+
* This ensures that the "lock" mechanism used to prevent concurrent runs is released properly.
538+
*/
539+
public void testRequestRunOnDemandReleasesLock() throws Exception {
540+
ClusterState state = createClusterState(projectId, new PersistentTasksCustomMetadata(1L, Map.of()));
541+
when(clusterService.state()).thenReturn(state);
542+
// Track the number of calls to runDownloader.
543+
AtomicInteger calls = new AtomicInteger();
544+
// Create a GeoIpDownloader that throws an exception on the first call to runDownloader.
545+
geoIpDownloader = new EnterpriseGeoIpDownloader(
546+
client,
547+
httpClient,
548+
clusterService,
549+
threadPool,
550+
1,
551+
"",
552+
"",
553+
"",
554+
EMPTY_TASK_ID,
555+
Map.of(),
556+
() -> GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getDefault(Settings.EMPTY),
557+
(type) -> "password".toCharArray()
558+
) {
559+
@Override
560+
synchronized void runDownloader() {
561+
if (calls.incrementAndGet() == 1) {
562+
throw new RuntimeException("test exception");
563+
}
564+
super.runDownloader();
565+
}
566+
};
567+
geoIpDownloader.setState(EnterpriseGeoIpTaskState.EMPTY);
568+
geoIpDownloader.requestRunOnDemand();
569+
assertBusy(() -> assertEquals(1, calls.get()));
570+
geoIpDownloader.requestRunOnDemand();
571+
assertBusy(() -> assertEquals(2, calls.get()));
572+
}
573+
574+
/**
575+
* Tests that if an exception is thrown while {@link EnterpriseGeoIpDownloader#runPeriodic()} is running subsequent calls still proceed.
576+
* This ensures that the "lock" mechanism used to prevent concurrent runs is released properly.
577+
*/
578+
public void testRestartPeriodicRunReleasesLock() throws Exception {
579+
ClusterState state = createClusterState(projectId, new PersistentTasksCustomMetadata(1L, Map.of()));
580+
when(clusterService.state()).thenReturn(state);
581+
// Track the number of calls to runDownloader.
582+
AtomicInteger calls = new AtomicInteger();
583+
// Create a GeoIpDownloader that throws an exception on the first call to runDownloader.
584+
geoIpDownloader = new EnterpriseGeoIpDownloader(
585+
client,
586+
httpClient,
587+
clusterService,
588+
threadPool,
589+
1,
590+
"",
591+
"",
592+
"",
593+
EMPTY_TASK_ID,
594+
Map.of(),
595+
() -> GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getDefault(Settings.EMPTY),
596+
(type) -> "password".toCharArray()
597+
) {
598+
@Override
599+
synchronized void runDownloader() {
600+
if (calls.incrementAndGet() == 1) {
601+
throw new RuntimeException("test exception");
602+
}
603+
super.runDownloader();
604+
}
605+
};
606+
geoIpDownloader.setState(EnterpriseGeoIpTaskState.EMPTY);
607+
geoIpDownloader.restartPeriodicRun();
608+
assertBusy(() -> assertEquals(1, calls.get()));
609+
geoIpDownloader.restartPeriodicRun();
610+
assertBusy(() -> assertEquals(2, calls.get()));
611+
}
612+
535613
private static class MockClient extends NoOpClient {
536614

537615
private final Map<ActionType<?>, BiConsumer<? extends ActionRequest, ? extends ActionListener<?>>> handlers = new HashMap<>();

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -179,9 +179,6 @@ tests:
179179
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
180180
method: test {p0=transform/transforms_stats/Test get transform stats}
181181
issue: https://github.com/elastic/elasticsearch/issues/126270
182-
- class: org.elasticsearch.ingest.geoip.EnterpriseGeoIpDownloaderIT
183-
method: testEnterpriseDownloaderTask
184-
issue: https://github.com/elastic/elasticsearch/issues/126124
185182
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
186183
method: test {p0=transform/transforms_start_stop/Test start/stop only starts/stops specified transform}
187184
issue: https://github.com/elastic/elasticsearch/issues/126466

0 commit comments

Comments
 (0)