Skip to content

Commit 792ed55

Browse files
KDKHDkibanamachineelasticmachine
authored
[Security Solution] [AI Assistant] Configure checkpoints and checkpoint-writes index templates for Elastic Assistant (#233604)
## Summary Summarize your PR. If it involves visual changes include a screenshot or gif. Configures indices required by the Langgraph checkpointer for the Elastic Assistant plugin. ### How to test - Start Kibana - Go to http://localhost:5601/app/management/data/index_management/templates - Verify the following index templates exist: `.kibana-elastic-ai-assistant-index-template-checkpoint-writes` `.kibana-elastic-ai-assistant-index-template-checkpoints` <img width="1726" height="1040" alt="image" src="https://github.com/user-attachments/assets/5f3ce118-1c8d-421b-a4f9-c6c9957945d2" /> ### Checklist Check the PR satisfies following conditions. Reviewers should verify this PR satisfies this list as well. - [X] Any text added follows [EUI's writing guidelines](https://elastic.github.io/eui/#/guidelines/writing), uses sentence case text and includes [i18n support](https://github.com/elastic/kibana/blob/main/src/platform/packages/shared/kbn-i18n/README.md) - [X] [Documentation](https://www.elastic.co/guide/en/kibana/master/development-documentation.html) was added for features that require explanation or tutorials - [X] [Unit or functional tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html) were updated or added to match the most common scenarios - [X] If a plugin configuration key changed, check if it needs to be allowlisted in the cloud and added to the [docker list](https://github.com/elastic/kibana/blob/main/src/dev/build/tasks/os_packages/docker_generator/resources/base/bin/kibana-docker) - [X] This was checked for breaking HTTP API changes, and any breaking changes have been approved by the breaking-change committee. The `release_note:breaking` label should be applied in these situations. - [X] [Flaky Test Runner](https://ci-stats.kibana.dev/trigger_flaky_test_runner/1) was used on any tests changed - [X] The PR description includes the appropriate Release Notes section, and the correct `release_note:*` label is applied per the [guidelines](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process) - [X] Review the [backport guidelines](https://docs.google.com/document/d/1VyN5k91e5OVumlc0Gb9RPa3h1ewuPE705nRtioPiTvY/edit?usp=sharing) and apply applicable `backport:*` labels. ### Identify risks Does this PR introduce any risks? For example, consider risks like hard to test bugs, performance regression, potential of data loss. Describe the risk, its severity, and mitigation for each identified risk. Invite stakeholders and evaluate how to proceed before merging. - [ ] [See some risk examples](https://github.com/elastic/kibana/blob/main/RISK_MATRIX.mdx) - [ ] ... --------- Co-authored-by: kibanamachine <[email protected]> Co-authored-by: Elastic Machine <[email protected]>
1 parent d753695 commit 792ed55

File tree

7 files changed

+396
-164
lines changed

7 files changed

+396
-164
lines changed

x-pack/platform/packages/shared/kbn-langgraph-checkpoint-saver/server/elastic-search-checkpoint-saver/index.ts

Lines changed: 87 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import {
2020
} from '@langchain/langgraph-checkpoint';
2121

2222
interface CheckpointDocument {
23-
created_at: string;
23+
'@timestamp': string;
2424
thread_id: string;
2525
checkpoint_ns: string;
2626
checkpoint_id: string;
@@ -31,7 +31,7 @@ interface CheckpointDocument {
3131
}
3232

3333
interface WritesDocument {
34-
created_at: string;
34+
'@timestamp': string;
3535
thread_id: string;
3636
checkpoint_ns: string;
3737
checkpoint_id: string;
@@ -58,27 +58,41 @@ export class ElasticSearchSaver extends BaseCheckpointSaver {
5858

5959
static defaultCheckpointWritesIndex = 'checkpoint_writes';
6060

61-
static readonly checkpointIndexMapping = {
62-
created_at: { type: 'date' },
63-
thread_id: { type: 'keyword' },
64-
checkpoint_ns: { type: 'keyword' },
65-
checkpoint_id: { type: 'keyword' },
66-
parent_checkpoint_id: { type: 'keyword' },
67-
type: { type: 'keyword' },
68-
checkpoint: { type: 'binary' },
69-
metadata: { type: 'binary' },
61+
/**
62+
* When modifying the field maps, ensure you perform upgrade testing for all graphs depending on this saver.
63+
*/
64+
static readonly checkpointsFieldMap = {
65+
'@timestamp': {
66+
type: 'date',
67+
required: true,
68+
array: false,
69+
},
70+
thread_id: { type: 'keyword', required: true, array: false },
71+
checkpoint_ns: { type: 'keyword', required: true, array: false },
72+
checkpoint_id: { type: 'keyword', required: false, array: false },
73+
parent_checkpoint_id: { type: 'keyword', required: true, array: false },
74+
type: { type: 'keyword', required: true, array: false },
75+
checkpoint: { type: 'binary', required: true, array: false },
76+
metadata: { type: 'binary', required: true, array: false },
7077
} as const;
7178

72-
static readonly checkpointWritesIndexMapping = {
73-
created_at: { type: 'date' },
74-
thread_id: { type: 'keyword' },
75-
checkpoint_ns: { type: 'keyword' },
76-
checkpoint_id: { type: 'keyword' },
77-
task_id: { type: 'keyword' },
78-
idx: { type: 'unsigned_long' },
79-
channel: { type: 'keyword' },
80-
type: { type: 'keyword' },
81-
value: { type: 'binary' },
79+
/**
80+
* When modifying the field maps, ensure you perform upgrade testing for all graphs depending on this saver.
81+
*/
82+
static readonly checkpointWritesFieldMap = {
83+
'@timestamp': {
84+
type: 'date',
85+
required: true,
86+
array: false,
87+
},
88+
thread_id: { type: 'keyword', required: true, array: false },
89+
checkpoint_ns: { type: 'keyword', required: true, array: false },
90+
checkpoint_id: { type: 'keyword', required: true, array: false },
91+
task_id: { type: 'keyword', required: true, array: false },
92+
idx: { type: 'unsigned_long', required: true, array: false },
93+
channel: { type: 'keyword', required: true, array: false },
94+
type: { type: 'keyword', required: true, array: false },
95+
value: { type: 'binary', required: true, array: false },
8296
} as const;
8397

8498
protected client: ElasticsearchClient;
@@ -149,9 +163,34 @@ export class ElasticSearchSaver extends BaseCheckpointSaver {
149163
query: {
150164
bool: {
151165
must: [
152-
{ term: { thread_id: threadId } },
153-
{ term: { checkpoint_ns: checkpointNs } },
154-
{ term: { checkpoint_id: doc.checkpoint_id } },
166+
// todo(@KDKHD): fix this query to remove the duplicate should clauses. Ensure both tests and the elastic assistant work.
167+
{
168+
bool: {
169+
should: [
170+
{ term: { thread_id: threadId } },
171+
{ term: { 'thread_id.keyword': threadId } },
172+
],
173+
minimum_should_match: 1,
174+
},
175+
},
176+
{
177+
bool: {
178+
should: [
179+
{ term: { checkpoint_ns: checkpointNs } },
180+
{ term: { 'checkpoint_ns.keyword': checkpointNs } },
181+
],
182+
minimum_should_match: 1,
183+
},
184+
},
185+
{
186+
bool: {
187+
should: [
188+
{ term: { checkpoint_id: doc.checkpoint_id } },
189+
{ term: { 'checkpoint_id.keyword': doc.checkpoint_id } },
190+
],
191+
minimum_should_match: 1,
192+
},
193+
},
155194
],
156195
},
157196
},
@@ -212,7 +251,7 @@ export class ElasticSearchSaver extends BaseCheckpointSaver {
212251
config: RunnableConfig,
213252
options?: CheckpointListOptions
214253
): AsyncGenerator<CheckpointTuple> {
215-
const { limit, before, filter } = options ?? {};
254+
const { limit, before } = options ?? {};
216255
const mustClauses = [];
217256

218257
if (config?.configurable?.thread_id) {
@@ -234,16 +273,10 @@ export class ElasticSearchSaver extends BaseCheckpointSaver {
234273
});
235274
}
236275

237-
if (filter) {
238-
Object.entries(filter).forEach(([key, value]) => {
239-
mustClauses.push({ term: { [`metadata.${key}`]: value } });
240-
});
241-
}
242-
243276
const result = await this.client.search<CheckpointDocument>({
244277
index: this.checkpointIndex,
245278
...(limit ? { size: limit } : {}),
246-
sort: [{ checkpoint_id: { order: 'desc' } }],
279+
sort: [{ checkpoint_id: { order: 'desc' } }, { '@timestamp': { order: 'desc' } }],
247280
query: {
248281
bool: {
249282
must: mustClauses,
@@ -296,6 +329,10 @@ export class ElasticSearchSaver extends BaseCheckpointSaver {
296329
checkpoint: Checkpoint,
297330
metadata: CheckpointMetadata
298331
): Promise<RunnableConfig> {
332+
this.logger.debug(
333+
`Putting checkpoint ${checkpoint.id} for thread ${config.configurable?.thread_id}`
334+
);
335+
299336
const threadId = config.configurable?.thread_id;
300337

301338
const checkpointNs = config.configurable?.checkpoint_ns ?? '';
@@ -313,7 +350,7 @@ export class ElasticSearchSaver extends BaseCheckpointSaver {
313350
}
314351

315352
const doc: CheckpointDocument = {
316-
created_at: new Date().toISOString(),
353+
'@timestamp': new Date().toISOString(),
317354
thread_id: threadId,
318355
checkpoint_ns: checkpointNs,
319356
checkpoint_id: checkpointId,
@@ -326,10 +363,11 @@ export class ElasticSearchSaver extends BaseCheckpointSaver {
326363

327364
const compositeId = `thread_id:${threadId}|checkpoint_ns:${checkpointNs}|checkpoint_id:${checkpointId}`;
328365

329-
await this.client.index({
366+
await this.client.update({
330367
index: this.checkpointIndex,
331368
id: compositeId,
332-
document: doc,
369+
doc,
370+
doc_as_upsert: true,
333371
refresh: this.refreshPolicy,
334372
});
335373

@@ -346,6 +384,7 @@ export class ElasticSearchSaver extends BaseCheckpointSaver {
346384
* Saves intermediate writes associated with a checkpoint to Elastic Search.
347385
*/
348386
async putWrites(config: RunnableConfig, writes: PendingWrite[], taskId: string): Promise<void> {
387+
this.logger.debug(`Putting writes for checkpoint ${config.configurable?.checkpoint_id}`);
349388
const threadId = config.configurable?.thread_id;
350389

351390
const checkpointNs = config.configurable?.checkpoint_ns;
@@ -364,7 +403,7 @@ export class ElasticSearchSaver extends BaseCheckpointSaver {
364403
const [type, serializedValue] = this.serde.dumpsTyped(value);
365404

366405
const doc: WritesDocument = {
367-
created_at: new Date().toISOString(),
406+
'@timestamp': new Date().toISOString(),
368407
thread_id: threadId,
369408
checkpoint_ns: checkpointNs,
370409
checkpoint_id: checkpointId,
@@ -375,20 +414,29 @@ export class ElasticSearchSaver extends BaseCheckpointSaver {
375414
type,
376415
};
377416

417+
this.logger.debug(`Indexing write operation for checkpoint ${checkpointId}`);
418+
378419
return [
379420
{
380-
index: {
421+
update: {
381422
_index: this.checkpointWritesIndex,
382423
_id: compositeId,
383424
},
384425
},
385-
doc,
426+
{ doc, doc_as_upsert: true },
386427
];
387428
});
388429

389-
await this.client.bulk({
430+
const result = await this.client.bulk({
390431
operations,
391432
refresh: this.refreshPolicy,
433+
error_trace: true,
392434
});
435+
436+
if (result.errors) {
437+
this.logger.error(`Failed to index writes for checkpoint ${checkpointId}`);
438+
439+
throw new Error(`Failed to index writes for checkpoint ${checkpointId}`);
440+
}
393441
}
394442
}

0 commit comments

Comments
 (0)