2929import org .elasticsearch .cluster .block .ClusterBlockException ;
3030import org .elasticsearch .cluster .block .ClusterBlockLevel ;
3131import org .elasticsearch .cluster .metadata .MappingMetadata ;
32- import org .elasticsearch .cluster .metadata .Metadata ;
32+ import org .elasticsearch .cluster .metadata .ProjectId ;
33+ import org .elasticsearch .cluster .metadata .ProjectMetadata ;
34+ import org .elasticsearch .cluster .project .ProjectResolver ;
3335import org .elasticsearch .cluster .service .ClusterService ;
3436import org .elasticsearch .common .logging .DeprecationCategory ;
3537import org .elasticsearch .common .logging .DeprecationLogger ;
@@ -78,6 +80,7 @@ public class TransportPutRollupJobAction extends AcknowledgedTransportMasterNode
7880
7981 private final PersistentTasksService persistentTasksService ;
8082 private final Client client ;
83+ private final ProjectResolver projectResolver ;
8184
8285 @ Inject
8386 public TransportPutRollupJobAction (
@@ -86,7 +89,8 @@ public TransportPutRollupJobAction(
8689 ActionFilters actionFilters ,
8790 ClusterService clusterService ,
8891 PersistentTasksService persistentTasksService ,
89- Client client
92+ Client client ,
93+ ProjectResolver projectResolver
9094 ) {
9195 super (
9296 PutRollupJobAction .NAME ,
@@ -99,7 +103,7 @@ public TransportPutRollupJobAction(
99103 );
100104 this .persistentTasksService = persistentTasksService ;
101105 this .client = client ;
102-
106+ this . projectResolver = projectResolver ;
103107 }
104108
105109 @ Override
@@ -113,10 +117,11 @@ protected void masterOperation(
113117 XPackPlugin .checkReadyForXPackCustomMetadata (clusterState );
114118 checkForDeprecatedTZ (request );
115119
116- int numberOfCurrentRollupJobs = RollupUsageTransportAction .findNumberOfRollupJobs (clusterState .metadata ().getProject ());
120+ final var project = projectResolver .getProjectMetadata (clusterState );
121+ int numberOfCurrentRollupJobs = RollupUsageTransportAction .findNumberOfRollupJobs (project );
117122 if (numberOfCurrentRollupJobs == 0 ) {
118123 try {
119- boolean hasRollupIndices = hasRollupIndices (clusterState . getMetadata () );
124+ boolean hasRollupIndices = hasRollupIndices (project );
120125 if (hasRollupIndices == false ) {
121126 listener .onFailure (
122127 new IllegalArgumentException (
@@ -135,6 +140,7 @@ protected void masterOperation(
135140 .fields (request .getConfig ().getAllFields ().toArray (new String [0 ]));
136141 fieldCapsRequest .setParentTask (clusterService .localNode ().getId (), task .getId ());
137142
143+ final var projectId = project .id ();
138144 client .fieldCaps (fieldCapsRequest , listener .delegateFailure ((l , fieldCapabilitiesResponse ) -> {
139145 ActionRequestValidationException validationException = request .validateMappings (fieldCapabilitiesResponse .get ());
140146 if (validationException != null ) {
@@ -143,7 +149,7 @@ protected void masterOperation(
143149 }
144150
145151 RollupJob job = createRollupJob (request .getConfig (), threadPool );
146- createIndex (job , l , persistentTasksService , client , LOGGER );
152+ createIndex (projectId , job , l , persistentTasksService , client , LOGGER );
147153 }));
148154 }
149155
@@ -177,6 +183,7 @@ private RollupJob createRollupJob(RollupJobConfig config, ThreadPool threadPool)
177183 }
178184
179185 static void createIndex (
186+ ProjectId projectId ,
180187 RollupJob job ,
181188 ActionListener <AcknowledgedResponse > listener ,
182189 PersistentTasksService persistentTasksService ,
@@ -196,10 +203,10 @@ static void createIndex(
196203 client .execute (
197204 TransportCreateIndexAction .TYPE ,
198205 request ,
199- ActionListener .wrap (createIndexResponse -> startPersistentTask (job , listener , persistentTasksService ), e -> {
206+ ActionListener .wrap (createIndexResponse -> startPersistentTask (projectId , job , listener , persistentTasksService ), e -> {
200207 if (e instanceof ResourceAlreadyExistsException ) {
201208 logger .debug ("Rolled index already exists for rollup job [" + job .getConfig ().getId () + "], updating metadata." );
202- updateMapping (job , listener , persistentTasksService , client , logger , request .masterNodeTimeout ());
209+ updateMapping (projectId , job , listener , persistentTasksService , client , logger , request .masterNodeTimeout ());
203210 } else {
204211 String msg = "Could not create index for rollup job [" + job .getConfig ().getId () + "]" ;
205212 logger .error (msg );
@@ -245,6 +252,7 @@ static XContentBuilder createMappings(RollupJobConfig config) throws IOException
245252
246253 @ SuppressWarnings ("unchecked" )
247254 static void updateMapping (
255+ ProjectId projectId ,
248256 RollupJob job ,
249257 ActionListener <AcknowledgedResponse > listener ,
250258 PersistentTasksService persistentTasksService ,
@@ -301,7 +309,10 @@ static void updateMapping(
301309 client .execute (
302310 TransportPutMappingAction .TYPE ,
303311 request ,
304- ActionListener .wrap (putMappingResponse -> startPersistentTask (job , listener , persistentTasksService ), listener ::onFailure )
312+ ActionListener .wrap (
313+ putMappingResponse -> startPersistentTask (projectId , job , listener , persistentTasksService ),
314+ listener ::onFailure
315+ )
305316 );
306317 };
307318
@@ -314,17 +325,19 @@ static void updateMapping(
314325 }
315326
316327 static void startPersistentTask (
328+ ProjectId projectId ,
317329 RollupJob job ,
318330 ActionListener <AcknowledgedResponse > listener ,
319331 PersistentTasksService persistentTasksService
320332 ) {
321333 assertNoAuthorizationHeader (job .getHeaders ());
322- persistentTasksService .sendStartRequest (
334+ persistentTasksService .sendProjectStartRequest (
335+ projectId ,
323336 job .getConfig ().getId (),
324337 RollupField .TASK_NAME ,
325338 job ,
326339 TimeValue .THIRTY_SECONDS /* TODO should this be configurable? longer by default? infinite? */ ,
327- ActionListener .wrap (rollupConfigPersistentTask -> waitForRollupStarted (job , listener , persistentTasksService ), e -> {
340+ ActionListener .wrap (rollupConfigPersistentTask -> waitForRollupStarted (projectId , job , listener , persistentTasksService ), e -> {
328341 if (e instanceof ResourceAlreadyExistsException ) {
329342 e = new ElasticsearchStatusException (
330343 "Cannot create job [" + job .getConfig ().getId () + "] because it has already been created (task exists)" ,
@@ -338,11 +351,13 @@ static void startPersistentTask(
338351 }
339352
340353 private static void waitForRollupStarted (
354+ ProjectId projectId ,
341355 RollupJob job ,
342356 ActionListener <AcknowledgedResponse > listener ,
343357 PersistentTasksService persistentTasksService
344358 ) {
345359 persistentTasksService .waitForPersistentTaskCondition (
360+ projectId ,
346361 job .getConfig ().getId (),
347362 Objects ::nonNull ,
348363 job .getConfig ().getTimeout (),
@@ -369,9 +384,9 @@ public void onTimeout(TimeValue timeout) {
369384 );
370385 }
371386
372- static boolean hasRollupIndices (Metadata metadata ) throws IOException {
387+ static boolean hasRollupIndices (ProjectMetadata project ) throws IOException {
373388 // Sniffing logic instead of invoking sourceAsMap(), which would materialize the entire mapping as map of maps.
374- for (var imd : metadata . getProject () ) {
389+ for (var imd : project ) {
375390 if (imd .mapping () == null ) {
376391 continue ;
377392 }
0 commit comments