Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/134223.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 134223
summary: Improve concurrency design of `EnterpriseGeoIpDownloader`
area: Ingest Node
type: bug
issues:
- 126124
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
}

@SuppressWarnings("unchecked")
@TestLogging(
reason = "understanding why ipinfo asn database sometimes is not loaded",
value = "org.elasticsearch.ingest.geoip.DatabaseNodeService:TRACE"
)
@TestLogging(reason = "understanding why ipinfo asn database sometimes is not loaded", value = "org.elasticsearch.ingest.geoip:TRACE")
public void testEnterpriseDownloaderTask() throws Exception {
/*
* This test starts the enterprise geoip downloader task, and creates a database configuration. Then it creates an ingest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -55,6 +56,8 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -108,7 +111,23 @@ public class EnterpriseGeoIpDownloader extends AllocatedPersistentTask {

// visible for testing
protected volatile EnterpriseGeoIpTaskState state;
/**
* The currently scheduled periodic run, or null if no periodic run is currently scheduled. Note: _not_ the currently running thread!
*/
private volatile Scheduler.ScheduledCancellable scheduled;
/**
* Semaphore with 1 permit, used to ensure that only one run (periodic or cluster state) is running at a time.
*/
private final Semaphore running = new Semaphore(1);
/**
* Contains a reference to the next state to run on, or null if no run is currently requested.
* May be overridden by a newer state before the downloader has had a chance to run on it.
* We store the cluster state like this instead of using `ClusterService#state()`, as we invoke {@link #requestRunOnState(ClusterState)}
* from a cluster state listener, and then use the cluster state asynchronously, meaning there's a race condition between
* {@link #runOnState()} and the rest of the cluster state listeners completing and `ClusterStateApplierService` updating its internal
* `state` field.
*/
private final AtomicReference<ClusterState> queue = new AtomicReference<>();
private final Supplier<TimeValue> pollIntervalSupplier;
private final Function<String, char[]> tokenProvider;

Expand Down Expand Up @@ -146,10 +165,9 @@ void setState(EnterpriseGeoIpTaskState state) {
}

// visible for testing
void updateDatabases() throws IOException {
void updateDatabases(ClusterState clusterState) throws IOException {
@NotMultiProjectCapable(description = "Enterprise GeoIP not available in serverless")
ProjectId projectId = ProjectId.DEFAULT;
var clusterState = clusterService.state();
var geoipIndex = clusterState.getMetadata().getProject(projectId).getIndicesLookup().get(EnterpriseGeoIpDownloader.DATABASES_INDEX);
if (geoipIndex != null) {
logger.trace("the geoip index [{}] exists", EnterpriseGeoIpDownloader.DATABASES_INDEX);
Expand Down Expand Up @@ -390,58 +408,124 @@ static byte[] getChunk(InputStream is) throws IOException {
}

/**
* Downloads the geoip databases now, and schedules them to be downloaded again after pollInterval.
* Cancels the currently scheduled run (if any) and schedules a new (periodic) run to happen immediately, which will then schedule
* the next periodic run using the poll interval.
*/
synchronized void runDownloader() {
// by the time we reach here, the state will never be null
assert this.state != null : "this.setState() is null. You need to call setState() before calling runDownloader()";

// there's a race condition between here and requestReschedule. originally this scheduleNextRun call was at the end of this
// block, but remember that updateDatabases can take seconds to run (it's downloading bytes from the internet), and so during the
// very first run there would be no future run scheduled to reschedule in requestReschedule. which meant that if you went from zero
// to N(>=2) databases in quick succession, then all but the first database wouldn't necessarily get downloaded, because the
// requestReschedule call in the EnterpriseGeoIpDownloaderTaskExecutor's clusterChanged wouldn't have a scheduled future run to
// reschedule. scheduling the next run at the beginning of this run means that there's a much smaller window (milliseconds?, rather
// than seconds) in which such a race could occur. technically there's a window here, still, but i think it's _greatly_ reduced.
Copy link
Member

@jbaiera jbaiera Sep 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this race condition is a result of chasing our tail instead of relying on the scheduling code. It looks like we schedule the next run immediately every execution because the first time we run the downloader (in nodeOperation) we don't have a scheduled task to cancel. But if we scheduled that first run (even at zero seconds) then we'd have the scheduled task field initialized for when we want to call it adhoc. And then when we run it adhoc, we aren't racing to set the scheduled field. We could even use a barrier or gate on adhoc runs to enforce that the scheduling thread sets the field before the scheduled function can in the case that the scheduler is suspended strangely. Maybe I'm missing something, thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if we scheduled that first run (even at zero seconds) then we'd have the scheduled task field initialized for when we want to call it adhoc. And then when we run it adhoc, we aren't racing to set the scheduled field.

Do you mean that this would allow us to move the scheduleNextRun(pollIntervalSupplier.get()); call to the end of the runDownloader() method, which would reduce the chance of this race condition occurring? As in, because the body of runDownloader() takes time, we'd significantly reduce the chance that scheduleNextRun(pollIntervalSupplier.get()) is run before the scheduled field is set to the ad-hoc run?

I considered that too, but that doesn't address the race condition we have with clusterService.state(). If we have a significantly slow cluster state applier in the cluster, our clusterService.state() could run before ClusterStateApplier's state field is set to the new cluster state.
While typing this, I realized that EnterpriseGeoIpDownloaderTaskExecutor implements ClusterStateListener and not ClusterStateApplier, and listeners are only invoked after the state field is updated:

callClusterStateAppliers(clusterChangedEvent, stopWatch);
nodeConnectionsService.disconnectFromNodesExcept(newClusterState.nodes());
logger.debug("set locally applied cluster state to version {}", newClusterState.version());
state.set(newClusterState);
callClusterStateListeners(clusterChangedEvent, stopWatch);

So, I don't think we need to worry about that cluster state race condition anymore. That does simplify things a lot. Then I think we could indeed revert to scheduling the next run at the end of runDownloader() and ensure that works properly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, looking at this again, I realized I forgot to mention another part of my reasoning for the current changes in this PR. I am personally not a big fan of the "cancel scheduled run" approach, because it creates some uncertainty about what happens if a scheduled run is already in progress. We cancel with mayInterruptIfRunning=false here:

return toCancel.cancel(false); // this method is a forbidden API since it interrupts threads

But to me it's not clear what exactly happens with a scheduled run that is already in progress - does the scheduled.cancel() call block/wait for the scheduled run to finish? This was part of the reason why I went the approach of splitting the periodic and ad-hoc runs up, and making them explicitly wait for the run to finish, instead of cancelling one. Let me know what your thoughts are on that!

scheduleNextRun(pollIntervalSupplier.get());
// TODO regardless of the above comment, i like the idea of checking the lowest last-checked time and then running the math to get
// to the next interval from then -- maybe that's a neat future enhancement to add
public void restartPeriodicRun() {
logger.trace("Restarting periodic run");
if (scheduled != null) {
final boolean cancelSuccessful = scheduled.cancel();
logger.trace("Cancelled scheduled run: [{}]", cancelSuccessful);
}
if (threadPool.scheduler().isShutdown() == false) {
threadPool.schedule(this::runPeriodic, TimeValue.ZERO, threadPool.generic());
}
}

/**
* Tries to run the downloader now, if it isn't already currently running, and schedules the next periodic run using the poll interval.
*/
private void runPeriodic() {
if (isCancelled() || isCompleted()) {
logger.debug("Not running periodic downloader because task is cancelled or completed");
return;
}
try {
updateDatabases(); // n.b. this downloads bytes from the internet, it can take a while
} catch (Exception e) {
logger.error("exception during databases update", e);

// If we are not able to acquire the semaphore immediately, it means that a run is already in progress. Periodic runs do not run
// concurrently, but a cluster state run could be in progress. Since the default poll interval is quite large (3d), there is no
// need to wait for the current run to finish and then run again, so we just skip this run and schedule the next one.
if (running.tryAcquire()) {
final var clusterState = clusterService.state();
logger.trace("Running periodic downloader on cluster state [{}]", clusterState.version());
runDownloader(clusterState);
running.release();
}
if (threadPool.scheduler().isShutdown() == false) {
logger.trace("Scheduling next periodic run, current scheduled run is [{}]", scheduled);
scheduled = threadPool.schedule(this::runPeriodic, pollIntervalSupplier.get(), threadPool.generic());
logger.trace("Next periodic run scheduled: [{}]", scheduled);
}
}

/**
* This method requests that the downloader runs on the supplied cluster state, which likely contains a change in the GeoIP metadata.
* If the queue was non-empty before we set it, then a run is already scheduled or in progress, so it will either be processed in the
* next/current run, or the current run will automatically start a new run when it finishes because the cluster state queue changed
* while it was running. This method does nothing if this task is cancelled or completed.
*/
public void requestRunOnState(ClusterState clusterState) {
if (isCancelled() || isCompleted() || threadPool.scheduler().isShutdown()) {
logger.debug("Not requesting downloader run on cluster state because task is cancelled, completed or shutting down");
return;
}
logger.trace("Requesting downloader run on cluster state [{}]", clusterState.version());
if (queue.getAndSet(clusterState) == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned that this hand off might cause other race conditions or leaks. If we set a state here and the runOnState function is interrupted while waiting for the running lock, there's nothing that can clear this reference which means this conditional can never schedule it to run again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm good point. I think a similar problem also existed in the current/previous code, as the thread could be interrupted before it scheduled the next run with scheduleNextRun(pollIntervalSupplier.get()); I think we could improve the periodic aspect by using ThreadPool#scheduleWithFixedDelay once (and restart when the poll interval changes) instead of calling ThreadPool#schedule every time. That would ensure that a thread interrupt doesn't prevent the periodic run from never being run again, but it doesn't fix the locking problem. Thinking out loud, we could wrap runOnState in a try/catch where we catch thread interrupts and release the lock in the finally block, but I'm not sure if that guarantees that the lock is released - could the thread be interrupted while the lock release is running?

logger.trace("Scheduling downloader run on cluster state");
threadPool.schedule(this::runOnState, TimeValue.ZERO, threadPool.generic());
}
}

/**
* Waits for any current run to finish, then runs the downloader on the last seen cluster state. If a new cluster state came in while
* waiting or running, then schedules another run to happen immediately after this one.
*/
private void runOnState() {
if (isCancelled() || isCompleted()) {
logger.debug("Not running downloader on cluster state because task is cancelled or completed");
return;
}
// Here we do want to wait for the current run (if any) to finish. Since a new cluster state might have arrived while the current
// run was running, we want to ensure that new cluster state update isn't lost, so we wait and run afterwards.
logger.trace("Waiting to run downloader on cluster state");
try {
cleanDatabases();
} catch (Exception e) {
logger.error("exception during databases cleanup", e);
running.acquire();
} catch (InterruptedException e) {
logger.warn("Interrupted while waiting to run downloader on cluster state", e);
return;
}
// Get the last seen cluster state and process it.
final ClusterState clusterState = queue.get();
assert clusterState != null : "queue was null, but we should only be called if queue was non-null";
logger.debug("Running downloader on cluster state [{}]", clusterState.version());
runDownloader(clusterState);
// Try to clear the queue by setting the reference to null. If another cluster state came in since we fetched it above (i.e. the
// reference differs from `clusterState`), then we schedule another run to happen immediately after this one.
if (queue.compareAndSet(clusterState, null) == false) {
logger.debug("A new cluster state came in while running, scheduling another run");
threadPool.schedule(this::runOnState, TimeValue.ZERO, threadPool.generic());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a thought: Does this need another schedule call? If we're aware of a new state could we loop to the start of the method instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, looping to the start of the current method wouldn't work because we could risk stack overflows, and we'd get deadlocks (because we'd never reach the running.release() below). We could essentially change this method to loop while the "queue" (I agree with Szymon that that variable name should be rephrased) is changed. But I'm not sure what benefit that would have other than potentially reducing the amount of locking and releasing we do. An advantage of the current implementation is that we're a friendlier neighbour of the generic threadpool as we're hoarding threads for shorter periods of time.

Copy link
Member

@jbaiera jbaiera Sep 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could essentially change this method to loop while the "queue"

That is what I was thinking yeah. I think I described it poorly.

An advantage of the current implementation is that we're a friendlier neighbour of the generic threadpool

The current logic waits for a lock on a thread which is consuming that generic thread during the wait period regardless. If multiple updates trickle in then we will be repeatedly rescheduling the operation with zero wait anyway, which means grabbing another thread to wait on immediately reacquiring the lock again.

I think if we move forward with this specific change we'll be consuming the time on generic threads no matter what. That said, if this keeps it simpler for folks to reason about then that's fine too.

}
// We release the semaphore last, to ensure that no duplicate runs/threads are started.
running.release();
logger.trace("Finished running downloader on cluster state [{}]", clusterState.version());
}

/**
* This method requests that the downloader be rescheduled to run immediately (presumably because a dynamic property supplied by
* pollIntervalSupplier or eagerDownloadSupplier has changed, or a pipeline with a geoip processor has been added). This method does
* nothing if this task is cancelled, completed, or has not yet been scheduled to run for the first time. It cancels any existing
* scheduled run.
* Downloads the geoip databases now based on the supplied cluster state.
*/
public void requestReschedule() {
synchronized void runDownloader(ClusterState clusterState) {
// by the time we reach here, the state will never be null
assert this.state != null : "this.setState() is null. You need to call setState() before calling runDownloader()";

if (isCancelled() || isCompleted()) {
return;
}
if (scheduled != null && scheduled.cancel()) {
scheduleNextRun(TimeValue.ZERO);
try {
updateDatabases(clusterState); // n.b. this downloads bytes from the internet, it can take a while
} catch (Exception e) {
logger.error("exception during databases update", e);
}
try {
cleanDatabases(clusterState);
} catch (Exception e) {
logger.error("exception during databases cleanup", e);
}
}

private void cleanDatabases() {
private void cleanDatabases(ClusterState clusterState) {
List<Tuple<String, Metadata>> expiredDatabases = state.getDatabases()
.entrySet()
.stream()
.filter(e -> e.getValue().isNewEnough(clusterService.state().metadata().settings()) == false)
.filter(e -> e.getValue().isNewEnough(clusterState.metadata().settings()) == false)
.map(entry -> Tuple.tuple(entry.getKey(), entry.getValue()))
.toList();
expiredDatabases.forEach(e -> {
Expand All @@ -461,12 +545,6 @@ protected void onCancelled() {
markAsCompleted();
}

private void scheduleNextRun(TimeValue time) {
if (threadPool.scheduler().isShutdown() == false) {
scheduled = threadPool.schedule(this::runDownloader, time, threadPool.generic());
}
}

private ProviderDownload downloaderFor(DatabaseConfiguration database) {
if (database.provider() instanceof DatabaseConfiguration.Maxmind maxmind) {
return new MaxmindDownload(database.name(), maxmind);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private void setPollInterval(TimeValue pollInterval) {
this.pollInterval = pollInterval;
EnterpriseGeoIpDownloader currentDownloader = getCurrentTask();
if (currentDownloader != null) {
currentDownloader.requestReschedule();
currentDownloader.restartPeriodicRun();
}
}
}
Expand Down Expand Up @@ -154,7 +154,7 @@ protected void nodeOperation(AllocatedPersistentTask task, EnterpriseGeoIpTaskPa
downloader.setState(geoIpTaskState);
currentTask.set(downloader);
if (ENABLED_SETTING.get(clusterService.state().metadata().settings(), settings)) {
downloader.runDownloader();
downloader.restartPeriodicRun();
}
}

Expand All @@ -169,7 +169,8 @@ public void clusterChanged(ClusterChangedEvent event) {
boolean hasGeoIpMetadataChanges = event.metadataChanged()
&& event.changedCustomProjectMetadataSet().contains(IngestGeoIpMetadata.TYPE);
if (hasGeoIpMetadataChanges) {
currentDownloader.requestReschedule(); // watching the cluster changed events to kick the thing off if it's not running
// watching the cluster changed events to kick the thing off if it's not running
currentDownloader.requestRunOnState(event.state());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,11 +451,10 @@ public void testUpdateDatabasesWriteBlock() {
.get(EnterpriseGeoIpDownloader.DATABASES_INDEX)
.getWriteIndex()
.getName();
state = ClusterState.builder(state)
ClusterState finalState = ClusterState.builder(state)
.blocks(new ClusterBlocks.Builder().addIndexBlock(projectId, geoIpIndex, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK))
.build();
when(clusterService.state()).thenReturn(state);
var e = expectThrows(ClusterBlockException.class, () -> geoIpDownloader.updateDatabases());
var e = expectThrows(ClusterBlockException.class, () -> geoIpDownloader.updateDatabases(finalState));
assertThat(
e.getMessage(),
equalTo(
Expand All @@ -481,8 +480,7 @@ public void testUpdateDatabasesIndexNotReady() throws IOException {
state = ClusterState.builder(state)
.blocks(new ClusterBlocks.Builder().addIndexBlock(projectId, geoIpIndex, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK))
.build();
when(clusterService.state()).thenReturn(state);
geoIpDownloader.updateDatabases();
geoIpDownloader.updateDatabases(state);
verifyNoInteractions(httpClient);
}

Expand Down
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,6 @@ tests:
- class: org.elasticsearch.smoketest.MlWithSecurityIT
method: test {yaml=ml/trained_model_cat_apis/Test cat trained models}
issue: https://github.com/elastic/elasticsearch/issues/125750
- class: org.elasticsearch.ingest.geoip.EnterpriseGeoIpDownloaderIT
method: testEnterpriseDownloaderTask
issue: https://github.com/elastic/elasticsearch/issues/126124
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
method: test {p0=transform/transforms_start_stop/Test start/stop only starts/stops specified transform}
issue: https://github.com/elastic/elasticsearch/issues/126466
Expand Down