29
29
import org .elasticsearch .cluster .block .ClusterBlockException ;
30
30
import org .elasticsearch .cluster .block .ClusterBlockLevel ;
31
31
import org .elasticsearch .cluster .metadata .MappingMetadata ;
32
- import org .elasticsearch .cluster .metadata .Metadata ;
32
+ import org .elasticsearch .cluster .metadata .ProjectId ;
33
+ import org .elasticsearch .cluster .metadata .ProjectMetadata ;
34
+ import org .elasticsearch .cluster .project .ProjectResolver ;
33
35
import org .elasticsearch .cluster .service .ClusterService ;
34
36
import org .elasticsearch .common .logging .DeprecationCategory ;
35
37
import org .elasticsearch .common .logging .DeprecationLogger ;
@@ -78,6 +80,7 @@ public class TransportPutRollupJobAction extends AcknowledgedTransportMasterNode
78
80
79
81
private final PersistentTasksService persistentTasksService ;
80
82
private final Client client ;
83
+ private final ProjectResolver projectResolver ;
81
84
82
85
@ Inject
83
86
public TransportPutRollupJobAction (
@@ -86,7 +89,8 @@ public TransportPutRollupJobAction(
86
89
ActionFilters actionFilters ,
87
90
ClusterService clusterService ,
88
91
PersistentTasksService persistentTasksService ,
89
- Client client
92
+ Client client ,
93
+ ProjectResolver projectResolver
90
94
) {
91
95
super (
92
96
PutRollupJobAction .NAME ,
@@ -99,7 +103,7 @@ public TransportPutRollupJobAction(
99
103
);
100
104
this .persistentTasksService = persistentTasksService ;
101
105
this .client = client ;
102
-
106
+ this . projectResolver = projectResolver ;
103
107
}
104
108
105
109
@ Override
@@ -113,10 +117,11 @@ protected void masterOperation(
113
117
XPackPlugin .checkReadyForXPackCustomMetadata (clusterState );
114
118
checkForDeprecatedTZ (request );
115
119
116
- int numberOfCurrentRollupJobs = RollupUsageTransportAction .findNumberOfRollupJobs (clusterState .metadata ().getProject ());
120
+ final var project = projectResolver .getProjectMetadata (clusterState );
121
+ int numberOfCurrentRollupJobs = RollupUsageTransportAction .findNumberOfRollupJobs (project );
117
122
if (numberOfCurrentRollupJobs == 0 ) {
118
123
try {
119
- boolean hasRollupIndices = hasRollupIndices (clusterState . getMetadata () );
124
+ boolean hasRollupIndices = hasRollupIndices (project );
120
125
if (hasRollupIndices == false ) {
121
126
listener .onFailure (
122
127
new IllegalArgumentException (
@@ -135,6 +140,7 @@ protected void masterOperation(
135
140
.fields (request .getConfig ().getAllFields ().toArray (new String [0 ]));
136
141
fieldCapsRequest .setParentTask (clusterService .localNode ().getId (), task .getId ());
137
142
143
+ final var projectId = project .id ();
138
144
client .fieldCaps (fieldCapsRequest , listener .delegateFailure ((l , fieldCapabilitiesResponse ) -> {
139
145
ActionRequestValidationException validationException = request .validateMappings (fieldCapabilitiesResponse .get ());
140
146
if (validationException != null ) {
@@ -143,7 +149,7 @@ protected void masterOperation(
143
149
}
144
150
145
151
RollupJob job = createRollupJob (request .getConfig (), threadPool );
146
- createIndex (job , l , persistentTasksService , client , LOGGER );
152
+ createIndex (projectId , job , l , persistentTasksService , client , LOGGER );
147
153
}));
148
154
}
149
155
@@ -177,6 +183,7 @@ private RollupJob createRollupJob(RollupJobConfig config, ThreadPool threadPool)
177
183
}
178
184
179
185
static void createIndex (
186
+ ProjectId projectId ,
180
187
RollupJob job ,
181
188
ActionListener <AcknowledgedResponse > listener ,
182
189
PersistentTasksService persistentTasksService ,
@@ -196,10 +203,10 @@ static void createIndex(
196
203
client .execute (
197
204
TransportCreateIndexAction .TYPE ,
198
205
request ,
199
- ActionListener .wrap (createIndexResponse -> startPersistentTask (job , listener , persistentTasksService ), e -> {
206
+ ActionListener .wrap (createIndexResponse -> startPersistentTask (projectId , job , listener , persistentTasksService ), e -> {
200
207
if (e instanceof ResourceAlreadyExistsException ) {
201
208
logger .debug ("Rolled index already exists for rollup job [" + job .getConfig ().getId () + "], updating metadata." );
202
- updateMapping (job , listener , persistentTasksService , client , logger , request .masterNodeTimeout ());
209
+ updateMapping (projectId , job , listener , persistentTasksService , client , logger , request .masterNodeTimeout ());
203
210
} else {
204
211
String msg = "Could not create index for rollup job [" + job .getConfig ().getId () + "]" ;
205
212
logger .error (msg );
@@ -245,6 +252,7 @@ static XContentBuilder createMappings(RollupJobConfig config) throws IOException
245
252
246
253
@ SuppressWarnings ("unchecked" )
247
254
static void updateMapping (
255
+ ProjectId projectId ,
248
256
RollupJob job ,
249
257
ActionListener <AcknowledgedResponse > listener ,
250
258
PersistentTasksService persistentTasksService ,
@@ -301,7 +309,10 @@ static void updateMapping(
301
309
client .execute (
302
310
TransportPutMappingAction .TYPE ,
303
311
request ,
304
- ActionListener .wrap (putMappingResponse -> startPersistentTask (job , listener , persistentTasksService ), listener ::onFailure )
312
+ ActionListener .wrap (
313
+ putMappingResponse -> startPersistentTask (projectId , job , listener , persistentTasksService ),
314
+ listener ::onFailure
315
+ )
305
316
);
306
317
};
307
318
@@ -314,17 +325,19 @@ static void updateMapping(
314
325
}
315
326
316
327
static void startPersistentTask (
328
+ ProjectId projectId ,
317
329
RollupJob job ,
318
330
ActionListener <AcknowledgedResponse > listener ,
319
331
PersistentTasksService persistentTasksService
320
332
) {
321
333
assertNoAuthorizationHeader (job .getHeaders ());
322
- persistentTasksService .sendStartRequest (
334
+ persistentTasksService .sendProjectStartRequest (
335
+ projectId ,
323
336
job .getConfig ().getId (),
324
337
RollupField .TASK_NAME ,
325
338
job ,
326
339
TimeValue .THIRTY_SECONDS /* TODO should this be configurable? longer by default? infinite? */ ,
327
- ActionListener .wrap (rollupConfigPersistentTask -> waitForRollupStarted (job , listener , persistentTasksService ), e -> {
340
+ ActionListener .wrap (rollupConfigPersistentTask -> waitForRollupStarted (projectId , job , listener , persistentTasksService ), e -> {
328
341
if (e instanceof ResourceAlreadyExistsException ) {
329
342
e = new ElasticsearchStatusException (
330
343
"Cannot create job [" + job .getConfig ().getId () + "] because it has already been created (task exists)" ,
@@ -338,11 +351,13 @@ static void startPersistentTask(
338
351
}
339
352
340
353
private static void waitForRollupStarted (
354
+ ProjectId projectId ,
341
355
RollupJob job ,
342
356
ActionListener <AcknowledgedResponse > listener ,
343
357
PersistentTasksService persistentTasksService
344
358
) {
345
359
persistentTasksService .waitForPersistentTaskCondition (
360
+ projectId ,
346
361
job .getConfig ().getId (),
347
362
Objects ::nonNull ,
348
363
job .getConfig ().getTimeout (),
@@ -369,9 +384,9 @@ public void onTimeout(TimeValue timeout) {
369
384
);
370
385
}
371
386
372
- static boolean hasRollupIndices (Metadata metadata ) throws IOException {
387
+ static boolean hasRollupIndices (ProjectMetadata project ) throws IOException {
373
388
// Sniffing logic instead of invoking sourceAsMap(), which would materialize the entire mapping as map of maps.
374
- for (var imd : metadata . getProject () ) {
389
+ for (var imd : project ) {
375
390
if (imd .mapping () == null ) {
376
391
continue ;
377
392
}
0 commit comments