Skip to content

Commit 47ccede

Browse files
[8.x] [Streams] Partitioning improvements (elastic#209095) (elastic#210190)
# Backport This will backport the following commits from `main` to `8.x`: - [[Streams] Partitioning improvements (elastic#209095)](elastic#209095) <!--- Backport version: 9.4.3 --> ### Questions ? Please refer to the [Backport tool documentation](https://github.com/sqren/backport) <!--BACKPORT [{"author":{"name":"Kerry Gallagher","email":"[email protected]"},"sourceCommit":{"committedDate":"2025-02-07T13:07:19Z","message":"[Streams] Partitioning improvements (elastic#209095)\n\n## Summary \r\n\r\nThis issue predominantly tries to improve the situation around fetching\r\nand showing samples. Some of the discussion can be seen here:\r\nhttps://github.com/elastic/streams-program/issues/37#issuecomment-2605288052\r\n\r\nWe have several issues - runtime fields are expensive (but needed if\r\nfields aren't mapped), we are susceptible to timeouts depending on\r\namount of data and timerange, getting exact document counts (for match /\r\nnot matched counts) is expensive etc.\r\n\r\nAfter speaking with Joe we decided it might be worth trying out async\r\nsearch, as this alleviates some of these issues. E.g. the ability to\r\nload and show partial results without trying to communicate this through\r\nour API, or have to provide a potentially confusing UI around timeouts /\r\nrunning to exhaustion options / toggles.\r\n\r\nRealistically we only fetch 100 examples, but we might need to scan many\r\ndocuments to gather that set of documents, I'm not 100% sure how often\r\nwe'll actually hit partial results here, but it seems more robust than\r\nworrying about timeouts.\r\n\r\nFor the matching counts I just couldn't see a way to get an accurate\r\ncount without something expensive (e.g. `track_total_hits`) so I've\r\ntried to use an \"approximate match rate\" based on a random sample, that\r\nrandom sample is then filtered to the condition to see what approximate\r\npercent matched. One note: aggregations don't seem to return partial\r\nresults (which makes sense I guess), you get the interval polling\r\nrequests, but won't get a result until the end. I did wonder if you\r\ncould do something smart with `track_total_hits` and aggs to \"stream\"\r\npartial counts, I found a Slack thread saying don't do this 😅\r\n\r\n⚠️ ~I'm not 100% sure what I'm missing here but I have seen the filter\r\nsub aggregation come back with a doc_count that is higher than the\r\nrandom sample.~\r\n\r\n~[From the\r\ndocs](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-random-sampler-aggregation.html#random-sampler-inner-workings)\r\nI understand \"If a query is provided, a document is returned if it is\r\nmatched by the query and if the document is in the random sampling. The\r\nsampling is not done over the matched documents.\" but I don't see why\r\nthat affects the sub aggregation under the random sample.~\r\n\r\n![Screenshot 2025-01-31 at 11 30\r\n53](https://github.com/user-attachments/assets/e2444348-caef-41b6-9708-4fdbb84f1ccd)\r\n\r\n~I hit this when playing with the `probability` setting, not sure if I'm\r\nmissing something stupid.~\r\n\r\n\r\n[Solved](https://github.com/elastic/kibana/pull/209095#discussion_r1940567855)\r\n\r\nOverall, this does seem to work well. I've used this against ~250k and\r\n~2.5million documents, and whilst (depending on time range / runtime\r\nfields) it can still be slow, it seems to provide a better experience\r\nthan hitting our API and holding the open connection. Obviously it comes\r\nwith the downsides of sitting on the client (not really sure it's a con,\r\nthese are platform services) and not using the standard\r\n`streamsRepositoryClient`.\r\n\r\n## Other changes\r\n\r\n- The core changes here are in the `use_async_sample` hook, and where\r\nthat's consumed.\r\n\r\n- Runtime fields are not generated for fields that are mapped.\r\n\r\n- I've also refactored the routing index page so that components / hooks\r\nlive in their own files (this makes the diff look bigger than it is)\r\n\r\n- Refactored some logic around preview panel / preview panel\r\nillustration so that the two branches of logic / conditionals now become\r\none.\r\n\r\n## Followups\r\n\r\n- I haven't changed enrichment to use this or removed the actual API\r\nroute as I figured this would need discussion first to see if we want to\r\nuse this.","sha":"97d0c1b2aeee10bdadede71b05691f8857c5fc2f","branchLabelMapping":{"^v9.1.0$":"main","^v8.19.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:skip","backport:version","Feature:Streams","v9.1.0","v8.19.0"],"title":"[Streams] Partitioning improvements ","number":209095,"url":"https://github.com/elastic/kibana/pull/209095","mergeCommit":{"message":"[Streams] Partitioning improvements (elastic#209095)\n\n## Summary \r\n\r\nThis issue predominantly tries to improve the situation around fetching\r\nand showing samples. Some of the discussion can be seen here:\r\nhttps://github.com/elastic/streams-program/issues/37#issuecomment-2605288052\r\n\r\nWe have several issues - runtime fields are expensive (but needed if\r\nfields aren't mapped), we are susceptible to timeouts depending on\r\namount of data and timerange, getting exact document counts (for match /\r\nnot matched counts) is expensive etc.\r\n\r\nAfter speaking with Joe we decided it might be worth trying out async\r\nsearch, as this alleviates some of these issues. E.g. the ability to\r\nload and show partial results without trying to communicate this through\r\nour API, or have to provide a potentially confusing UI around timeouts /\r\nrunning to exhaustion options / toggles.\r\n\r\nRealistically we only fetch 100 examples, but we might need to scan many\r\ndocuments to gather that set of documents, I'm not 100% sure how often\r\nwe'll actually hit partial results here, but it seems more robust than\r\nworrying about timeouts.\r\n\r\nFor the matching counts I just couldn't see a way to get an accurate\r\ncount without something expensive (e.g. `track_total_hits`) so I've\r\ntried to use an \"approximate match rate\" based on a random sample, that\r\nrandom sample is then filtered to the condition to see what approximate\r\npercent matched. One note: aggregations don't seem to return partial\r\nresults (which makes sense I guess), you get the interval polling\r\nrequests, but won't get a result until the end. I did wonder if you\r\ncould do something smart with `track_total_hits` and aggs to \"stream\"\r\npartial counts, I found a Slack thread saying don't do this 😅\r\n\r\n⚠️ ~I'm not 100% sure what I'm missing here but I have seen the filter\r\nsub aggregation come back with a doc_count that is higher than the\r\nrandom sample.~\r\n\r\n~[From the\r\ndocs](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-random-sampler-aggregation.html#random-sampler-inner-workings)\r\nI understand \"If a query is provided, a document is returned if it is\r\nmatched by the query and if the document is in the random sampling. The\r\nsampling is not done over the matched documents.\" but I don't see why\r\nthat affects the sub aggregation under the random sample.~\r\n\r\n![Screenshot 2025-01-31 at 11 30\r\n53](https://github.com/user-attachments/assets/e2444348-caef-41b6-9708-4fdbb84f1ccd)\r\n\r\n~I hit this when playing with the `probability` setting, not sure if I'm\r\nmissing something stupid.~\r\n\r\n\r\n[Solved](https://github.com/elastic/kibana/pull/209095#discussion_r1940567855)\r\n\r\nOverall, this does seem to work well. I've used this against ~250k and\r\n~2.5million documents, and whilst (depending on time range / runtime\r\nfields) it can still be slow, it seems to provide a better experience\r\nthan hitting our API and holding the open connection. Obviously it comes\r\nwith the downsides of sitting on the client (not really sure it's a con,\r\nthese are platform services) and not using the standard\r\n`streamsRepositoryClient`.\r\n\r\n## Other changes\r\n\r\n- The core changes here are in the `use_async_sample` hook, and where\r\nthat's consumed.\r\n\r\n- Runtime fields are not generated for fields that are mapped.\r\n\r\n- I've also refactored the routing index page so that components / hooks\r\nlive in their own files (this makes the diff look bigger than it is)\r\n\r\n- Refactored some logic around preview panel / preview panel\r\nillustration so that the two branches of logic / conditionals now become\r\none.\r\n\r\n## Followups\r\n\r\n- I haven't changed enrichment to use this or removed the actual API\r\nroute as I figured this would need discussion first to see if we want to\r\nuse this.","sha":"97d0c1b2aeee10bdadede71b05691f8857c5fc2f"}},"sourceBranch":"main","suggestedTargetBranches":["8.x"],"targetPullRequestStates":[{"branch":"main","label":"v9.1.0","branchLabelMappingKey":"^v9.1.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/209095","number":209095,"mergeCommit":{"message":"[Streams] Partitioning improvements (elastic#209095)\n\n## Summary \r\n\r\nThis issue predominantly tries to improve the situation around fetching\r\nand showing samples. Some of the discussion can be seen here:\r\nhttps://github.com/elastic/streams-program/issues/37#issuecomment-2605288052\r\n\r\nWe have several issues - runtime fields are expensive (but needed if\r\nfields aren't mapped), we are susceptible to timeouts depending on\r\namount of data and timerange, getting exact document counts (for match /\r\nnot matched counts) is expensive etc.\r\n\r\nAfter speaking with Joe we decided it might be worth trying out async\r\nsearch, as this alleviates some of these issues. E.g. the ability to\r\nload and show partial results without trying to communicate this through\r\nour API, or have to provide a potentially confusing UI around timeouts /\r\nrunning to exhaustion options / toggles.\r\n\r\nRealistically we only fetch 100 examples, but we might need to scan many\r\ndocuments to gather that set of documents, I'm not 100% sure how often\r\nwe'll actually hit partial results here, but it seems more robust than\r\nworrying about timeouts.\r\n\r\nFor the matching counts I just couldn't see a way to get an accurate\r\ncount without something expensive (e.g. `track_total_hits`) so I've\r\ntried to use an \"approximate match rate\" based on a random sample, that\r\nrandom sample is then filtered to the condition to see what approximate\r\npercent matched. One note: aggregations don't seem to return partial\r\nresults (which makes sense I guess), you get the interval polling\r\nrequests, but won't get a result until the end. I did wonder if you\r\ncould do something smart with `track_total_hits` and aggs to \"stream\"\r\npartial counts, I found a Slack thread saying don't do this 😅\r\n\r\n⚠️ ~I'm not 100% sure what I'm missing here but I have seen the filter\r\nsub aggregation come back with a doc_count that is higher than the\r\nrandom sample.~\r\n\r\n~[From the\r\ndocs](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-random-sampler-aggregation.html#random-sampler-inner-workings)\r\nI understand \"If a query is provided, a document is returned if it is\r\nmatched by the query and if the document is in the random sampling. The\r\nsampling is not done over the matched documents.\" but I don't see why\r\nthat affects the sub aggregation under the random sample.~\r\n\r\n![Screenshot 2025-01-31 at 11 30\r\n53](https://github.com/user-attachments/assets/e2444348-caef-41b6-9708-4fdbb84f1ccd)\r\n\r\n~I hit this when playing with the `probability` setting, not sure if I'm\r\nmissing something stupid.~\r\n\r\n\r\n[Solved](https://github.com/elastic/kibana/pull/209095#discussion_r1940567855)\r\n\r\nOverall, this does seem to work well. I've used this against ~250k and\r\n~2.5million documents, and whilst (depending on time range / runtime\r\nfields) it can still be slow, it seems to provide a better experience\r\nthan hitting our API and holding the open connection. Obviously it comes\r\nwith the downsides of sitting on the client (not really sure it's a con,\r\nthese are platform services) and not using the standard\r\n`streamsRepositoryClient`.\r\n\r\n## Other changes\r\n\r\n- The core changes here are in the `use_async_sample` hook, and where\r\nthat's consumed.\r\n\r\n- Runtime fields are not generated for fields that are mapped.\r\n\r\n- I've also refactored the routing index page so that components / hooks\r\nlive in their own files (this makes the diff look bigger than it is)\r\n\r\n- Refactored some logic around preview panel / preview panel\r\nillustration so that the two branches of logic / conditionals now become\r\none.\r\n\r\n## Followups\r\n\r\n- I haven't changed enrichment to use this or removed the actual API\r\nroute as I figured this would need discussion first to see if we want to\r\nuse this.","sha":"97d0c1b2aeee10bdadede71b05691f8857c5fc2f"}},{"branch":"8.x","label":"v8.19.0","branchLabelMappingKey":"^v8.19.0$","isSourceBranch":false,"state":"NOT_CREATED"}]}] BACKPORT--> Co-authored-by: Kerry Gallagher <[email protected]>
1 parent d931871 commit 47ccede

File tree

15 files changed

+1365
-849
lines changed

15 files changed

+1365
-849
lines changed
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@
77

88
import {
99
Condition,
10-
FilterCondition,
11-
isAndCondition,
1210
isFilterCondition,
11+
isAndCondition,
1312
isOrCondition,
14-
} from '@kbn/streams-schema';
13+
FilterCondition,
14+
} from '../models';
1515

