Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -116,7 +117,7 @@ public void testEnterpriseDownloaderTask() throws Exception {
final String sourceField = "ip";
final String targetField = "ip-result";

startEnterpriseGeoIpDownloaderTask();
startEnterpriseGeoIpDownloaderTask(ProjectId.DEFAULT);
configureMaxmindDatabase(MAXMIND_DATABASE_TYPE);
configureIpinfoDatabase(IPINFO_DATABASE_TYPE);
waitAround();
Expand Down Expand Up @@ -171,9 +172,10 @@ private void deleteDatabaseConfiguration(String configurationName, ActionListene
);
}

private void startEnterpriseGeoIpDownloaderTask() {
private void startEnterpriseGeoIpDownloaderTask(ProjectId projectId) {
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
persistentTasksService.sendStartRequest(
persistentTasksService.sendProjectStartRequest(
projectId,
ENTERPRISE_GEOIP_DOWNLOADER,
ENTERPRISE_GEOIP_DOWNLOADER,
new EnterpriseGeoIpTask.EnterpriseGeoIpTaskParams(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.hash.MessageDigests;
import org.elasticsearch.core.NotMultiProjectCapable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.query.BoolQueryBuilder;
Expand Down Expand Up @@ -67,6 +69,9 @@
* Downloads are verified against MD5 checksum provided by the server
* Current state of all stored databases is stored in cluster state in persistent task state
*/
@NotMultiProjectCapable(
description = "Enterprise GeoIP not available in serverless, we should review this class for MP again after serverless is enabled"
)
public class EnterpriseGeoIpDownloader extends AllocatedPersistentTask {

private static final Logger logger = LogManager.getLogger(EnterpriseGeoIpDownloader.class);
Expand Down Expand Up @@ -142,22 +147,27 @@ void setState(EnterpriseGeoIpTaskState state) {

// visible for testing
void updateDatabases() throws IOException {
@NotMultiProjectCapable(description = "Enterprise GeoIP not available in serverless")
ProjectId projectId = ProjectId.DEFAULT;
var clusterState = clusterService.state();
var geoipIndex = clusterState.getMetadata().getProject().getIndicesLookup().get(EnterpriseGeoIpDownloader.DATABASES_INDEX);
var geoipIndex = clusterState.getMetadata().getProject(projectId).getIndicesLookup().get(EnterpriseGeoIpDownloader.DATABASES_INDEX);
if (geoipIndex != null) {
logger.trace("the geoip index [{}] exists", EnterpriseGeoIpDownloader.DATABASES_INDEX);
if (clusterState.getRoutingTable().index(geoipIndex.getWriteIndex()).allPrimaryShardsActive() == false) {
if (clusterState.routingTable(projectId).index(geoipIndex.getWriteIndex()).allPrimaryShardsActive() == false) {
logger.debug("not updating databases because not all primary shards of [{}] index are active yet", DATABASES_INDEX);
return;
}
var blockException = clusterState.blocks().indexBlockedException(ClusterBlockLevel.WRITE, geoipIndex.getWriteIndex().getName());
var blockException = clusterState.blocks()
.indexBlockedException(projectId, ClusterBlockLevel.WRITE, geoipIndex.getWriteIndex().getName());
if (blockException != null) {
throw blockException;
}
}

logger.trace("Updating databases");
IngestGeoIpMetadata geoIpMeta = clusterState.metadata().getProject().custom(IngestGeoIpMetadata.TYPE, IngestGeoIpMetadata.EMPTY);
IngestGeoIpMetadata geoIpMeta = clusterState.metadata()
.getProject(projectId)
.custom(IngestGeoIpMetadata.TYPE, IngestGeoIpMetadata.EMPTY);

// if there are entries in the cs that aren't in the persistent task state,
// then download those (only)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.NotMultiProjectCapable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.ingest.EnterpriseGeoIpTask.EnterpriseGeoIpTaskParams;
import org.elasticsearch.ingest.IngestService;
Expand All @@ -47,6 +48,9 @@
import static org.elasticsearch.ingest.geoip.GeoIpDownloaderTaskExecutor.ENABLED_SETTING;
import static org.elasticsearch.ingest.geoip.GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING;

@NotMultiProjectCapable(
description = "Enterprise GeoIP not available in serverless, we should review this class for MP again after serverless is enabled"
)
public class EnterpriseGeoIpDownloaderTaskExecutor extends PersistentTasksExecutor<EnterpriseGeoIpTaskParams>
implements
ClusterStateListener {
Expand Down