Skip to content

Commit 22cc67c

Browse files
committed
Update downloader
1 parent 6ce72b7 commit 22cc67c

File tree

4 files changed

+52
-55
lines changed

4 files changed

+52
-55
lines changed

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java

Lines changed: 8 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import org.elasticsearch.action.support.PlainActionFuture;
1919
import org.elasticsearch.client.internal.Client;
2020
import org.elasticsearch.cluster.block.ClusterBlockLevel;
21-
import org.elasticsearch.cluster.project.ProjectResolver;
21+
import org.elasticsearch.cluster.metadata.ProjectId;
2222
import org.elasticsearch.cluster.service.ClusterService;
2323
import org.elasticsearch.common.hash.MessageDigests;
2424
import org.elasticsearch.common.settings.Setting;
@@ -96,37 +96,8 @@ public class GeoIpDownloader extends AllocatedPersistentTask {
9696
*/
9797
private final Supplier<Boolean> atLeastOneGeoipProcessorSupplier;
9898

99-
private final ProjectResolver projectResolver;
99+
private final ProjectId projectId;
100100

101-
GeoIpDownloader(
102-
Client client,
103-
HttpClient httpClient,
104-
ClusterService clusterService,
105-
ThreadPool threadPool,
106-
Settings settings,
107-
long id,
108-
String type,
109-
String action,
110-
String description,
111-
TaskId parentTask,
112-
Map<String, String> headers,
113-
Supplier<TimeValue> pollIntervalSupplier,
114-
Supplier<Boolean> eagerDownloadSupplier,
115-
Supplier<Boolean> atLeastOneGeoipProcessorSupplier
116-
) {
117-
super(id, type, action, description, parentTask, headers);
118-
this.client = client;
119-
this.httpClient = httpClient;
120-
this.clusterService = clusterService;
121-
this.threadPool = threadPool;
122-
this.endpoint = ENDPOINT_SETTING.get(settings);
123-
this.pollIntervalSupplier = pollIntervalSupplier;
124-
this.eagerDownloadSupplier = eagerDownloadSupplier;
125-
this.atLeastOneGeoipProcessorSupplier = atLeastOneGeoipProcessorSupplier;
126-
this.projectResolver = null;
127-
}
128-
129-
// TODO: consolidate this constructor with the one above
130101
GeoIpDownloader(
131102
Client client,
132103
HttpClient httpClient,
@@ -142,7 +113,7 @@ public class GeoIpDownloader extends AllocatedPersistentTask {
142113
Supplier<TimeValue> pollIntervalSupplier,
143114
Supplier<Boolean> eagerDownloadSupplier,
144115
Supplier<Boolean> atLeastOneGeoipProcessorSupplier,
145-
ProjectResolver projectResolver
116+
ProjectId projectId
146117
) {
147118
super(id, type, action, description, parentTask, headers);
148119
this.client = client;
@@ -153,7 +124,7 @@ public class GeoIpDownloader extends AllocatedPersistentTask {
153124
this.pollIntervalSupplier = pollIntervalSupplier;
154125
this.eagerDownloadSupplier = eagerDownloadSupplier;
155126
this.atLeastOneGeoipProcessorSupplier = atLeastOneGeoipProcessorSupplier;
156-
this.projectResolver = projectResolver;
127+
this.projectId = projectId;
157128
}
158129

159130
void setState(GeoIpTaskState state) {
@@ -168,16 +139,17 @@ void setState(GeoIpTaskState state) {
168139
// visible for testing
169140
void updateDatabases() throws IOException {
170141
var clusterState = clusterService.state();
171-
var geoipIndex = clusterState.getMetadata().getProject().getIndicesLookup().get(GeoIpDownloader.DATABASES_INDEX);
142+
var geoipIndex = clusterState.getMetadata().getProject(projectId).getIndicesLookup().get(GeoIpDownloader.DATABASES_INDEX);
172143
if (geoipIndex != null) {
173144
logger.trace("The {} index is not null", GeoIpDownloader.DATABASES_INDEX);
174-
if (clusterState.getRoutingTable().index(geoipIndex.getWriteIndex()).allPrimaryShardsActive() == false) {
145+
if (clusterState.routingTable(projectId).index(geoipIndex.getWriteIndex()).allPrimaryShardsActive() == false) {
175146
logger.debug(
176147
"Not updating geoip database because not all primary shards of the [" + DATABASES_INDEX + "] index are active."
177148
);
178149
return;
179150
}
180-
var blockException = clusterState.blocks().indexBlockedException(ClusterBlockLevel.WRITE, geoipIndex.getWriteIndex().getName());
151+
var blockException = clusterState.blocks().indexBlockedException(projectId, ClusterBlockLevel.WRITE,
152+
geoipIndex.getWriteIndex().getName());
181153
if (blockException != null) {
182154
logger.debug(
183155
"Not updating geoip database because there is a write block on the " + geoipIndex.getWriteIndex().getName() + " index",

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,8 @@ protected GeoIpDownloader createTask(
196196
PersistentTasksCustomMetadata.PersistentTask<GeoIpTaskParams> taskInProgress,
197197
Map<String, String> headers
198198
) {
199-
AtomicBoolean atLeastOneGeoipProcessor = atLeastOneGeoipProcessorByProject.get(projectResolver.getProjectId());
199+
ProjectId projectId = projectResolver.getProjectId();
200+
AtomicBoolean atLeastOneGeoipProcessor = atLeastOneGeoipProcessorByProject.get(projectId);
200201
return new GeoIpDownloader(
201202
client,
202203
httpClient,
@@ -212,7 +213,7 @@ protected GeoIpDownloader createTask(
212213
() -> pollInterval,
213214
() -> eagerDownload,
214215
() -> atLeastOneGeoipProcessor != null && atLeastOneGeoipProcessor.get(),
215-
projectResolver
216+
projectId
216217
);
217218
}
218219

modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/DatabaseNodeServiceTests.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.elasticsearch.cluster.metadata.AliasMetadata;
2323
import org.elasticsearch.cluster.metadata.IndexMetadata;
2424
import org.elasticsearch.cluster.metadata.Metadata;
25+
import org.elasticsearch.cluster.metadata.ProjectId;
26+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
2527
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
2628
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
2729
import org.elasticsearch.cluster.node.DiscoveryNodes;
@@ -365,10 +367,19 @@ private String mockSearches(String databaseName, int firstChunk, int lastChunk)
365367
}
366368

367369
static ClusterState createClusterState(PersistentTasksCustomMetadata tasksCustomMetadata) {
368-
return createClusterState(tasksCustomMetadata, false);
370+
return createClusterState(Metadata.DEFAULT_PROJECT_ID, tasksCustomMetadata, false);
371+
}
372+
373+
static ClusterState createClusterState(ProjectId projectId, PersistentTasksCustomMetadata tasksCustomMetadata) {
374+
return createClusterState(projectId, tasksCustomMetadata, false);
369375
}
370376

371377
static ClusterState createClusterState(PersistentTasksCustomMetadata tasksCustomMetadata, boolean noStartedShards) {
378+
return createClusterState(Metadata.DEFAULT_PROJECT_ID, tasksCustomMetadata, noStartedShards);
379+
}
380+
381+
static ClusterState createClusterState(ProjectId projectId, PersistentTasksCustomMetadata tasksCustomMetadata,
382+
boolean noStartedShards) {
372383
boolean aliasGeoipDatabase = randomBoolean();
373384
String indexName = aliasGeoipDatabase
374385
? GeoIpDownloader.DATABASES_INDEX + "-" + randomAlphaOfLength(5)
@@ -392,14 +403,15 @@ static ClusterState createClusterState(PersistentTasksCustomMetadata tasksCustom
392403
shardRouting = shardRouting.moveToStarted(ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
393404
}
394405
return ClusterState.builder(new ClusterName("name"))
395-
.metadata(Metadata.builder().putCustom(TYPE, tasksCustomMetadata).put(idxMeta))
406+
.putProjectMetadata(ProjectMetadata.builder(projectId).put(idxMeta).putCustom(TYPE, tasksCustomMetadata))
396407
.nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("_id1")).localNodeId("_id1"))
397-
.routingTable(
408+
.putRoutingTable(
409+
projectId,
398410
RoutingTable.builder()
399411
.add(
400412
IndexRoutingTable.builder(index)
401413
.addIndexShard(IndexShardRoutingTable.builder(new ShardId(index, 0)).addShard(shardRouting))
402-
)
414+
).build()
403415
)
404416
.build();
405417
}

modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.cluster.ClusterState;
2727
import org.elasticsearch.cluster.block.ClusterBlocks;
2828
import org.elasticsearch.cluster.metadata.IndexMetadata;
29+
import org.elasticsearch.cluster.metadata.ProjectId;
2930
import org.elasticsearch.cluster.service.ClusterService;
3031
import org.elasticsearch.common.settings.ClusterSettings;
3132
import org.elasticsearch.common.settings.Settings;
@@ -85,9 +86,11 @@ public class GeoIpDownloaderTests extends ESTestCase {
8586
private ThreadPool threadPool;
8687
private MockClient client;
8788
private GeoIpDownloader geoIpDownloader;
89+
private ProjectId projectId;
8890

8991
@Before
9092
public void setup() throws IOException {
93+
projectId = randomProjectIdOrDefault();
9194
httpClient = mock(HttpClient.class);
9295
when(httpClient.getBytes(anyString())).thenReturn("[]".getBytes(StandardCharsets.UTF_8));
9396
clusterService = mock(ClusterService.class);
@@ -107,7 +110,7 @@ public void setup() throws IOException {
107110
)
108111
)
109112
);
110-
ClusterState state = createClusterState(new PersistentTasksCustomMetadata(1L, Map.of()));
113+
ClusterState state = createClusterState(projectId, new PersistentTasksCustomMetadata(1L, Map.of()));
111114
when(clusterService.state()).thenReturn(state);
112115
client = new MockClient(threadPool);
113116
geoIpDownloader = new GeoIpDownloader(
@@ -124,7 +127,8 @@ public void setup() throws IOException {
124127
Map.of(),
125128
() -> GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getDefault(Settings.EMPTY),
126129
() -> GeoIpDownloaderTaskExecutor.EAGER_DOWNLOAD_SETTING.getDefault(Settings.EMPTY),
127-
() -> true
130+
() -> true,
131+
projectId
128132
) {
129133
{
130134
GeoIpTaskParams geoIpTaskParams = mock(GeoIpTaskParams.class);
@@ -296,7 +300,8 @@ public void testProcessDatabaseNew() throws IOException {
296300
Map.of(),
297301
() -> GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getDefault(Settings.EMPTY),
298302
() -> GeoIpDownloaderTaskExecutor.EAGER_DOWNLOAD_SETTING.getDefault(Settings.EMPTY),
299-
() -> true
303+
() -> true,
304+
randomProjectIdOrDefault()
300305
) {
301306
@Override
302307
protected void updateTimestamp(String name, GeoIpTaskState.Metadata metadata) {
@@ -347,7 +352,8 @@ public void testProcessDatabaseUpdate() throws IOException {
347352
Map.of(),
348353
() -> GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getDefault(Settings.EMPTY),
349354
() -> GeoIpDownloaderTaskExecutor.EAGER_DOWNLOAD_SETTING.getDefault(Settings.EMPTY),
350-
() -> true
355+
() -> true,
356+
randomProjectIdOrDefault()
351357
) {
352358
@Override
353359
protected void updateTimestamp(String name, GeoIpTaskState.Metadata metadata) {
@@ -400,7 +406,8 @@ public void testProcessDatabaseSame() throws IOException {
400406
Map.of(),
401407
() -> GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getDefault(Settings.EMPTY),
402408
() -> GeoIpDownloaderTaskExecutor.EAGER_DOWNLOAD_SETTING.getDefault(Settings.EMPTY),
403-
() -> true
409+
() -> true,
410+
randomProjectIdOrDefault()
404411
) {
405412
@Override
406413
protected void updateTimestamp(String name, GeoIpTaskState.Metadata newMetadata) {
@@ -450,7 +457,8 @@ public void testCleanDatabases() throws IOException {
450457
Map.of(),
451458
() -> GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getDefault(Settings.EMPTY),
452459
() -> GeoIpDownloaderTaskExecutor.EAGER_DOWNLOAD_SETTING.getDefault(Settings.EMPTY),
453-
() -> true
460+
() -> true,
461+
randomProjectIdOrDefault()
454462
) {
455463
@Override
456464
void updateDatabases() throws IOException {
@@ -495,7 +503,8 @@ public void testUpdateTaskState() {
495503
Map.of(),
496504
() -> GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getDefault(Settings.EMPTY),
497505
() -> GeoIpDownloaderTaskExecutor.EAGER_DOWNLOAD_SETTING.getDefault(Settings.EMPTY),
498-
() -> true
506+
() -> true,
507+
randomProjectIdOrDefault()
499508
) {
500509
@Override
501510
public void updatePersistentTaskState(PersistentTaskState state, ActionListener<PersistentTask<?>> listener) {
@@ -525,7 +534,8 @@ public void testUpdateTaskStateError() {
525534
Map.of(),
526535
() -> GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getDefault(Settings.EMPTY),
527536
() -> GeoIpDownloaderTaskExecutor.EAGER_DOWNLOAD_SETTING.getDefault(Settings.EMPTY),
528-
() -> true
537+
() -> true,
538+
randomProjectIdOrDefault()
529539
) {
530540
@Override
531541
public void updatePersistentTaskState(PersistentTaskState state, ActionListener<PersistentTask<?>> listener) {
@@ -566,7 +576,8 @@ public void testUpdateDatabases() throws IOException {
566576
Map.of(),
567577
() -> GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getDefault(Settings.EMPTY),
568578
() -> GeoIpDownloaderTaskExecutor.EAGER_DOWNLOAD_SETTING.getDefault(Settings.EMPTY),
569-
atLeastOneGeoipProcessor::get
579+
atLeastOneGeoipProcessor::get,
580+
projectId
570581
) {
571582
@Override
572583
void processDatabase(Map<String, Object> databaseInfo) {
@@ -584,10 +595,11 @@ public void testUpdateDatabasesWriteBlock() throws IOException {
584595
/*
585596
* Here we make sure that we bail out before making an httpClient request if there is write block on the .geoip_databases index
586597
*/
587-
ClusterState state = createClusterState(new PersistentTasksCustomMetadata(1L, Map.of()));
588-
var geoIpIndex = state.getMetadata().getProject().getIndicesLookup().get(GeoIpDownloader.DATABASES_INDEX).getWriteIndex().getName();
598+
ClusterState state = createClusterState(projectId, new PersistentTasksCustomMetadata(1L, Map.of()));
599+
var geoIpIndex = state.getMetadata().getProject(projectId).getIndicesLookup().get(GeoIpDownloader.DATABASES_INDEX).getWriteIndex()
600+
.getName();
589601
state = ClusterState.builder(state)
590-
.blocks(new ClusterBlocks.Builder().addIndexBlock(geoIpIndex, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK))
602+
.blocks(new ClusterBlocks.Builder().addIndexBlock(projectId, geoIpIndex, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK))
591603
.build();
592604
when(clusterService.state()).thenReturn(state);
593605
geoIpDownloader.updateDatabases();
@@ -599,7 +611,7 @@ public void testUpdateDatabasesIndexNotReady() throws IOException {
599611
* Here we make sure that we bail out before making an httpClient request if there are unallocated shards on the .geoip_databases
600612
* index
601613
*/
602-
ClusterState state = createClusterState(new PersistentTasksCustomMetadata(1L, Map.of()), true);
614+
ClusterState state = createClusterState(projectId, new PersistentTasksCustomMetadata(1L, Map.of()), true);
603615
when(clusterService.state()).thenReturn(state);
604616
geoIpDownloader.updateDatabases();
605617
verifyNoInteractions(httpClient);

0 commit comments

Comments
 (0)