-
Notifications
You must be signed in to change notification settings - Fork 25.7k
Improve concurrency design of EnterpriseGeoIpDownloader
#134223
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
ef12fe2
7ec9166
7a8ded8
e88aa58
7e2001a
b189e57
8470dbf
512a69e
e7bca06
d4ef021
40a13ab
4bd03ce
c4d55d7
21da0c8
0bd609f
627152d
877f133
041e4ff
dfcb64f
cfcecae
ca8db01
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
nielsbauman marked this conversation as resolved.
Show resolved
Hide resolved
|
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -18,6 +18,7 @@ | |||||||||||||||||||
| import org.elasticsearch.action.index.IndexRequest; | ||||||||||||||||||||
szybia marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||
| import org.elasticsearch.action.support.PlainActionFuture; | ||||||||||||||||||||
| import org.elasticsearch.client.internal.Client; | ||||||||||||||||||||
| import org.elasticsearch.cluster.ClusterState; | ||||||||||||||||||||
| import org.elasticsearch.cluster.block.ClusterBlockLevel; | ||||||||||||||||||||
| import org.elasticsearch.cluster.metadata.ProjectId; | ||||||||||||||||||||
| import org.elasticsearch.cluster.service.ClusterService; | ||||||||||||||||||||
|
|
@@ -55,6 +56,8 @@ | |||||||||||||||||||
| import java.util.Map; | ||||||||||||||||||||
| import java.util.Objects; | ||||||||||||||||||||
| import java.util.Set; | ||||||||||||||||||||
| import java.util.concurrent.Semaphore; | ||||||||||||||||||||
| import java.util.concurrent.atomic.AtomicReference; | ||||||||||||||||||||
| import java.util.function.Function; | ||||||||||||||||||||
| import java.util.function.Supplier; | ||||||||||||||||||||
| import java.util.regex.Pattern; | ||||||||||||||||||||
|
|
@@ -108,7 +111,23 @@ public class EnterpriseGeoIpDownloader extends AllocatedPersistentTask { | |||||||||||||||||||
|
|
||||||||||||||||||||
| // visible for testing | ||||||||||||||||||||
| protected volatile EnterpriseGeoIpTaskState state; | ||||||||||||||||||||
| /** | ||||||||||||||||||||
| * The currently scheduled periodic run, or null if no periodic run is currently scheduled. Note: _not_ the currently running thread! | ||||||||||||||||||||
szybia marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||||||||||||||||
| */ | ||||||||||||||||||||
| private volatile Scheduler.ScheduledCancellable scheduled; | ||||||||||||||||||||
szybia marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||||||||||||||||
| /** | ||||||||||||||||||||
| * Semaphore with 1 permit, used to ensure that only one run (periodic or cluster state) is running at a time. | ||||||||||||||||||||
| */ | ||||||||||||||||||||
| private final Semaphore running = new Semaphore(1); | ||||||||||||||||||||
szybia marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
szybia marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||||||||||||||||
| /** | ||||||||||||||||||||
| * Contains a reference to the next state to run on, or null if no run is currently requested. | ||||||||||||||||||||
| * May be overridden by a newer state before the downloader has had a chance to run on it. | ||||||||||||||||||||
| * We store the cluster state like this instead of using `ClusterService#state()`, as we invoke {@link #requestRunOnState(ClusterState)} | ||||||||||||||||||||
| * from a cluster state listener, and then use the cluster state asynchronously, meaning there's a race condition between | ||||||||||||||||||||
| * {@link #runOnState()} and the rest of the cluster state listeners completing and `ClusterStateApplierService` updating its internal | ||||||||||||||||||||
| * `state` field. | ||||||||||||||||||||
| */ | ||||||||||||||||||||
| private final AtomicReference<ClusterState> queue = new AtomicReference<>(); | ||||||||||||||||||||
nielsbauman marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||||||||||||||||
| private final Supplier<TimeValue> pollIntervalSupplier; | ||||||||||||||||||||
| private final Function<String, char[]> tokenProvider; | ||||||||||||||||||||
|
|
||||||||||||||||||||
|
|
@@ -146,10 +165,9 @@ void setState(EnterpriseGeoIpTaskState state) { | |||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| // visible for testing | ||||||||||||||||||||
| void updateDatabases() throws IOException { | ||||||||||||||||||||
| void updateDatabases(ClusterState clusterState) throws IOException { | ||||||||||||||||||||
| @NotMultiProjectCapable(description = "Enterprise GeoIP not available in serverless") | ||||||||||||||||||||
| ProjectId projectId = ProjectId.DEFAULT; | ||||||||||||||||||||
| var clusterState = clusterService.state(); | ||||||||||||||||||||
| var geoipIndex = clusterState.getMetadata().getProject(projectId).getIndicesLookup().get(EnterpriseGeoIpDownloader.DATABASES_INDEX); | ||||||||||||||||||||
| if (geoipIndex != null) { | ||||||||||||||||||||
| logger.trace("the geoip index [{}] exists", EnterpriseGeoIpDownloader.DATABASES_INDEX); | ||||||||||||||||||||
|
|
@@ -390,58 +408,123 @@ static byte[] getChunk(InputStream is) throws IOException { | |||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| /** | ||||||||||||||||||||
| * Downloads the geoip databases now, and schedules them to be downloaded again after pollInterval. | ||||||||||||||||||||
| * Cancels the currently scheduled run (if any) and schedules a new (periodic) run to happen immediately, which will then schedule | ||||||||||||||||||||
| * the next periodic run using the poll interval. | ||||||||||||||||||||
| */ | ||||||||||||||||||||
| synchronized void runDownloader() { | ||||||||||||||||||||
| // by the time we reach here, the state will never be null | ||||||||||||||||||||
| assert this.state != null : "this.setState() is null. You need to call setState() before calling runDownloader()"; | ||||||||||||||||||||
|
|
||||||||||||||||||||
| // there's a race condition between here and requestReschedule. originally this scheduleNextRun call was at the end of this | ||||||||||||||||||||
| // block, but remember that updateDatabases can take seconds to run (it's downloading bytes from the internet), and so during the | ||||||||||||||||||||
| // very first run there would be no future run scheduled to reschedule in requestReschedule. which meant that if you went from zero | ||||||||||||||||||||
| // to N(>=2) databases in quick succession, then all but the first database wouldn't necessarily get downloaded, because the | ||||||||||||||||||||
| // requestReschedule call in the EnterpriseGeoIpDownloaderTaskExecutor's clusterChanged wouldn't have a scheduled future run to | ||||||||||||||||||||
| // reschedule. scheduling the next run at the beginning of this run means that there's a much smaller window (milliseconds?, rather | ||||||||||||||||||||
| // than seconds) in which such a race could occur. technically there's a window here, still, but i think it's _greatly_ reduced. | ||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if this race condition is a result of chasing our tail instead of relying on the scheduling code. It looks like we schedule the next run immediately every execution because the first time we run the downloader (in nodeOperation) we don't have a scheduled task to cancel. But if we scheduled that first run (even at zero seconds) then we'd have the scheduled task field initialized for when we want to call it adhoc. And then when we run it adhoc, we aren't racing to set the scheduled field. We could even use a barrier or gate on adhoc runs to enforce that the scheduling thread sets the field before the scheduled function can in the case that the scheduler is suspended strangely. Maybe I'm missing something, thoughts?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Do you mean that this would allow us to move the
elasticsearch/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java Lines 531 to 538 in 4eeaae3
So, I don't think we need to worry about that cluster state race condition anymore. That does simplify things a lot. Then I think we could indeed revert to scheduling the next run at the end of runDownloader() and ensure that works properly.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, looking at this again, I realized I forgot to mention another part of my reasoning for the current changes in this PR. I am personally not a big fan of the "cancel scheduled run" approach, because it creates some uncertainty about what happens if a scheduled run is already in progress. We cancel with elasticsearch/server/src/main/java/org/elasticsearch/common/util/concurrent/FutureUtils.java Line 32 in 4eeaae3
But to me it's not clear what exactly happens with a scheduled run that is already in progress - does the scheduled.cancel() call block/wait for the scheduled run to finish? This was part of the reason why I went the approach of splitting the periodic and ad-hoc runs up, and making them explicitly wait for the run to finish, instead of cancelling one. Let me know what your thoughts are on that!
|
||||||||||||||||||||
| scheduleNextRun(pollIntervalSupplier.get()); | ||||||||||||||||||||
| // TODO regardless of the above comment, i like the idea of checking the lowest last-checked time and then running the math to get | ||||||||||||||||||||
| // to the next interval from then -- maybe that's a neat future enhancement to add | ||||||||||||||||||||
| public void restartPeriodicRun() { | ||||||||||||||||||||
| logger.trace("Restarting periodic run"); | ||||||||||||||||||||
| if (scheduled != null) { | ||||||||||||||||||||
| final boolean cancelSuccessful = scheduled.cancel(); | ||||||||||||||||||||
| logger.trace("Cancelled scheduled run: [{}]", cancelSuccessful); | ||||||||||||||||||||
| } | ||||||||||||||||||||
| if (threadPool.scheduler().isShutdown() == false) { | ||||||||||||||||||||
| threadPool.schedule(this::runPeriodic, TimeValue.ZERO, threadPool.generic()); | ||||||||||||||||||||
| } | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| /** | ||||||||||||||||||||
| * Tries to run the downloader now, if it isn't already currently running, and schedules the next periodic run using the poll interval. | ||||||||||||||||||||
| */ | ||||||||||||||||||||
| private void runPeriodic() { | ||||||||||||||||||||
| if (isCancelled() || isCompleted()) { | ||||||||||||||||||||
| logger.debug("Not running periodic downloader because task is cancelled or completed"); | ||||||||||||||||||||
| return; | ||||||||||||||||||||
| } | ||||||||||||||||||||
| try { | ||||||||||||||||||||
| updateDatabases(); // n.b. this downloads bytes from the internet, it can take a while | ||||||||||||||||||||
| } catch (Exception e) { | ||||||||||||||||||||
| logger.error("exception during databases update", e); | ||||||||||||||||||||
|
|
||||||||||||||||||||
| // If we are not able to acquire the semaphore immediately, it means that a run is already in progress. Periodic runs do not run | ||||||||||||||||||||
| // concurrently, but a cluster state run could be in progress. Since the default poll interval is quite large (3d), there is no | ||||||||||||||||||||
| // need to wait for the current run to finish and then run again, so we just skip this run and schedule the next one. | ||||||||||||||||||||
| if (running.tryAcquire()) { | ||||||||||||||||||||
| final var clusterState = clusterService.state(); | ||||||||||||||||||||
| logger.trace("Running periodic downloader on cluster state [{}]", clusterState.version()); | ||||||||||||||||||||
| runDownloader(clusterState); | ||||||||||||||||||||
| running.release(); | ||||||||||||||||||||
szybia marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||||||||||||||||
| } | ||||||||||||||||||||
| try { | ||||||||||||||||||||
| cleanDatabases(); | ||||||||||||||||||||
| } catch (Exception e) { | ||||||||||||||||||||
| logger.error("exception during databases cleanup", e); | ||||||||||||||||||||
| if (threadPool.scheduler().isShutdown() == false) { | ||||||||||||||||||||
| logger.trace("Scheduling next periodic run, current scheduled run is [{}]", scheduled); | ||||||||||||||||||||
| scheduled = threadPool.schedule(this::runPeriodic, pollIntervalSupplier.get(), threadPool.generic()); | ||||||||||||||||||||
szybia marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||||||||||||||||
| logger.trace("Next periodic run scheduled: [{}]", scheduled); | ||||||||||||||||||||
| } | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| /** | ||||||||||||||||||||
| * This method requests that the downloader be rescheduled to run immediately (presumably because a dynamic property supplied by | ||||||||||||||||||||
| * pollIntervalSupplier or eagerDownloadSupplier has changed, or a pipeline with a geoip processor has been added). This method does | ||||||||||||||||||||
| * nothing if this task is cancelled, completed, or has not yet been scheduled to run for the first time. It cancels any existing | ||||||||||||||||||||
| * scheduled run. | ||||||||||||||||||||
| * This method requests that the downloader runs on the supplied cluster state, which likely contains a change in the GeoIP metadata. | ||||||||||||||||||||
| * If the queue was non-empty before we set it, then a run is already scheduled or in progress, so it will either be processed in the | ||||||||||||||||||||
| * next/current run, or the current run will automatically start a new run when it finishes because the cluster state queue changed | ||||||||||||||||||||
| * while it was running. This method does nothing if this task is cancelled or completed. | ||||||||||||||||||||
| */ | ||||||||||||||||||||
| public void requestReschedule() { | ||||||||||||||||||||
| public void requestRunOnState(ClusterState clusterState) { | ||||||||||||||||||||
| if (isCancelled() || isCompleted() || threadPool.scheduler().isShutdown()) { | ||||||||||||||||||||
| logger.debug("Not requesting downloader run on cluster state because task is cancelled, completed or shutting down"); | ||||||||||||||||||||
| return; | ||||||||||||||||||||
| } | ||||||||||||||||||||
| logger.trace("Requesting downloader run on cluster state [{}]", clusterState.version()); | ||||||||||||||||||||
| if (queue.getAndSet(clusterState) == null) { | ||||||||||||||||||||
|
||||||||||||||||||||
| logger.trace("Scheduling downloader run on cluster state"); | ||||||||||||||||||||
| threadPool.schedule(this::runOnState, TimeValue.ZERO, threadPool.generic()); | ||||||||||||||||||||
| } | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| /** | ||||||||||||||||||||
| * Waits for any current run to finish, then runs the downloader on the last seen cluster state. If a new cluster state came in while | ||||||||||||||||||||
| * waiting or running, then schedules another run to happen immediately after this one. | ||||||||||||||||||||
| */ | ||||||||||||||||||||
| private void runOnState() { | ||||||||||||||||||||
| if (isCancelled() || isCompleted()) { | ||||||||||||||||||||
| logger.debug("Not running downloader on cluster state because task is cancelled or completed"); | ||||||||||||||||||||
| return; | ||||||||||||||||||||
| } | ||||||||||||||||||||
| // Here we do want to wait for the current run (if any) to finish. Since a new cluster state might have arrived while the current | ||||||||||||||||||||
| // run was running, we want to ensure that new cluster state update isn't lost, so we wait and run afterwards. | ||||||||||||||||||||
| logger.trace("Waiting to run downloader on cluster state"); | ||||||||||||||||||||
| try { | ||||||||||||||||||||
| running.acquire(); | ||||||||||||||||||||
| } catch (InterruptedException e) { | ||||||||||||||||||||
| logger.warn("Interrupted while waiting to run downloader on cluster state", e); | ||||||||||||||||||||
nielsbauman marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||||||||||||||||
| } | ||||||||||||||||||||
| // Get the last seen cluster state and process it. | ||||||||||||||||||||
| final ClusterState clusterState = queue.get(); | ||||||||||||||||||||
| assert clusterState != null : "queue was null, but we should only be called if queue was non-null"; | ||||||||||||||||||||
| logger.debug("Running downloader on cluster state [{}]", clusterState.version()); | ||||||||||||||||||||
| runDownloader(clusterState); | ||||||||||||||||||||
| // Try to clear the queue by setting the reference to null. If another cluster state came in since we fetched it above (i.e. the | ||||||||||||||||||||
| // reference differs from `clusterState`), then we schedule another run to happen immediately after this one. | ||||||||||||||||||||
| if (queue.compareAndSet(clusterState, null) == false) { | ||||||||||||||||||||
nielsbauman marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||||||||||||||||
| logger.debug("A new cluster state came in while running, scheduling another run"); | ||||||||||||||||||||
| threadPool.schedule(this::runOnState, TimeValue.ZERO, threadPool.generic()); | ||||||||||||||||||||
|
||||||||||||||||||||
| } | ||||||||||||||||||||
| // We release the semaphore last, to ensure that no duplicate runs/threads are started. | ||||||||||||||||||||
| running.release(); | ||||||||||||||||||||
| logger.trace("Finished running downloader on cluster state [{}]", clusterState.version()); | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| /** | ||||||||||||||||||||
| * Downloads the geoip databases now based on the supplied cluster state. | ||||||||||||||||||||
| */ | ||||||||||||||||||||
| synchronized void runDownloader(ClusterState clusterState) { | ||||||||||||||||||||
| // by the time we reach here, the state will never be null | ||||||||||||||||||||
| assert this.state != null : "this.setState() is null. You need to call setState() before calling runDownloader()"; | ||||||||||||||||||||
|
|
||||||||||||||||||||
| if (isCancelled() || isCompleted()) { | ||||||||||||||||||||
| return; | ||||||||||||||||||||
| } | ||||||||||||||||||||
| if (scheduled != null && scheduled.cancel()) { | ||||||||||||||||||||
| scheduleNextRun(TimeValue.ZERO); | ||||||||||||||||||||
| try { | ||||||||||||||||||||
| updateDatabases(clusterState); // n.b. this downloads bytes from the internet, it can take a while | ||||||||||||||||||||
| } catch (Exception e) { | ||||||||||||||||||||
| logger.error("exception during databases update", e); | ||||||||||||||||||||
| } | ||||||||||||||||||||
| try { | ||||||||||||||||||||
| cleanDatabases(clusterState); | ||||||||||||||||||||
| } catch (Exception e) { | ||||||||||||||||||||
| logger.error("exception during databases cleanup", e); | ||||||||||||||||||||
| } | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| private void cleanDatabases() { | ||||||||||||||||||||
| private void cleanDatabases(ClusterState clusterState) { | ||||||||||||||||||||
| List<Tuple<String, Metadata>> expiredDatabases = state.getDatabases() | ||||||||||||||||||||
| .entrySet() | ||||||||||||||||||||
| .stream() | ||||||||||||||||||||
| .filter(e -> e.getValue().isNewEnough(clusterService.state().metadata().settings()) == false) | ||||||||||||||||||||
| .filter(e -> e.getValue().isNewEnough(clusterState.metadata().settings()) == false) | ||||||||||||||||||||
| .map(entry -> Tuple.tuple(entry.getKey(), entry.getValue())) | ||||||||||||||||||||
| .toList(); | ||||||||||||||||||||
| expiredDatabases.forEach(e -> { | ||||||||||||||||||||
|
|
@@ -461,12 +544,6 @@ protected void onCancelled() { | |||||||||||||||||||
| markAsCompleted(); | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| private void scheduleNextRun(TimeValue time) { | ||||||||||||||||||||
| if (threadPool.scheduler().isShutdown() == false) { | ||||||||||||||||||||
| scheduled = threadPool.schedule(this::runDownloader, time, threadPool.generic()); | ||||||||||||||||||||
| } | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| private ProviderDownload downloaderFor(DatabaseConfiguration database) { | ||||||||||||||||||||
| if (database.provider() instanceof DatabaseConfiguration.Maxmind maxmind) { | ||||||||||||||||||||
| return new MaxmindDownload(database.name(), maxmind); | ||||||||||||||||||||
|
|
||||||||||||||||||||
Uh oh!
There was an error while loading. Please reload this page.