Skip to content

Commit ef12fe2

Browse files
committed
Improve concurrency design of EnterpriseGeoIpDownloader
Refactors `EnterpriseGeoIpDownloader` to avoid race conditions between the periodic and ad-hoc runs. See the discussion on #126124 for more details on the previously existing race condition. With this new approach, we make a distinction between the periodic and ad-hoc runs. The periodic runs simply run periodically on the configured poll interval. The ad-hoc runs are typically triggered by changes in the cluster state to the GeoIP metadata, and require running the downloader immediately, to download any GeoIP databases that were just added by a user. By using a `Semaphore` and an `AtomicReference<ClusterState>`, we can guarantee that a new cluster state will result in the downloader running and avoid the downloader from running concurrently. While the (non-enterprise) `GeoIpDownloader` has the exact same concurrency implementation, we scope this PR to just the enterprise downloader to focus discussions on the design changes. A follow-up PR will modify the `GeoIpDownloader` to have the same implementation as the enterprise downloader. Fixes #126124
1 parent 500c4ff commit ef12fe2

File tree

5 files changed

+125
-55
lines changed

5 files changed

+125
-55
lines changed

modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloaderIT.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
9898
}
9999

100100
@SuppressWarnings("unchecked")
101-
@TestLogging(
102-
reason = "understanding why ipinfo asn database sometimes is not loaded",
103-
value = "org.elasticsearch.ingest.geoip.DatabaseNodeService:TRACE"
104-
)
101+
@TestLogging(reason = "understanding why ipinfo asn database sometimes is not loaded", value = "org.elasticsearch.ingest.geoip:TRACE")
105102
public void testEnterpriseDownloaderTask() throws Exception {
106103
/*
107104
* This test starts the enterprise geoip downloader task, and creates a database configuration. Then it creates an ingest

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloader.java

Lines changed: 117 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.action.index.IndexRequest;
1919
import org.elasticsearch.action.support.PlainActionFuture;
2020
import org.elasticsearch.client.internal.Client;
21+
import org.elasticsearch.cluster.ClusterState;
2122
import org.elasticsearch.cluster.block.ClusterBlockLevel;
2223
import org.elasticsearch.cluster.metadata.ProjectId;
2324
import org.elasticsearch.cluster.service.ClusterService;
@@ -55,6 +56,8 @@
5556
import java.util.Map;
5657
import java.util.Objects;
5758
import java.util.Set;
59+
import java.util.concurrent.Semaphore;
60+
import java.util.concurrent.atomic.AtomicReference;
5861
import java.util.function.Function;
5962
import java.util.function.Supplier;
6063
import java.util.regex.Pattern;
@@ -108,7 +111,23 @@ public class EnterpriseGeoIpDownloader extends AllocatedPersistentTask {
108111

109112
// visible for testing
110113
protected volatile EnterpriseGeoIpTaskState state;
114+
/**
115+
* The currently scheduled periodic run, or null if no periodic run is currently scheduled. Note: _not_ the currently running thread!
116+
*/
111117
private volatile Scheduler.ScheduledCancellable scheduled;
118+
/**
119+
* Semaphore with 1 permit, used to ensure that only one run (periodic or cluster state) is running at a time.
120+
*/
121+
private final Semaphore running = new Semaphore(1);
122+
/**
123+
* Contains a reference to the next state to run on, or null if no run is currently requested.
124+
* May be overridden by a newer state before the downloader has had a chance to run on it.
125+
* We store the cluster state like this instead of using `ClusterService#state()`, as we invoke {@link #requestRunOnState(ClusterState)}
126+
* from a cluster state listener, and then use the cluster state asynchronously, meaning there's a race condition between
127+
* {@link #runOnState()} and the rest of the cluster state listeners completing and `ClusterStateApplierService` updating its internal
128+
* `state` field.
129+
*/
130+
private final AtomicReference<ClusterState> queue = new AtomicReference<>();
112131
private final Supplier<TimeValue> pollIntervalSupplier;
113132
private final Function<String, char[]> tokenProvider;
114133

@@ -146,10 +165,9 @@ void setState(EnterpriseGeoIpTaskState state) {
146165
}
147166

148167
// visible for testing
149-
void updateDatabases() throws IOException {
168+
void updateDatabases(ClusterState clusterState) throws IOException {
150169
@NotMultiProjectCapable(description = "Enterprise GeoIP not available in serverless")
151170
ProjectId projectId = ProjectId.DEFAULT;
152-
var clusterState = clusterService.state();
153171
var geoipIndex = clusterState.getMetadata().getProject(projectId).getIndicesLookup().get(EnterpriseGeoIpDownloader.DATABASES_INDEX);
154172
if (geoipIndex != null) {
155173
logger.trace("the geoip index [{}] exists", EnterpriseGeoIpDownloader.DATABASES_INDEX);
@@ -390,58 +408,123 @@ static byte[] getChunk(InputStream is) throws IOException {
390408
}
391409

392410
/**
393-
* Downloads the geoip databases now, and schedules them to be downloaded again after pollInterval.
411+
* Cancels the currently scheduled run (if any) and schedules a new (periodic) run to happen immediately, which will then schedule
412+
* the next periodic run using the poll interval.
394413
*/
395-
synchronized void runDownloader() {
396-
// by the time we reach here, the state will never be null
397-
assert this.state != null : "this.setState() is null. You need to call setState() before calling runDownloader()";
398-
399-
// there's a race condition between here and requestReschedule. originally this scheduleNextRun call was at the end of this
400-
// block, but remember that updateDatabases can take seconds to run (it's downloading bytes from the internet), and so during the
401-
// very first run there would be no future run scheduled to reschedule in requestReschedule. which meant that if you went from zero
402-
// to N(>=2) databases in quick succession, then all but the first database wouldn't necessarily get downloaded, because the
403-
// requestReschedule call in the EnterpriseGeoIpDownloaderTaskExecutor's clusterChanged wouldn't have a scheduled future run to
404-
// reschedule. scheduling the next run at the beginning of this run means that there's a much smaller window (milliseconds?, rather
405-
// than seconds) in which such a race could occur. technically there's a window here, still, but i think it's _greatly_ reduced.
406-
scheduleNextRun(pollIntervalSupplier.get());
407-
// TODO regardless of the above comment, i like the idea of checking the lowest last-checked time and then running the math to get
408-
// to the next interval from then -- maybe that's a neat future enhancement to add
414+
public void restartPeriodicRun() {
415+
logger.trace("Restarting periodic run");
416+
if (scheduled != null) {
417+
final boolean cancelSuccessful = scheduled.cancel();
418+
logger.trace("Cancelled scheduled run: [{}]", cancelSuccessful);
419+
}
420+
if (threadPool.scheduler().isShutdown() == false) {
421+
threadPool.schedule(this::runPeriodic, TimeValue.ZERO, threadPool.generic());
422+
}
423+
}
409424

425+
/**
426+
* Tries to run the downloader now, if it isn't already currently running, and schedules the next periodic run using the poll interval.
427+
*/
428+
private void runPeriodic() {
410429
if (isCancelled() || isCompleted()) {
430+
logger.debug("Not running periodic downloader because task is cancelled or completed");
411431
return;
412432
}
413-
try {
414-
updateDatabases(); // n.b. this downloads bytes from the internet, it can take a while
415-
} catch (Exception e) {
416-
logger.error("exception during databases update", e);
433+
434+
// If we are not able to acquire the semaphore immediately, it means that a run is already in progress. Periodic runs do not run
435+
// concurrently, but a cluster state run could be in progress. Since the default poll interval is quite large (3d), there is no
436+
// need to wait for the current run to finish and then run again, so we just skip this run and schedule the next one.
437+
if (running.tryAcquire()) {
438+
final var clusterState = clusterService.state();
439+
logger.trace("Running periodic downloader on cluster state [{}]", clusterState.version());
440+
runDownloader(clusterState);
441+
running.release();
417442
}
418-
try {
419-
cleanDatabases();
420-
} catch (Exception e) {
421-
logger.error("exception during databases cleanup", e);
443+
if (threadPool.scheduler().isShutdown() == false) {
444+
logger.trace("Scheduling next periodic run, current scheduled run is [{}]", scheduled);
445+
scheduled = threadPool.schedule(this::runPeriodic, pollIntervalSupplier.get(), threadPool.generic());
446+
logger.trace("Next periodic run scheduled: [{}]", scheduled);
422447
}
423448
}
424449

425450
/**
426-
* This method requests that the downloader be rescheduled to run immediately (presumably because a dynamic property supplied by
427-
* pollIntervalSupplier or eagerDownloadSupplier has changed, or a pipeline with a geoip processor has been added). This method does
428-
* nothing if this task is cancelled, completed, or has not yet been scheduled to run for the first time. It cancels any existing
429-
* scheduled run.
451+
* This method requests that the downloader runs on the supplied cluster state, which likely contains a change in the GeoIP metadata.
452+
* If the queue was non-empty before we set it, then a run is already scheduled or in progress, so it will either be processed in the
453+
* next/current run, or the current run will automatically start a new run when it finishes because the cluster state queue changed
454+
* while it was running. This method does nothing if this task is cancelled or completed.
430455
*/
431-
public void requestReschedule() {
456+
public void requestRunOnState(ClusterState clusterState) {
457+
if (isCancelled() || isCompleted() || threadPool.scheduler().isShutdown()) {
458+
logger.debug("Not requesting downloader run on cluster state because task is cancelled, completed or shutting down");
459+
return;
460+
}
461+
logger.trace("Requesting downloader run on cluster state [{}]", clusterState.version());
462+
if (queue.getAndSet(clusterState) == null) {
463+
logger.trace("Scheduling downloader run on cluster state");
464+
threadPool.schedule(this::runOnState, TimeValue.ZERO, threadPool.generic());
465+
}
466+
}
467+
468+
/**
469+
* Waits for any current run to finish, then runs the downloader on the last seen cluster state. If a new cluster state came in while
470+
* waiting or running, then schedules another run to happen immediately after this one.
471+
*/
472+
private void runOnState() {
473+
if (isCancelled() || isCompleted()) {
474+
logger.debug("Not running downloader on cluster state because task is cancelled or completed");
475+
return;
476+
}
477+
// Here we do want to wait for the current run (if any) to finish. Since a new cluster state might have arrived while the current
478+
// run was running, we want to ensure that new cluster state update isn't lost, so we wait and run afterwards.
479+
logger.trace("Waiting to run downloader on cluster state");
480+
try {
481+
running.acquire();
482+
} catch (InterruptedException e) {
483+
logger.warn("Interrupted while waiting to run downloader on cluster state", e);
484+
}
485+
// Get the last seen cluster state and process it.
486+
final ClusterState clusterState = queue.get();
487+
assert clusterState != null : "queue was null, but we should only be called if queue was non-null";
488+
logger.debug("Running downloader on cluster state [{}]", clusterState.version());
489+
runDownloader(clusterState);
490+
// Try to clear the queue by setting the reference to null. If another cluster state came in since we fetched it above (i.e. the
491+
// reference differs from `clusterState`), then we schedule another run to happen immediately after this one.
492+
if (queue.compareAndSet(clusterState, null) == false) {
493+
logger.debug("A new cluster state came in while running, scheduling another run");
494+
threadPool.schedule(this::runOnState, TimeValue.ZERO, threadPool.generic());
495+
}
496+
// We release the semaphore last, to ensure that no duplicate runs/threads are started.
497+
running.release();
498+
logger.trace("Finished running downloader on cluster state [{}]", clusterState.version());
499+
}
500+
501+
/**
502+
* Downloads the geoip databases now based on the supplied cluster state.
503+
*/
504+
synchronized void runDownloader(ClusterState clusterState) {
505+
// by the time we reach here, the state will never be null
506+
assert this.state != null : "this.setState() is null. You need to call setState() before calling runDownloader()";
507+
432508
if (isCancelled() || isCompleted()) {
433509
return;
434510
}
435-
if (scheduled != null && scheduled.cancel()) {
436-
scheduleNextRun(TimeValue.ZERO);
511+
try {
512+
updateDatabases(clusterState); // n.b. this downloads bytes from the internet, it can take a while
513+
} catch (Exception e) {
514+
logger.error("exception during databases update", e);
515+
}
516+
try {
517+
cleanDatabases(clusterState);
518+
} catch (Exception e) {
519+
logger.error("exception during databases cleanup", e);
437520
}
438521
}
439522

440-
private void cleanDatabases() {
523+
private void cleanDatabases(ClusterState clusterState) {
441524
List<Tuple<String, Metadata>> expiredDatabases = state.getDatabases()
442525
.entrySet()
443526
.stream()
444-
.filter(e -> e.getValue().isNewEnough(clusterService.state().metadata().settings()) == false)
527+
.filter(e -> e.getValue().isNewEnough(clusterState.metadata().settings()) == false)
445528
.map(entry -> Tuple.tuple(entry.getKey(), entry.getValue()))
446529
.toList();
447530
expiredDatabases.forEach(e -> {
@@ -461,12 +544,6 @@ protected void onCancelled() {
461544
markAsCompleted();
462545
}
463546

464-
private void scheduleNextRun(TimeValue time) {
465-
if (threadPool.scheduler().isShutdown() == false) {
466-
scheduled = threadPool.schedule(this::runDownloader, time, threadPool.generic());
467-
}
468-
}
469-
470547
private ProviderDownload downloaderFor(DatabaseConfiguration database) {
471548
if (database.provider() instanceof DatabaseConfiguration.Maxmind maxmind) {
472549
return new MaxmindDownload(database.name(), maxmind);

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloaderTaskExecutor.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ private void setPollInterval(TimeValue pollInterval) {
103103
this.pollInterval = pollInterval;
104104
EnterpriseGeoIpDownloader currentDownloader = getCurrentTask();
105105
if (currentDownloader != null) {
106-
currentDownloader.requestReschedule();
106+
currentDownloader.restartPeriodicRun();
107107
}
108108
}
109109
}
@@ -154,7 +154,7 @@ protected void nodeOperation(AllocatedPersistentTask task, EnterpriseGeoIpTaskPa
154154
downloader.setState(geoIpTaskState);
155155
currentTask.set(downloader);
156156
if (ENABLED_SETTING.get(clusterService.state().metadata().settings(), settings)) {
157-
downloader.runDownloader();
157+
downloader.restartPeriodicRun();
158158
}
159159
}
160160

@@ -169,7 +169,8 @@ public void clusterChanged(ClusterChangedEvent event) {
169169
boolean hasGeoIpMetadataChanges = event.metadataChanged()
170170
&& event.changedCustomProjectMetadataSet().contains(IngestGeoIpMetadata.TYPE);
171171
if (hasGeoIpMetadataChanges) {
172-
currentDownloader.requestReschedule(); // watching the cluster changed events to kick the thing off if it's not running
172+
// watching the cluster changed events to kick the thing off if it's not running
173+
currentDownloader.requestRunOnState(event.state());
173174
}
174175
}
175176
}

modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloaderTests.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -451,11 +451,10 @@ public void testUpdateDatabasesWriteBlock() {
451451
.get(EnterpriseGeoIpDownloader.DATABASES_INDEX)
452452
.getWriteIndex()
453453
.getName();
454-
state = ClusterState.builder(state)
454+
ClusterState finalState = ClusterState.builder(state)
455455
.blocks(new ClusterBlocks.Builder().addIndexBlock(projectId, geoIpIndex, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK))
456456
.build();
457-
when(clusterService.state()).thenReturn(state);
458-
var e = expectThrows(ClusterBlockException.class, () -> geoIpDownloader.updateDatabases());
457+
var e = expectThrows(ClusterBlockException.class, () -> geoIpDownloader.updateDatabases(finalState));
459458
assertThat(
460459
e.getMessage(),
461460
equalTo(
@@ -481,8 +480,7 @@ public void testUpdateDatabasesIndexNotReady() throws IOException {
481480
state = ClusterState.builder(state)
482481
.blocks(new ClusterBlocks.Builder().addIndexBlock(projectId, geoIpIndex, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK))
483482
.build();
484-
when(clusterService.state()).thenReturn(state);
485-
geoIpDownloader.updateDatabases();
483+
geoIpDownloader.updateDatabases(state);
486484
verifyNoInteractions(httpClient);
487485
}
488486

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -230,9 +230,6 @@ tests:
230230
- class: org.elasticsearch.smoketest.MlWithSecurityIT
231231
method: test {yaml=ml/trained_model_cat_apis/Test cat trained models}
232232
issue: https://github.com/elastic/elasticsearch/issues/125750
233-
- class: org.elasticsearch.ingest.geoip.EnterpriseGeoIpDownloaderIT
234-
method: testEnterpriseDownloaderTask
235-
issue: https://github.com/elastic/elasticsearch/issues/126124
236233
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
237234
method: test {p0=transform/transforms_start_stop/Test start/stop only starts/stops specified transform}
238235
issue: https://github.com/elastic/elasticsearch/issues/126466

0 commit comments

Comments
 (0)