Skip to content

Commit 5c51c89

Browse files
kibanamachinejesuswrrudolf
authored
[9.1] Use optimistic concurrency in SO migrations (elastic#231406) (elastic#233673)
# Backport This will backport the following commits from `main` to `9.1`: - [Use optimistic concurrency in SO migrations (elastic#231406)](elastic#231406) <!--- Backport version: 9.6.6 --> ### Questions ? Please refer to the [Backport tool documentation](https://github.com/sorenlouv/backport) <!--BACKPORT [{"author":{"name":"Jesus Wahrman","email":"[email protected]"},"sourceCommit":{"committedDate":"2025-09-01T16:42:54Z","message":"Use optimistic concurrency in SO migrations (elastic#231406)\n\n## Summary\n\nResolves https://github.com/elastic/kibana/issues/226518\n\nAdd `seqNoPrimaryTerm: true,` in V2 and ZDT so it uses the optimistic\nconcurrency logic. Also added some tests for this.\n\nWhile doing this, some tests broke, these tests were moving a SO from\nsingle namespace to multi namespace. We can't use the optimistic\nconcurrency logic in this case because we are using the `seq no` and\n`primary term` from the old SO, so when it tries to create a new SO it\nwill fail. That's why we are checking if `document._id !==\ndocument._source.originId;` before deciding if we'll use the optimistic\nconcurrency logic.\n\n### Checklist\n\nCheck the PR satisfies following conditions. \n\nReviewers should verify this PR satisfies this list as well.\n\n- [x] [Unit or functional\ntests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)\nwere updated or added to match the most common scenarios\n- [x] The PR description includes the appropriate Release Notes section,\nand the correct `release_note:*` label is applied per the\n[guidelines](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)\n- [x] Review the [backport\nguidelines](https://docs.google.com/document/d/1VyN5k91e5OVumlc0Gb9RPa3h1ewuPE705nRtioPiTvY/edit?usp=sharing)\nand apply applicable `backport:*` labels.\n\n---------\n\nCo-authored-by: Rudolf Meijering <[email protected]>","sha":"a5805f23728374dc6ec6ee241d7c38aca19762f6","branchLabelMapping":{"^v9.2.0$":"main","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["Team:Core","Feature:Saved Objects","release_note:skip","backport:prev-major","backport:current-major","v9.2.0"],"title":"Use optimistic concurrency in SO migrations","number":231406,"url":"https://github.com/elastic/kibana/pull/231406","mergeCommit":{"message":"Use optimistic concurrency in SO migrations (elastic#231406)\n\n## Summary\n\nResolves https://github.com/elastic/kibana/issues/226518\n\nAdd `seqNoPrimaryTerm: true,` in V2 and ZDT so it uses the optimistic\nconcurrency logic. Also added some tests for this.\n\nWhile doing this, some tests broke, these tests were moving a SO from\nsingle namespace to multi namespace. We can't use the optimistic\nconcurrency logic in this case because we are using the `seq no` and\n`primary term` from the old SO, so when it tries to create a new SO it\nwill fail. That's why we are checking if `document._id !==\ndocument._source.originId;` before deciding if we'll use the optimistic\nconcurrency logic.\n\n### Checklist\n\nCheck the PR satisfies following conditions. \n\nReviewers should verify this PR satisfies this list as well.\n\n- [x] [Unit or functional\ntests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)\nwere updated or added to match the most common scenarios\n- [x] The PR description includes the appropriate Release Notes section,\nand the correct `release_note:*` label is applied per the\n[guidelines](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)\n- [x] Review the [backport\nguidelines](https://docs.google.com/document/d/1VyN5k91e5OVumlc0Gb9RPa3h1ewuPE705nRtioPiTvY/edit?usp=sharing)\nand apply applicable `backport:*` labels.\n\n---------\n\nCo-authored-by: Rudolf Meijering <[email protected]>","sha":"a5805f23728374dc6ec6ee241d7c38aca19762f6"}},"sourceBranch":"main","suggestedTargetBranches":[],"targetPullRequestStates":[{"branch":"main","label":"v9.2.0","branchLabelMappingKey":"^v9.2.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/231406","number":231406,"mergeCommit":{"message":"Use optimistic concurrency in SO migrations (elastic#231406)\n\n## Summary\n\nResolves https://github.com/elastic/kibana/issues/226518\n\nAdd `seqNoPrimaryTerm: true,` in V2 and ZDT so it uses the optimistic\nconcurrency logic. Also added some tests for this.\n\nWhile doing this, some tests broke, these tests were moving a SO from\nsingle namespace to multi namespace. We can't use the optimistic\nconcurrency logic in this case because we are using the `seq no` and\n`primary term` from the old SO, so when it tries to create a new SO it\nwill fail. That's why we are checking if `document._id !==\ndocument._source.originId;` before deciding if we'll use the optimistic\nconcurrency logic.\n\n### Checklist\n\nCheck the PR satisfies following conditions. \n\nReviewers should verify this PR satisfies this list as well.\n\n- [x] [Unit or functional\ntests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)\nwere updated or added to match the most common scenarios\n- [x] The PR description includes the appropriate Release Notes section,\nand the correct `release_note:*` label is applied per the\n[guidelines](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)\n- [x] Review the [backport\nguidelines](https://docs.google.com/document/d/1VyN5k91e5OVumlc0Gb9RPa3h1ewuPE705nRtioPiTvY/edit?usp=sharing)\nand apply applicable `backport:*` labels.\n\n---------\n\nCo-authored-by: Rudolf Meijering <[email protected]>","sha":"a5805f23728374dc6ec6ee241d7c38aca19762f6"}}]}] BACKPORT--> Co-authored-by: Jesus Wahrman <[email protected]> Co-authored-by: Rudolf Meijering <[email protected]>
1 parent 0db04d3 commit 5c51c89

File tree

5 files changed

+216
-2
lines changed

5 files changed

+216
-2
lines changed

src/core/packages/saved-objects/migration-server-internal/src/model/helpers.test.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,45 @@ describe('createBulkIndexOperationTuple', () => {
423423
]
424424
`);
425425
});
426+
427+
it('includes if_seq_no and if_primary_term when originId is not defined', () => {
428+
const document = {
429+
_id: 'doc1',
430+
_seq_no: 10,
431+
_primary_term: 20,
432+
_source: { type: 'cases', title: 'no originId' },
433+
};
434+
const [operation] = createBulkIndexOperationTuple(document);
435+
expect(operation.index).toBeDefined();
436+
expect((operation.index as any).if_seq_no).toBe(10);
437+
expect((operation.index as any).if_primary_term).toBe(20);
438+
});
439+
440+
it('includes if_seq_no and if_primary_term when originId === _id', () => {
441+
const document = {
442+
_id: 'doc2',
443+
_seq_no: 11,
444+
_primary_term: 21,
445+
_source: { type: 'cases', title: 'originId equals _id', originId: 'doc2' },
446+
};
447+
const [operation] = createBulkIndexOperationTuple(document);
448+
expect(operation.index).toBeDefined();
449+
expect((operation.index as any).if_seq_no).toBe(11);
450+
expect((operation.index as any).if_primary_term).toBe(21);
451+
});
452+
453+
it('does NOT include if_seq_no and if_primary_term when originId !== _id', () => {
454+
const document = {
455+
_id: 'doc3',
456+
_seq_no: 12,
457+
_primary_term: 22,
458+
_source: { type: 'cases', title: 'originId not equal _id', originId: 'other-id' },
459+
};
460+
const [operation] = createBulkIndexOperationTuple(document);
461+
expect(operation.index).toBeDefined();
462+
expect((operation.index as any).if_seq_no).toBeUndefined();
463+
expect((operation.index as any).if_primary_term).toBeUndefined();
464+
});
426465
});
427466

428467
describe('getMigrationType', () => {

src/core/packages/saved-objects/migration-server-internal/src/model/helpers.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,7 @@ export const createBulkIndexOperationTuple = (
283283
doc: SavedObjectsRawDoc,
284284
typeIndexMap: Record<string, string> = {}
285285
): BulkIndexOperationTuple => {
286+
const idChanged = doc._source.originId && doc._source.originId !== doc._id;
286287
return [
287288
{
288289
index: {
@@ -292,8 +293,9 @@ export const createBulkIndexOperationTuple = (
292293
}),
293294
// use optimistic concurrency control to ensure that outdated
294295
// documents are only overwritten once with the latest version
295-
...(typeof doc._seq_no !== 'undefined' && { if_seq_no: doc._seq_no }),
296-
...(typeof doc._primary_term !== 'undefined' && { if_primary_term: doc._primary_term }),
296+
...(typeof doc._seq_no !== 'undefined' && !idChanged && { if_seq_no: doc._seq_no }),
297+
...(typeof doc._primary_term !== 'undefined' &&
298+
!idChanged && { if_primary_term: doc._primary_term }),
297299
},
298300
},
299301
doc._source,

src/core/packages/saved-objects/migration-server-internal/src/next.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@ export const nextActionMap = (
257257
batchSize: state.batchSize,
258258
searchAfter: state.lastHitSortValue,
259259
maxResponseSizeBytes: state.maxReadBatchSizeBytes,
260+
seqNoPrimaryTerm: true,
260261
}),
261262
OUTDATED_DOCUMENTS_SEARCH_CLOSE_PIT: (state: OutdatedDocumentsSearchClosePit) =>
262263
Actions.closePit({ client, pitId: state.pitId }),

src/core/packages/saved-objects/migration-server-internal/src/zdt/next.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ export const nextActionMap = (context: MigratorContext) => {
149149
searchAfter: state.lastHitSortValue,
150150
batchSize: context.migrationConfig.batchSize,
151151
query: state.outdatedDocumentsQuery,
152+
seqNoPrimaryTerm: true,
152153
}),
153154
OUTDATED_DOCUMENTS_SEARCH_TRANSFORM: (state: OutdatedDocumentsSearchTransformState) =>
154155
Actions.transformDocs({
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
import Path from 'path';
11+
import fs from 'fs/promises';
12+
import { range } from 'lodash';
13+
import { type TestElasticsearchUtils } from '@kbn/core-test-helpers-kbn-server';
14+
import type { SavedObjectsBulkCreateObject } from '@kbn/core-saved-objects-api-server';
15+
import '../jest_matchers';
16+
import { getKibanaMigratorTestKit, startElasticsearch } from '../kibana_migrator_test_kit';
17+
import { parseLogFile } from '../test_utils';
18+
import { getBaseMigratorParams, getSampleAType } from '../fixtures/zdt_base.fixtures';
19+
import type {
20+
SavedObjectModelTransformationDoc,
21+
SavedObjectModelUnsafeTransformFn,
22+
} from '@kbn/core-saved-objects-server';
23+
24+
export const logFilePath = Path.join(__dirname, 'optimistic_concurrency.test.log');
25+
26+
interface TestSOType {
27+
boolean: boolean;
28+
keyword: string;
29+
}
30+
31+
describe('ZDT & V2 upgrades - optimistic concurrency tests', () => {
32+
let esServer: TestElasticsearchUtils['es'];
33+
34+
beforeAll(async () => {
35+
esServer = await startElasticsearch();
36+
});
37+
38+
afterAll(async () => {
39+
await esServer?.stop();
40+
});
41+
42+
beforeEach(async () => {
43+
await fs.unlink(logFilePath).catch(() => {});
44+
jest.clearAllMocks();
45+
});
46+
47+
it.each(['v2', 'zdt'] as const)(
48+
'doesnt overwrite changes made while migrating (%s)',
49+
async (migrationAlgorithm) => {
50+
const { runMigrations, savedObjectsRepository, client } = await prepareScenario(
51+
migrationAlgorithm
52+
);
53+
54+
const originalBulkImplementation = client.bulk;
55+
const spy = jest.spyOn(client, 'bulk');
56+
spy.mockImplementation(function (this: typeof client, ...args) {
57+
// let's run some updates before we run the bulk operations
58+
return Promise.all(
59+
['a-0', 'a-3', 'a-4'].map((id) =>
60+
savedObjectsRepository.update('sample_a', id, {
61+
keyword: 'concurrent update that shouldnt be overwritten',
62+
})
63+
)
64+
).then(() => {
65+
return originalBulkImplementation.apply(this, args);
66+
});
67+
});
68+
69+
await runMigrations();
70+
71+
const records = await parseLogFile(logFilePath);
72+
expect(records).toContainLogEntry('-> DONE');
73+
74+
const { saved_objects: sampleADocs } = await savedObjectsRepository.find<TestSOType>({
75+
type: 'sample_a',
76+
});
77+
78+
expect(
79+
sampleADocs
80+
.map((doc) => ({
81+
id: doc.id,
82+
keyword: doc.attributes.keyword,
83+
}))
84+
.sort((a, b) => a.id.localeCompare(b.id))
85+
).toMatchInlineSnapshot(`
86+
Array [
87+
Object {
88+
"id": "a-0",
89+
"keyword": "concurrent update that shouldnt be overwritten",
90+
},
91+
Object {
92+
"id": "a-1",
93+
"keyword": "updated by the migrator",
94+
},
95+
Object {
96+
"id": "a-2",
97+
"keyword": "updated by the migrator",
98+
},
99+
Object {
100+
"id": "a-3",
101+
"keyword": "concurrent update that shouldnt be overwritten",
102+
},
103+
Object {
104+
"id": "a-4",
105+
"keyword": "concurrent update that shouldnt be overwritten",
106+
},
107+
]
108+
`);
109+
}
110+
);
111+
112+
const prepareScenario = async (migrationAlgorithm: 'zdt' | 'v2') => {
113+
await createBaseline();
114+
115+
const typeA = getSampleAType();
116+
117+
const transformFunc: SavedObjectModelUnsafeTransformFn<TestSOType, TestSOType> = (
118+
doc: SavedObjectModelTransformationDoc<TestSOType>
119+
) => {
120+
const attributes = {
121+
...doc.attributes,
122+
keyword: 'updated by the migrator',
123+
};
124+
return { document: { ...doc, attributes } };
125+
};
126+
typeA.modelVersions = {
127+
...typeA.modelVersions,
128+
'2': {
129+
changes: [
130+
{
131+
type: 'unsafe_transform',
132+
transformFn: (typeSafeGuard) => typeSafeGuard(transformFunc),
133+
},
134+
],
135+
},
136+
};
137+
138+
const { runMigrations, client, savedObjectsRepository } = await getKibanaMigratorTestKit({
139+
...getBaseMigratorParams({ migrationAlgorithm }),
140+
logFilePath,
141+
types: [typeA],
142+
});
143+
144+
return { runMigrations, client, savedObjectsRepository };
145+
};
146+
147+
const createBaseline = async () => {
148+
const { runMigrations, savedObjectsRepository, client } = await getKibanaMigratorTestKit({
149+
...getBaseMigratorParams(),
150+
types: [getSampleAType()],
151+
});
152+
153+
try {
154+
await client.indices.delete({ index: '.kibana_1' });
155+
} catch (e) {
156+
/* index wasn't created, that's fine */
157+
}
158+
159+
await runMigrations();
160+
161+
const sampleAObjs = range(5).map<SavedObjectsBulkCreateObject<TestSOType>>((number) => ({
162+
id: `a-${number}`,
163+
type: 'sample_a',
164+
attributes: {
165+
keyword: `a_${number}`,
166+
boolean: true,
167+
},
168+
}));
169+
await savedObjectsRepository.bulkCreate<TestSOType>(sampleAObjs);
170+
};
171+
});

0 commit comments

Comments
 (0)