Skip to content

Commit d62e9e5

Browse files
mattkimeakowalska622
authored andcommitted
[upgrade assistant] Stop rollup jobs before reindexing - forwardport to 9.1 (elastic#218049)
## Summary forward port of elastic#212815 --- This PR improve support for rollup indices. Rollup indices can be handled like normal indices but jobs should be stopped before reindexing begins or index is marked read only. Also handles case where the rollup job is already stopped. To review: Mark the following read only and make sure rollup jobs are handled as appropriate: Rollup index with and without job running, normal index. Follow up to elastic#212592 and elastic#214656 Closes: elastic#211850
1 parent 0479696 commit d62e9e5

File tree

12 files changed

+118
-15
lines changed

12 files changed

+118
-15
lines changed

x-pack/platform/plugins/private/upgrade_assistant/common/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ export interface ReindexOperation {
115115
errorMessage: string | null;
116116
// This field is only used for the singleton IndexConsumerType documents.
117117
runningReindexCount: number | null;
118+
rollupJob?: string;
118119

119120
/**
120121
* The original index settings to set after reindex is completed.

x-pack/platform/plugins/private/upgrade_assistant/server/lib/reindexing/reindex_actions.test.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
*/
77

88
import { SavedObjectsErrorHelpers } from '@kbn/core/server';
9-
import { elasticsearchServiceMock } from '@kbn/core/server/mocks';
9+
import { elasticsearchServiceMock, loggingSystemMock } from '@kbn/core/server/mocks';
1010
import type { ScopedClusterClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
1111
import moment from 'moment';
1212

@@ -22,10 +22,15 @@ import { getMockVersionInfo } from '../__fixtures__/version';
2222

2323
const { currentMajor, prevMajor } = getMockVersionInfo();
2424

25+
jest.mock('../rollup_job', () => ({
26+
getRollupJobByIndexName: jest.fn(),
27+
}));
28+
2529
describe('ReindexActions', () => {
2630
let client: jest.Mocked<any>;
2731
let clusterClient: ScopedClusterClientMock;
2832
let actions: ReindexActions;
33+
const log = loggingSystemMock.createLogger();
2934

3035
const unimplemented = (name: string) => () =>
3136
Promise.reject(`Mock function ${name} was not implemented!`);
@@ -45,7 +50,7 @@ describe('ReindexActions', () => {
4550
) as any,
4651
};
4752
clusterClient = elasticsearchServiceMock.createScopedClusterClient();
48-
actions = reindexActionsFactory(client, clusterClient.asCurrentUser);
53+
actions = reindexActionsFactory(client, clusterClient.asCurrentUser, log);
4954
});
5055

5156
describe('createReindexOp', () => {

x-pack/platform/plugins/private/upgrade_assistant/server/lib/reindexing/reindex_actions.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import {
1111
SavedObjectsFindResponse,
1212
SavedObjectsClientContract,
1313
ElasticsearchClient,
14+
Logger,
1415
} from '@kbn/core/server';
1516
import {
1617
REINDEX_OP_TYPE,
@@ -22,6 +23,7 @@ import {
2223
} from '../../../common/types';
2324
import { generateNewIndexName } from './index_settings';
2425
import { FlatSettings } from './types';
26+
import { getRollupJobByIndexName } from '../rollup_job';
2527

2628
// TODO: base on elasticsearch.requestTimeout?
2729
export const LOCK_WINDOW = moment.duration(90, 'seconds');
@@ -84,7 +86,8 @@ export interface ReindexActions {
8486

8587
export const reindexActionsFactory = (
8688
client: SavedObjectsClientContract,
87-
esClient: ElasticsearchClient
89+
esClient: ElasticsearchClient,
90+
log: Logger
8891
): ReindexActions => {
8992
// ----- Internal functions
9093
const isLocked = (reindexOp: ReindexSavedObject) => {
@@ -125,6 +128,9 @@ export const reindexActionsFactory = (
125128
// ----- Public interface
126129
return {
127130
async createReindexOp(indexName: string, opts?: ReindexOptions) {
131+
// gets rollup job if it exists and needs stopping, otherwise returns undefined
132+
const rollupJob = await getRollupJobByIndexName(esClient, log, indexName);
133+
128134
return client.create<ReindexOperation>(REINDEX_OP_TYPE, {
129135
indexName,
130136
newIndexName: generateNewIndexName(indexName),
@@ -136,6 +142,7 @@ export const reindexActionsFactory = (
136142
errorMessage: null,
137143
runningReindexCount: null,
138144
reindexOptions: opts,
145+
rollupJob,
139146
});
140147
},
141148

x-pack/platform/plugins/private/upgrade_assistant/server/lib/reindexing/reindex_service.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,12 @@ export const reindexServiceFactory = (
169169
* @param reindexOp
170170
*/
171171
const setReadonly = async (reindexOp: ReindexSavedObject) => {
172-
const { indexName } = reindexOp.attributes;
172+
const { indexName, rollupJob } = reindexOp.attributes;
173+
174+
if (rollupJob) {
175+
await esClient.rollup.stopJob({ id: rollupJob, wait_for_completion: true });
176+
}
177+
173178
const putReadonly = await esClient.indices.putSettings({
174179
index: indexName,
175180
body: { blocks: { write: true } },
@@ -428,6 +433,11 @@ export const reindexServiceFactory = (
428433
await esClient.indices.close({ index: indexName });
429434
}
430435

436+
if (reindexOp.attributes.rollupJob) {
437+
// start the rollup job. rollupJob is undefined if the rollup job is stopped
438+
await esClient.rollup.startJob({ id: reindexOp.attributes.rollupJob });
439+
}
440+
431441
return actions.updateReindexOp(reindexOp, {
432442
lastCompletedStep: ReindexStep.aliasCreated,
433443
});

x-pack/platform/plugins/private/upgrade_assistant/server/lib/reindexing/worker.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ export class ReindexWorker {
8888

8989
this.reindexService = reindexServiceFactory(
9090
callAsInternalUser,
91-
reindexActionsFactory(this.client, callAsInternalUser),
91+
reindexActionsFactory(this.client, callAsInternalUser, this.log),
9292
log,
9393
this.licensing
9494
);
@@ -173,7 +173,7 @@ export class ReindexWorker {
173173
const fakeRequest: FakeRequest = { headers: credential };
174174
const scopedClusterClient = this.clusterClient.asScoped(fakeRequest);
175175
const callAsCurrentUser = scopedClusterClient.asCurrentUser;
176-
const actions = reindexActionsFactory(this.client, callAsCurrentUser);
176+
const actions = reindexActionsFactory(this.client, callAsCurrentUser, this.log);
177177
return reindexServiceFactory(callAsCurrentUser, actions, this.log, this.licensing);
178178
};
179179

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
import type { ElasticsearchClient, Logger } from '@kbn/core/server';
9+
import {
10+
RollupGetRollupIndexCapsResponse,
11+
RollupGetJobsResponse,
12+
} from '@elastic/elasticsearch/lib/api/types';
13+
14+
export async function getRollupJobByIndexName(
15+
esClient: ElasticsearchClient,
16+
log: Logger,
17+
index: string
18+
) {
19+
let rollupCaps: RollupGetRollupIndexCapsResponse;
20+
21+
try {
22+
rollupCaps = await esClient.rollup.getRollupIndexCaps({ index }, { ignore: [404] });
23+
// may catch if not found in some circumstances, such as a closed index, etc
24+
// would be nice to handle the error better but little info is provided
25+
} catch (e) {
26+
log.warn(`Get rollup index capabilities failed: ${e}`);
27+
return;
28+
}
29+
30+
const rollupIndices = Object.keys(rollupCaps);
31+
let rollupJob: string | undefined;
32+
33+
// there should only be one job
34+
if (rollupIndices.length === 1) {
35+
rollupJob = rollupCaps[rollupIndices[0]].rollup_jobs[0].job_id;
36+
let jobs: RollupGetJobsResponse;
37+
38+
try {
39+
jobs = await esClient.rollup.getJobs({ id: rollupJob }, { ignore: [404] });
40+
// may catch if not found in some circumstances, such as a closed index, etc
41+
// would be nice to handle the error better but little info is provided
42+
} catch (e) {
43+
log.warn(`Get rollup job failed: ${e}`);
44+
return;
45+
}
46+
47+
// there can only be one job. If its stopped then we don't need rollup handling
48+
if (
49+
// zero jobs shouldn't happen but we can handle it gracefully
50+
jobs.jobs.length === 0 ||
51+
// rollup job is stopped so we can treat it like a regular index
52+
(jobs.jobs.length === 1 && jobs.jobs[0].status.job_state === 'stopped')
53+
) {
54+
rollupJob = undefined;
55+
// this shouldn't be possible but just in case
56+
} else if (jobs.jobs.length > 1) {
57+
throw new Error(`Multiple jobs returned for a single rollup job id: + ${rollupJob}`);
58+
}
59+
// this shouldn't be possible but just in case
60+
} else if (rollupIndices.length > 1) {
61+
throw new Error(`Multiple indices returned for a single index name: + ${index}`);
62+
}
63+
64+
return rollupJob;
65+
}

x-pack/platform/plugins/private/upgrade_assistant/server/lib/update_index/index.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@
55
* 2.0.
66
*/
77

8-
import type { ElasticsearchClient } from '@kbn/core/server';
8+
import type { ElasticsearchClient, Logger } from '@kbn/core/server';
99
import type { UpdateIndexOperation } from '../../../common/update_index';
10+
import { getRollupJobByIndexName } from '../rollup_job';
1011

1112
export interface UpdateIndexParams {
1213
esClient: ElasticsearchClient;
1314
index: string;
1415
operations: UpdateIndexOperation[];
16+
log: Logger;
1517
}
1618

1719
/**
@@ -20,12 +22,18 @@ export interface UpdateIndexParams {
2022
* @param index The index to update
2123
* @param operations The operations to perform on the specified index
2224
*/
23-
export async function updateIndex({ esClient, index, operations }: UpdateIndexParams) {
25+
export async function updateIndex({ esClient, index, operations, log }: UpdateIndexParams) {
2426
for (const operation of operations) {
2527
let res;
2628

2729
switch (operation) {
2830
case 'blockWrite': {
31+
// stop related rollup job if it exists
32+
const rollupJob = await getRollupJobByIndexName(esClient, log, index);
33+
if (rollupJob) {
34+
await esClient.rollup.stopJob({ id: rollupJob, wait_for_completion: true });
35+
}
36+
2937
res = await esClient.indices.addBlock({ index, block: 'write' });
3038
break;
3139
}

x-pack/platform/plugins/private/upgrade_assistant/server/routes/es_deprecations.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ export function registerESDeprecationRoutes({
4141
dataSourceExclusions,
4242
});
4343
const asCurrentUser = client.asCurrentUser;
44-
const reindexActions = reindexActionsFactory(savedObjectsClient, asCurrentUser);
44+
const reindexActions = reindexActionsFactory(savedObjectsClient, asCurrentUser, log);
4545
const reindexService = reindexServiceFactory(asCurrentUser, reindexActions, log, licensing);
4646
const indexNames = [...status.migrationsDeprecations, ...status.enrichedHealthIndicators]
4747
.filter(({ index }) => typeof index !== 'undefined')

x-pack/platform/plugins/private/upgrade_assistant/server/routes/reindex_indices/batch_reindex_indices.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ export function registerBatchReindexIndicesRoutes(
5757
const callAsCurrentUser = esClient.asCurrentUser;
5858
const reindexActions = reindexActionsFactory(
5959
getClient({ includedHiddenTypes: [REINDEX_OP_TYPE] }),
60-
callAsCurrentUser
60+
callAsCurrentUser,
61+
log
6162
);
6263
try {
6364
const inProgressOps = await reindexActions.findAllByStatus(ReindexStatus.inProgress);

x-pack/platform/plugins/private/upgrade_assistant/server/routes/reindex_indices/reindex_handler.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ export const reindexHandler = async ({
4949
security,
5050
}: ReindexHandlerArgs): Promise<ReindexOperation> => {
5151
const callAsCurrentUser = dataClient.asCurrentUser;
52-
const reindexActions = reindexActionsFactory(savedObjects, callAsCurrentUser);
52+
const reindexActions = reindexActionsFactory(savedObjects, callAsCurrentUser, log);
5353
const reindexService = reindexServiceFactory(callAsCurrentUser, reindexActions, log, licensing);
5454

5555
if (!(await reindexService.hasRequiredPrivileges(indexName))) {

0 commit comments

Comments
 (0)