Skip to content

Commit c776bc4

Browse files
mattkimesabarasaba
andauthored
[upgrade assistant] Stop rollup jobs before reindexing (elastic#212815)
This PR improve support for rollup indices. Rollup indices can be handled like normal indices but jobs should be stopped before reindexing begins or index is marked read only. Also handles case where the rollup job is already stopped. To review: Mark the following read only and make sure rollup jobs are handled as appropriate: Rollup index with and without job running, normal index. Follow up to elastic#212592 and elastic#214656 Closes: elastic#211850 --------- Co-authored-by: Ignacio Rivas <[email protected]>
1 parent a776287 commit c776bc4

File tree

13 files changed

+113
-15
lines changed

13 files changed

+113
-15
lines changed

x-pack/platform/plugins/private/upgrade_assistant/common/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ export interface ReindexOperation {
115115
errorMessage: string | null;
116116
// This field is only used for the singleton IndexConsumerType documents.
117117
runningReindexCount: number | null;
118+
rollupJob?: string;
118119

119120
/**
120121
* The original index settings to set after reindex is completed.

x-pack/platform/plugins/private/upgrade_assistant/server/lib/reindexing/reindex_actions.test.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
*/
77

88
import { SavedObjectsErrorHelpers } from '@kbn/core/server';
9-
import { elasticsearchServiceMock } from '@kbn/core/server/mocks';
9+
import { elasticsearchServiceMock, loggingSystemMock } from '@kbn/core/server/mocks';
1010
import type { ScopedClusterClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
1111
import moment from 'moment';
1212

@@ -22,10 +22,15 @@ import { getMockVersionInfo } from '../__fixtures__/version';
2222

2323
const { currentMajor, prevMajor } = getMockVersionInfo();
2424

25+
jest.mock('../rollup_job', () => ({
26+
getRollupJobByIndexName: jest.fn(),
27+
}));
28+
2529
describe('ReindexActions', () => {
2630
let client: jest.Mocked<any>;
2731
let clusterClient: ScopedClusterClientMock;
2832
let actions: ReindexActions;
33+
const log = loggingSystemMock.createLogger();
2934

3035
const unimplemented = (name: string) => () =>
3136
Promise.reject(`Mock function ${name} was not implemented!`);
@@ -45,7 +50,7 @@ describe('ReindexActions', () => {
4550
) as any,
4651
};
4752
clusterClient = elasticsearchServiceMock.createScopedClusterClient();
48-
actions = reindexActionsFactory(client, clusterClient.asCurrentUser);
53+
actions = reindexActionsFactory(client, clusterClient.asCurrentUser, log);
4954
});
5055

5156
describe('createReindexOp', () => {

x-pack/platform/plugins/private/upgrade_assistant/server/lib/reindexing/reindex_actions.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import {
1111
SavedObjectsFindResponse,
1212
SavedObjectsClientContract,
1313
ElasticsearchClient,
14+
Logger,
1415
} from '@kbn/core/server';
1516
import {
1617
REINDEX_OP_TYPE,
@@ -22,6 +23,7 @@ import {
2223
} from '../../../common/types';
2324
import { generateNewIndexName } from './index_settings';
2425
import { FlatSettings } from './types';
26+
import { getRollupJobByIndexName } from '../rollup_job';
2527

2628
// TODO: base on elasticsearch.requestTimeout?
2729
export const LOCK_WINDOW = moment.duration(90, 'seconds');
@@ -84,7 +86,8 @@ export interface ReindexActions {
8486

8587
export const reindexActionsFactory = (
8688
client: SavedObjectsClientContract,
87-
esClient: ElasticsearchClient
89+
esClient: ElasticsearchClient,
90+
log: Logger
8891
): ReindexActions => {
8992
// ----- Internal functions
9093
const isLocked = (reindexOp: ReindexSavedObject) => {
@@ -125,6 +128,9 @@ export const reindexActionsFactory = (
125128
// ----- Public interface
126129
return {
127130
async createReindexOp(indexName: string, opts?: ReindexOptions) {
131+
// gets rollup job if it exists and needs stopping, otherwise returns undefined
132+
const rollupJob = await getRollupJobByIndexName(esClient, log, indexName);
133+
128134
return client.create<ReindexOperation>(REINDEX_OP_TYPE, {
129135
indexName,
130136
newIndexName: generateNewIndexName(indexName),
@@ -136,6 +142,7 @@ export const reindexActionsFactory = (
136142
errorMessage: null,
137143
runningReindexCount: null,
138144
reindexOptions: opts,
145+
rollupJob,
139146
});
140147
},
141148

x-pack/platform/plugins/private/upgrade_assistant/server/lib/reindexing/reindex_service.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,12 @@ export const reindexServiceFactory = (
169169
* @param reindexOp
170170
*/
171171
const setReadonly = async (reindexOp: ReindexSavedObject) => {
172-
const { indexName } = reindexOp.attributes;
172+
const { indexName, rollupJob } = reindexOp.attributes;
173+
174+
if (rollupJob) {
175+
await esClient.rollup.stopJob({ id: rollupJob, wait_for_completion: true });
176+
}
177+
173178
const putReadonly = await esClient.indices.putSettings({
174179
index: indexName,
175180
body: { blocks: { write: true } },
@@ -458,6 +463,11 @@ export const reindexServiceFactory = (
458463
await esClient.indices.close({ index: indexName });
459464
}
460465

466+
if (reindexOp.attributes.rollupJob) {
467+
// start the rollup job. rollupJob is undefined if the rollup job is stopped
468+
await esClient.rollup.startJob({ id: reindexOp.attributes.rollupJob });
469+
}
470+
461471
return actions.updateReindexOp(reindexOp, {
462472
lastCompletedStep: ReindexStep.aliasCreated,
463473
});

x-pack/platform/plugins/private/upgrade_assistant/server/lib/reindexing/worker.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ export class ReindexWorker {
9999

100100
this.reindexService = reindexServiceFactory(
101101
callAsInternalUser,
102-
reindexActionsFactory(this.client, callAsInternalUser),
102+
reindexActionsFactory(this.client, callAsInternalUser, this.log),
103103
log,
104104
this.licensing
105105
);
@@ -195,7 +195,7 @@ export class ReindexWorker {
195195
const fakeRequest: FakeRequest = { headers: credential };
196196
const scopedClusterClient = this.clusterClient.asScoped(fakeRequest);
197197
const callAsCurrentUser = scopedClusterClient.asCurrentUser;
198-
const actions = reindexActionsFactory(this.client, callAsCurrentUser);
198+
const actions = reindexActionsFactory(this.client, callAsCurrentUser, this.log);
199199
return reindexServiceFactory(callAsCurrentUser, actions, this.log, this.licensing);
200200
};
201201

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
import type { ElasticsearchClient, Logger } from '@kbn/core/server';
9+
import {
10+
RollupGetRollupIndexCapsResponse,
11+
RollupGetJobsResponse,
12+
} from '@elastic/elasticsearch/lib/api/types';
13+
14+
export async function getRollupJobByIndexName(
15+
esClient: ElasticsearchClient,
16+
log: Logger,
17+
index: string
18+
) {
19+
let rollupCaps: RollupGetRollupIndexCapsResponse;
20+
21+
try {
22+
rollupCaps = await esClient.rollup.getRollupIndexCaps({ index }, { ignore: [404] });
23+
// may catch if not found in some circumstances, such as a closed index, etc
24+
// would be nice to handle the error better but little info is provided
25+
} catch (e) {
26+
log.warn(`Get rollup index capabilities failed: ${e}`);
27+
return;
28+
}
29+
30+
const rollupIndices = Object.keys(rollupCaps);
31+
let rollupJob: string | undefined;
32+
33+
// there should only be one job
34+
if (rollupIndices.length === 1) {
35+
rollupJob = rollupCaps[rollupIndices[0]].rollup_jobs[0].job_id;
36+
let jobs: RollupGetJobsResponse;
37+
38+
try {
39+
jobs = await esClient.rollup.getJobs({ id: rollupJob }, { ignore: [404] });
40+
// may catch if not found in some circumstances, such as a closed index, etc
41+
// would be nice to handle the error better but little info is provided
42+
} catch (e) {
43+
log.warn(`Get rollup job failed: ${e}`);
44+
return;
45+
}
46+
47+
// there can only be one job. If its stopped then we don't need rollup handling
48+
if (
49+
// zero jobs shouldn't happen but we can handle it gracefully
50+
jobs.jobs.length === 0 ||
51+
// rollup job is stopped so we can treat it like a regular index
52+
(jobs.jobs.length === 1 && jobs.jobs[0].status.job_state === 'stopped')
53+
) {
54+
rollupJob = undefined;
55+
// this shouldn't be possible but just in case
56+
} else if (jobs.jobs.length > 1) {
57+
throw new Error(`Multiple jobs returned for a single rollup job id: + ${rollupJob}`);
58+
}
59+
// this shouldn't be possible but just in case
60+
} else if (rollupIndices.length > 1) {
61+
throw new Error(`Multiple indices returned for a single index name: + ${index}`);
62+
}
63+
64+
return rollupJob;
65+
}

x-pack/platform/plugins/private/upgrade_assistant/server/lib/update_index/index.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ describe('updateIndex', () => {
2525
const mockGetReindexWarnings = getReindexWarnings as jest.Mock;
2626
const mockLogger = loggingSystemMock.create().get();
2727
const mockClient = elasticsearchServiceMock.createScopedClusterClient().asCurrentUser;
28+
mockClient.rollup.getRollupIndexCaps.mockResponse({});
2829

2930
beforeEach(() => {
3031
jest.clearAllMocks();

x-pack/platform/plugins/private/upgrade_assistant/server/lib/update_index/index.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,16 @@
55
* 2.0.
66
*/
77

8-
import type { ElasticsearchClient } from '@kbn/core/server';
9-
import type { Logger } from '@kbn/core/server';
8+
import type { ElasticsearchClient, Logger } from '@kbn/core/server';
109
import type { UpdateIndexOperation } from '../../../common/update_index';
10+
import { getRollupJobByIndexName } from '../rollup_job';
1111
import { getReindexWarnings } from '../reindexing/index_settings';
1212

1313
export interface UpdateIndexParams {
1414
esClient: ElasticsearchClient;
1515
index: string;
1616
operations: UpdateIndexOperation[];
17-
log?: Logger;
17+
log: Logger;
1818
}
1919

2020
/**
@@ -30,6 +30,12 @@ export async function updateIndex({ esClient, index, operations, log }: UpdateIn
3030

3131
switch (operation) {
3232
case 'blockWrite': {
33+
// stop related rollup job if it exists
34+
const rollupJob = await getRollupJobByIndexName(esClient, log, index);
35+
if (rollupJob) {
36+
await esClient.rollup.stopJob({ id: rollupJob, wait_for_completion: true });
37+
}
38+
3339
res = await esClient.indices.addBlock({ index, block: 'write' });
3440

3541
await removeDeprecatedSettings(esClient, index, log);

x-pack/platform/plugins/private/upgrade_assistant/server/routes/es_deprecations.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ export function registerESDeprecationRoutes({
4141
dataSourceExclusions,
4242
});
4343
const asCurrentUser = client.asCurrentUser;
44-
const reindexActions = reindexActionsFactory(savedObjectsClient, asCurrentUser);
44+
const reindexActions = reindexActionsFactory(savedObjectsClient, asCurrentUser, log);
4545
const reindexService = reindexServiceFactory(asCurrentUser, reindexActions, log, licensing);
4646
const indexNames = [...status.migrationsDeprecations, ...status.enrichedHealthIndicators]
4747
.filter(({ index }) => typeof index !== 'undefined')

x-pack/platform/plugins/private/upgrade_assistant/server/routes/reindex_indices/batch_reindex_indices.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ export function registerBatchReindexIndicesRoutes(
5757
const callAsCurrentUser = esClient.asCurrentUser;
5858
const reindexActions = reindexActionsFactory(
5959
getClient({ includedHiddenTypes: [REINDEX_OP_TYPE] }),
60-
callAsCurrentUser
60+
callAsCurrentUser,
61+
log
6162
);
6263
try {
6364
const inProgressOps = await reindexActions.findAllByStatus(ReindexStatus.inProgress);

0 commit comments

Comments
 (0)