Skip to content

Commit b7cf75a

Browse files
[Observability AI Assistant] Add isStream param to chat/complete endpoint to support non-streaming responses (#240819)
Closes #239439 This adds a parameter `isStream` to both internal and public `chat/complete` endpoint. ## Solution The internal ~and public~ `chat/complete` APIs should return a single JSON response instead of a stream of events when passing `isStream:false` ### Internal API ```sh curl -X POST "http://localhost:5601/internal/observability_ai_assistant/chat/complete" \ -u elastic:changeme \ -H "kbn-version: 9.3.0" \ -H "Content-Type: application/json" \ -H "x-elastic-internal-origin: kibana" \ -d '{ "messages": [ { "@timestamp": "'$(date -u +"%Y-%m-%dT%H:%M:%S.000Z")'", "message": { "role": "user", "content": "This is a test. Respond with Ack." } } ], "connectorId": "<your-connector-id>", "scopes": ["observability"], "screenContexts": [], "persist": false, "isStream": false }' | jq ``` ### Public API <details> <summary>Disabled for the initial rollout</summary> ```sh curl -X POST "http://localhost:5601/api/observability_ai_assistant/chat/complete" \ -u elastic:changeme \ -H "kbn-xsrf: true" \ -H "Content-Type: application/json" \ -d '{ "messages": [ { "@timestamp": "'$(date -u +"%Y-%m-%dT%H:%M:%S.000Z")'", "message": { "role": "user", "content": "This is a test. Respond with Ack." } } ], "connectorId": "<your-connector-id>", "persist": false, "isStream": false }' | jq ``` </details> ### Response ```json { "conversationId": "a408ec61-9401-4c7d-9ae6-349fb4844684", "data": "Why do programmers prefer dark mode?\n\nBecause light attracts bugs! 🐛", "connectorId": "bedrock-claude-40" } ``` --------- Co-authored-by: kibanamachine <[email protected]>
1 parent e375b08 commit b7cf75a

File tree

10 files changed

+493
-195
lines changed

10 files changed

+493
-195
lines changed

x-pack/platform/plugins/shared/observability_ai_assistant/server/routes/chat/route.ts

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@
77
import { notImplemented } from '@hapi/boom';
88
import { toBooleanRt } from '@kbn/io-ts-utils';
99
import * as t from 'io-ts';
10+
import type { Observable } from 'rxjs';
1011
import { from, map } from 'rxjs';
1112
import { v4 } from 'uuid';
1213
import type { Readable } from 'stream';
1314
import type { AssistantScope } from '@kbn/ai-assistant-common';
15+
import { last } from 'lodash';
1416
import { aiAssistantSimulatedFunctionCalling } from '../..';
1517
import { createFunctionResponseMessage } from '../../../common/utils/create_function_response_message';
1618
import { flushBuffer } from '../../service/util/flush_buffer';
@@ -28,6 +30,11 @@ import {
2830
screenContextRt,
2931
} from '../runtime_types';
3032
import type { ObservabilityAIAssistantRouteHandlerResources } from '../types';
33+
import type {
34+
BufferFlushEvent,
35+
StreamingChatResponseEventWithoutError,
36+
} from '../../../common/conversation_complete';
37+
import type { ConversationCreateRequest } from '../../../common/types';
3138

3239
const chatCompleteBaseRt = (apiType: 'public' | 'internal') =>
3340
t.type({
@@ -38,6 +45,7 @@ const chatCompleteBaseRt = (apiType: 'public' | 'internal') =>
3845
persist: toBooleanRt,
3946
}),
4047
t.partial({
48+
isStream: t.union([t.undefined, toBooleanRt]), // accept undefined in order to default to true
4149
conversationId: t.string,
4250
title: t.string,
4351
disableFunctions: toBooleanRt,
@@ -245,7 +253,10 @@ async function chatComplete(
245253
resources: ObservabilityAIAssistantRouteHandlerResources & {
246254
params: t.TypeOf<typeof chatCompleteInternalRt>;
247255
}
248-
) {
256+
): Promise<{
257+
response$: Observable<StreamingChatResponseEventWithoutError | BufferFlushEvent>;
258+
getConversation: () => Promise<ConversationCreateRequest>;
259+
}> {
249260
const { params, service } = resources;
250261

251262
const {
@@ -286,7 +297,7 @@ async function chatComplete(
286297
: userInstructionOrString
287298
);
288299

289-
const response$ = client.complete({
300+
const { response$, getConversation } = client.complete({
290301
messages,
291302
connectorId,
292303
conversationId,
@@ -299,7 +310,8 @@ async function chatComplete(
299310
disableFunctions,
300311
});
301312

302-
return response$.pipe(flushBuffer(isCloudEnabled));
313+
const responseWithFlushBuffer$ = response$.pipe(flushBuffer(isCloudEnabled));
314+
return { response$: responseWithFlushBuffer$, getConversation };
303315
}
304316

305317
const chatCompleteRoute = createObservabilityAIAssistantServerRoute({
@@ -310,8 +322,22 @@ const chatCompleteRoute = createObservabilityAIAssistantServerRoute({
310322
},
311323
},
312324
params: chatCompleteInternalRt,
313-
handler: async (resources): Promise<Readable> => {
314-
return observableIntoStream(await chatComplete(resources));
325+
handler: async (resources) => {
326+
const { params } = resources;
327+
const { response$, getConversation } = await chatComplete(resources);
328+
const { isStream = true, connectorId } = params.body;
329+
330+
if (isStream === false) {
331+
const response = await getConversation();
332+
333+
return {
334+
conversationId: response.conversation.id,
335+
data: last(response.messages)?.message.content,
336+
connectorId,
337+
};
338+
}
339+
340+
return observableIntoStream(response$);
315341
},
316342
});
317343

@@ -326,18 +352,15 @@ const publicChatCompleteRoute = createObservabilityAIAssistantServerRoute({
326352
options: {
327353
tags: ['observability-ai-assistant'],
328354
},
329-
handler: async (resources): Promise<Readable> => {
355+
handler: async (resources) => {
330356
const { params, logger } = resources;
357+
const { actions, ...bodyParams } = params.body;
331358

332-
const {
333-
body: { actions, ...restOfBody },
334-
} = params;
335-
336-
const response$ = await chatComplete({
359+
const { response$ } = await chatComplete({
337360
...resources,
338361
params: {
339362
body: {
340-
...restOfBody,
363+
...bodyParams,
341364
scopes: ['observability'],
342365
screenContexts: [
343366
{

x-pack/platform/plugins/shared/observability_ai_assistant/server/service/client/index.test.ts

Lines changed: 83 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -273,15 +273,15 @@ describe('Observability AI Assistant client', () => {
273273
});
274274
});
275275

276-
stream = observableIntoStream(
277-
client.complete({
278-
connectorId: 'foo',
279-
messages: [user('How many alerts do I have?')],
280-
functionClient: functionClientMock,
281-
signal: new AbortController().signal,
282-
persist: true,
283-
})
284-
);
276+
const { response$ } = client.complete({
277+
connectorId: 'foo',
278+
messages: [user('How many alerts do I have?')],
279+
functionClient: functionClientMock,
280+
signal: new AbortController().signal,
281+
persist: true,
282+
});
283+
284+
stream = observableIntoStream(response$);
285285
});
286286

287287
describe('when streaming the response from the LLM', () => {
@@ -557,16 +557,16 @@ describe('Observability AI Assistant client', () => {
557557
return {} as any;
558558
});
559559

560-
stream = observableIntoStream(
561-
await client.complete({
562-
connectorId: 'foo',
563-
messages: [user('How many alerts do I have?')],
564-
functionClient: functionClientMock,
565-
signal: new AbortController().signal,
566-
conversationId: 'my-conversation-id',
567-
persist: true,
568-
})
569-
);
560+
const { response$ } = client.complete({
561+
connectorId: 'foo',
562+
messages: [user('How many alerts do I have?')],
563+
functionClient: functionClientMock,
564+
signal: new AbortController().signal,
565+
conversationId: 'my-conversation-id',
566+
persist: true,
567+
});
568+
569+
stream = observableIntoStream(response$);
570570

571571
dataHandler = jest.fn();
572572

@@ -649,16 +649,16 @@ describe('Observability AI Assistant client', () => {
649649
});
650650
});
651651

652-
stream = observableIntoStream(
653-
await client.complete({
654-
connectorId: 'foo',
655-
messages: [user('How many alerts do I have?')],
656-
functionClient: functionClientMock,
657-
signal: new AbortController().signal,
658-
title: 'My predefined title',
659-
persist: true,
660-
})
661-
);
652+
const { response$ } = client.complete({
653+
connectorId: 'foo',
654+
messages: [user('How many alerts do I have?')],
655+
functionClient: functionClientMock,
656+
signal: new AbortController().signal,
657+
title: 'My predefined title',
658+
persist: true,
659+
});
660+
661+
stream = observableIntoStream(response$);
662662

663663
dataHandler = jest.fn();
664664

@@ -739,16 +739,16 @@ describe('Observability AI Assistant client', () => {
739739
});
740740
});
741741

742-
stream = observableIntoStream(
743-
await client.complete({
744-
connectorId: 'foo',
745-
messages: [user('How many alerts do I have?')],
746-
functionClient: functionClientMock,
747-
signal: new AbortController().signal,
748-
title: 'My predefined title',
749-
persist: true,
750-
})
751-
);
742+
const { response$ } = client.complete({
743+
connectorId: 'foo',
744+
messages: [user('How many alerts do I have?')],
745+
functionClient: functionClientMock,
746+
signal: new AbortController().signal,
747+
title: 'My predefined title',
748+
persist: true,
749+
});
750+
751+
stream = observableIntoStream(response$);
752752

753753
dataHandler = jest.fn();
754754

@@ -1178,15 +1178,15 @@ describe('Observability AI Assistant client', () => {
11781178
};
11791179
});
11801180

1181-
stream = observableIntoStream(
1182-
await client.complete({
1183-
connectorId: 'foo',
1184-
messages: [user('How many alerts do I have?')],
1185-
functionClient: functionClientMock,
1186-
signal: new AbortController().signal,
1187-
persist: false,
1188-
})
1189-
);
1181+
const { response$ } = client.complete({
1182+
connectorId: 'foo',
1183+
messages: [user('How many alerts do I have?')],
1184+
functionClient: functionClientMock,
1185+
signal: new AbortController().signal,
1186+
persist: false,
1187+
});
1188+
1189+
stream = observableIntoStream(response$);
11901190

11911191
dataHandler = jest.fn();
11921192

@@ -1304,16 +1304,16 @@ describe('Observability AI Assistant client', () => {
13041304
};
13051305
});
13061306

1307-
stream = observableIntoStream(
1308-
client.complete({
1309-
connectorId: 'foo',
1310-
messages: [user('How many alerts do I have?')],
1311-
functionClient: functionClientMock,
1312-
signal: new AbortController().signal,
1313-
title: 'My predefined title',
1314-
persist: true,
1315-
})
1316-
);
1307+
const { response$ } = client.complete({
1308+
connectorId: 'foo',
1309+
messages: [user('How many alerts do I have?')],
1310+
functionClient: functionClientMock,
1311+
signal: new AbortController().signal,
1312+
title: 'My predefined title',
1313+
persist: true,
1314+
});
1315+
1316+
stream = observableIntoStream(response$);
13171317

13181318
dataHandler = jest.fn();
13191319

@@ -1392,15 +1392,15 @@ describe('Observability AI Assistant client', () => {
13921392
};
13931393
});
13941394

1395-
const stream = observableIntoStream(
1396-
await client.complete({
1397-
connectorId: 'foo',
1398-
messages: [user('How many alerts do I have?')],
1399-
functionClient: functionClientMock,
1400-
signal: new AbortController().signal,
1401-
persist: false,
1402-
})
1403-
);
1395+
const { response$ } = client.complete({
1396+
connectorId: 'foo',
1397+
messages: [user('How many alerts do I have?')],
1398+
functionClient: functionClientMock,
1399+
signal: new AbortController().signal,
1400+
persist: false,
1401+
});
1402+
1403+
const stream = observableIntoStream(response$);
14041404

14051405
dataHandler = jest.fn();
14061406

@@ -1480,16 +1480,16 @@ describe('Observability AI Assistant client', () => {
14801480
});
14811481
});
14821482

1483-
stream = observableIntoStream(
1484-
await client.complete({
1485-
connectorId: 'foo',
1486-
messages: [user('How many alerts do I have?')],
1487-
functionClient: functionClientMock,
1488-
signal: new AbortController().signal,
1489-
title: 'My predefined title',
1490-
persist: true,
1491-
})
1492-
);
1483+
const { response$ } = client.complete({
1484+
connectorId: 'foo',
1485+
messages: [user('How many alerts do I have?')],
1486+
functionClient: functionClientMock,
1487+
signal: new AbortController().signal,
1488+
title: 'My predefined title',
1489+
persist: true,
1490+
});
1491+
1492+
stream = observableIntoStream(response$);
14931493

14941494
dataHandler = jest.fn();
14951495

@@ -1553,7 +1553,7 @@ describe('Observability AI Assistant client', () => {
15531553
title: 'My predefined title',
15541554
persist: false,
15551555
})
1556-
.subscribe(() => {}); // To trigger call to chat
1556+
.response$.subscribe(() => {}); // To trigger call to chat
15571557
await nextTick();
15581558

15591559
expect(chatSpy.mock.calls[0][1].systemMessage).toEqual(EXPECTED_STORED_SYSTEM_MESSAGE);
@@ -1571,7 +1571,7 @@ describe('Observability AI Assistant client', () => {
15711571
});
15721572
});
15731573

1574-
const complete$ = await client.complete({
1574+
const { response$ } = client.complete({
15751575
connectorId: 'foo',
15761576
messages: [user('Can you call the my_action function?')],
15771577
functionClient: new ChatFunctionClient([
@@ -1601,7 +1601,7 @@ describe('Observability AI Assistant client', () => {
16011601
const messages: Message[] = [];
16021602

16031603
completePromise = new Promise<Message[]>((resolve, reject) => {
1604-
complete$.subscribe({
1604+
response$.subscribe({
16051605
next: (event) => {
16061606
if (event.type === StreamingChatResponseEventType.MessageAdd) {
16071607
messages.push(event.message);
@@ -1748,8 +1748,8 @@ describe('Observability AI Assistant client', () => {
17481748
// client = createClient(namespace);
17491749
client = createClient();
17501750
(client as any).dependencies.namespace = namespace;
1751-
(
1752-
await client.complete({
1751+
client
1752+
.complete({
17531753
functionClient: functionClientMock,
17541754
connectorId: 'foo',
17551755
messages: [],
@@ -1758,7 +1758,7 @@ describe('Observability AI Assistant client', () => {
17581758
kibanaPublicUrl: 'http://localhost:5601',
17591759
title: 'Generated title',
17601760
})
1761-
).subscribe({});
1761+
.response$.subscribe({});
17621762

17631763
await nextTick();
17641764
};

0 commit comments

Comments
 (0)