Skip to content

Commit 710d5c5

Browse files
🌊 [Streams] Add state based (snapshot) telemetry, report Stream State Errors (#236921)
## Summary This PR implements the initial stats (daily snapshot) telemetry collection for Streams. Also tracks the Streams State errors via EBT. The PR also moves the StreamsStorageClient to a central location so that consumers don't have to recreate their own versions of Storage Adapter/Client. ### 📊 State-Based (Snapshot) Telemetry #### Overview Implements a basic telemetry collector that captures Streams usage patterns ~and significant events metrics~. #### Metrics Collected <table border="1" cellpadding="5" cellspacing="0" style="border-collapse: collapse; width: 100%;"> <thead> <tr> <th width="200">Metric</th> <th width="100">Type</th> <th style="text-align: left; padding: 8px;">Description</th> </tr> </thead> <tbody> <tr> <td colspan="3" style="padding: 8px;"><strong>Classic Streams</strong></td> </tr> <tr> <td style="padding: 8px;"><code>changed_count</code></td> <td style="padding: 8px; white-space: nowrap;"><code>long</code></td> <td style="padding: 8px;">Number of classic streams modified from default configuration. Calculated by counting streams in <code>.kibana_streams</code> index (presence indicates customization).</td> </tr> <tr> <td style="padding: 8px;"><code>with_processing_count</code></td> <td style="padding: 8px; white-space: nowrap;"><code>long</code></td> <td style="padding: 8px;">Number of classic streams with custom processing steps. Counted by non-empty <code>ingest.processing.steps</code> arrays.</td> </tr> <tr> <td style="padding: 8px;"><code>with_fields_count</code></td> <td style="padding: 8px; white-space: nowrap;"><code>long</code></td> <td style="padding: 8px;">Number of classic streams with custom field overrides. Counted by non-empty <code>ingest.classic.field_overrides</code> objects.</td> </tr> <tr> <td style="padding: 8px;"><code>with_changed_retention_count</code></td> <td style="padding: 8px; white-space: nowrap;"><code>long</code></td> <td style="padding: 8px;">Number of classic streams with default (inherited) retention changed.</td> </tr> <tr> <td colspan="3" style="padding: 8px;"><strong>Wired Streams</strong></td> </tr> <tr> <td style="padding: 8px;"><code>count</code></td> <td style="padding: 8px; white-space: nowrap;"><code>long</code></td> <td style="padding: 8px;">Total number of wired streams in the system. Counted by streams matching WiredStream schema (excluding GroupStreams).</td> </tr> <tr> <td colspan="3" style="padding: 8px;"><strong> ~Significant Events~ </strong> removed</td> </tr> </tbody> </table> ### 📊 Error-Based (EBT) Telemetry The errors reported by Strreams state machine e.g. during [`attemptChanges`](https://github.com/elastic/kibana/blob/5d4799fa165a1f85967c9b837b8c29bdd6f61611/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/state.ts#L69) are now tracked via EBT. ### 📖 Notes Only unit tests coverage is maintained currently. e2e tests for telemetry could be implemented as a follow up or after the telemetry matures. ### 🧪 How to test <img width="3446" height="2080" alt="image" src="https://github.com/user-attachments/assets/62f33aa7-497d-41c2-b993-12ba52b2ec6d" /> --------- Co-authored-by: kibanamachine <[email protected]>
1 parent aa6af6a commit 710d5c5

31 files changed

+710
-103
lines changed

‎x-pack/platform/plugins/private/telemetry_collection_xpack/schema/xpack_platform.json‎

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6492,6 +6492,48 @@
64926492
}
64936493
}
64946494
},
6495+
"streams": {
6496+
"properties": {
6497+
"classic_streams": {
6498+
"properties": {
6499+
"changed_count": {
6500+
"type": "long",
6501+
"_meta": {
6502+
"description": "Number of classic streams that have been modified from their default configuration. Calculated by presence in .kibana_streams (managed) index and type is \"classic\"."
6503+
}
6504+
},
6505+
"with_processing_count": {
6506+
"type": "long",
6507+
"_meta": {
6508+
"description": "Number of classic streams with custom processing steps configured. Calculated by counting streams with non-empty ingest.processing.steps arrays."
6509+
}
6510+
},
6511+
"with_fields_count": {
6512+
"type": "long",
6513+
"_meta": {
6514+
"description": "Number of classic streams with custom field overrides configured. Calculated by counting streams with non-empty ingest.classic.field_overrides objects."
6515+
}
6516+
},
6517+
"with_changed_retention_count": {
6518+
"type": "long",
6519+
"_meta": {
6520+
"description": "Number of classic streams with changed retention. Calculated by confirming stream lifecycle is not \"inherited\"."
6521+
}
6522+
}
6523+
}
6524+
},
6525+
"wired_streams": {
6526+
"properties": {
6527+
"count": {
6528+
"type": "long",
6529+
"_meta": {
6530+
"description": "Total number of wired streams in the system."
6531+
}
6532+
}
6533+
}
6534+
}
6535+
}
6536+
},
64956537
"task_manager": {
64966538
"properties": {
64976539
"task_type_exclusion": {

‎x-pack/platform/plugins/shared/streams/server/lib/streams/client.ts‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import {
2828
import { SecurityError } from './errors/security_error';
2929
import { StatusError } from './errors/status_error';
3030
import { LOGS_ROOT_STREAM_NAME, rootStreamDefinition } from './root_stream_definition';
31-
import type { StreamsStorageClient } from './service';
31+
import type { StreamsStorageClient } from './storage/streams_storage_client';
3232
import { State } from './state_management/state';
3333
import { checkAccess, checkAccessBulk } from './stream_crud';
3434
import { StreamsStatusConflictError } from './errors/streams_status_conflict_error';

‎x-pack/platform/plugins/shared/streams/server/lib/streams/create_streams_global_search_result_provider.ts‎

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,13 @@ import type {
1111
GlobalSearchProviderResult,
1212
GlobalSearchResultProvider,
1313
} from '@kbn/global-search-plugin/server';
14-
import { StorageIndexAdapter } from '@kbn/storage-adapter';
1514
import { Streams } from '@kbn/streams-schema';
1615
import type { SearchHit } from '@kbn/es-types';
1716
import { OBSERVABILITY_STREAMS_ENABLE_GROUP_STREAMS } from '@kbn/management-settings-ids';
18-
import type { StreamsStorageClient, StreamsStorageSettings } from './service';
19-
import { streamsStorageSettings } from './service';
20-
import { migrateOnRead } from './helpers/migrate_on_read';
17+
import {
18+
createStreamsStorageClient,
19+
type StreamsStorageClient,
20+
} from './storage/streams_storage_client';
2121
import { checkAccessBulk } from './stream_crud';
2222

2323
const streamTypes = ['classic stream', 'wired stream', 'group stream', 'stream'];
@@ -34,13 +34,7 @@ export function createStreamsGlobalSearchResultProvider(
3434
return from([]);
3535
}
3636

37-
const storageAdapter = new StorageIndexAdapter<
38-
StreamsStorageSettings,
39-
Streams.all.Definition
40-
>(client.asInternalUser, logger, streamsStorageSettings, {
41-
migrateSource: migrateOnRead,
42-
});
43-
const storageClient = storageAdapter.getClient();
37+
const storageClient = createStreamsStorageClient(client.asInternalUser, logger);
4438

4539
return from(findStreams({ term, types, maxResults, storageClient, client, core })).pipe(
4640
takeUntil(aborted$)

‎x-pack/platform/plugins/shared/streams/server/lib/streams/service.ts‎

Lines changed: 2 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -6,32 +6,14 @@
66
*/
77

88
import type { CoreSetup, KibanaRequest, Logger } from '@kbn/core/server';
9-
import type { IStorageClient, StorageSettings } from '@kbn/storage-adapter';
10-
import { StorageIndexAdapter, types } from '@kbn/storage-adapter';
11-
import type { Streams } from '@kbn/streams-schema';
129
import { LockManagerService } from '@kbn/lock-manager';
1310
import type { StreamsPluginStartDependencies } from '../../types';
11+
import { createStreamsStorageClient } from './storage/streams_storage_client';
1412
import type { AssetClient } from './assets/asset_client';
1513
import type { QueryClient } from './assets/query/query_client';
1614
import { StreamsClient } from './client';
17-
import { migrateOnRead } from './helpers/migrate_on_read';
1815
import type { SystemClient } from './system/system_client';
1916

20-
export const streamsStorageSettings = {
21-
name: '.kibana_streams',
22-
schema: {
23-
properties: {
24-
name: types.keyword(),
25-
description: types.text(),
26-
ingest: types.object({ enabled: false }),
27-
group: types.object({ enabled: false }),
28-
},
29-
},
30-
} satisfies StorageSettings;
31-
32-
export type StreamsStorageSettings = typeof streamsStorageSettings;
33-
export type StreamsStorageClient = IStorageClient<StreamsStorageSettings, Streams.all.Definition>;
34-
3517
export class StreamsService {
3618
constructor(
3719
private readonly coreSetup: CoreSetup<StreamsPluginStartDependencies>,
@@ -57,23 +39,14 @@ export class StreamsService {
5739
const scopedClusterClient = coreStart.elasticsearch.client.asScoped(request);
5840
const isServerless = coreStart.elasticsearch.getCapabilities().serverless;
5941

60-
const storageAdapter = new StorageIndexAdapter<StreamsStorageSettings, Streams.all.Definition>(
61-
scopedClusterClient.asInternalUser,
62-
logger,
63-
streamsStorageSettings,
64-
{
65-
migrateSource: migrateOnRead,
66-
}
67-
);
68-
6942
return new StreamsClient({
7043
assetClient,
7144
queryClient,
7245
systemClient,
7346
logger,
7447
scopedClusterClient,
7548
lockManager: new LockManagerService(this.coreSetup, logger),
76-
storageClient: storageAdapter.getClient(),
49+
storageClient: createStreamsStorageClient(scopedClusterClient.asInternalUser, logger),
7750
request,
7851
isServerless,
7952
isDev: this.isDev,

‎x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/types.ts‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import type { Streams } from '@kbn/streams-schema';
1010
import type { LockManagerService } from '@kbn/lock-manager';
1111
import type { AssetClient } from '../assets/asset_client';
1212
import type { StreamsClient } from '../client';
13-
import type { StreamsStorageClient } from '../service';
13+
import type { StreamsStorageClient } from '../storage/streams_storage_client';
1414
import type { QueryClient } from '../assets/query/query_client';
1515
import type { SystemClient } from '../system/system_client';
1616

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
import {
9+
StorageIndexAdapter,
10+
type IStorageClient,
11+
type StorageSettings,
12+
types,
13+
} from '@kbn/storage-adapter';
14+
import type { Logger } from '@kbn/logging';
15+
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
16+
import type { Streams } from '@kbn/streams-schema';
17+
import { migrateOnRead } from './migrate_on_read';
18+
19+
const streamsStorageSettings = {
20+
name: '.kibana_streams',
21+
schema: {
22+
properties: {
23+
name: types.keyword(),
24+
description: types.text(),
25+
ingest: types.object({ enabled: false }),
26+
group: types.object({ enabled: false }),
27+
},
28+
},
29+
} satisfies StorageSettings;
30+
31+
type StreamsStorageSettings = typeof streamsStorageSettings;
32+
export type StreamsStorageClient = IStorageClient<StreamsStorageSettings, Streams.all.Definition>;
33+
34+
/**
35+
* This ensures there's only one way to initialize a storage client for streams, with the proper
36+
* settings and migration on read.
37+
* @param esClient
38+
* @param logger
39+
*/
40+
export function createStreamsStorageClient(
41+
esClient: ElasticsearchClient,
42+
logger: Logger
43+
): StreamsStorageClient {
44+
const adapter = new StorageIndexAdapter<StreamsStorageSettings, Streams.all.Definition>(
45+
esClient,
46+
logger,
47+
streamsStorageSettings,
48+
{
49+
migrateSource: migrateOnRead,
50+
}
51+
);
52+
53+
return adapter.getClient();
54+
}
Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
*/
77

88
import type { AnalyticsServiceSetup } from '@kbn/core-analytics-server';
9-
import type { StreamEndpointLatencyProps } from './types';
10-
import { STREAMS_ENDPOINT_LATENCY_EVENT } from './constants';
9+
import type { StreamEndpointLatencyProps, StreamsStateErrorProps } from './types';
10+
import { STREAMS_ENDPOINT_LATENCY_EVENT, STREAMS_STATE_ERROR_EVENT } from './constants';
1111

1212
const LATENCY_TRACKING_ENDPOINT_ALLOW_LIST = [
1313
'POST /api/streams/{name}/processing/_simulate 2023-10-31',
@@ -23,7 +23,7 @@ const LATENCY_TRACKING_ENDPOINT_ALLOW_LIST = [
2323
'POST /api/streams/_resync 2023-10-31',
2424
];
2525

26-
export class StreamsTelemetryClient {
26+
export class EbtTelemetryClient {
2727
constructor(private readonly analytics: AnalyticsServiceSetup) {}
2828

2929
public startTrackingEndpointLatency(
@@ -40,4 +40,17 @@ export class StreamsTelemetryClient {
4040
});
4141
};
4242
}
43+
44+
public reportStreamsStateError(error: Error & { statusCode: number }) {
45+
const errorData: StreamsStateErrorProps = {
46+
error: {
47+
name: error.name,
48+
message: error.message,
49+
stack_trace: error.stack,
50+
},
51+
status_code: error.statusCode,
52+
};
53+
54+
this.analytics.reportEvent(STREAMS_STATE_ERROR_EVENT, errorData);
55+
}
4356
}

0 commit comments

Comments
 (0)