Skip to content

Commit 96d1692

Browse files
authored
[Obs AI Assistant] Use update-by-query for semantic_text migration (#220255)
Closes: #220339 **Background** The `semantic_text` migration will migrate content from `text` field to `semantic_text` field. It does so with a recursive function that continuously retrieves knowledge base entries if they do not contain `semantic_text` and updates them accordingly. **Problem** It is possible to save empty knowledge base entries (#220342) where `text` and `semantic_text` will be empty. Doing this will cause the migration script to run indefinitely leading to OOM on the affected clusters. ## Workarounds for clusters that cannot / won't upgrade Temporary workaround is to delete empty knowledge base entries: ```jsonc POST .kibana-observability-ai-assistant-kb/_delete_by_query { "query": { "bool": { "must": [{ "exists": { "field": "text" }}], "must_not": [ { "wildcard": { "text": "*" } } ] } } } ``` If you want to perform a dry run (find offending documents without deleting them) run this: ```jsonc GET .kibana-observability-ai-assistant-kb/_search { "query": { "bool": { "must": [{ "exists": { "field": "text" }}], "must_not": [ { "wildcard": { "text": "*" } } ] } } } ```
1 parent ff3822d commit 96d1692

File tree

8 files changed

+164
-150
lines changed

8 files changed

+164
-150
lines changed

x-pack/platform/plugins/shared/observability_ai_assistant/server/routes/knowledge_base/route.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ const saveKnowledgeBaseUserInstruction = createObservabilityAIAssistantServerRou
177177

178178
const { id, text, public: isPublic } = resources.params.body;
179179
return client.addUserInstruction({
180-
entry: { id, text, public: isPublic },
180+
entry: { id, text, public: isPublic, title: `User instruction` },
181181
});
182182
},
183183
});

