Skip to content

Commit a1cd7fe

Browse files
committed
[Reporting] roll over the reporting data stream when the template has changed (elastic#234119)
resolves: elastic#231200 This PR adds code run at startup to check the version in the reporting index template against the `_meta.template_version` value stamped into each backing index's mappings via elastic/elasticsearch#133846 If it determines there are existing reporting indices that have not been created with the current index template version, it will roll over the reporting data stream, to ensure future indexing will use the latest mappings. This is done with the "lazy" option, which will perform the roll over on the next write to the datastream. This will prevent situations where multiple Kibanas restarting at the same time would roll over multiple times. --------- Co-authored-by: Elastic Machine <[email protected]> (cherry picked from commit c89ba82) # Conflicts: # .github/CODEOWNERS # x-pack/test/reporting_api_integration/reporting_without_security.config.ts # x-pack/test/reporting_api_integration/reporting_without_security/index.ts # x-pack/test/reporting_api_integration/reporting_without_security/roll_datastream.ts # x-pack/test/tsconfig.json
1 parent 9ee3ed8 commit a1cd7fe

File tree

18 files changed

+876
-5
lines changed

18 files changed

+876
-5
lines changed

.github/CODEOWNERS

Lines changed: 196 additions & 0 deletions
Large diffs are not rendered by default.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -765,6 +765,7 @@
765765
"@kbn/reporting-plugin": "link:x-pack/platform/plugins/private/reporting",
766766
"@kbn/reporting-public": "link:src/platform/packages/private/kbn-reporting/public",
767767
"@kbn/reporting-server": "link:src/platform/packages/private/kbn-reporting/server",
768+
"@kbn/reporting-test-routes": "link:x-pack/platform/test/reporting_api_integration/plugins/reporting_test_routes",
768769
"@kbn/resizable-layout": "link:src/platform/packages/shared/kbn-resizable-layout",
769770
"@kbn/resizable-layout-examples-plugin": "link:examples/resizable_layout_examples",
770771
"@kbn/resolver-test-plugin": "link:x-pack/test/plugin_functional/plugins/resolver_test",

src/platform/packages/private/kbn-reporting/server/constants.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ export const REPORTING_LEGACY_INDICES = '.reporting-*';
2323
export const REPORTING_DATA_STREAM_WILDCARD_WITH_LEGACY = '.reporting-*,.kibana-reporting*';
2424
// Name of component template which Kibana overrides for lifecycle settings
2525
export const REPORTING_DATA_STREAM_COMPONENT_TEMPLATE = 'kibana-reporting@custom';
26+
// Name of index template
27+
export const REPORTING_DATA_STREAM_INDEX_TEMPLATE = '.kibana-reporting';
28+
// Name of mapping meta field which contains the version of the index template
29+
// see: https://github.com/elastic/elasticsearch/pull/133846
30+
export const REPORTING_INDEX_TEMPLATE_MAPPING_META_FIELD = 'template_version';
2631

2732
/*
2833
* Telemetry

tsconfig.base.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1510,6 +1510,8 @@
15101510
"@kbn/reporting-public/*": ["src/platform/packages/private/kbn-reporting/public/*"],
15111511
"@kbn/reporting-server": ["src/platform/packages/private/kbn-reporting/server"],
15121512
"@kbn/reporting-server/*": ["src/platform/packages/private/kbn-reporting/server/*"],
1513+
"@kbn/reporting-test-routes": ["x-pack/platform/test/reporting_api_integration/plugins/reporting_test_routes"],
1514+
"@kbn/reporting-test-routes/*": ["x-pack/platform/test/reporting_api_integration/plugins/reporting_test_routes/*"],
15131515
"@kbn/resizable-layout": ["src/platform/packages/shared/kbn-resizable-layout"],
15141516
"@kbn/resizable-layout/*": ["src/platform/packages/shared/kbn-resizable-layout/*"],
15151517
"@kbn/resizable-layout-examples-plugin": ["examples/resizable_layout_examples"],
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
import { elasticsearchServiceMock } from '@kbn/core/server/mocks';
9+
import { loggingSystemMock } from '@kbn/core/server/mocks';
10+
import {
11+
REPORTING_DATA_STREAM_ALIAS,
12+
REPORTING_DATA_STREAM_INDEX_TEMPLATE,
13+
REPORTING_INDEX_TEMPLATE_MAPPING_META_FIELD,
14+
} from '@kbn/reporting-server';
15+
import type {
16+
IndicesGetIndexTemplateIndexTemplateItem,
17+
IndicesGetMappingResponse,
18+
} from '@elastic/elasticsearch/lib/api/types';
19+
20+
import { rollDataStreamIfRequired } from './rollover';
21+
22+
describe('rollDataStreamIfRequired', () => {
23+
const mockLogger = loggingSystemMock.createLogger();
24+
let mockEsClient: ReturnType<typeof elasticsearchServiceMock.createElasticsearchClient>;
25+
26+
beforeEach(async () => {
27+
mockEsClient = elasticsearchServiceMock.createElasticsearchClient();
28+
});
29+
30+
const msgPrefix = `Data stream ${REPORTING_DATA_STREAM_ALIAS}`;
31+
const skipMessage = 'does not need to be rolled over';
32+
const rollMessage = 'rolling over the data stream';
33+
34+
beforeEach(async () => {
35+
jest.clearAllMocks();
36+
});
37+
38+
it('does nothing if there is no data stream', async () => {
39+
mockEsClient.indices.exists.mockResponse(false);
40+
await rollDataStreamIfRequired(mockLogger, mockEsClient);
41+
42+
expect(mockEsClient.indices.exists).toHaveBeenCalledWith({
43+
index: REPORTING_DATA_STREAM_ALIAS,
44+
expand_wildcards: 'all',
45+
});
46+
expect(mockLogger.debug).toHaveBeenCalledWith(`${msgPrefix} does not exist so ${skipMessage}`);
47+
expect(mockEsClient.indices.getIndexTemplate).not.toHaveBeenCalled();
48+
expect(mockEsClient.indices.getMapping).not.toHaveBeenCalled();
49+
expect(mockEsClient.indices.rollover).not.toHaveBeenCalled();
50+
});
51+
52+
it('throws an error if no index template is returned', async () => {
53+
mockEsClient.indices.exists.mockResponse(true);
54+
mockEsClient.indices.getIndexTemplate.mockResponse({ index_templates: [] });
55+
const err = `${msgPrefix} index template ${REPORTING_DATA_STREAM_INDEX_TEMPLATE} not found`;
56+
await expect(rollDataStreamIfRequired(mockLogger, mockEsClient)).rejects.toThrow(err);
57+
58+
expect(mockEsClient.indices.getIndexTemplate).toHaveBeenCalledWith({
59+
name: REPORTING_DATA_STREAM_INDEX_TEMPLATE,
60+
});
61+
expect(mockEsClient.indices.getMapping).not.toHaveBeenCalled();
62+
expect(mockEsClient.indices.rollover).not.toHaveBeenCalled();
63+
});
64+
65+
it('throws an error if there is no index template with a version', async () => {
66+
mockEsClient.indices.exists.mockResponse(true);
67+
const templateWithoutVersion = getBasicIndexTemplate();
68+
delete templateWithoutVersion.index_template.version;
69+
mockEsClient.indices.getIndexTemplate.mockResponse({
70+
index_templates: [templateWithoutVersion],
71+
});
72+
73+
const err = `${msgPrefix} index template ${REPORTING_DATA_STREAM_INDEX_TEMPLATE} does not have a version field`;
74+
await expect(rollDataStreamIfRequired(mockLogger, mockEsClient)).rejects.toThrow(err);
75+
76+
expect(mockEsClient.indices.getMapping).not.toHaveBeenCalled();
77+
expect(mockEsClient.indices.rollover).not.toHaveBeenCalled();
78+
});
79+
80+
it('does nothing if there are no mappings on the backing indices', async () => {
81+
mockEsClient.indices.exists.mockResponse(true);
82+
mockEsClient.indices.getIndexTemplate.mockResponse({
83+
index_templates: [getBasicIndexTemplate()],
84+
});
85+
mockEsClient.indices.getMapping.mockResponse({});
86+
await rollDataStreamIfRequired(mockLogger, mockEsClient);
87+
88+
const msg = `${msgPrefix} has no backing indices so ${skipMessage}`;
89+
expect(mockLogger.debug).toHaveBeenCalledWith(msg);
90+
expect(mockEsClient.indices.rollover).not.toHaveBeenCalled();
91+
});
92+
93+
it('rolls over the data stream if there are no versions in the backing index mappings', async () => {
94+
mockEsClient.indices.exists.mockResponse(true);
95+
mockEsClient.indices.getIndexTemplate.mockResponse({
96+
index_templates: [getBasicIndexTemplate()],
97+
});
98+
const mappings: IndicesGetMappingResponse = {
99+
indexName: {
100+
mappings: { _meta: {} },
101+
},
102+
};
103+
mockEsClient.indices.getMapping.mockResponse(mappings);
104+
await rollDataStreamIfRequired(mockLogger, mockEsClient);
105+
106+
const msg = `${msgPrefix} has no mapping versions so ${rollMessage}`;
107+
expect(mockLogger.info).toHaveBeenCalledWith(msg);
108+
expect(mockEsClient.indices.rollover).toHaveBeenCalled();
109+
});
110+
111+
it('rolls over the data stream if the index template version is newer than the backing index mappings versions', async () => {
112+
mockEsClient.indices.exists.mockResponse(true);
113+
mockEsClient.indices.getIndexTemplate.mockResponse({
114+
index_templates: [getBasicIndexTemplate()],
115+
});
116+
const mappings: IndicesGetMappingResponse = {
117+
indexName: {
118+
mappings: { _meta: { [REPORTING_INDEX_TEMPLATE_MAPPING_META_FIELD]: 41 } },
119+
},
120+
};
121+
mockEsClient.indices.getMapping.mockResponse(mappings);
122+
await rollDataStreamIfRequired(mockLogger, mockEsClient);
123+
124+
const msg = `${msgPrefix} has older mappings than the template so ${rollMessage}`;
125+
expect(mockLogger.info).toHaveBeenCalledWith(msg);
126+
expect(mockEsClient.indices.rollover).toHaveBeenCalled();
127+
});
128+
129+
it('throws an error if the index template version is older than the backing index mappings versions', async () => {
130+
mockEsClient.indices.exists.mockResponse(true);
131+
mockEsClient.indices.getIndexTemplate.mockResponse({
132+
index_templates: [getBasicIndexTemplate()],
133+
});
134+
const mappings: IndicesGetMappingResponse = {
135+
indexName: {
136+
mappings: { _meta: { [REPORTING_INDEX_TEMPLATE_MAPPING_META_FIELD]: 43 } },
137+
},
138+
};
139+
mockEsClient.indices.getMapping.mockResponse(mappings);
140+
const err = `${msgPrefix} has newer mappings than the template`;
141+
await expect(rollDataStreamIfRequired(mockLogger, mockEsClient)).rejects.toThrow(err);
142+
143+
expect(mockEsClient.indices.rollover).not.toHaveBeenCalled();
144+
});
145+
146+
it('does nothing if the index template version is not newer than the backing index mapping versions', async () => {
147+
mockEsClient.indices.exists.mockResponse(true);
148+
mockEsClient.indices.getIndexTemplate.mockResponse({
149+
index_templates: [getBasicIndexTemplate()],
150+
});
151+
const mappings: IndicesGetMappingResponse = {
152+
indexName: {
153+
mappings: { _meta: { [REPORTING_INDEX_TEMPLATE_MAPPING_META_FIELD]: 42 } },
154+
},
155+
};
156+
mockEsClient.indices.getMapping.mockResponse(mappings);
157+
await rollDataStreamIfRequired(mockLogger, mockEsClient);
158+
159+
const msg = `${msgPrefix} has latest mappings applied so ${skipMessage}`;
160+
expect(mockLogger.debug).toHaveBeenCalledWith(msg);
161+
expect(mockEsClient.indices.rollover).not.toHaveBeenCalled();
162+
});
163+
});
164+
165+
function getBasicIndexTemplate(): IndicesGetIndexTemplateIndexTemplateItem {
166+
return {
167+
name: REPORTING_DATA_STREAM_INDEX_TEMPLATE,
168+
index_template: {
169+
index_patterns: ['ignored'],
170+
composed_of: ['ignored'],
171+
version: 42,
172+
},
173+
};
174+
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
import {
9+
REPORTING_DATA_STREAM_ALIAS,
10+
REPORTING_DATA_STREAM_INDEX_TEMPLATE,
11+
REPORTING_INDEX_TEMPLATE_MAPPING_META_FIELD,
12+
} from '@kbn/reporting-server';
13+
import type { ElasticsearchClient, Logger } from '@kbn/core/server';
14+
15+
export async function rollDataStreamIfRequired(
16+
logger: Logger,
17+
esClient: ElasticsearchClient
18+
): Promise<boolean> {
19+
const msgPrefix = `Data stream ${REPORTING_DATA_STREAM_ALIAS}`;
20+
const skipMessage = 'does not need to be rolled over';
21+
const rollMessage = 'rolling over the data stream';
22+
// easy way to change debug log level when debugging
23+
const debug = (msg: string) => logger.debug(msg);
24+
25+
const exists = await esClient.indices.exists({
26+
index: REPORTING_DATA_STREAM_ALIAS,
27+
expand_wildcards: 'all',
28+
});
29+
30+
if (!exists) {
31+
debug(`${msgPrefix} does not exist so ${skipMessage}`);
32+
return false;
33+
}
34+
35+
const gotTemplate = await esClient.indices.getIndexTemplate({
36+
name: REPORTING_DATA_STREAM_INDEX_TEMPLATE,
37+
});
38+
if (gotTemplate.index_templates.length === 0) {
39+
throw new Error(
40+
`${msgPrefix} index template ${REPORTING_DATA_STREAM_INDEX_TEMPLATE} not found`
41+
);
42+
}
43+
44+
const templateVersions: number[] = [];
45+
for (const template of gotTemplate.index_templates) {
46+
const templateVersion = template.index_template.version;
47+
if (templateVersion) templateVersions.push(templateVersion);
48+
}
49+
50+
if (templateVersions.length === 0) {
51+
throw new Error(
52+
`${msgPrefix} index template ${REPORTING_DATA_STREAM_INDEX_TEMPLATE} does not have a version field`
53+
);
54+
}
55+
56+
// assume the highest version is the one in use
57+
const templateVersion = Math.max(...templateVersions);
58+
debug(`${msgPrefix} template version: ${templateVersion}`);
59+
60+
const mappings = await esClient.indices.getMapping({
61+
index: REPORTING_DATA_STREAM_ALIAS,
62+
allow_no_indices: true,
63+
expand_wildcards: 'all',
64+
});
65+
66+
const mappingsArray = Object.values(mappings);
67+
if (mappingsArray.length === 0) {
68+
debug(`${msgPrefix} has no backing indices so ${skipMessage}`);
69+
return false;
70+
}
71+
72+
// get the value of _meta.template_version from each index's mappings
73+
const mappingsVersions = mappingsArray
74+
.map((m) => m.mappings._meta?.[REPORTING_INDEX_TEMPLATE_MAPPING_META_FIELD])
75+
.filter((a: any): a is number => typeof a === 'number');
76+
77+
const mappingsVersion = mappingsVersions.length === 0 ? undefined : Math.max(...mappingsVersions);
78+
debug(`${msgPrefix} mappings version: ${mappingsVersion ?? '<none>'}`);
79+
80+
if (mappingsVersion === undefined) {
81+
// no mapping version found on any indices
82+
logger.info(`${msgPrefix} has no mapping versions so ${rollMessage}`);
83+
} else if (mappingsVersion < templateVersion) {
84+
// all mappings are old
85+
logger.info(`${msgPrefix} has older mappings than the template so ${rollMessage}`);
86+
} else if (mappingsVersion > templateVersion) {
87+
// newer mappings than the template shouldn't happen
88+
throw new Error(`${msgPrefix} has newer mappings than the template`);
89+
} else {
90+
// latest mappings already applied
91+
debug(`${msgPrefix} has latest mappings applied so ${skipMessage}`);
92+
return false;
93+
}
94+
95+
// Roll over the data stream to pick up the new mappings.
96+
// The `lazy` option will cause the rollover to run on the next write.
97+
// This limits potential race conditions of multiple Kibana's rolling over at once.
98+
await esClient.indices.rollover({
99+
alias: REPORTING_DATA_STREAM_ALIAS,
100+
lazy: true,
101+
});
102+
103+
logger.info(`${msgPrefix} rolled over to pick up index template version ${templateVersion}`);
104+
return true;
105+
}

x-pack/platform/plugins/private/reporting/server/lib/store/store.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import type { ReportingCore } from '../..';
2525
import type { ReportTaskParams } from '../tasks';
2626
import { IlmPolicyManager } from './ilm_policy_manager';
2727
import { MIGRATION_VERSION } from './report';
28+
import { rollDataStreamIfRequired } from './rollover';
2829

2930
type UpdateResponse<T> = estypes.UpdateResponse<T>;
3031
type IndexResponse = estypes.IndexResponse;
@@ -165,10 +166,20 @@ export class ReportingStore {
165166
await this.createIlmPolicy();
166167
}
167168
} catch (e) {
168-
this.logger.error('Error in start phase');
169-
this.logger.error(e);
169+
this.logger.error(`Error creating ILM policy: ${e.message}`, {
170+
error: { stack_trace: e.stack },
171+
});
170172
throw e;
171173
}
174+
175+
try {
176+
await rollDataStreamIfRequired(this.logger, await this.getClient());
177+
} catch (e) {
178+
this.logger.error(`Error rolling over data stream: ${e.message}`, {
179+
error: { stack_trace: e.stack },
180+
});
181+
// not rethrowing, as this is not a fatal error
182+
}
172183
}
173184

174185
public async addReport(report: Report): Promise<SavedReport> {
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
{
2+
"type": "plugin",
3+
"id": "@kbn/reporting-test-routes",
4+
"owner": "@elastic/response-ops",
5+
"visibility": "private",
6+
"plugin": {
7+
"id": "reportingTestRoutes",
8+
"server": true,
9+
"browser": false,
10+
"requiredPlugins": [
11+
"reporting",
12+
],
13+
"optionalPlugins": [
14+
]
15+
}
16+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
{
2+
"name": "@kbn/reporting-test-routes",
3+
"version": "1.0.0",
4+
"kibana": {
5+
"version": "kibana",
6+
"templateVersion": "1.0.0"
7+
},
8+
"main": "target/test/reporting_api_integration/plugins/reporting_api_integration",
9+
"scripts": {
10+
"kbn": "node ../../../../../../scripts/kbn.js",
11+
"build": "rm -rf './target' && ../../../../../../node_modules/.bin/tsc"
12+
},
13+
"license": "Elastic License 2.0"
14+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
import type { PluginInitializerContext } from '@kbn/core/server';
9+
import { TestPlugin } from './plugin';
10+
11+
export const plugin = async (initContext: PluginInitializerContext) => new TestPlugin(initContext);

0 commit comments

Comments
 (0)