Skip to content

Commit fda20c1

Browse files
committed
Extract AbstractGeoIpDownloader for shared logic
1 parent c751961 commit fda20c1

File tree

4 files changed

+180
-174
lines changed

4 files changed

+180
-174
lines changed
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.ingest.geoip;
11+
12+
import org.apache.logging.log4j.LogManager;
13+
import org.apache.logging.log4j.Logger;
14+
import org.elasticsearch.core.TimeValue;
15+
import org.elasticsearch.persistent.AllocatedPersistentTask;
16+
import org.elasticsearch.tasks.TaskId;
17+
import org.elasticsearch.threadpool.Scheduler;
18+
import org.elasticsearch.threadpool.ThreadPool;
19+
20+
import java.util.Map;
21+
import java.util.concurrent.atomic.AtomicInteger;
22+
import java.util.function.Supplier;
23+
24+
/**
25+
* Abstract base class for GeoIP downloaders that download GeoIP databases.
26+
*/
27+
public abstract class AbstractGeoIpDownloader extends AllocatedPersistentTask {
28+
29+
private final Logger logger;
30+
private final ThreadPool threadPool;
31+
/**
32+
* The currently scheduled periodic run. Only null before first periodic run.
33+
*/
34+
private volatile Scheduler.ScheduledCancellable scheduledPeriodicRun;
35+
/**
36+
* The number of requested runs. If this is greater than 0, then a run is either in progress or scheduled to run as soon as possible.
37+
*/
38+
private final AtomicInteger queuedRuns = new AtomicInteger(0);
39+
private final Supplier<TimeValue> pollIntervalSupplier;
40+
41+
public AbstractGeoIpDownloader(
42+
long id,
43+
String type,
44+
String action,
45+
String description,
46+
TaskId parentTask,
47+
Map<String, String> headers,
48+
ThreadPool threadPool,
49+
Supplier<TimeValue> pollIntervalSupplier
50+
) {
51+
super(id, type, action, description, parentTask, headers);
52+
this.logger = LogManager.getLogger(getClass());
53+
this.threadPool = threadPool;
54+
this.pollIntervalSupplier = pollIntervalSupplier;
55+
}
56+
57+
/**
58+
* Cancels the currently scheduled run (if any) and schedules a new periodic run using the current poll interval, then requests
59+
* that the downloader runs on demand now. The main reason we need that last step is that if this persistent task
60+
* gets reassigned to a different node, we want to run the downloader immediately on that new node, not wait for the next periodic run.
61+
*/
62+
public void restartPeriodicRun() {
63+
if (isCancelled() || isCompleted() || threadPool.scheduler().isShutdown()) {
64+
logger.debug("Not restarting periodic run because task is cancelled, completed, or shutting down");
65+
return;
66+
}
67+
logger.debug("Restarting periodic run");
68+
// We synchronize to ensure we only have one scheduledPeriodicRun at a time.
69+
synchronized (this) {
70+
if (scheduledPeriodicRun != null) {
71+
// Technically speaking, there's a chance that the scheduled run is already running, in which case cancelling it here does
72+
// nothing. That means that we might end up with two periodic runs scheduled close together. However, that's unlikely to
73+
// happen and relatively harmless if it does, as we only end up running the downloader more often than strictly necessary.
74+
final boolean cancelSuccessful = scheduledPeriodicRun.cancel();
75+
logger.debug("Cancelled scheduled run: [{}]", cancelSuccessful);
76+
}
77+
// This is based on the premise that the poll interval is sufficiently large that we don't need to worry about
78+
// the scheduled `runPeriodic` running before this method completes.
79+
scheduledPeriodicRun = threadPool.schedule(this::runPeriodic, pollIntervalSupplier.get(), threadPool.generic());
80+
}
81+
// Technically, with multiple rapid calls to restartPeriodicRun, we could end up with multiple calls to requestRunOnDemand, but
82+
// that's unlikely to happen and harmless if it does, as we only end up running the downloader more often than strictly necessary.
83+
requestRunOnDemand();
84+
}
85+
86+
/**
87+
* Runs the downloader now and schedules the next periodic run using the poll interval.
88+
*/
89+
private void runPeriodic() {
90+
if (isCancelled() || isCompleted() || threadPool.scheduler().isShutdown()) {
91+
logger.debug("Not running periodic downloader because task is cancelled, completed, or shutting down");
92+
return;
93+
}
94+
95+
logger.trace("Running periodic downloader");
96+
// There's a chance that an on-demand run is already in progress, in which case this periodic run is redundant.
97+
// However, we don't try to avoid that case here, as it's harmless to run the downloader more than strictly necessary (due to
98+
// the high default poll interval of 3d), and it simplifies the logic considerably.
99+
requestRunOnDemand();
100+
101+
synchronized (this) {
102+
scheduledPeriodicRun = threadPool.schedule(this::runPeriodic, pollIntervalSupplier.get(), threadPool.generic());
103+
}
104+
}
105+
106+
/**
107+
* This method requests that the downloader runs on the latest cluster state, which likely contains a change in the GeoIP metadata.
108+
* This method does nothing if this task is cancelled or completed.
109+
*/
110+
public void requestRunOnDemand() {
111+
if (isCancelled() || isCompleted()) {
112+
logger.debug("Not requesting downloader to run on demand because task is cancelled or completed");
113+
return;
114+
}
115+
logger.trace("Requesting downloader run on demand");
116+
// If queuedRuns was greater than 0, then either a run is in progress and it will fire off another run when it finishes,
117+
// or a run is scheduled to run as soon as possible and it will include the latest cluster state.
118+
// If it was 0, we set it to 1 to indicate that a run is scheduled to run as soon as possible and schedule it now.
119+
if (queuedRuns.getAndIncrement() == 0) {
120+
logger.trace("Scheduling downloader run on demand");
121+
threadPool.generic().submit(this::runOnDemand);
122+
}
123+
}
124+
125+
/**
126+
* Runs the downloader on the latest cluster state. {@link #queuedRuns} protects against multiple concurrent runs and ensures that
127+
* if a run is requested while this method is running, then another run will be scheduled to run as soon as this method finishes.
128+
*/
129+
private void runOnDemand() {
130+
if (isCancelled() || isCompleted()) {
131+
logger.debug("Not running downloader on demand because task is cancelled or completed");
132+
return;
133+
}
134+
// Capture the current queue size, so that if another run is requested while we're running, we'll know at the end of this method
135+
// whether we need to run again.
136+
final int currentQueueSize = queuedRuns.get();
137+
logger.trace("Running downloader on demand");
138+
try {
139+
runDownloader();
140+
logger.trace("Downloader completed successfully");
141+
} finally {
142+
// If any exception was thrown during runDownloader, we still want to check queuedRuns.
143+
// Subtract this "batch" of runs from queuedRuns.
144+
// If queuedRuns is still > 0, then a run was requested while we were running, so we need to run again.
145+
if (queuedRuns.addAndGet(-currentQueueSize) > 0) {
146+
logger.debug("Downloader on demand requested again while running, scheduling another run");
147+
threadPool.generic().submit(this::runOnDemand);
148+
}
149+
}
150+
}
151+
152+
/**
153+
* Download, update, and clean up GeoIP databases as required by the GeoIP processors in the cluster.
154+
*/
155+
abstract void runDownloader();
156+
157+
@Override
158+
protected void onCancelled() {
159+
synchronized (this) {
160+
if (scheduledPeriodicRun != null) {
161+
scheduledPeriodicRun.cancel();
162+
}
163+
}
164+
markAsCompleted();
165+
}
166+
}

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloader.java

