Skip to content

Commit d753695

Browse files
🌊 [Group streams] Promote unmanaged Classic streams when added as member (#233896)
## Summary If a user submits a Group stream that has an unmanaged Classic stream as a member, we would fail since during validation we check to find all members in the desired state. Since we do not load all unmanaged Classic streams into the state, this would fail for a request that is otherwise valid. In this PR, I'm adding a cascading effect that "promotes" any unmanaged Classic stream to a managed Classic stream if it is added as a member of a Group stream.
1 parent 223f372 commit d753695

File tree

2 files changed

+96
-10
lines changed
  • x-pack/platform

2 files changed

+96
-10
lines changed

‎x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/group_stream.ts‎

Lines changed: 59 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,51 @@ export class GroupStream extends StreamActiveRecord<Streams.GroupStream.Definiti
4545

4646
this._definition = definition;
4747

48-
return { cascadingChanges: [], changeStatus: 'upserted' };
48+
const missingMembers: string[] = [];
49+
for (const member of this._definition.group.members) {
50+
if (!desiredState.has(member)) {
51+
missingMembers.push(member);
52+
}
53+
}
54+
55+
const existsAsDataStream = await Promise.all(
56+
missingMembers.map(async (member) => {
57+
try {
58+
const dataStreamResult =
59+
await this.dependencies.scopedClusterClient.asCurrentUser.indices.getDataStream({
60+
name: member,
61+
});
62+
63+
return dataStreamResult.data_streams.length > 0 ? member : null;
64+
} catch (error) {
65+
if (!isNotFoundError(error)) {
66+
throw error;
67+
}
68+
return null;
69+
}
70+
})
71+
);
72+
73+
const cascadingChanges: StreamChange[] = existsAsDataStream
74+
.filter((member): member is string => member !== null)
75+
.map((member) => ({
76+
type: 'upsert',
77+
definition: {
78+
name: member,
79+
description: '',
80+
ingest: {
81+
classic: {},
82+
lifecycle: {
83+
inherit: {},
84+
},
85+
processing: {
86+
steps: [],
87+
},
88+
},
89+
},
90+
}));
91+
92+
return { cascadingChanges, changeStatus: 'upserted' };
4993
}
5094

5195
protected async doHandleDeleteChange(
@@ -115,17 +159,25 @@ export class GroupStream extends StreamActiveRecord<Streams.GroupStream.Definiti
115159
};
116160
}
117161

162+
const missingMembers: string[] = [];
118163
for (const member of this._definition.group.members) {
119164
const relatedStream = desiredState.get(member);
120165
if (!relatedStream || relatedStream.isDeleted()) {
121-
return {
122-
isValid: false,
123-
errors: [
124-
new Error(`Group stream ${this.name} has ${member} as a member which was not found`),
125-
],
126-
};
166+
missingMembers.push(member);
127167
}
128168
}
169+
if (missingMembers.length > 0) {
170+
return {
171+
isValid: false,
172+
errors: [
173+
new Error(
174+
`Group stream ${
175+
this.name
176+
} has the following members which were not found: ${missingMembers.join(', ')}`
177+
),
178+
],
179+
};
180+
}
129181

130182
const duplicates = this._definition.group.members.filter(
131183
(name, index) => this._definition.group.members.indexOf(name) !== index

‎x-pack/platform/test/api_integration_deployment_agnostic/apis/streams/group_streams.ts‎

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
1919
const esClient = getService('es');
2020

2121
let apiClient: StreamsSupertestRepositoryClient;
22+
const existingDataStreamName = 'logs-existing-datastream';
23+
const unmanagedStreamName = 'logs-test-unmanaged';
2224

2325
describe('Group streams', () => {
2426
before(async () => {
@@ -29,7 +31,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
2931
after(async () => {
3032
await disableStreams(apiClient);
3133
await esClient.indices.deleteDataStream({
32-
name: 'logs-existing-datastream',
34+
name: [existingDataStreamName, unmanagedStreamName],
3335
});
3436
});
3537

@@ -310,6 +312,38 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
310312
.expect(400);
311313
});
312314

315+
it('promotes unmanaged Classic streams to managed Classic streams when added as a member', async () => {
316+
await esClient.indices.createDataStream({
317+
name: unmanagedStreamName,
318+
});
319+
320+
await apiClient
321+
.fetch('PUT /api/streams/{name} 2023-10-31', {
322+
params: {
323+
path: { name: 'test-group' },
324+
body: {
325+
stream: {
326+
description: 'A Group stream',
327+
group: {
328+
tags: [],
329+
members: [unmanagedStreamName],
330+
},
331+
},
332+
dashboards: [],
333+
rules: [],
334+
queries: [],
335+
},
336+
},
337+
})
338+
.expect(200);
339+
340+
const { found } = await esClient.get({
341+
index: '.kibana_streams',
342+
id: unmanagedStreamName,
343+
});
344+
expect(found).to.be(true);
345+
});
346+
313347
it('cannot create a Group stream with duplicated relationships', async () => {
314348
await apiClient
315349
.fetch('PUT /api/streams/{name} 2023-10-31', {
@@ -335,13 +369,13 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
335369

336370
it('cannot overwrite an existing data stream', async () => {
337371
await esClient.indices.createDataStream({
338-
name: 'logs-existing-datastream',
372+
name: existingDataStreamName,
339373
});
340374

341375
await apiClient
342376
.fetch('PUT /api/streams/{name} 2023-10-31', {
343377
params: {
344-
path: { name: 'logs-existing-datastream' },
378+
path: { name: existingDataStreamName },
345379
body: {
346380
stream: {
347381
description: 'A Group stream',

0 commit comments

Comments
 (0)