Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/134223.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 134223
summary: Improve concurrency design of `EnterpriseGeoIpDownloader`
area: Ingest Node
type: bug
issues:
- 126124
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
}

@SuppressWarnings("unchecked")
@TestLogging(
reason = "understanding why ipinfo asn database sometimes is not loaded",
value = "org.elasticsearch.ingest.geoip.DatabaseNodeService:TRACE"
)
@TestLogging(reason = "understanding why ipinfo asn database sometimes is not loaded", value = "org.elasticsearch.ingest.geoip:TRACE")
public void testEnterpriseDownloaderTask() throws Exception {
/*
* This test starts the enterprise geoip downloader task, and creates a database configuration. Then it creates an ingest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -108,7 +110,18 @@ public class EnterpriseGeoIpDownloader extends AllocatedPersistentTask {

// visible for testing
protected volatile EnterpriseGeoIpTaskState state;
/**
* The currently scheduled periodic run, or null if no periodic run is currently scheduled. Note: _not_ the currently running thread!
*/
private volatile Scheduler.ScheduledCancellable scheduled;
/**
* Semaphore with 1 permit, used to ensure that only one run (periodic or cluster state) is running at a time.
*/
private final Semaphore running = new Semaphore(1);
/**
* True if a run has been queued due to a cluster state change, false otherwise.
*/
private final AtomicBoolean runOnDemandQueued = new AtomicBoolean(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A technique we use elsewhere is to have an AtomicInteger ql representing the length of some (possibly-notional) queue of tasks awaiting execution. Adding a task to the queue means calling ql.getAndIncrement() and then checking the return value: if zero, we just moved the queue from empty to nonempty and thus we must actually execute the task. If nonzero, there's already a task running thus we do nothing more. Meanwhile, when we start the task we call int batchSize = ql.get() and this tells us the size of the batch of (notional) tasks we're running, and then ql.addAndGet(-batchSize) at the end of the task execution. If that addAndGet returns 0 then no new tasks were added so we stop, otherwise we must re-run the task, getting a new batchSize etc. See for instance org.elasticsearch.cluster.service.MasterService#totalQueueSize.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for chiming in, David. I originally thought of a similar approach to indicate the number of queued tasks, but then I realized we don't actually care in this class how many times we requested the downloader to run on demand; we only care whether it was requested or not, hence my use of a boolean. Therefore, I'm not immediately seeing what added value an integer would have in this case. Am I missing something, or did you just want to link a place where we do something similar?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With a boolean you have to do extra work to know whether a new task got added after the current task started executing (in which case you need to run the task again) or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... or rather, that's the problem if you set it back to false at the end of the task execution. If you set it back to false at the start of the task execution then you need some other mechanism to ensure that only one task executes at once. In the latter case, the AtomicInteger gives you the mutual exclusion automatically.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, right. I thought you were suggesting replacing only the AtomicBoolean with AtomicInteger, but you were suggesting removing the Semaphore as well. I implemented that locally just now, and I think I indeed prefer this approach, as it's simpler than what I had before. The only small downside is that we need to check the result of of the decrement call in two places instead of one compared to the MasterService, because we also have the periodic run here, but I think that's worth the reduced complexity. Thanks a lot for the suggestion!

private final Supplier<TimeValue> pollIntervalSupplier;
private final Function<String, char[]> tokenProvider;

Expand Down Expand Up @@ -390,23 +403,108 @@ static byte[] getChunk(InputStream is) throws IOException {
}

/**
* Downloads the geoip databases now, and schedules them to be downloaded again after pollInterval.
* Cancels the currently scheduled run (if any) and schedules a new (periodic) run to happen immediately, which will then schedule
* the next periodic run using the poll interval.
*/
public void restartPeriodicRun() {
logger.trace("Restarting periodic run");
if (scheduled != null) {
final boolean cancelSuccessful = scheduled.cancel();
logger.trace("Cancelled scheduled run: [{}]", cancelSuccessful);
}
if (threadPool.scheduler().isShutdown() == false) {
threadPool.generic().submit(this::runPeriodic);
}
}

/**
* Tries to run the downloader now, if it isn't already currently running, and schedules the next periodic run using the poll interval.
*/
private void runPeriodic() {
if (isCancelled() || isCompleted()) {
logger.debug("Not running periodic downloader because task is cancelled or completed");
return;
}

// If we are not able to acquire the semaphore immediately, it means that a run is already in progress. Periodic runs do not run
// concurrently, but a cluster state run could be in progress. Since the default poll interval is quite large (3d), there is no
// need to wait for the current run to finish and then run again, so we just skip this run and schedule the next one.
if (running.tryAcquire()) {
try {
logger.trace("Running periodic downloader");
runDownloader();
} finally {
running.release();
}
}
if (threadPool.scheduler().isShutdown() == false) {
logger.trace("Scheduling next periodic run, current scheduled run is [{}]", scheduled);
scheduled = threadPool.schedule(this::runPeriodic, pollIntervalSupplier.get(), threadPool.generic());
logger.trace("Next periodic run scheduled: [{}]", scheduled);
}
}

/**
* This method requests that the downloader runs on the latest cluster state, which likely contains a change in the GeoIP metadata.
* If the queue was already {@code true}, then a run is already scheduled, which will process the latest cluster state when it runs,
* so we don't need to do anything. If the queue was {@code false}, then we set it to {@code true} and schedule a run.
* This method does nothing if this task is cancelled or completed.
*/
public void requestRunOnDemand() {
if (isCancelled() || isCompleted() || threadPool.scheduler().isShutdown()) {
logger.debug("Not requesting downloader run on demand because task is cancelled, completed or shutting down");
return;
}
logger.trace("Requesting downloader run on demand");
if (runOnDemandQueued.compareAndSet(false, true)) {
logger.trace("Scheduling downloader run on demand");
threadPool.generic().submit(this::runOnDemand);
}
}

/**
* Waits for any current run to finish, then runs the downloader on the last seen cluster state.
*/
private void runOnDemand() {
if (isCancelled() || isCompleted()) {
logger.debug("Not running downloader on demand because task is cancelled or completed");
return;
}
// Here we do want to wait for the current run (if any) to finish. Since a new cluster state might have arrived while the current
// run was running, we want to ensure that the new cluster state update isn't lost, so we wait and run afterwards.
logger.trace("Waiting to run downloader on demand");
boolean acquired = false;
try {
running.acquire();
acquired = true;
// We have the semaphore, so no other run is in progress. We set queued to false, so that future calls to requestRunOnDemand
// will schedule a new run. We do this before running the downloader, so that if a new cluster state arrives while we're
// running, that will schedule a new run.
// Technically speaking, there's a chance that a new cluster state arrives in between the following line and
// clusterService.state() in updateDatabases() (or deleteDatabases()), which will cause another run to be scheduled,
// but the only consequence of that is that we'll process that cluster state twice, which is harmless.
runOnDemandQueued.set(false);
logger.debug("Running downloader on demand");
runDownloader();
} catch (InterruptedException e) {
logger.warn("Interrupted while waiting to run downloader on demand", e);
} finally {
// If a new cluster state arrived in between queued.set(false) and now, then that will have scheduled a new run,
// so we don't need to care about that here.
if (acquired) {
running.release();
}
logger.trace("Finished running downloader on demand");
}
}

/**
* Downloads the geoip databases now based on the supplied cluster state.
*/
synchronized void runDownloader() {
// by the time we reach here, the state will never be null
assert this.state != null : "this.setState() is null. You need to call setState() before calling runDownloader()";

// there's a race condition between here and requestReschedule. originally this scheduleNextRun call was at the end of this
// block, but remember that updateDatabases can take seconds to run (it's downloading bytes from the internet), and so during the
// very first run there would be no future run scheduled to reschedule in requestReschedule. which meant that if you went from zero
// to N(>=2) databases in quick succession, then all but the first database wouldn't necessarily get downloaded, because the
// requestReschedule call in the EnterpriseGeoIpDownloaderTaskExecutor's clusterChanged wouldn't have a scheduled future run to
// reschedule. scheduling the next run at the beginning of this run means that there's a much smaller window (milliseconds?, rather
// than seconds) in which such a race could occur. technically there's a window here, still, but i think it's _greatly_ reduced.
Copy link
Member

@jbaiera jbaiera Sep 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this race condition is a result of chasing our tail instead of relying on the scheduling code. It looks like we schedule the next run immediately every execution because the first time we run the downloader (in nodeOperation) we don't have a scheduled task to cancel. But if we scheduled that first run (even at zero seconds) then we'd have the scheduled task field initialized for when we want to call it adhoc. And then when we run it adhoc, we aren't racing to set the scheduled field. We could even use a barrier or gate on adhoc runs to enforce that the scheduling thread sets the field before the scheduled function can in the case that the scheduler is suspended strangely. Maybe I'm missing something, thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if we scheduled that first run (even at zero seconds) then we'd have the scheduled task field initialized for when we want to call it adhoc. And then when we run it adhoc, we aren't racing to set the scheduled field.

Do you mean that this would allow us to move the scheduleNextRun(pollIntervalSupplier.get()); call to the end of the runDownloader() method, which would reduce the chance of this race condition occurring? As in, because the body of runDownloader() takes time, we'd significantly reduce the chance that scheduleNextRun(pollIntervalSupplier.get()) is run before the scheduled field is set to the ad-hoc run?

I considered that too, but that doesn't address the race condition we have with clusterService.state(). If we have a significantly slow cluster state applier in the cluster, our clusterService.state() could run before ClusterStateApplier's state field is set to the new cluster state.
While typing this, I realized that EnterpriseGeoIpDownloaderTaskExecutor implements ClusterStateListener and not ClusterStateApplier, and listeners are only invoked after the state field is updated:

callClusterStateAppliers(clusterChangedEvent, stopWatch);
nodeConnectionsService.disconnectFromNodesExcept(newClusterState.nodes());
logger.debug("set locally applied cluster state to version {}", newClusterState.version());
state.set(newClusterState);
callClusterStateListeners(clusterChangedEvent, stopWatch);

So, I don't think we need to worry about that cluster state race condition anymore. That does simplify things a lot. Then I think we could indeed revert to scheduling the next run at the end of runDownloader() and ensure that works properly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, looking at this again, I realized I forgot to mention another part of my reasoning for the current changes in this PR. I am personally not a big fan of the "cancel scheduled run" approach, because it creates some uncertainty about what happens if a scheduled run is already in progress. We cancel with mayInterruptIfRunning=false here:

return toCancel.cancel(false); // this method is a forbidden API since it interrupts threads

But to me it's not clear what exactly happens with a scheduled run that is already in progress - does the scheduled.cancel() call block/wait for the scheduled run to finish? This was part of the reason why I went the approach of splitting the periodic and ad-hoc runs up, and making them explicitly wait for the run to finish, instead of cancelling one. Let me know what your thoughts are on that!

scheduleNextRun(pollIntervalSupplier.get());
// TODO regardless of the above comment, i like the idea of checking the lowest last-checked time and then running the math to get
// to the next interval from then -- maybe that's a neat future enhancement to add

if (isCancelled() || isCompleted()) {
return;
}
Expand All @@ -422,21 +520,6 @@ synchronized void runDownloader() {
}
}

/**
* This method requests that the downloader be rescheduled to run immediately (presumably because a dynamic property supplied by
* pollIntervalSupplier or eagerDownloadSupplier has changed, or a pipeline with a geoip processor has been added). This method does
* nothing if this task is cancelled, completed, or has not yet been scheduled to run for the first time. It cancels any existing
* scheduled run.
*/
public void requestReschedule() {
if (isCancelled() || isCompleted()) {
return;
}
if (scheduled != null && scheduled.cancel()) {
scheduleNextRun(TimeValue.ZERO);
}
}

private void cleanDatabases() {
List<Tuple<String, Metadata>> expiredDatabases = state.getDatabases()
.entrySet()
Expand All @@ -461,12 +544,6 @@ protected void onCancelled() {
markAsCompleted();
}

private void scheduleNextRun(TimeValue time) {
if (threadPool.scheduler().isShutdown() == false) {
scheduled = threadPool.schedule(this::runDownloader, time, threadPool.generic());
}
}

private ProviderDownload downloaderFor(DatabaseConfiguration database) {
if (database.provider() instanceof DatabaseConfiguration.Maxmind maxmind) {
return new MaxmindDownload(database.name(), maxmind);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ private void setPollInterval(TimeValue pollInterval) {
this.pollInterval = pollInterval;
EnterpriseGeoIpDownloader currentDownloader = getCurrentTask();
if (currentDownloader != null) {
currentDownloader.requestReschedule();
currentDownloader.restartPeriodicRun();
}
}
}
Expand Down Expand Up @@ -150,7 +150,7 @@ protected void nodeOperation(AllocatedPersistentTask task, EnterpriseGeoIpTaskPa
downloader.setState(geoIpTaskState);
currentTask.set(downloader);
if (ENABLED_SETTING.get(clusterService.state().metadata().settings(), settings)) {
downloader.runDownloader();
downloader.restartPeriodicRun();
}
}

Expand All @@ -165,7 +165,8 @@ public void clusterChanged(ClusterChangedEvent event) {
boolean hasGeoIpMetadataChanges = event.metadataChanged()
&& event.changedCustomProjectMetadataSet().contains(IngestGeoIpMetadata.TYPE);
if (hasGeoIpMetadataChanges) {
currentDownloader.requestReschedule(); // watching the cluster changed events to kick the thing off if it's not running
// watching the cluster changed events to kick the thing off if it's not running
currentDownloader.requestRunOnDemand();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,80 @@ public void testIpinfoUrls() {
}
}

/**
* Tests that if an exception is thrown while {@code runOnDemand} is running, the lock is released so that subsequent calls to
* {@code runOnDemand} can proceed.
*/
public void testRequestRunOnDemandReleasesLock() throws Exception {
ClusterState state = createClusterState(projectId, new PersistentTasksCustomMetadata(1L, Map.of()));
AtomicInteger calls = new AtomicInteger();
geoIpDownloader = new EnterpriseGeoIpDownloader(
client,
httpClient,
clusterService,
threadPool,
1,
"",
"",
"",
EMPTY_TASK_ID,
Map.of(),
() -> GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getDefault(Settings.EMPTY),
(type) -> "password".toCharArray()
) {
@Override
synchronized void runDownloader() {
if (calls.incrementAndGet() == 1) {
throw new RuntimeException("test exception");
}
super.runDownloader();
}
};
when(clusterService.state()).thenReturn(state);
geoIpDownloader.setState(EnterpriseGeoIpTaskState.EMPTY);
geoIpDownloader.requestRunOnDemand();
assertBusy(() -> assertEquals(1, calls.get()));
geoIpDownloader.requestRunOnDemand();
assertBusy(() -> { assertEquals(2, calls.get()); });
}

/**
* Tests that if an exception is thrown while {@code restartPeriodicRun} is running, the lock is released so that subsequent calls to
* {@code restartPeriodicRun} can proceed.
*/
public void testRestartPeriodicRunReleasesLock() throws Exception {
ClusterState state = createClusterState(projectId, new PersistentTasksCustomMetadata(1L, Map.of()));
AtomicInteger calls = new AtomicInteger();
geoIpDownloader = new EnterpriseGeoIpDownloader(
client,
httpClient,
clusterService,
threadPool,
1,
"",
"",
"",
EMPTY_TASK_ID,
Map.of(),
() -> GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getDefault(Settings.EMPTY),
(type) -> "password".toCharArray()
) {
@Override
synchronized void runDownloader() {
if (calls.incrementAndGet() == 1) {
throw new RuntimeException("test exception");
}
super.runDownloader();
}
};
when(clusterService.state()).thenReturn(state);
geoIpDownloader.setState(EnterpriseGeoIpTaskState.EMPTY);
geoIpDownloader.restartPeriodicRun();
assertBusy(() -> assertEquals(1, calls.get()));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a big fan of using assertBusy here; I'd prefer to use something like a future to avoid waiting unnecessarily long in assertBusy, but I don't immediately have a good example of that at hand and wanted to push my changes already.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from reading assertBusy, seems quite efficient with exponentialBackoff so i'd be happy with this, more readable too

but couldn't turn down an opportunity to play with the code (haven't tested this)

diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloaderTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloaderTests.java
index 4fd2b7b751c..06913cd9cb6 100644
--- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloaderTests.java
+++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloaderTests.java
@@ -56,6 +56,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
@@ -576,6 +577,8 @@ public class EnterpriseGeoIpDownloaderTests extends ESTestCase {
     public void testRestartPeriodicRunReleasesLock() throws Exception {
         ClusterState state = createClusterState(projectId, new PersistentTasksCustomMetadata(1L, Map.of()));
         AtomicInteger calls = new AtomicInteger();
+        final CompletableFuture<Void> firstRunInvoked = new CompletableFuture<>();
+        final CompletableFuture<Void> secondRunInvoked = new CompletableFuture<>();
         geoIpDownloader = new EnterpriseGeoIpDownloader(
             client,
             httpClient,
@@ -592,8 +595,13 @@ public class EnterpriseGeoIpDownloaderTests extends ESTestCase {
         ) {
             @Override
             synchronized void runDownloader() {
-                if (calls.incrementAndGet() == 1) {
-                    throw new RuntimeException("test exception");
+                switch (calls.incrementAndGet()) {
+                    case 1 -> {
+                        firstRunInvoked.complete(null);
+                        throw new RuntimeException("test exception");
+                    }
+                    case 2 -> secondRunInvoked.complete(null);
+                    default -> throw new IllegalStateException("unexpected to be called > 2 times");
                 }
                 super.runDownloader();
             }
@@ -601,9 +609,9 @@ public class EnterpriseGeoIpDownloaderTests extends ESTestCase {
         when(clusterService.state()).thenReturn(state);
         geoIpDownloader.setState(EnterpriseGeoIpTaskState.EMPTY);
         geoIpDownloader.restartPeriodicRun();
-        assertBusy(() -> assertEquals(1, calls.get()));
+        firstRunInvoked.join();
         geoIpDownloader.restartPeriodicRun();
-        assertBusy(() -> { assertEquals(2, calls.get()); });
+        secondRunInvoked.join();
     }
 
     private static class MockClient extends NoOpClient {

geoIpDownloader.restartPeriodicRun();
assertBusy(() -> { assertEquals(2, calls.get()); });
}

private static class MockClient extends NoOpClient {

private final Map<ActionType<?>, BiConsumer<? extends ActionRequest, ? extends ActionListener<?>>> handlers = new HashMap<>();
Expand Down
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,6 @@ tests:
- class: org.elasticsearch.smoketest.MlWithSecurityIT
method: test {yaml=ml/trained_model_cat_apis/Test cat trained models}
issue: https://github.com/elastic/elasticsearch/issues/125750
- class: org.elasticsearch.ingest.geoip.EnterpriseGeoIpDownloaderIT
method: testEnterpriseDownloaderTask
issue: https://github.com/elastic/elasticsearch/issues/126124
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
method: test {p0=transform/transforms_start_stop/Test start/stop only starts/stops specified transform}
issue: https://github.com/elastic/elasticsearch/issues/126466
Expand Down