Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.DeprecationCategory;
import org.elasticsearch.common.logging.DeprecationLogger;
Expand Down Expand Up @@ -44,8 +45,15 @@ public class TransportDeleteRollupJobAction extends TransportTasksAction<

private static final DeprecationLogger DEPRECATION_LOGGER = DeprecationLogger.getLogger(TransportDeleteRollupJobAction.class);

private final ProjectResolver projectResolver;

@Inject
public TransportDeleteRollupJobAction(TransportService transportService, ActionFilters actionFilters, ClusterService clusterService) {
public TransportDeleteRollupJobAction(
TransportService transportService,
ActionFilters actionFilters,
ClusterService clusterService,
ProjectResolver projectResolver
) {
super(
DeleteRollupJobAction.NAME,
clusterService,
Expand All @@ -55,6 +63,7 @@ public TransportDeleteRollupJobAction(TransportService transportService, ActionF
DeleteRollupJobAction.Response::new,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
this.projectResolver = projectResolver;
}

@Override
Expand All @@ -64,7 +73,7 @@ protected void doExecute(Task task, DeleteRollupJobAction.Request request, Actio
final DiscoveryNodes nodes = state.nodes();

if (nodes.isLocalNodeElectedMaster()) {
PersistentTasksCustomMetadata pTasksMeta = state.getMetadata().getProject().custom(PersistentTasksCustomMetadata.TYPE);
PersistentTasksCustomMetadata pTasksMeta = projectResolver.getProjectMetadata(state).custom(PersistentTasksCustomMetadata.TYPE);
if (pTasksMeta != null && pTasksMeta.getTask(request.getId()) != null) {
super.doExecute(task, request, listener);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.DeprecationCategory;
import org.elasticsearch.common.logging.DeprecationLogger;
Expand Down Expand Up @@ -43,9 +44,15 @@ public class TransportGetRollupCapsAction extends HandledTransportAction<GetRoll

private final ClusterService clusterService;
private final Executor managementExecutor;
private final ProjectResolver projectResolver;

@Inject
public TransportGetRollupCapsAction(TransportService transportService, ClusterService clusterService, ActionFilters actionFilters) {
public TransportGetRollupCapsAction(
TransportService transportService,
ClusterService clusterService,
ActionFilters actionFilters,
ProjectResolver projectResolver
) {
// TODO replace SAME when removing workaround for https://github.com/elastic/elasticsearch/issues/97916
super(
GetRollupCapsAction.NAME,
Expand All @@ -56,6 +63,7 @@ public TransportGetRollupCapsAction(TransportService transportService, ClusterSe
);
this.clusterService = clusterService;
this.managementExecutor = transportService.getThreadPool().executor(ThreadPool.Names.MANAGEMENT);
this.projectResolver = projectResolver;
}

@Override
Expand All @@ -67,7 +75,8 @@ protected void doExecute(Task task, GetRollupCapsAction.Request request, ActionL

private void doExecuteForked(String indexPattern, ActionListener<GetRollupCapsAction.Response> listener) {
Transports.assertNotTransportThread("retrieving rollup job caps may be expensive");
Map<String, RollableIndexCaps> allCaps = getCaps(indexPattern, clusterService.state().getMetadata().getProject().indices());
final var project = projectResolver.getProjectMetadata(clusterService.state());
Map<String, RollableIndexCaps> allCaps = getCaps(indexPattern, project.indices());
listener.onResponse(new GetRollupCapsAction.Response(allCaps));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.DeprecationCategory;
import org.elasticsearch.common.logging.DeprecationLogger;
Expand Down Expand Up @@ -46,13 +47,15 @@ public class TransportGetRollupIndexCapsAction extends HandledTransportAction<
private final ClusterService clusterService;
private final IndexNameExpressionResolver resolver;
private final Executor managementExecutor;
private final ProjectResolver projectResolver;

@Inject
public TransportGetRollupIndexCapsAction(
TransportService transportService,
ClusterService clusterService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
IndexNameExpressionResolver indexNameExpressionResolver,
ProjectResolver projectResolver
) {
// TODO replace SAME when removing workaround for https://github.com/elastic/elasticsearch/issues/97916
super(
Expand All @@ -65,6 +68,7 @@ public TransportGetRollupIndexCapsAction(
this.clusterService = clusterService;
this.managementExecutor = transportService.getThreadPool().executor(ThreadPool.Names.MANAGEMENT);
this.resolver = indexNameExpressionResolver;
this.projectResolver = projectResolver;
}

@Override
Expand All @@ -80,11 +84,9 @@ protected void doExecute(

private void doExecuteForked(IndicesRequest request, ActionListener<GetRollupIndexCapsAction.Response> listener) {
Transports.assertNotTransportThread("retrieving rollup job index caps may be expensive");
String[] indices = resolver.concreteIndexNames(clusterService.state(), request.indicesOptions(), request);
Map<String, RollableIndexCaps> allCaps = getCapsByRollupIndex(
Arrays.asList(indices),
clusterService.state().getMetadata().getProject().indices()
);
final var project = projectResolver.getProjectMetadata(clusterService.state());
String[] indices = resolver.concreteIndexNames(project, request.indicesOptions(), request);
Map<String, RollableIndexCaps> allCaps = getCapsByRollupIndex(Arrays.asList(indices), project.indices());
listener.onResponse(new GetRollupIndexCapsAction.Response(allCaps));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.DeprecationCategory;
import org.elasticsearch.common.logging.DeprecationLogger;
Expand Down Expand Up @@ -78,6 +80,7 @@ public class TransportPutRollupJobAction extends AcknowledgedTransportMasterNode

private final PersistentTasksService persistentTasksService;
private final Client client;
private final ProjectResolver projectResolver;

@Inject
public TransportPutRollupJobAction(
Expand All @@ -86,7 +89,8 @@ public TransportPutRollupJobAction(
ActionFilters actionFilters,
ClusterService clusterService,
PersistentTasksService persistentTasksService,
Client client
Client client,
ProjectResolver projectResolver
) {
super(
PutRollupJobAction.NAME,
Expand All @@ -99,7 +103,7 @@ public TransportPutRollupJobAction(
);
this.persistentTasksService = persistentTasksService;
this.client = client;

this.projectResolver = projectResolver;
}

@Override
Expand All @@ -113,10 +117,11 @@ protected void masterOperation(
XPackPlugin.checkReadyForXPackCustomMetadata(clusterState);
checkForDeprecatedTZ(request);

int numberOfCurrentRollupJobs = RollupUsageTransportAction.findNumberOfRollupJobs(clusterState.metadata().getProject());
final var project = projectResolver.getProjectMetadata(clusterState);
int numberOfCurrentRollupJobs = RollupUsageTransportAction.findNumberOfRollupJobs(project);
if (numberOfCurrentRollupJobs == 0) {
try {
boolean hasRollupIndices = hasRollupIndices(clusterState.getMetadata());
boolean hasRollupIndices = hasRollupIndices(project);
if (hasRollupIndices == false) {
listener.onFailure(
new IllegalArgumentException(
Expand All @@ -135,6 +140,7 @@ protected void masterOperation(
.fields(request.getConfig().getAllFields().toArray(new String[0]));
fieldCapsRequest.setParentTask(clusterService.localNode().getId(), task.getId());

final var projectId = project.id();
client.fieldCaps(fieldCapsRequest, listener.delegateFailure((l, fieldCapabilitiesResponse) -> {
ActionRequestValidationException validationException = request.validateMappings(fieldCapabilitiesResponse.get());
if (validationException != null) {
Expand All @@ -143,7 +149,7 @@ protected void masterOperation(
}

RollupJob job = createRollupJob(request.getConfig(), threadPool);
createIndex(job, l, persistentTasksService, client, LOGGER);
createIndex(projectId, job, l, persistentTasksService, client, LOGGER);
}));
}

Expand Down Expand Up @@ -177,6 +183,7 @@ private RollupJob createRollupJob(RollupJobConfig config, ThreadPool threadPool)
}

static void createIndex(
ProjectId projectId,
RollupJob job,
ActionListener<AcknowledgedResponse> listener,
PersistentTasksService persistentTasksService,
Expand All @@ -196,10 +203,10 @@ static void createIndex(
client.execute(
TransportCreateIndexAction.TYPE,
request,
ActionListener.wrap(createIndexResponse -> startPersistentTask(job, listener, persistentTasksService), e -> {
ActionListener.wrap(createIndexResponse -> startPersistentTask(projectId, job, listener, persistentTasksService), e -> {
if (e instanceof ResourceAlreadyExistsException) {
logger.debug("Rolled index already exists for rollup job [" + job.getConfig().getId() + "], updating metadata.");
updateMapping(job, listener, persistentTasksService, client, logger, request.masterNodeTimeout());
updateMapping(projectId, job, listener, persistentTasksService, client, logger, request.masterNodeTimeout());
} else {
String msg = "Could not create index for rollup job [" + job.getConfig().getId() + "]";
logger.error(msg);
Expand Down Expand Up @@ -245,6 +252,7 @@ static XContentBuilder createMappings(RollupJobConfig config) throws IOException

@SuppressWarnings("unchecked")
static void updateMapping(
ProjectId projectId,
RollupJob job,
ActionListener<AcknowledgedResponse> listener,
PersistentTasksService persistentTasksService,
Expand Down Expand Up @@ -301,7 +309,10 @@ static void updateMapping(
client.execute(
TransportPutMappingAction.TYPE,
request,
ActionListener.wrap(putMappingResponse -> startPersistentTask(job, listener, persistentTasksService), listener::onFailure)
ActionListener.wrap(
putMappingResponse -> startPersistentTask(projectId, job, listener, persistentTasksService),
listener::onFailure
)
);
};

Expand All @@ -314,17 +325,19 @@ static void updateMapping(
}

static void startPersistentTask(
ProjectId projectId,
RollupJob job,
ActionListener<AcknowledgedResponse> listener,
PersistentTasksService persistentTasksService
) {
assertNoAuthorizationHeader(job.getHeaders());
persistentTasksService.sendStartRequest(
persistentTasksService.sendProjectStartRequest(
projectId,
job.getConfig().getId(),
RollupField.TASK_NAME,
job,
TimeValue.THIRTY_SECONDS /* TODO should this be configurable? longer by default? infinite? */,
ActionListener.wrap(rollupConfigPersistentTask -> waitForRollupStarted(job, listener, persistentTasksService), e -> {
ActionListener.wrap(rollupConfigPersistentTask -> waitForRollupStarted(projectId, job, listener, persistentTasksService), e -> {
if (e instanceof ResourceAlreadyExistsException) {
e = new ElasticsearchStatusException(
"Cannot create job [" + job.getConfig().getId() + "] because it has already been created (task exists)",
Expand All @@ -338,11 +351,13 @@ static void startPersistentTask(
}

private static void waitForRollupStarted(
ProjectId projectId,
RollupJob job,
ActionListener<AcknowledgedResponse> listener,
PersistentTasksService persistentTasksService
) {
persistentTasksService.waitForPersistentTaskCondition(
projectId,
job.getConfig().getId(),
Objects::nonNull,
job.getConfig().getTimeout(),
Expand All @@ -369,9 +384,9 @@ public void onTimeout(TimeValue timeout) {
);
}

static boolean hasRollupIndices(Metadata metadata) throws IOException {
static boolean hasRollupIndices(ProjectMetadata project) throws IOException {
// Sniffing logic instead of invoking sourceAsMap(), which would materialize the entire mapping as map of maps.
for (var imd : metadata.getProject()) {
for (var imd : project) {
if (imd.mapping() == null) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
Expand Down Expand Up @@ -88,6 +89,7 @@ public class TransportRollupSearchAction extends TransportAction<SearchRequest,
private final ScriptService scriptService;
private final ClusterService clusterService;
private final IndexNameExpressionResolver resolver;
private final ProjectResolver projectResolver;
private static final Logger logger = LogManager.getLogger(RollupSearchAction.class);

@Inject
Expand All @@ -99,7 +101,8 @@ public TransportRollupSearchAction(
BigArrays bigArrays,
ScriptService scriptService,
ClusterService clusterService,
IndexNameExpressionResolver resolver
IndexNameExpressionResolver resolver,
ProjectResolver projectResolver
) {
super(RollupSearchAction.NAME, actionFilters, transportService.getTaskManager(), EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.client = client;
Expand All @@ -108,6 +111,7 @@ public TransportRollupSearchAction(
this.scriptService = scriptService;
this.clusterService = clusterService;
this.resolver = resolver;
this.projectResolver = projectResolver;

transportService.registerRequestHandler(
actionName,
Expand All @@ -123,7 +127,8 @@ public TransportRollupSearchAction(
protected void doExecute(Task task, SearchRequest request, ActionListener<SearchResponse> listener) {
DEPRECATION_LOGGER.warn(DeprecationCategory.API, DEPRECATION_KEY, DEPRECATION_MESSAGE);
String[] indices = resolver.concreteIndexNames(clusterService.state(), request);
RollupSearchContext rollupSearchContext = separateIndices(indices, clusterService.state().getMetadata().getProject().indices());
final var project = projectResolver.getProjectMetadata(clusterService.state());
RollupSearchContext rollupSearchContext = separateIndices(indices, project.indices());

MultiSearchRequest msearch = createMSearchRequest(request, registry, rollupSearchContext);

Expand Down
Loading