1616
export function getFields(
1717
condition: Condition
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@
66
*/
77

88
import {
9-
Condition,
109
FilterCondition,
11-
isAndCondition,
10+
Condition,
1211
isFilterCondition,
12+
isAndCondition,
1313
isOrCondition,
14-
} from '@kbn/streams-schema';
14+
} from '../models';
1515

1616
function conditionToClause(condition: FilterCondition) {
1717
switch (condition.operator) {

x-pack/solutions/observability/packages/kbn-streams-schema/src/helpers/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,5 @@
77

88
export * from './type_guards';
99
export * from './hierarchy';
10+
export * from './condition_fields';
11+
export * from './condition_to_query_dsl';

x-pack/solutions/observability/plugins/streams/server/routes/streams/management/route.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,14 @@
55
* 2.0.
66
*/
77

8-
import { RecursiveRecord, conditionSchema } from '@kbn/streams-schema';
8+
import {
9+
RecursiveRecord,
10+
conditionSchema,
11+
conditionToQueryDsl,
12+
getFields,
13+
} from '@kbn/streams-schema';
914
import { z } from '@kbn/zod';
1015
import { ResyncStreamsResponse } from '../../../lib/streams/client';
11-
import { getFields } from '../../../lib/streams/helpers/condition_fields';
12-
import { conditionToQueryDsl } from '../../../lib/streams/helpers/condition_to_query_dsl';
1316
import { checkAccess } from '../../../lib/streams/stream_crud';
1417
import { createServerRoute } from '../../create_server_route';
1518
import { DefinitionNotFoundError } from '../../../lib/streams/errors/definition_not_found_error';
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
import {
9+
EuiFlexGroup,
10+
EuiFlexItem,
11+
EuiText,
12+
EuiDragDropContext,
13+
EuiDroppable,
14+
EuiDraggable,
15+
EuiPanel,
16+
EuiButtonEmpty,
17+
} from '@elastic/eui';
18+
import { i18n } from '@kbn/i18n';
19+
import { WiredStreamGetResponse } from '@kbn/streams-schema';
20+
import { css } from '@emotion/css';
21+
import { cloneDeep } from 'lodash';
22+
import React from 'react';
23+
import { EMPTY_EQUALS_CONDITION } from '../../util/condition';
24+
import { NestedView } from '../nested_view';
25+
import { useRoutingState } from './hooks/routing_state';
26+
import { CurrentStreamEntry } from './current_stream_entry';
27+
import { NewRoutingStreamEntry } from './new_routing_stream_entry';
28+
import { RoutingStreamEntry } from './routing_stream_entry';
29+
30+
export function ChildStreamList({
31+
definition,
32+
availableStreams,
33+
routingAppState: {
34+
childUnderEdit,
35+
selectChildUnderEdit,
36+
childStreams,
37+
onChildStreamDragEnd,
38+
onChildStreamDragStart,
39+
draggingChildStream,
40+
},
41+
}: {
42+
definition: WiredStreamGetResponse;
43+
routingAppState: ReturnType<typeof useRoutingState>;
44+
availableStreams: string[];
45+
}) {
46+
return (
47+
<EuiFlexGroup
48+
direction="column"
49+
gutterSize="s"
50+
className={css`
51+
overflow: auto;
52+
`}
53+
>
54+
<EuiFlexItem grow={false}>
55+
<EuiText
56+
size="s"
57+
className={css`
58+
height: 40px;
59+
align-content: center;
60+
font-weight: bold;
61+
`}
62+
>
63+
{i18n.translate('xpack.streams.streamDetailRouting.rules.header', {
64+
defaultMessage: 'Routing rules',
65+
})}
66+
</EuiText>
67+
</EuiFlexItem>
68+
<EuiFlexGroup
69+
direction="column"
70+
gutterSize="xs"
71+
className={css`
72+
overflow: auto;
73+
`}
74+
>
75+
<CurrentStreamEntry definition={definition} />
76+
<EuiDragDropContext onDragEnd={onChildStreamDragEnd} onDragStart={onChildStreamDragStart}>
77+
<EuiDroppable droppableId="routing_children_reordering" spacing="none">
78+
<EuiFlexGroup direction="column" gutterSize="xs">
79+
{childStreams.map((child, i) => (
80+
<EuiFlexItem key={`${child.destination}-${i}-flex-item`} grow={false}>
81+
<EuiDraggable
82+
key={child.destination}
83+
index={i}
84+
draggableId={child.destination}
85+
hasInteractiveChildren={true}
86+
customDragHandle={true}
87+
spacing="none"
88+
>
89+
{(provided) => (
90+
<NestedView
91+
key={i}
92+
isBeingDragged={draggingChildStream === child.destination}
93+
>
94+
<RoutingStreamEntry
95+
draggableProvided={provided}
96+
child={
97+
!childUnderEdit?.isNew &&
98+
child.destination === childUnderEdit?.child.destination
99+
? childUnderEdit.child
100+
: child
101+
}
102+
edit={
103+
!childUnderEdit?.isNew &&
104+
child.destination === childUnderEdit?.child.destination
105+
}
106+
onEditStateChange={() => {
107+
if (child.destination === childUnderEdit?.child.destination) {
108+
selectChildUnderEdit(undefined);
109+
} else {
110+
selectChildUnderEdit({ isNew: false, child });
111+
}
112+
}}
113+
onChildChange={(newChild) => {
114+
selectChildUnderEdit({
115+
isNew: false,
116+
child: newChild,
117+
});
118+
}}
119+
availableStreams={availableStreams}
120+
/>
121+
</NestedView>
122+
)}
123+
</EuiDraggable>
124+
</EuiFlexItem>
125+
))}
126+
</EuiFlexGroup>
127+
</EuiDroppable>
128+
</EuiDragDropContext>
129+
{childUnderEdit?.isNew ? (
130+
<NestedView last>
131+
<NewRoutingStreamEntry
132+
child={childUnderEdit.child}
133+
onChildChange={(newChild) => {
134+
if (!newChild) {
135+
selectChildUnderEdit(undefined);
136+
return;
137+
}
138+
selectChildUnderEdit({
139+
isNew: true,
140+
child: newChild,
141+
});
142+
}}
143+
/>
144+
</NestedView>
145+
) : (
146+
<NestedView last>
147+
<EuiPanel hasShadow={false} hasBorder paddingSize="none">
148+
<EuiButtonEmpty
149+
iconType="plus"
150+
data-test-subj="streamsAppStreamDetailRoutingAddRuleButton"
151+
onClick={() => {
152+
selectChildUnderEdit({
153+
isNew: true,
154+
child: {
155+
destination: `${definition.stream.name}.child`,
156+
if: cloneDeep(EMPTY_EQUALS_CONDITION),
157+
},
158+
});
159+
}}
160+
>
161+
{i18n.translate('xpack.streams.streamDetailRouting.addRule', {
162+
defaultMessage: 'Create a new child stream',
163+
})}
164+
</EuiButtonEmpty>
165+
</EuiPanel>
166+
</NestedView>
167+
)}
168+
</EuiFlexGroup>
169+
</EuiFlexGroup>
170+
);
171+
}

0 commit comments

Comments
 (0)