Skip to content
Merged
Show file tree
Hide file tree
Changes from 46 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
49adbff
Make GeoIp multi-project aware
samxbr May 22, 2025
6ce72b7
Add project resolver in plugin
samxbr May 22, 2025
22cc67c
Update downloader
samxbr May 26, 2025
48ef756
[CI] Auto commit changes from spotless
May 26, 2025
e68c0ad
Merge branch 'main' into feature/multi-project/geoip
samxbr May 26, 2025
6bacb71
Add projectId context
samxbr May 27, 2025
bf65981
Merge branch 'main' into feature/multi-project/geoip
samxbr May 27, 2025
42c863f
[CI] Auto commit changes from spotless
May 27, 2025
b82df2f
remove yaml tests
samxbr May 27, 2025
6d425a4
Fix bug for atLeastOneGeoipProcessorByProject
samxbr May 27, 2025
54e7975
Temporarily unmute geoip yaml tests
samxbr May 27, 2025
f5ece95
Add unit tests for changedCustomProjectMetadataSet
samxbr May 27, 2025
b561598
[CI] Auto commit changes from spotless
May 27, 2025
2ddbdea
WIP: make GeoIp search project aware
samxbr Jun 2, 2025
64b7145
[CI] Auto commit changes from spotless
Jun 2, 2025
c244087
WIP changes
samxbr Jun 4, 2025
45d982e
[CI] Auto commit changes from spotless
Jun 4, 2025
efb99d3
Revert GeoIp search to reduce size of change
samxbr Jun 4, 2025
2382456
Merge branch 'main' into feature/multi-project/geoip
samxbr Jun 4, 2025
bcf458b
Merge branch 'main' into feature/multi-project/geoip
samxbr Jun 7, 2025
612651a
Add java rest test
samxbr Jun 8, 2025
2789e08
remove internal cluster test
samxbr Jun 8, 2025
789399e
Update settings.gradle
samxbr Jun 8, 2025
2c384aa
remove MP rest test change
samxbr Jun 8, 2025
0552403
Merge branch 'main' into feature/multi-project/geoip
samxbr Jun 8, 2025
77ab9f0
Update modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/ge…
samxbr Jun 8, 2025
a54d56b
[CI] Auto commit changes from spotless
Jun 8, 2025
5092830
Remove atomic and test change
samxbr Jun 10, 2025
f3dc2f8
[CI] Auto commit changes from spotless
Jun 10, 2025
8fb68bd
Remove AtomicBoolean and update customMetadataChanged
samxbr Jun 11, 2025
d82724e
revert settings.gradle
samxbr Jun 11, 2025
d0721e6
Merge branch 'main' into feature/multi-project/geoip
samxbr Jun 11, 2025
4e2eefa
[CI] Auto commit changes from spotless
Jun 11, 2025
43d5490
Merge branch 'main' into feature/multi-project/geoip
samxbr Jun 17, 2025
29936f5
Replace executeOnProject with explicit project methods
samxbr Jun 17, 2025
0974d41
Add assert
samxbr Jun 17, 2025
3690cd9
remove constructor param
samxbr Jun 17, 2025
303f4f6
remove downloader project resolver
samxbr Jun 17, 2025
22db3b8
Fix unit tests
samxbr Jun 17, 2025
1140e18
Merge branch 'main' into feature/multi-project/geoip
samxbr Jun 17, 2025
ba8e9a1
Add update project
samxbr Jun 17, 2025
59c0286
Mock updateProjectPersistentTaskState
samxbr Jun 18, 2025
69240ed
Add project id to mock client
samxbr Jun 18, 2025
f5a5b86
Address comments
samxbr Jun 19, 2025
735c88c
[CI] Auto commit changes from spotless
Jun 19, 2025
6d5be04
Merge branch 'main' into feature/multi-project/geoip
samxbr Jun 19, 2025
a6d155b
Merge branch 'main' into feature/multi-project/geoip
samxbr Jun 20, 2025
734a6fa
Address comments
samxbr Jun 20, 2025
96654dd
comment
samxbr Jun 20, 2025
195b613
Merge branch 'main' into feature/multi-project/geoip
samxbr Jun 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions modules/ingest-geoip/qa/multi-project/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

apply plugin: 'elasticsearch.internal-java-rest-test'

dependencies {
javaRestTestImplementation project(':modules:ingest-geoip')
javaRestTestImplementation project(':test:external-modules:test-multi-project')
javaRestTestImplementation project(':test:fixtures:geoip-fixture')

clusterModules project(':modules:ingest-geoip')
clusterModules project(':modules:reindex') // needed for database cleanup
clusterModules project(':test:external-modules:test-multi-project')
}