Lines changed: 3 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,8 @@
3535
import org.elasticsearch.ingest.geoip.GeoIpTaskState.Metadata;
3636
import org.elasticsearch.ingest.geoip.direct.DatabaseConfiguration;
3737
import org.elasticsearch.ingest.geoip.direct.DatabaseConfigurationMetadata;
38-
import org.elasticsearch.persistent.AllocatedPersistentTask;
3938
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask;
4039
import org.elasticsearch.tasks.TaskId;
41-
import org.elasticsearch.threadpool.Scheduler;
4240
import org.elasticsearch.threadpool.ThreadPool;
4341
import org.elasticsearch.xcontent.XContentParser;
4442
import org.elasticsearch.xcontent.XContentParserConfiguration;
@@ -55,7 +53,6 @@
5553
import java.util.Map;
5654
import java.util.Objects;
5755
import java.util.Set;
58-
import java.util.concurrent.atomic.AtomicInteger;
5956
import java.util.function.Function;
6057
import java.util.function.Supplier;
6158
import java.util.regex.Pattern;
@@ -73,7 +70,7 @@
7370
@NotMultiProjectCapable(
7471
description = "Enterprise GeoIP not available in serverless, we should review this class for MP again after serverless is enabled"
7572
)
76-
public class EnterpriseGeoIpDownloader extends AllocatedPersistentTask {
73+
public class EnterpriseGeoIpDownloader extends AbstractGeoIpDownloader {
7774

7875
private static final Logger logger = LogManager.getLogger(EnterpriseGeoIpDownloader.class);
7976

@@ -105,19 +102,9 @@ public class EnterpriseGeoIpDownloader extends AllocatedPersistentTask {
105102
private final Client client;
106103
private final HttpClient httpClient;
107104
private final ClusterService clusterService;
108-
private final ThreadPool threadPool;
109105

110106
// visible for testing
111107
protected volatile EnterpriseGeoIpTaskState state;
112-
/**
113-
* The currently scheduled periodic run. Only null before first periodic run.
114-
*/
115-
private volatile Scheduler.ScheduledCancellable scheduledPeriodicRun;
116-
/**
117-
* The number of requested runs. If this is greater than 0, then a run is either in progress or scheduled to run as soon as possible.
118-
*/
119-
private final AtomicInteger queuedRuns = new AtomicInteger(0);
120-
private final Supplier<TimeValue> pollIntervalSupplier;
121108
private final Function<String, char[]> tokenProvider;
122109

123110
EnterpriseGeoIpDownloader(
@@ -134,12 +121,10 @@ public class EnterpriseGeoIpDownloader extends AllocatedPersistentTask {
134121
Supplier<TimeValue> pollIntervalSupplier,
135122
Function<String, char[]> tokenProvider
136123
) {
137-
super(id, type, action, description, parentTask, headers);
124+
super(id, type, action, description, parentTask, headers, threadPool, pollIntervalSupplier);
138125
this.client = client;
139126
this.httpClient = httpClient;
140127
this.clusterService = clusterService;
141-
this.threadPool = threadPool;
142-
this.pollIntervalSupplier = pollIntervalSupplier;
143128
this.tokenProvider = tokenProvider;
144129
}
145130

@@ -397,104 +382,7 @@ static byte[] getChunk(InputStream is) throws IOException {
397382
return buf;
398383
}
399384

400-
/**
401-
* Cancels the currently scheduled run (if any) and schedules a new periodic run using the current poll interval, then requests
402-
* that the downloader runs on demand now. The main reason we need that last step is that if this persistent task
403-
* gets reassigned to a different node, we want to run the downloader immediately on that new node, not wait for the next periodic run.
404-
*/
405-
public void restartPeriodicRun() {
406-
if (isCancelled() || isCompleted() || threadPool.scheduler().isShutdown()) {
407-
logger.debug("Not restarting periodic run because task is cancelled, completed, or shutting down");
408-
return;
409-
}
410-
logger.debug("Restarting periodic run");
411-
// We synchronize to ensure we only have one scheduledPeriodicRun at a time.
412-
synchronized (this) {
413-
if (scheduledPeriodicRun != null) {
414-
// Technically speaking, there's a chance that the scheduled run is already running, in which case cancelling it here does
415-
// nothing. That means that we might end up with two periodic runs scheduled close together. However, that's unlikely to
416-
// happen and relatively harmless if it does, as we only end up running the downloader more often than strictly necessary.
417-
final boolean cancelSuccessful = scheduledPeriodicRun.cancel();
418-
logger.debug("Cancelled scheduled run: [{}]", cancelSuccessful);
419-
}
420-
// This is based on the premise that the poll interval is sufficiently large that we don't need to worry about
421-
// the scheduled `runPeriodic` running before this method completes.
422-
scheduledPeriodicRun = threadPool.schedule(this::runPeriodic, pollIntervalSupplier.get(), threadPool.generic());
423-
}
424-
// Technically, with multiple rapid calls to restartPeriodicRun, we could end up with multiple calls to requestRunOnDemand, but
425-
// that's unlikely to happen and harmless if it does, as we only end up running the downloader more often than strictly necessary.
426-
requestRunOnDemand();
427-
}
428-
429-
/**
430-
* Runs the downloader now and schedules the next periodic run using the poll interval.
431-
*/
432-
private void runPeriodic() {
433-
if (isCancelled() || isCompleted() || threadPool.scheduler().isShutdown()) {
434-
logger.debug("Not running periodic downloader because task is cancelled, completed, or shutting down");
435-
return;
436-
}
437-
438-
logger.trace("Running periodic downloader");
439-
// There's a chance that an on-demand run is already in progress, in which case this periodic run is redundant.
440-
// However, we don't try to avoid that case here, as it's harmless to run the downloader more than strictly necessary (due to
441-
// the high default poll interval of 3d), and it simplifies the logic considerably.
442-
requestRunOnDemand();
443-
444-
synchronized (this) {
445-
scheduledPeriodicRun = threadPool.schedule(this::runPeriodic, pollIntervalSupplier.get(), threadPool.generic());
446-
}
447-
}
448-
449-
/**
450-
* This method requests that the downloader runs on the latest cluster state, which likely contains a change in the GeoIP metadata.
451-
* This method does nothing if this task is cancelled or completed.
452-
*/
453-
public void requestRunOnDemand() {
454-
if (isCancelled() || isCompleted()) {
455-
logger.debug("Not requesting downloader to run on demand because task is cancelled or completed");
456-
return;
457-
}
458-
logger.trace("Requesting downloader run on demand");
459-
// If queuedRuns was greater than 0, then either a run is in progress and it will fire off another run when it finishes,
460-
// or a run is scheduled to run as soon as possible and it will include the latest cluster state.
461-
// If it was 0, we set it to 1 to indicate that a run is scheduled to run as soon as possible and schedule it now.
462-
if (queuedRuns.getAndIncrement() == 0) {
463-
logger.trace("Scheduling downloader run on demand");
464-
threadPool.generic().submit(this::runOnDemand);
465-
}
466-
}
467-
468-
/**
469-
* Runs the downloader on the latest cluster state. {@link #queuedRuns} protects against multiple concurrent runs and ensures that
470-
* if a run is requested while this method is running, then another run will be scheduled to run as soon as this method finishes.
471-
*/
472-
private void runOnDemand() {
473-
if (isCancelled() || isCompleted()) {
474-
logger.debug("Not running downloader on demand because task is cancelled or completed");
475-
return;
476-
}
477-
// Capture the current queue size, so that if another run is requested while we're running, we'll know at the end of this method
478-
// whether we need to run again.
479-
final int currentQueueSize = queuedRuns.get();
480-
logger.trace("Running downloader on demand");
481-
try {
482-
runDownloader();
483-
logger.trace("Downloader completed successfully");
484-
} finally {
485-
// If any exception was thrown during runDownloader, we still want to check queuedRuns.
486-
// Subtract this "batch" of runs from queuedRuns.
487-
// If queuedRuns is still > 0, then a run was requested while we were running, so we need to run again.
488-
if (queuedRuns.addAndGet(-currentQueueSize) > 0) {
489-
logger.debug("Downloader on demand requested again while running, scheduling another run");
490-
threadPool.generic().submit(this::runOnDemand);
491-
}
492-
}
493-
}
494-
495-
/**
496-
* Downloads the geoip databases now based on the supplied cluster state.
497-
*/
385+
@Override
498386
void runDownloader() {
499387
if (isCancelled() || isCompleted()) {
500388
logger.debug("Not running downloader because task is cancelled or completed");
@@ -531,16 +419,6 @@ private void cleanDatabases() {
531419
});
532420
}
533421

534-
@Override
535-
protected void onCancelled() {
536-
synchronized (this) {
537-
if (scheduledPeriodicRun != null) {
538-
scheduledPeriodicRun.cancel();
539-
}
540-
}
541-
markAsCompleted();
542-
}
543-
544422
private ProviderDownload downloaderFor(DatabaseConfiguration database) {
545423
if (database.provider() instanceof DatabaseConfiguration.Maxmind maxmind) {
546424
return new MaxmindDownload(database.name(), maxmind);

0 commit comments

Comments
 (0)