Skip to content

Commit ed4b880

Browse files
authored
🌊 Streams: Enable failure store for wired streams (#234066)
Closes elastic/streams-program#304 by enabling failure store for all wired streams. This also means that documents that violate the rules of wired streams will go to the failure store (e.g. docs going directly to a child stream)
1 parent 6911e6c commit ed4b880

File tree

3 files changed

+16
-18
lines changed

3 files changed

+16
-18
lines changed

‎x-pack/platform/plugins/shared/streams/server/lib/streams/index_templates/generate_index_template.ts‎

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,18 @@ export function generateIndexTemplate(name: string) {
3535
},
3636
data_stream: {
3737
hidden: false,
38-
failure_store: false,
3938
},
4039
template: {
4140
settings: {
4241
index: {
4342
default_pipeline: getProcessingPipelineName(name),
4443
},
4544
},
45+
data_stream_options: {
46+
failure_store: {
47+
enabled: true,
48+
},
49+
},
4650
mappings: {
4751
properties: {
4852
'stream.name': {

‎x-pack/platform/test/api_integration_deployment_agnostic/apis/streams/helpers/requests.ts‎

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import type { Readable } from 'stream';
99
import type { Client } from '@elastic/elasticsearch';
1010
import type { JsonObject } from '@kbn/utility-types';
1111
import expect from '@kbn/expect';
12-
import type { SearchTotalHits } from '@elastic/elasticsearch/lib/api/types';
12+
import type { SearchTotalHits, Refresh } from '@elastic/elasticsearch/lib/api/types';
1313
import type { Streams } from '@kbn/streams-schema';
1414
import type { ClientRequestParamsOf } from '@kbn/server-route-repository-utils';
1515
import type { StreamsRouteRepository } from '@kbn/streams-plugin/server';
@@ -24,8 +24,13 @@ export async function disableStreams(client: StreamsSupertestRepositoryClient) {
2424
await client.fetch('POST /api/streams/_disable 2023-10-31').expect(200);
2525
}
2626

27-
export async function indexDocument(esClient: Client, index: string, document: JsonObject) {
28-
const response = await esClient.index({ index, document, refresh: 'wait_for' });
27+
export async function indexDocument(
28+
esClient: Client,
29+
index: string,
30+
document: JsonObject,
31+
refresh: Refresh = 'wait_for'
32+
) {
33+
const response = await esClient.index({ index, document, refresh });
2934
return response;
3035
}
3136

‎x-pack/platform/test/api_integration_deployment_agnostic/apis/streams/root_stream.ts‎

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,6 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
8383
const roleScopedSupertest = getService('roleScopedSupertest');
8484
let apiClient: StreamsSupertestRepositoryClient;
8585
const esClient = getService('es');
86-
const config = getService('config');
87-
const isServerless = !!config.get('serverless');
8886

8987
describe('Root stream', () => {
9088
before(async () => {
@@ -190,18 +188,9 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
190188
'@timestamp': '2024-01-01T00:00:20.000Z',
191189
message: 'test',
192190
};
193-
let threw = false;
194-
try {
195-
await indexDocument(esClient, 'logs.gcpcloud', doc);
196-
} catch (e) {
197-
threw = true;
198-
if (isServerless) {
199-
expect(e.message).to.contain('stream.name is not set properly');
200-
} else {
201-
expect(e.message).to.contain('Direct writes to child streams are prohibited');
202-
}
203-
}
204-
expect(threw).to.be(true);
191+
const response = await indexDocument(esClient, 'logs.gcpcloud', doc, false);
192+
// @ts-expect-error failure_store is not in the types, but in the actual response
193+
expect(response.failure_store).to.be('used');
205194
});
206195
});
207196
}

0 commit comments

Comments
 (0)