Skip to content

Commit 845e88e

Browse files
kibanamachinejesuswrrudolf
authored
[9.0] Use optimistic concurrency in SO migrations (elastic#231406) (elastic#233671)
# Backport This will backport the following commits from `main` to `9.0`: - [Use optimistic concurrency in SO migrations (elastic#231406)](elastic#231406) <!--- Backport version: 9.6.6 --> ### Questions ? Please refer to the [Backport tool documentation](https://github.com/sorenlouv/backport) <!--BACKPORT [{"author":{"name":"Jesus Wahrman","email":"[email protected]"},"sourceCommit":{"committedDate":"2025-09-01T16:42:54Z","message":"Use optimistic concurrency in SO migrations (elastic#231406)\n\n## Summary\n\nResolves https://github.com/elastic/kibana/issues/226518\n\nAdd `seqNoPrimaryTerm: true,` in V2 and ZDT so it uses the optimistic\nconcurrency logic. Also added some tests for this.\n\nWhile doing this, some tests broke, these tests were moving a SO from\nsingle namespace to multi namespace. We can't use the optimistic\nconcurrency logic in this case because we are using the `seq no` and\n`primary term` from the old SO, so when it tries to create a new SO it\nwill fail. That's why we are checking if `document._id !==\ndocument._source.originId;` before deciding if we'll use the optimistic\nconcurrency logic.\n\n### Checklist\n\nCheck the PR satisfies following conditions. \n\nReviewers should verify this PR satisfies this list as well.\n\n- [x] [Unit or functional\ntests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)\nwere updated or added to match the most common scenarios\n- [x] The PR description includes the appropriate Release Notes section,\nand the correct `release_note:*` label is applied per the\n[guidelines](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)\n- [x] Review the [backport\nguidelines](https://docs.google.com/document/d/1VyN5k91e5OVumlc0Gb9RPa3h1ewuPE705nRtioPiTvY/edit?usp=sharing)\nand apply applicable `backport:*` labels.\n\n---------\n\nCo-authored-by: Rudolf Meijering <[email protected]>","sha":"a5805f23728374dc6ec6ee241d7c38aca19762f6","branchLabelMapping":{"^v9.2.0$":"main","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["Team:Core","Feature:Saved Objects","release_note:skip","backport:prev-major","backport:current-major","v9.2.0"],"title":"Use optimistic concurrency in SO migrations","number":231406,"url":"https://github.com/elastic/kibana/pull/231406","mergeCommit":{"message":"Use optimistic concurrency in SO migrations (elastic#231406)\n\n## Summary\n\nResolves https://github.com/elastic/kibana/issues/226518\n\nAdd `seqNoPrimaryTerm: true,` in V2 and ZDT so it uses the optimistic\nconcurrency logic. Also added some tests for this.\n\nWhile doing this, some tests broke, these tests were moving a SO from\nsingle namespace to multi namespace. We can't use the optimistic\nconcurrency logic in this case because we are using the `seq no` and\n`primary term` from the old SO, so when it tries to create a new SO it\nwill fail. That's why we are checking if `document._id !==\ndocument._source.originId;` before deciding if we'll use the optimistic\nconcurrency logic.\n\n### Checklist\n\nCheck the PR satisfies following conditions. \n\nReviewers should verify this PR satisfies this list as well.\n\n- [x] [Unit or functional\ntests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)\nwere updated or added to match the most common scenarios\n- [x] The PR description includes the appropriate Release Notes section,\nand the correct `release_note:*` label is applied per the\n[guidelines](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)\n- [x] Review the [backport\nguidelines](https://docs.google.com/document/d/1VyN5k91e5OVumlc0Gb9RPa3h1ewuPE705nRtioPiTvY/edit?usp=sharing)\nand apply applicable `backport:*` labels.\n\n---------\n\nCo-authored-by: Rudolf Meijering <[email protected]>","sha":"a5805f23728374dc6ec6ee241d7c38aca19762f6"}},"sourceBranch":"main","suggestedTargetBranches":[],"targetPullRequestStates":[{"branch":"main","label":"v9.2.0","branchLabelMappingKey":"^v9.2.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/231406","number":231406,"mergeCommit":{"message":"Use optimistic concurrency in SO migrations (elastic#231406)\n\n## Summary\n\nResolves https://github.com/elastic/kibana/issues/226518\n\nAdd `seqNoPrimaryTerm: true,` in V2 and ZDT so it uses the optimistic\nconcurrency logic. Also added some tests for this.\n\nWhile doing this, some tests broke, these tests were moving a SO from\nsingle namespace to multi namespace. We can't use the optimistic\nconcurrency logic in this case because we are using the `seq no` and\n`primary term` from the old SO, so when it tries to create a new SO it\nwill fail. That's why we are checking if `document._id !==\ndocument._source.originId;` before deciding if we'll use the optimistic\nconcurrency logic.\n\n### Checklist\n\nCheck the PR satisfies following conditions. \n\nReviewers should verify this PR satisfies this list as well.\n\n- [x] [Unit or functional\ntests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)\nwere updated or added to match the most common scenarios\n- [x] The PR description includes the appropriate Release Notes section,\nand the correct `release_note:*` label is applied per the\n[guidelines](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)\n- [x] Review the [backport\nguidelines](https://docs.google.com/document/d/1VyN5k91e5OVumlc0Gb9RPa3h1ewuPE705nRtioPiTvY/edit?usp=sharing)\nand apply applicable `backport:*` labels.\n\n---------\n\nCo-authored-by: Rudolf Meijering <[email protected]>","sha":"a5805f23728374dc6ec6ee241d7c38aca19762f6"}}]}] BACKPORT--> --------- Co-authored-by: Jesus Wahrman <[email protected]> Co-authored-by: Rudolf Meijering <[email protected]> Co-authored-by: Jesus Wahrman <[email protected]>
1 parent 1aeb2d1 commit 845e88e

File tree

5 files changed

+209
-2
lines changed

5 files changed

+209
-2
lines changed

src/core/packages/saved-objects/migration-server-internal/src/model/helpers.test.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,45 @@ describe('createBulkIndexOperationTuple', () => {
423423
]
424424
`);
425425
});
426+
427+
it('includes if_seq_no and if_primary_term when originId is not defined', () => {
428+
const document = {
429+
_id: 'doc1',
430+
_seq_no: 10,
431+
_primary_term: 20,
432+
_source: { type: 'cases', title: 'no originId' },
433+
};
434+
const [operation] = createBulkIndexOperationTuple(document);
435+
expect(operation.index).toBeDefined();
436+
expect((operation.index as any).if_seq_no).toBe(10);
437+
expect((operation.index as any).if_primary_term).toBe(20);
438+
});
439+
440+
it('includes if_seq_no and if_primary_term when originId === _id', () => {
441+
const document = {
442+
_id: 'doc2',
443+
_seq_no: 11,
444+
_primary_term: 21,
445+
_source: { type: 'cases', title: 'originId equals _id', originId: 'doc2' },
446+
};
447+
const [operation] = createBulkIndexOperationTuple(document);
448+
expect(operation.index).toBeDefined();
449+
expect((operation.index as any).if_seq_no).toBe(11);
450+
expect((operation.index as any).if_primary_term).toBe(21);
451+
});
452+
453+
it('does NOT include if_seq_no and if_primary_term when originId !== _id', () => {
454+
const document = {
455+
_id: 'doc3',
456+
_seq_no: 12,
457+
_primary_term: 22,
458+
_source: { type: 'cases', title: 'originId not equal _id', originId: 'other-id' },
459+
};
460+
const [operation] = createBulkIndexOperationTuple(document);
461+
expect(operation.index).toBeDefined();
462+
expect((operation.index as any).if_seq_no).toBeUndefined();
463+
expect((operation.index as any).if_primary_term).toBeUndefined();
464+
});
426465
});
427466

428467
describe('getMigrationType', () => {

src/core/packages/saved-objects/migration-server-internal/src/model/helpers.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,7 @@ export const createBulkIndexOperationTuple = (
283283
doc: SavedObjectsRawDoc,
284284
typeIndexMap: Record<string, string> = {}
285285
): BulkIndexOperationTuple => {
286+
const idChanged = doc._source.originId && doc._source.originId !== doc._id;
286287
return [
287288
{
288289
index: {
@@ -292,8 +293,9 @@ export const createBulkIndexOperationTuple = (
292293
}),
293294
// use optimistic concurrency control to ensure that outdated
294295
// documents are only overwritten once with the latest version
295-
...(typeof doc._seq_no !== 'undefined' && { if_seq_no: doc._seq_no }),
296-
...(typeof doc._primary_term !== 'undefined' && { if_primary_term: doc._primary_term }),
296+
...(typeof doc._seq_no !== 'undefined' && !idChanged && { if_seq_no: doc._seq_no }),
297+
...(typeof doc._primary_term !== 'undefined' &&
298+
!idChanged && { if_primary_term: doc._primary_term }),
297299
},
298300
},
299301
doc._source,

src/core/packages/saved-objects/migration-server-internal/src/next.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@ export const nextActionMap = (
257257
batchSize: state.batchSize,
258258
searchAfter: state.lastHitSortValue,
259259
maxResponseSizeBytes: state.maxReadBatchSizeBytes,
260+
seqNoPrimaryTerm: true,
260261
}),
261262
OUTDATED_DOCUMENTS_SEARCH_CLOSE_PIT: (state: OutdatedDocumentsSearchClosePit) =>
262263
Actions.closePit({ client, pitId: state.pitId }),

src/core/packages/saved-objects/migration-server-internal/src/zdt/next.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ export const nextActionMap = (context: MigratorContext) => {
149149
searchAfter: state.lastHitSortValue,
150150
batchSize: context.migrationConfig.batchSize,
151151
query: state.outdatedDocumentsQuery,
152+
seqNoPrimaryTerm: true,
152153
}),
153154
OUTDATED_DOCUMENTS_SEARCH_TRANSFORM: (state: OutdatedDocumentsSearchTransformState) =>
154155
Actions.transformDocs({
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
import Path from 'path';
11+
import fs from 'fs/promises';
12+
import { range } from 'lodash';
13+
import { type TestElasticsearchUtils } from '@kbn/core-test-helpers-kbn-server';
14+
import type { SavedObjectsBulkCreateObject } from '@kbn/core-saved-objects-api-server';
15+
import '../jest_matchers';
16+
import { getKibanaMigratorTestKit, startElasticsearch } from '../kibana_migrator_test_kit';
17+
import { parseLogFile } from '../test_utils';
18+
import { getBaseMigratorParams, getSampleAType } from '../fixtures/zdt_base.fixtures';
19+
20+
export const logFilePath = Path.join(__dirname, 'optimistic_concurrency.test.log');
21+
22+
interface TestSOType {
23+
boolean: boolean;
24+
keyword: string;
25+
}
26+
27+
describe('ZDT & V2 upgrades - optimistic concurrency tests', () => {
28+
let esServer: TestElasticsearchUtils['es'];
29+
30+
beforeAll(async () => {
31+
esServer = await startElasticsearch();
32+
});
33+
34+
afterAll(async () => {
35+
await esServer?.stop();
36+
});
37+
38+
beforeEach(async () => {
39+
await fs.unlink(logFilePath).catch(() => {});
40+
jest.clearAllMocks();
41+
});
42+
43+
it.each(['v2', 'zdt'] as const)(
44+
'doesnt overwrite changes made while migrating (%s)',
45+
async (migrationAlgorithm) => {
46+
const { runMigrations, savedObjectsRepository, client } = await prepareScenario(
47+
migrationAlgorithm
48+
);
49+
50+
const originalBulkImplementation = client.bulk;
51+
const spy = jest.spyOn(client, 'bulk');
52+
spy.mockImplementation(function (this: typeof client, ...args) {
53+
// let's run some updates before we run the bulk operations
54+
return Promise.all(
55+
['a-0', 'a-3', 'a-4'].map((id) =>
56+
savedObjectsRepository.update('sample_a', id, {
57+
keyword: 'concurrent update that shouldnt be overwritten',
58+
})
59+
)
60+
).then(() => {
61+
return originalBulkImplementation.apply(this, args);
62+
});
63+
});
64+
65+
await runMigrations();
66+
67+
const records = await parseLogFile(logFilePath);
68+
expect(records).toContainLogEntry('-> DONE');
69+
70+
const { saved_objects: sampleADocs } = await savedObjectsRepository.find<TestSOType>({
71+
type: 'sample_a',
72+
});
73+
74+
expect(
75+
sampleADocs
76+
.map((doc) => ({
77+
id: doc.id,
78+
keyword: doc.attributes.keyword,
79+
}))
80+
.sort((a, b) => a.id.localeCompare(b.id))
81+
).toMatchInlineSnapshot(`
82+
Array [
83+
Object {
84+
"id": "a-0",
85+
"keyword": "concurrent update that shouldnt be overwritten",
86+
},
87+
Object {
88+
"id": "a-1",
89+
"keyword": "updated by the migrator",
90+
},
91+
Object {
92+
"id": "a-2",
93+
"keyword": "updated by the migrator",
94+
},
95+
Object {
96+
"id": "a-3",
97+
"keyword": "concurrent update that shouldnt be overwritten",
98+
},
99+
Object {
100+
"id": "a-4",
101+
"keyword": "concurrent update that shouldnt be overwritten",
102+
},
103+
]
104+
`);
105+
}
106+
);
107+
108+
const prepareScenario = async (migrationAlgorithm: 'zdt' | 'v2') => {
109+
await createBaseline();
110+
111+
const typeA = getSampleAType();
112+
113+
typeA.modelVersions = {
114+
...typeA.modelVersions,
115+
'2': {
116+
changes: [
117+
{
118+
type: 'unsafe_transform',
119+
transformFn: (doc) => {
120+
const attributes = {
121+
...doc.attributes,
122+
keyword: 'updated by the migrator',
123+
};
124+
return { document: { ...doc, attributes } };
125+
},
126+
},
127+
],
128+
},
129+
};
130+
131+
const { runMigrations, client, savedObjectsRepository } = await getKibanaMigratorTestKit({
132+
...getBaseMigratorParams({ migrationAlgorithm }),
133+
logFilePath,
134+
types: [typeA],
135+
});
136+
137+
return { runMigrations, client, savedObjectsRepository };
138+
};
139+
140+
const createBaseline = async () => {
141+
const { runMigrations, savedObjectsRepository, client } = await getKibanaMigratorTestKit({
142+
...getBaseMigratorParams(),
143+
types: [getSampleAType()],
144+
});
145+
146+
try {
147+
await client.indices.delete({ index: '.kibana_1' });
148+
} catch (e) {
149+
/* index wasn't created, that's fine */
150+
}
151+
152+
await runMigrations();
153+
154+
const sampleAObjs = range(5).map<SavedObjectsBulkCreateObject<TestSOType>>((number) => ({
155+
id: `a-${number}`,
156+
type: 'sample_a',
157+
attributes: {
158+
keyword: `a_${number}`,
159+
boolean: true,
160+
},
161+
}));
162+
await savedObjectsRepository.bulkCreate<TestSOType>(sampleAObjs);
163+
};
164+
});

0 commit comments

Comments
 (0)