tasks.withType(Test).configureEach {
it.systemProperty "tests.multi_project.enabled", true
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package geoip;

import fixture.geoip.GeoIpHttpFixture;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.core.Booleans;
import org.elasticsearch.ingest.geoip.GeoIpDownloader;
import org.elasticsearch.ingest.geoip.GeoIpDownloaderTaskExecutor;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.test.rest.ObjectPath;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.equalTo;

public class GeoIpMultiProjectIT extends ESRestTestCase {
// default true
private static final boolean useFixture = Booleans.parseBoolean(System.getProperty("geoip_use_service", "false")) == false;

public static final GeoIpHttpFixture fixture = new GeoIpHttpFixture(useFixture);

public static final ElasticsearchCluster cluster = ElasticsearchCluster.local()
.module("ingest-geoip")
.module("reindex") // for database cleanup
.module("test-multi-project")
.setting("test.multi_project.enabled", "true")
.setting(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), "true")
.setting(GeoIpDownloader.ENDPOINT_SETTING.getKey(), fixture::getAddress, (k) -> useFixture)
.build();

@ClassRule
public static TestRule ruleChain = RuleChain.outerRule(fixture).around(cluster);

@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}

@Override
protected boolean shouldConfigureProjects() {
return false;
}

public void testGeoIpDownloader() throws Exception {
String project1 = randomUniqueProjectId().id();
String project2 = randomUniqueProjectId().id();
createProject(project1);
createProject(project2);

// download databases for project1
putGeoIpPipeline(project1);
assertBusy(() -> assertDatabases(project1, true), 30, TimeUnit.SECONDS);
assertBusy(() -> assertDatabases(project2, false), 30, TimeUnit.SECONDS);

// download databases for project2
putGeoIpPipeline(project2);
assertBusy(() -> assertDatabases(project2, true), 30, TimeUnit.SECONDS);
}

private void putGeoIpPipeline(String projectId) throws IOException {
Request putPipelineRequest = new Request("PUT", "/_ingest/pipeline/geoip-pipeline");
putPipelineRequest.setJsonEntity("""
{
"processors" : [
{
"geoip" : {
"field" : "ip",
"target_field" : "geo",
"database_file" : "GeoLite2-Country.mmdb"
}
}
]
}
""");
setRequestProjectId(projectId, putPipelineRequest);
assertOK(client().performRequest(putPipelineRequest));
}

private static Request setRequestProjectId(String projectId, Request request) {
RequestOptions.Builder options = request.getOptions().toBuilder();
options.removeHeader(Task.X_ELASTIC_PROJECT_ID_HTTP_HEADER);
options.addHeader(Task.X_ELASTIC_PROJECT_ID_HTTP_HEADER, projectId);
request.setOptions(options);
return request;
}

