Skip to content

Commit 276694c

Browse files
[streams] stop llm reasoning when request is aborted (#241550)
## Summary We have several routes that triggers an llm workflow that can be long running. If a request is closed early by the client, the process will continue executing unnecessarily. This change passes the request's abort signal to the relevant workflows to prevent unneeded work in case the request is already terminated ### Testing Trigger any ai workflow and abort the request (eg close the window). The process should end with a `CancelledError` --------- Co-authored-by: kibanamachine <[email protected]>
1 parent c87f06b commit 276694c

File tree

12 files changed

+46
-3
lines changed

12 files changed

+46
-3
lines changed

x-pack/platform/packages/shared/kbn-evals-suite-streams/evals/feature_identification.spec.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ evaluate.describe('Streams feature identification', { tag: '@svlOblt' }, () => {
250250
stream,
251251
kql: '',
252252
dropUnmapped: true,
253+
signal: new AbortController().signal,
253254
});
254255

255256
const featuresWithAnalysis = await Promise.all(

x-pack/platform/packages/shared/kbn-streams-ai/src/description/generate_description.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,15 @@ export async function generateStreamDescription({
2121
end,
2222
esClient,
2323
inferenceClient,
24+
signal,
2425
}: {
2526
stream: Streams.all.Definition;
2627
feature?: Feature;
2728
start: number;
2829
end: number;
2930
esClient: ElasticsearchClient;
3031
inferenceClient: BoundInferenceClient;
32+
signal: AbortSignal;
3133
}): Promise<string> {
3234
const analysis = await describeDataset({
3335
start,
@@ -45,6 +47,7 @@ export async function generateStreamDescription({
4547
),
4648
},
4749
prompt: GenerateStreamDescriptionPrompt,
50+
abortSignal: signal,
4851
});
4952

5053
return response.content;

x-pack/platform/packages/shared/kbn-streams-ai/src/features/identify_features.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ export async function identifyFeatures({
3030
kql,
3131
inferenceClient,
3232
logger,
33+
signal,
3334
dropUnmapped = false,
3435
maxSteps: initialMaxSteps,
3536
}: {
@@ -41,6 +42,7 @@ export async function identifyFeatures({
4142
kql?: string;
4243
inferenceClient: BoundInferenceClient;
4344
logger: Logger;
45+
signal: AbortSignal;
4446
dropUnmapped?: boolean;
4547
maxSteps?: number;
4648
}): Promise<{ features: Omit<Feature, 'description'>[] }> {
@@ -121,6 +123,7 @@ export async function identifyFeatures({
121123
};
122124
},
123125
},
126+
abortSignal: signal,
124127
});
125128

126129
return {

x-pack/platform/packages/shared/kbn-streams-ai/src/significant_events/generate_significant_events.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,15 @@ export async function generateSignificantEvents({
3333
end,
3434
esClient,
3535
inferenceClient,
36-
logger,
36+
signal,
3737
}: {
3838
stream: Streams.all.Definition;
3939
feature?: Feature;
4040
start: number;
4141
end: number;
4242
esClient: ElasticsearchClient;
4343
inferenceClient: BoundInferenceClient;
44+
signal: AbortSignal;
4445
logger: Logger;
4546
}): Promise<{
4647
queries: Query[];
@@ -88,6 +89,7 @@ export async function generateSignificantEvents({
8889
};
8990
},
9091
},
92+
abortSignal: signal,
9193
});
9294

9395
const queries = response.input.flatMap((message) => {

x-pack/platform/plugins/shared/streams/server/lib/significant_events/generate_significant_events.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,15 @@ interface Dependencies {
2323
inferenceClient: InferenceClient;
2424
esClient: ElasticsearchClient;
2525
logger: Logger;
26+
signal: AbortSignal;
2627
}
2728

2829
export async function generateSignificantEventDefinitions(
2930
params: Params,
3031
dependencies: Dependencies
3132
): Promise<GeneratedSignificantEventQuery[]> {
3233
const { definition, connectorId, start, end, feature } = params;
33-
const { inferenceClient, esClient, logger } = dependencies;
34+
const { inferenceClient, esClient, logger, signal } = dependencies;
3435

3536
const boundInferenceClient = inferenceClient.bindTo({
3637
connectorId,
@@ -44,6 +45,7 @@ export async function generateSignificantEventDefinitions(
4445
inferenceClient: boundInferenceClient,
4546
logger,
4647
feature,
48+
signal,
4749
});
4850

4951
return queries.map((query) => ({

x-pack/platform/plugins/shared/streams/server/lib/streams/feature/run_feature_identification.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ export function runFeatureIdentification({
1818
logger,
1919
stream,
2020
features,
21+
signal,
2122
}: {
2223
start: number;
2324
end: number;
@@ -26,6 +27,7 @@ export function runFeatureIdentification({
2627
logger: Logger;
2728
stream: Streams.all.Definition;
2829
features: Feature[];
30+
signal: AbortSignal;
2931
}) {
3032
return identifyFeatures({
3133
start,
@@ -35,6 +37,7 @@ export function runFeatureIdentification({
3537
logger,
3638
stream,
3739
features,
40+
signal,
3841
dropUnmapped: true,
3942
});
4043
}

x-pack/platform/plugins/shared/streams/server/routes/internal/streams/features/route.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import { STREAMS_API_PRIVILEGES } from '../../../../../common/constants';
2323
import { assertSignificantEventsAccess } from '../../../utils/assert_significant_events_access';
2424
import { runFeatureIdentification } from '../../../../lib/streams/feature/run_feature_identification';
2525
import type { IdentifiedFeaturesEvent, StreamDescriptionEvent } from './types';
26+
import { getRequestAbortSignal } from '../../../utils/get_request_abort_signal';
2627

2728
const dateFromString = z.string().transform((input) => new Date(input));
2829

@@ -318,6 +319,7 @@ export const identifyFeaturesRoute = createServerRoute({
318319
const esClient = scopedClusterClient.asCurrentUser;
319320

320321
const boundInferenceClient = inferenceClient.bindTo({ connectorId });
322+
const signal = getRequestAbortSignal(request);
321323

322324
return from(
323325
runFeatureIdentification({
@@ -328,6 +330,7 @@ export const identifyFeaturesRoute = createServerRoute({
328330
logger,
329331
stream,
330332
features: hits,
333+
signal,
331334
})
332335
).pipe(
333336
switchMap(({ features }) => {
@@ -344,6 +347,7 @@ export const identifyFeaturesRoute = createServerRoute({
344347
...feature,
345348
description: '',
346349
},
350+
signal,
347351
});
348352

349353
return {
@@ -419,6 +423,7 @@ export const describeStreamRoute = createServerRoute({
419423
inferenceClient: inferenceClient.bindTo({ connectorId }),
420424
start: start.valueOf(),
421425
end: end.valueOf(),
426+
signal: getRequestAbortSignal(request),
422427
})
423428
).pipe(
424429
map((description) => {

x-pack/platform/plugins/shared/streams/server/routes/internal/streams/management/suggest_partitions_route.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import { STREAMS_TIERED_ML_FEATURE } from '../../../../../common';
1515
import { STREAMS_API_PRIVILEGES } from '../../../../../common/constants';
1616
import { SecurityError } from '../../../../lib/streams/errors/security_error';
1717
import { createServerRoute } from '../../../create_server_route';
18+
import { getRequestAbortSignal } from '../../../utils/get_request_abort_signal';
1819

1920
export interface SuggestPartitionsParams {
2021
path: {
@@ -84,7 +85,7 @@ export const suggestPartitionsRoute = createServerRoute({
8485
start: params.body.start,
8586
end: params.body.end,
8687
maxSteps: 1, // Longer reasoning seems to add unnecessary conditions (and latency), instead of improving accuracy, so we limit the steps.
87-
signal: new AbortController().signal,
88+
signal: getRequestAbortSignal(request),
8889
});
8990

9091
// Turn our promise into an Observable ServerSideEvent. The only reason we're streaming the

x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/grok_suggestions_handler.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ export interface ProcessingGrokSuggestionsHandlerDeps {
3939
scopedClusterClient: IScopedClusterClient;
4040
streamsClient: StreamsClient;
4141
fieldsMetadataClient: IFieldsMetadataClient;
42+
signal: AbortSignal;
4243
}
4344

4445
export const processingGrokSuggestionsSchema = z.object({
@@ -65,6 +66,7 @@ export const handleProcessingGrokSuggestions = async ({
6566
inferenceClient,
6667
streamsClient,
6768
fieldsMetadataClient,
69+
signal,
6870
}: ProcessingGrokSuggestionsHandlerDeps) => {
6971
const stream = await streamsClient.getStream(params.path.name);
7072
const isWiredStream = Streams.WiredStream.Definition.is(stream);
@@ -76,6 +78,7 @@ export const handleProcessingGrokSuggestions = async ({
7678
sample_messages: params.body.sample_messages,
7779
review_fields: JSON.stringify(params.body.review_fields),
7880
},
81+
abortSignal: signal,
7982
});
8083
const reviewResult = response.toolCalls[0].function.arguments;
8184

x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/route.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import {
2626
handleProcessingGrokSuggestions,
2727
processingGrokSuggestionsSchema,
2828
} from './grok_suggestions_handler';
29+
import { getRequestAbortSignal } from '../../../utils/get_request_abort_signal';
2930

3031
const paramsSchema = z.object({
3132
path: z.object({ name: z.string() }),
@@ -110,6 +111,7 @@ export const processingGrokSuggestionRoute = createServerRoute({
110111
streamsClient,
111112
scopedClusterClient,
112113
fieldsMetadataClient,
114+
signal: getRequestAbortSignal(request),
113115
})
114116
).pipe(
115117
map((grokProcessor) => ({

0 commit comments

Comments
 (0)