x-pack/platform/plugins/shared/observability_ai_assistant/server/service/client/index.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -765,16 +765,22 @@ export class ObservabilityAIAssistantClient {
765765
}): Promise<void> => {
766766
// for now we want to limit the number of user instructions to 1 per user
767767
// if a user instruction already exists for the user, we get the id and update it
768-
this.dependencies.logger.debug('Adding user instruction entry');
768+
769769
const existingId = await this.dependencies.knowledgeBaseService.getPersonalUserInstructionId({
770770
isPublic: entry.public,
771771
namespace: this.dependencies.namespace,
772772
user: this.dependencies.user,
773773
});
774774

775775
if (existingId) {
776+
this.dependencies.logger.debug(
777+
`Updating user instruction. id = "${existingId}", user = "${this.dependencies.user?.name}"`
778+
);
776779
entry.id = existingId;
777-
this.dependencies.logger.debug(`Updating user instruction with id "${existingId}"`);
780+
} else {
781+
this.dependencies.logger.debug(
782+
`Creating user instruction. id = "${entry.id}", user = "${this.dependencies.user?.name}"`
783+
);
778784
}
779785

780786
return this.dependencies.knowledgeBaseService.addEntry({

x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/index.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,9 @@ export class KnowledgeBaseService {
431431
refresh: 'wait_for',
432432
});
433433

434-
this.dependencies.logger.debug(`Entry added to knowledge base`);
434+
this.dependencies.logger.debug(
435+
`Entry added to knowledge base. title = "${doc.title}", user = "${user?.name}, namespace = "${namespace}"`
436+
);
435437
} catch (error) {
436438
this.dependencies.logger.error(`Failed to add entry to knowledge base ${error}`);
437439
if (isInferenceEndpointMissingOrUnavailable(error)) {

x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/reindex_knowledge_base.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -193,8 +193,5 @@ export async function isReIndexInProgress({
193193
getActiveReindexingTaskId(esClient),
194194
]);
195195

196-
logger.debug(`Lock: ${!!lock}`);
197-
logger.debug(`ES re-indexing task: ${!!activeReindexingTask}`);
198-
199196
return lock !== undefined || activeReindexingTask !== undefined;
200197
}

x-pack/platform/plugins/shared/observability_ai_assistant/server/service/startup_migrations/populate_missing_semantic_text_fields.ts

Lines changed: 23 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,13 @@
66
*/
77

88
import { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
9-
import pLimit from 'p-limit';
109
import type { CoreSetup, Logger } from '@kbn/core/server';
11-
import { uniq } from 'lodash';
1210
import { LockManagerService } from '@kbn/lock-manager';
13-
import { KnowledgeBaseEntry } from '../../../common';
11+
import pRetry from 'p-retry';
1412
import { resourceNames } from '..';
1513
import { waitForKbModel } from '../inference_endpoint';
1614
import { ObservabilityAIAssistantPluginStartDependencies } from '../../types';
1715
import { ObservabilityAIAssistantConfig } from '../../config';
18-
import { sleep } from '../util/sleep';
1916
import { getInferenceIdFromWriteIndex } from '../knowledge_base_service/get_inference_id_from_write_index';
2017

2118
const POPULATE_MISSING_SEMANTIC_TEXT_FIELDS_LOCK_ID = 'populate_missing_semantic_text_fields';
@@ -32,13 +29,12 @@ export async function populateMissingSemanticTextFieldWithLock({
3229
}) {
3330
const lmService = new LockManagerService(core, logger);
3431
await lmService.withLock(POPULATE_MISSING_SEMANTIC_TEXT_FIELDS_LOCK_ID, async () =>
35-
populateMissingSemanticTextFieldRecursively({ core, esClient, logger, config })
32+
populateMissingSemanticTextField({ core, esClient, logger, config })
3633
);
3734
}
3835

3936
// Ensures that every doc has populated the `semantic_text` field.
40-
// It retrieves entries without the field, updates them in batches, and continues until no entries remain.
41-
async function populateMissingSemanticTextFieldRecursively({
37+
async function populateMissingSemanticTextField({
4238
core,
4339
esClient,
4440
logger,
@@ -49,64 +45,30 @@ async function populateMissingSemanticTextFieldRecursively({
4945
logger: Logger;
5046
config: ObservabilityAIAssistantConfig;
5147
}) {
52-
logger.debug(
53-
'Checking for remaining entries without semantic_text field that need to be migrated'
54-
);
55-
56-
const response = await esClient.asInternalUser.search<KnowledgeBaseEntry>({
57-
size: 100,
58-
track_total_hits: true,
59-
index: [resourceNames.writeIndexAlias.kb],
60-
query: {
61-
bool: {
62-
must_not: {
63-
exists: {
64-
field: 'semantic_text',
65-
},
66-
},
67-
},
68-
},
69-
_source: {
70-
excludes: ['ml.tokens'],
71-
},
72-
});
48+
logger.debug('Initalizing semantic text migration for knowledge base entries...');
7349

74-
if (response.hits.hits.length === 0) {
75-
logger.debug('No remaining entries to migrate');
76-
return;
77-
}
50+
await pRetry(
51+
async () => {
52+
const inferenceId = await getInferenceIdFromWriteIndex(esClient);
53+
await waitForKbModel({ core, esClient, logger, config, inferenceId });
7854

79-
const inferenceId = await getInferenceIdFromWriteIndex(esClient);
80-
await waitForKbModel({ core, esClient, logger, config, inferenceId });
81-
82-
const indicesWithOutdatedEntries = uniq(response.hits.hits.map((hit) => hit._index));
83-
logger.debug(
84-
`Found ${response.hits.hits.length} entries without semantic_text field in "${indicesWithOutdatedEntries}". Updating now...`
85-
);
86-
87-
// Limit the number of concurrent requests to avoid overloading the cluster
88-
const limiter = pLimit(20);
89-
const promises = response.hits.hits.map((hit) => {
90-
return limiter(() => {
91-
if (!hit._source || !hit._id) {
92-
return;
93-
}
94-
95-
return esClient.asInternalUser.update({
96-
refresh: 'wait_for',
55+
await esClient.asInternalUser.updateByQuery({
9756
index: resourceNames.writeIndexAlias.kb,
98-
id: hit._id,
99-
doc: {
100-
...hit._source,
101-
semantic_text: hit._source.text ?? 'No text',
57+
requests_per_second: 100,
58+
script: {
59+
source: `ctx._source.semantic_text = ctx._source.text`,
60+
lang: 'painless',
61+
},
62+
query: {
63+
bool: {
64+
filter: { exists: { field: 'text' } },
65+
must_not: { exists: { field: 'semantic_text' } },
66+
},
10267
},
10368
});
104-
});
105-
});
106-
107-
await Promise.all(promises);
108-
logger.debug(`Updated ${promises.length} entries`);
69+
},
70+
{ retries: 10, minTimeout: 10_000 }
71+
);
10972

110-
await sleep(100);
111-
await populateMissingSemanticTextFieldRecursively({ core, esClient, logger, config });
73+
logger.debug('Semantic text migration completed successfully.');
11274
}

x-pack/platform/plugins/shared/observability_ai_assistant/server/service/startup_migrations/run_startup_migrations.ts

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
import { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
99
import type { CoreSetup, Logger } from '@kbn/core/server';
10-
import pRetry from 'p-retry';
1110
import { errors } from '@elastic/elasticsearch';
1211
import { LockManagerService, isLockAcquisitionError } from '@kbn/lock-manager';
1312
import { resourceNames } from '..';
@@ -58,21 +57,7 @@ export async function runStartupMigrations({
5857
await reIndexKnowledgeBaseWithLock({ core, logger, esClient });
5958
}
6059

61-
await pRetry(
62-
async () => populateMissingSemanticTextFieldWithLock({ core, logger, config, esClient }),
63-
{
64-
retries: 5,
65-
minTimeout: 10_000,
66-
onFailedAttempt: async (error) => {
67-
// if the error is a LockAcquisitionError the operation is already in progress and we should not retry
68-
// for other errors we should retry
69-
// throwing the error will cause pRetry to abort all retries
70-
if (isLockAcquisitionError(error)) {
71-
throw error;
72-
}
73-
},
74-
}
75-
);
60+
await populateMissingSemanticTextFieldWithLock({ core, logger, config, esClient });
7661
})
7762
.catch((error) => {
7863
// we should propogate the error if it is not a LockAcquisitionError

0 commit comments

Comments
 (0)