@SuppressWarnings("unchecked")
private void assertDatabases(String projectId, boolean shouldDownload) throws IOException {
Request getTaskState = new Request("GET", "/_cluster/state");
setRequestProjectId(projectId, getTaskState);

ObjectPath state = ObjectPath.createFromResponse(assertOK(client().performRequest(getTaskState)));

List<Map<String, ?>> tasks = state.evaluate("metadata.persistent_tasks.tasks");
// Short-circuit to avoid using steams if the list is empty
if (tasks.isEmpty()) {
fail("persistent tasks list is empty, expected at least one task for geoip-downloader");
}

// verify project task id
Set<Map<String, ?>> id = tasks.stream()
.filter(task -> String.format("%s/geoip-downloader", projectId).equals(task.get("id")))
.collect(Collectors.toSet());
assertThat(id.size(), equalTo(1));

// verify database download
Map<String, Object> databases = (Map<String, Object>) tasks.stream().map(task -> {
try {
return ObjectPath.evaluate(task, "task.geoip-downloader.state.databases");
} catch (IOException e) {
return null;
}
}).filter(Objects::nonNull).findFirst().orElse(null);

if (shouldDownload) {
// verify database downloaded
assertNotNull(databases);
for (String name : List.of("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb")) {
Object database = databases.get(name);
assertNotNull(database);
assertNotNull(ObjectPath.evaluate(database, "md5"));
}
} else {
// verify database not downloaded
assertNull(databases);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.hash.MessageDigests;
import org.elasticsearch.common.settings.Setting;
Expand Down Expand Up @@ -95,6 +96,8 @@ public class GeoIpDownloader extends AllocatedPersistentTask {
*/
private final Supplier<Boolean> atLeastOneGeoipProcessorSupplier;

private final ProjectId projectId;

GeoIpDownloader(
Client client,
HttpClient httpClient,
Expand All @@ -109,17 +112,19 @@ public class GeoIpDownloader extends AllocatedPersistentTask {
Map<String, String> headers,
Supplier<TimeValue> pollIntervalSupplier,
Supplier<Boolean> eagerDownloadSupplier,
Supplier<Boolean> atLeastOneGeoipProcessorSupplier
Supplier<Boolean> atLeastOneGeoipProcessorSupplier,
ProjectId projectId
) {
super(id, type, action, description, parentTask, headers);
this.client = client;
this.client = client.projectClient(projectId);
this.httpClient = httpClient;
this.clusterService = clusterService;
this.threadPool = threadPool;
this.endpoint = ENDPOINT_SETTING.get(settings);
this.pollIntervalSupplier = pollIntervalSupplier;
this.eagerDownloadSupplier = eagerDownloadSupplier;
this.atLeastOneGeoipProcessorSupplier = atLeastOneGeoipProcessorSupplier;
this.projectId = projectId;
}

void setState(GeoIpTaskState state) {
Expand All @@ -134,16 +139,17 @@ void setState(GeoIpTaskState state) {
// visible for testing
void updateDatabases() throws IOException {
var clusterState = clusterService.state();
var geoipIndex = clusterState.getMetadata().getProject().getIndicesLookup().get(GeoIpDownloader.DATABASES_INDEX);
var geoipIndex = clusterState.getMetadata().getProject(projectId).getIndicesLookup().get(GeoIpDownloader.DATABASES_INDEX);
if (geoipIndex != null) {
logger.trace("The {} index is not null", GeoIpDownloader.DATABASES_INDEX);
if (clusterState.getRoutingTable().index(geoipIndex.getWriteIndex()).allPrimaryShardsActive() == false) {
if (clusterState.routingTable(projectId).index(geoipIndex.getWriteIndex()).allPrimaryShardsActive() == false) {
logger.debug(
"Not updating geoip database because not all primary shards of the [" + DATABASES_INDEX + "] index are active."
);
return;
}
var blockException = clusterState.blocks().indexBlockedException(ClusterBlockLevel.WRITE, geoipIndex.getWriteIndex().getName());
var blockException = clusterState.blocks()
.indexBlockedException(projectId, ClusterBlockLevel.WRITE, geoipIndex.getWriteIndex().getName());
if (blockException != null) {
logger.debug(
"Not updating geoip database because there is a write block on the " + geoipIndex.getWriteIndex().getName() + " index",
Expand Down Expand Up @@ -196,7 +202,7 @@ private void processDatabase(final String name, final String md5, final String u
updateTimestamp(name, metadata);
return;
}
logger.debug("downloading geoip database [{}]", name);
logger.debug("downloading geoip database [{}] for project [{}]", name, projectId);
long start = System.currentTimeMillis();
try (InputStream is = httpClient.get(url)) {
int firstChunk = metadata.lastChunk() + 1; // if there is no metadata, then Metadata.EMPTY.lastChunk() + 1 = 0
Expand All @@ -205,12 +211,12 @@ private void processDatabase(final String name, final String md5, final String u
state = state.put(name, new Metadata(start, firstChunk, lastChunk - 1, md5, start));
updateTaskState();
stats = stats.successfulDownload(System.currentTimeMillis() - start).databasesCount(state.getDatabases().size());
logger.info("successfully downloaded geoip database [{}]", name);
logger.info("successfully downloaded geoip database [{}] for project [{}]", name, projectId);
deleteOldChunks(name, firstChunk);
}
} catch (Exception e) {
stats = stats.failedDownload();
logger.error(() -> "error downloading geoip database [" + name + "]", e);
logger.error(() -> "error downloading geoip database [" + name + "] for project [" + projectId + "]", e);
}
}

Expand All @@ -230,15 +236,15 @@ void deleteOldChunks(String name, int firstChunk) {

// visible for testing
protected void updateTimestamp(String name, Metadata old) {
logger.debug("geoip database [{}] is up to date, updated timestamp", name);
logger.debug("geoip database [{}] is up to date for project [{}], updated timestamp", name, projectId);
state = state.put(name, new Metadata(old.lastUpdate(), old.firstChunk(), old.lastChunk(), old.md5(), System.currentTimeMillis()));
stats = stats.skippedDownload();
updateTaskState();
}

void updateTaskState() {
PlainActionFuture<PersistentTask<?>> future = new PlainActionFuture<>();
updatePersistentTaskState(state, future);
updateProjectPersistentTaskState(projectId, state, future);
state = ((GeoIpTaskState) future.actionGet().getState());
}

Expand Down Expand Up @@ -360,5 +366,4 @@ private void scheduleNextRun(TimeValue time) {
scheduled = threadPool.schedule(this::runDownloader, time, threadPool.generic());
}
}

}
Loading