21
21
import org .elasticsearch .cluster .routing .IndexRoutingTable ;
22
22
import org .elasticsearch .cluster .service .ClusterService ;
23
23
import org .elasticsearch .common .hash .MessageDigests ;
24
+ import org .elasticsearch .common .logging .HeaderWarning ;
24
25
import org .elasticsearch .core .CheckedConsumer ;
25
26
import org .elasticsearch .core .CheckedRunnable ;
26
27
import org .elasticsearch .core .IOUtils ;
63
64
import java .util .zip .GZIPInputStream ;
64
65
65
66
import static org .elasticsearch .core .Strings .format ;
67
+ import static org .elasticsearch .persistent .PersistentTasksCustomMetadata .getTaskWithId ;
66
68
67
69
/**
68
70
* A component that is responsible for making the databases maintained by {@link GeoIpDownloader}
77
79
* 2) For each database check whether the databases have changed
78
80
* by comparing the local and remote md5 hash or are locally missing.
79
81
* 3) For each database identified in step 2 start downloading the database
80
- * chunks. Each chunks is appended to a tmp file (inside geoip tmp dir) and
82
+ * chunks. Each chunk is appended to a tmp file (inside geoip tmp dir) and
81
83
* after all chunks have been downloaded, the database is uncompressed and
82
84
* renamed to the final filename.After this the database is loaded and
83
85
* if there is an old instance of this database then that is closed.
84
86
* 4) Cleanup locally loaded databases that are no longer mentioned in {@link GeoIpTaskState}.
85
87
*/
86
- public final class DatabaseNodeService implements Closeable {
88
+ public final class DatabaseNodeService implements GeoIpDatabaseProvider , Closeable {
87
89
88
90
private static final Logger LOGGER = LogManager .getLogger (DatabaseNodeService .class );
89
91
@@ -93,34 +95,45 @@ public final class DatabaseNodeService implements Closeable {
93
95
private Path geoipTmpDirectory ;
94
96
private final ConfigDatabases configDatabases ;
95
97
private final Consumer <Runnable > genericExecutor ;
98
+ private final ClusterService clusterService ;
96
99
private IngestService ingestService ;
97
100
98
101
private final ConcurrentMap <String , DatabaseReaderLazyLoader > databases = new ConcurrentHashMap <>();
99
102
100
- DatabaseNodeService (Environment environment , Client client , GeoIpCache cache , Consumer <Runnable > genericExecutor ) {
103
+ DatabaseNodeService (
104
+ Environment environment ,
105
+ Client client ,
106
+ GeoIpCache cache ,
107
+ Consumer <Runnable > genericExecutor ,
108
+ ClusterService clusterService
109
+ ) {
101
110
this (
102
111
environment .tmpFile (),
103
112
new OriginSettingClient (client , IngestService .INGEST_ORIGIN ),
104
113
cache ,
105
114
new ConfigDatabases (environment , cache ),
106
- genericExecutor
115
+ genericExecutor ,
116
+ clusterService
107
117
);
108
118
}
109
119
110
- DatabaseNodeService (Path tmpDir , Client client , GeoIpCache cache , ConfigDatabases configDatabases , Consumer <Runnable > genericExecutor ) {
120
+ DatabaseNodeService (
121
+ Path tmpDir ,
122
+ Client client ,
123
+ GeoIpCache cache ,
124
+ ConfigDatabases configDatabases ,
125
+ Consumer <Runnable > genericExecutor ,
126
+ ClusterService clusterService
127
+ ) {
111
128
this .client = client ;
112
129
this .cache = cache ;
113
130
this .geoipTmpBaseDirectory = tmpDir .resolve ("geoip-databases" );
114
131
this .configDatabases = configDatabases ;
115
132
this .genericExecutor = genericExecutor ;
133
+ this .clusterService = clusterService ;
116
134
}
117
135
118
- public void initialize (
119
- String nodeId ,
120
- ResourceWatcherService resourceWatcher ,
121
- IngestService ingestServiceArg ,
122
- ClusterService clusterService
123
- ) throws IOException {
136
+ public void initialize (String nodeId , ResourceWatcherService resourceWatcher , IngestService ingestServiceArg ) throws IOException {
124
137
configDatabases .initialize (resourceWatcher );
125
138
geoipTmpDirectory = geoipTmpBaseDirectory .resolve (nodeId );
126
139
Files .walkFileTree (geoipTmpDirectory , new FileVisitor <>() {
@@ -161,7 +174,35 @@ public FileVisitResult postVisitDirectory(Path dir, IOException exc) {
161
174
clusterService .addListener (event -> checkDatabases (event .state ()));
162
175
}
163
176
164
- public DatabaseReaderLazyLoader getDatabase (String name ) {
177
+ @ Override
178
+ public Boolean isValid (String databaseFile ) {
179
+ ClusterState currentState = clusterService .state ();
180
+ assert currentState != null ;
181
+
182
+ PersistentTasksCustomMetadata .PersistentTask <?> task = getTaskWithId (currentState , GeoIpDownloader .GEOIP_DOWNLOADER );
183
+ if (task == null || task .getState () == null ) {
184
+ return true ;
185
+ }
186
+ GeoIpTaskState state = (GeoIpTaskState ) task .getState ();
187
+ GeoIpTaskState .Metadata metadata = state .getDatabases ().get (databaseFile );
188
+ // we never remove metadata from cluster state, if metadata is null we deal with built-in database, which is always valid
189
+ if (metadata == null ) {
190
+ return true ;
191
+ }
192
+
193
+ boolean valid = metadata .isValid (currentState .metadata ().settings ());
194
+ if (valid && metadata .isCloseToExpiration ()) {
195
+ HeaderWarning .addWarning (
196
+ "database [{}] was not updated for over 25 days, geoip processor" + " will stop working if there is no update for 30 days" ,
197
+ databaseFile
198
+ );
199
+ }
200
+
201
+ return valid ;
202
+ }
203
+
204
+ // for testing only:
205
+ DatabaseReaderLazyLoader getDatabaseReaderLazyLoader (String name ) {
165
206
// There is a need for reference counting in order to avoid using an instance
166
207
// that gets closed while using it. (this can happen during a database update)
167
208
while (true ) {
@@ -174,6 +215,11 @@ public DatabaseReaderLazyLoader getDatabase(String name) {
174
215
}
175
216
}
176
217
218
+ @ Override
219
+ public GeoIpDatabase getDatabase (String name ) {
220
+ return getDatabaseReaderLazyLoader (name );
221
+ }
222
+
177
223
List <DatabaseReaderLazyLoader > getAllDatabases () {
178
224
List <DatabaseReaderLazyLoader > all = new ArrayList <>(configDatabases .getConfigDatabases ().values ());
179
225
this .databases .forEach ((key , value ) -> all .add (value ));
0 commit comments