|
16 | 16 | import org.elasticsearch.client.internal.Client;
|
17 | 17 | import org.elasticsearch.client.internal.OriginSettingClient;
|
18 | 18 | import org.elasticsearch.cluster.ClusterState;
|
| 19 | +import org.elasticsearch.cluster.ProjectState; |
19 | 20 | import org.elasticsearch.cluster.metadata.IndexAbstraction;
|
20 | 21 | import org.elasticsearch.cluster.metadata.ProjectId;
|
21 | 22 | import org.elasticsearch.cluster.metadata.ProjectMetadata;
|
@@ -288,104 +289,102 @@ void checkDatabases(ClusterState state) {
|
288 | 289 | }
|
289 | 290 |
|
290 | 291 | // Optimization: only load the .geoip_databases for projects that are allocated to this node
|
291 |
| - for (ProjectMetadata projectMetadata : state.metadata().projects().values()) { |
292 |
| - ProjectId projectId = projectMetadata.id(); |
| 292 | + state.forEachProject(this::checkDatabases); |
| 293 | + } |
293 | 294 |
|
294 |
| - PersistentTasksCustomMetadata persistentTasks = projectMetadata.custom(PersistentTasksCustomMetadata.TYPE); |
295 |
| - if (persistentTasks == null) { |
296 |
| - logger.trace("Not checking databases for project [{}] because persistent tasks are null", projectId); |
297 |
| - continue; |
298 |
| - } |
| 295 | + void checkDatabases(ProjectState projectState) { |
| 296 | + ProjectId projectId = projectState.projectId(); |
| 297 | + ProjectMetadata projectMetadata = projectState.metadata(); |
| 298 | + PersistentTasksCustomMetadata persistentTasks = projectMetadata.custom(PersistentTasksCustomMetadata.TYPE); |
| 299 | + if (persistentTasks == null) { |
| 300 | + logger.trace("Not checking databases for project [{}] because persistent tasks are null", projectId); |
| 301 | + return; |
| 302 | + } |
299 | 303 |
|
300 |
| - IndexAbstraction databasesAbstraction = projectMetadata.getIndicesLookup().get(GeoIpDownloader.DATABASES_INDEX); |
301 |
| - if (databasesAbstraction == null) { |
302 |
| - logger.trace("Not checking databases because geoip databases index does not exist for project [{}]", projectId); |
| 304 | + IndexAbstraction databasesAbstraction = projectMetadata.getIndicesLookup().get(GeoIpDownloader.DATABASES_INDEX); |
| 305 | + if (databasesAbstraction == null) { |
| 306 | + logger.trace("Not checking databases because geoip databases index does not exist for project [{}]", projectId); |
| 307 | + return; |
| 308 | + } else { |
| 309 | + // regardless of whether DATABASES_INDEX is an alias, resolve it to a concrete index |
| 310 | + Index databasesIndex = databasesAbstraction.getWriteIndex(); |
| 311 | + IndexRoutingTable databasesIndexRT = projectState.routingTable().index(databasesIndex); |
| 312 | + if (databasesIndexRT == null || databasesIndexRT.allPrimaryShardsActive() == false) { |
| 313 | + logger.trace( |
| 314 | + "Not checking databases because geoip databases index does not have all active primary shards for project [{}]", |
| 315 | + projectId |
| 316 | + ); |
303 | 317 | return;
|
304 |
| - } else { |
305 |
| - // regardless of whether DATABASES_INDEX is an alias, resolve it to a concrete index |
306 |
| - Index databasesIndex = databasesAbstraction.getWriteIndex(); |
307 |
| - IndexRoutingTable databasesIndexRT = state.routingTable(projectId).index(databasesIndex); |
308 |
| - if (databasesIndexRT == null || databasesIndexRT.allPrimaryShardsActive() == false) { |
309 |
| - logger.trace( |
310 |
| - "Not checking databases because geoip databases index does not have all active primary shards for" |
311 |
| - + " project [{}]", |
312 |
| - projectId |
313 |
| - ); |
314 |
| - return; |
315 |
| - } |
316 | 318 | }
|
| 319 | + } |
317 | 320 |
|
318 |
| - // we'll consult each of the geoip downloaders to build up a list of database metadatas to work with |
319 |
| - List<Tuple<String, GeoIpTaskState.Metadata>> validMetadatas = new ArrayList<>(); |
| 321 | + // we'll consult each of the geoip downloaders to build up a list of database metadatas to work with |
| 322 | + List<Tuple<String, GeoIpTaskState.Metadata>> validMetadatas = new ArrayList<>(); |
320 | 323 |
|
321 |
| - // process the geoip task state for the (ordinary) geoip downloader |
322 |
| - { |
323 |
| - GeoIpTaskState taskState = getGeoIpTaskState( |
324 |
| - projectMetadata, |
325 |
| - getTaskId(projectId, projectResolver.supportsMultipleProjects()) |
326 |
| - ); |
327 |
| - if (taskState == null) { |
328 |
| - // Note: an empty state will purge stale entries in databases map |
329 |
| - taskState = GeoIpTaskState.EMPTY; |
330 |
| - } |
331 |
| - validMetadatas.addAll( |
332 |
| - taskState.getDatabases() |
333 |
| - .entrySet() |
334 |
| - .stream() |
335 |
| - .filter(e -> e.getValue().isNewEnough(state.getMetadata().settings())) |
336 |
| - .map(entry -> Tuple.tuple(entry.getKey(), entry.getValue())) |
337 |
| - .toList() |
338 |
| - ); |
| 324 | + // process the geoip task state for the (ordinary) geoip downloader |
| 325 | + { |
| 326 | + GeoIpTaskState taskState = getGeoIpTaskState(projectMetadata, getTaskId(projectId, projectResolver.supportsMultipleProjects())); |
| 327 | + if (taskState == null) { |
| 328 | + // Note: an empty state will purge stale entries in databases map |
| 329 | + taskState = GeoIpTaskState.EMPTY; |
339 | 330 | }
|
| 331 | + validMetadatas.addAll( |
| 332 | + taskState.getDatabases() |
| 333 | + .entrySet() |
| 334 | + .stream() |
| 335 | + .filter(e -> e.getValue().isNewEnough(projectState.cluster().metadata().settings())) |
| 336 | + .map(entry -> Tuple.tuple(entry.getKey(), entry.getValue())) |
| 337 | + .toList() |
| 338 | + ); |
| 339 | + } |
340 | 340 |
|
341 |
| - // process the geoip task state for the enterprise geoip downloader |
342 |
| - { |
343 |
| - EnterpriseGeoIpTaskState taskState = getEnterpriseGeoIpTaskState(state); |
344 |
| - if (taskState == null) { |
345 |
| - // Note: an empty state will purge stale entries in databases map |
346 |
| - taskState = EnterpriseGeoIpTaskState.EMPTY; |
347 |
| - } |
348 |
| - validMetadatas.addAll( |
349 |
| - taskState.getDatabases() |
350 |
| - .entrySet() |
351 |
| - .stream() |
352 |
| - .filter(e -> e.getValue().isNewEnough(state.getMetadata().settings())) |
353 |
| - .map(entry -> Tuple.tuple(entry.getKey(), entry.getValue())) |
354 |
| - .toList() |
355 |
| - ); |
| 341 | + // process the geoip task state for the enterprise geoip downloader |
| 342 | + { |
| 343 | + EnterpriseGeoIpTaskState taskState = getEnterpriseGeoIpTaskState(projectState.metadata()); |
| 344 | + if (taskState == null) { |
| 345 | + // Note: an empty state will purge stale entries in databases map |
| 346 | + taskState = EnterpriseGeoIpTaskState.EMPTY; |
356 | 347 | }
|
| 348 | + validMetadatas.addAll( |
| 349 | + taskState.getDatabases() |
| 350 | + .entrySet() |
| 351 | + .stream() |
| 352 | + .filter(e -> e.getValue().isNewEnough(projectState.cluster().metadata().settings())) |
| 353 | + .map(entry -> Tuple.tuple(entry.getKey(), entry.getValue())) |
| 354 | + .toList() |
| 355 | + ); |
| 356 | + } |
357 | 357 |
|
358 |
| - // run through all the valid metadatas, regardless of source, and retrieve them if the persistent downloader task |
359 |
| - // has downloaded a new version of the databases |
360 |
| - validMetadatas.forEach(e -> { |
361 |
| - String name = e.v1(); |
362 |
| - GeoIpTaskState.Metadata metadata = e.v2(); |
363 |
| - DatabaseReaderLazyLoader reference = getProjectLazyLoader(projectId, name); |
364 |
| - String remoteMd5 = metadata.md5(); |
365 |
| - String localMd5 = reference != null ? reference.getMd5() : null; |
366 |
| - if (Objects.equals(localMd5, remoteMd5)) { |
367 |
| - logger.debug("[{}] is up to date [{}] with cluster state [{}]", name, localMd5, remoteMd5); |
368 |
| - return; |
369 |
| - } |
| 358 | + // run through all the valid metadatas, regardless of source, and retrieve them if the persistent downloader task |
| 359 | + // has downloaded a new version of the databases |
| 360 | + validMetadatas.forEach(e -> { |
| 361 | + String name = e.v1(); |
| 362 | + GeoIpTaskState.Metadata metadata = e.v2(); |
| 363 | + DatabaseReaderLazyLoader reference = getProjectLazyLoader(projectId, name); |
| 364 | + String remoteMd5 = metadata.md5(); |
| 365 | + String localMd5 = reference != null ? reference.getMd5() : null; |
| 366 | + if (Objects.equals(localMd5, remoteMd5)) { |
| 367 | + logger.debug("[{}] is up to date [{}] with cluster state [{}]", name, localMd5, remoteMd5); |
| 368 | + return; |
| 369 | + } |
370 | 370 |
|
371 |
| - try { |
372 |
| - retrieveAndUpdateDatabase(projectId, name, metadata); |
373 |
| - } catch (Exception ex) { |
374 |
| - logger.error(() -> "failed to retrieve database [" + name + "]", ex); |
375 |
| - } |
376 |
| - }); |
377 |
| - |
378 |
| - // TODO perhaps we need to handle the license flap persistent task state better than we do |
379 |
| - // i think the ideal end state is that we *do not* drop the files that the enterprise downloader |
380 |
| - // handled if they fall out -- which means we need to track that in the databases map itself |
381 |
| - |
382 |
| - // start with the list of all databases we currently know about in this service, |
383 |
| - // then drop the ones that didn't check out as valid from the task states |
384 |
| - if (databases.containsKey(projectId)) { |
385 |
| - Set<String> staleDatabases = new HashSet<>(databases.get(projectId).keySet()); |
386 |
| - staleDatabases.removeAll(validMetadatas.stream().map(Tuple::v1).collect(Collectors.toSet())); |
387 |
| - removeStaleEntries(projectId, staleDatabases); |
| 371 | + try { |
| 372 | + retrieveAndUpdateDatabase(projectId, name, metadata); |
| 373 | + } catch (Exception ex) { |
| 374 | + logger.error(() -> "failed to retrieve database [" + name + "]", ex); |
388 | 375 | }
|
| 376 | + }); |
| 377 | + |
| 378 | + // TODO perhaps we need to handle the license flap persistent task state better than we do |
| 379 | + // i think the ideal end state is that we *do not* drop the files that the enterprise downloader |
| 380 | + // handled if they fall out -- which means we need to track that in the databases map itself |
| 381 | + |
| 382 | + // start with the list of all databases we currently know about in this service, |
| 383 | + // then drop the ones that didn't check out as valid from the task states |
| 384 | + if (databases.containsKey(projectId)) { |
| 385 | + Set<String> staleDatabases = new HashSet<>(databases.get(projectId).keySet()); |
| 386 | + staleDatabases.removeAll(validMetadatas.stream().map(Tuple::v1).collect(Collectors.toSet())); |
| 387 | + removeStaleEntries(projectId, staleDatabases); |
389 | 388 | }
|
390 | 389 | }
|
391 | 390 |
|
|
0 commit comments