Skip to content

Commit 7092e79

Browse files
authored
[Fleet] sync pipelines from package policy stream vars (#217834)
## Summary Closes #206245 To test: - enable feature flag `xpack.fleet.enableExperimental: ['enableSyncIntegrationsOnRemote']` - Add `Custom Filestream Logs` integration and set `Ingest Pipeline` var e.g. `filestream-pipeline1` - Create Ingest Pipeline with the same name - Wait a few minutes for the sync task to run, check that the ingest pipeline appears in the sync index - Set up a second cluster to enable CCR by following this guide: https://github.com/elastic/kibana/blob/main/x-pack/platform/plugins/shared/fleet/dev_docs/local_setup/remote_clusters_ccr.md - Verify that the ingest pipeline is synced to the remote cluster - Use a remote ES output as data output on the agent policy with the filestream policy - Enroll an agent to the agent policy in a multipass VM - Verify that data is being ingested in the remote cluster to the filestream dataset and the field in the pipeline is set <img width="1105" alt="image" src="https://github.com/user-attachments/assets/751b6f23-6ea7-4fbf-b6cc-fd441ca848e1" /> <img width="1591" alt="image" src="https://github.com/user-attachments/assets/9ef159f3-f399-4d4e-8e57-ecdf52bf4f6f" /> ``` GET fleet-synced-integrations/_search { "_index": "fleet-synced-integrations", "_id": "fleet-synced-integrations", "_score": 1, "_source": { "remote_es_hosts": [ { "name": "remote1", "hosts": [ "http://192.168.64.1:9200" ], "sync_integrations": true } ], "integrations": [ ], "custom_assets": { "ingest_pipeline:filestream-pipeline1": { "type": "ingest_pipeline", "name": "filestream-pipeline1", "package_name": "filestream", "package_version": "1.1.0", "is_deleted": false, "pipeline": { "processors": [ { "set": { "field": "test_field", "value": "value" } } ], "version": 2 } } } } GET fleet-synced-integrations-ccr-main/_search # same content should be in the ccr index ``` <img width="2545" alt="image" src="https://github.com/user-attachments/assets/127e5e6e-cf7e-4549-817d-88df10f4f523" /> ### Checklist - [x] [Unit or functional tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html) were updated or added to match the most common scenarios
1 parent 49ffc02 commit 7092e79

File tree

7 files changed

+319
-16
lines changed

7 files changed

+319
-16
lines changed

x-pack/platform/plugins/shared/fleet/dev_docs/local_setup/remote_clusters_ccr.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ Verify that node is healthy
4444
```
4545
yarn start --server.port=5701 --elasticsearch.hosts=http://localhost:9500 --dev.basePathProxyTarget=5703
4646
```
47-
- Login into http://localhost/5601/<YOUR_PATH>
47+
- Login into http://localhost/5701/<YOUR_PATH>
4848

4949
### Start Cluster A (main)
5050

@@ -67,7 +67,7 @@ curl -k -u elastic:changeme http://localhost:9200
6767
yarn start
6868
```
6969

70-
- Login into http://localhost/5701/<YOUR_PATH>
70+
- Login into http://localhost/5601/<YOUR_PATH>
7171

7272
To avoid issues, it might be needed to login to one of the kibana instances with an incognito session.
7373

@@ -100,7 +100,7 @@ Save the responses as they will be required in Cluster A (see next section).
100100

101101
### Add Remote Cluster
102102

103-
- Add a Remote Cluster to Cluster B. Navigate to *Stack Management > Remote Clusters* ([link](http://localhost:5601/app/management/data/remote_clusters)) and follow the steps to add cluster 'local':
103+
- Add a Remote Cluster to Cluster B. Navigate to *Stack Management > Remote Clusters* ([link](http://localhost:5701/app/management/data/remote_clusters)) and follow the steps to add cluster 'local':
104104

105105
- Click "Add a remote cluster"
106106
- Choose a name, put `localhost:9300` for "Seed nodes", and save (check "Yes, I have setup trust")
@@ -135,9 +135,9 @@ This configuration is required to kick off the integration sync. The local host
135135
hosts: ["http://<local_ip>:9500"]
136136
sync_integrations: true
137137
kibana_url: "http://localhost:5701"
138+
kibana_api_key: key
138139
secrets:
139140
service_token: token
140-
kibana_api_key: key
141141
```
142142

143143
## Verify sync

x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/compare_synced_integrations.test.ts

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -444,6 +444,16 @@ describe('fetchAndCompareSyncedIntegrations', () => {
444444
],
445445
},
446446
};
447+
const customPipelineFromVar = {
448+
type: 'ingest_pipeline',
449+
name: 'filestream-pipeline1',
450+
package_name: 'filestream',
451+
package_version: '1.1.0',
452+
is_deleted: false,
453+
pipeline: {
454+
processors: [{}],
455+
},
456+
};
447457

448458
afterEach(() => {
449459
jest.resetAllMocks();
@@ -464,6 +474,7 @@ describe('fetchAndCompareSyncedIntegrations', () => {
464474
custom_assets: {
465475
'component_template:logs-system.auth@custom': customComponentTemplate,
466476
'ingest_pipeline:logs-system.auth@custom': customPipeline,
477+
'ingest_pipeline:filestream-pipeline1': customPipelineFromVar,
467478
},
468479
},
469480
},
@@ -554,12 +565,19 @@ describe('fetchAndCompareSyncedIntegrations', () => {
554565
sync_status: 'synchronizing',
555566
type: 'ingest_pipeline',
556567
},
568+
'ingest_pipeline:filestream-pipeline1': {
569+
name: 'filestream-pipeline1',
570+
package_name: 'filestream',
571+
package_version: '1.1.0',
572+
sync_status: 'synchronizing',
573+
type: 'ingest_pipeline',
574+
},
557575
},
558576
});
559577
});
560578

561579
it('should return status = completed if custom assets are equal', async () => {
562-
(getPipelineMock as jest.MockedFunction<any>).mockResolvedValue({
580+
(getPipelineMock as jest.MockedFunction<any>).mockResolvedValueOnce({
563581
'logs-system.auth@custom': {
564582
processors: [
565583
{
@@ -571,6 +589,11 @@ describe('fetchAndCompareSyncedIntegrations', () => {
571589
],
572590
},
573591
});
592+
(getPipelineMock as jest.MockedFunction<any>).mockResolvedValueOnce({
593+
'filestream-pipeline1': {
594+
processors: [{}],
595+
},
596+
});
574597
(getComponentTemplateMock as jest.MockedFunction<any>).mockResolvedValue({
575598
component_templates: [
576599
{
@@ -642,6 +665,13 @@ describe('fetchAndCompareSyncedIntegrations', () => {
642665
sync_status: 'completed',
643666
type: 'ingest_pipeline',
644667
},
668+
'ingest_pipeline:filestream-pipeline1': {
669+
name: 'filestream-pipeline1',
670+
package_name: 'filestream',
671+
package_version: '1.1.0',
672+
sync_status: 'completed',
673+
type: 'ingest_pipeline',
674+
},
645675
},
646676
});
647677
});
@@ -677,6 +707,17 @@ describe('fetchAndCompareSyncedIntegrations', () => {
677707
version: 2,
678708
},
679709
},
710+
'ingest_pipeline:filestream-pipeline1': {
711+
type: 'ingest_pipeline',
712+
name: 'filestream-pipeline1',
713+
package_name: 'filestream',
714+
package_version: '1.1.0',
715+
is_deleted: false,
716+
pipeline: {
717+
processors: [{}],
718+
},
719+
version: 2,
720+
},
680721
},
681722
},
682723
},
@@ -760,6 +801,13 @@ describe('fetchAndCompareSyncedIntegrations', () => {
760801
sync_status: 'synchronizing',
761802
type: 'ingest_pipeline',
762803
},
804+
'ingest_pipeline:filestream-pipeline1': {
805+
name: 'filestream-pipeline1',
806+
package_name: 'filestream',
807+
package_version: '1.1.0',
808+
sync_status: 'synchronizing',
809+
type: 'ingest_pipeline',
810+
},
763811
},
764812
});
765813
});
@@ -784,6 +832,7 @@ describe('fetchAndCompareSyncedIntegrations', () => {
784832
},
785833
],
786834
});
835+
(getPipelineMock as jest.MockedFunction<any>).mockResolvedValue({});
787836

788837
const res = await fetchAndCompareSyncedIntegrations(
789838
esClientMock,
@@ -807,6 +856,13 @@ describe('fetchAndCompareSyncedIntegrations', () => {
807856
sync_status: 'synchronizing',
808857
type: 'ingest_pipeline',
809858
},
859+
'ingest_pipeline:filestream-pipeline1': {
860+
name: 'filestream-pipeline1',
861+
package_name: 'filestream',
862+
package_version: '1.1.0',
863+
sync_status: 'synchronizing',
864+
type: 'ingest_pipeline',
865+
},
810866
},
811867
integrations: [
812868
{

x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/compare_synced_integrations.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,16 @@ const fetchAndCompareCustomAssets = async (
180180

181181
try {
182182
const installedPipelines = await getPipeline(esClient, CUSTOM_ASSETS_PREFIX, abortController);
183+
184+
for (const [_, ccrCustomAsset] of Object.entries(ccrCustomAssets)) {
185+
if (ccrCustomAsset.type === 'ingest_pipeline' && !ccrCustomAsset.name.includes('@custom')) {
186+
const response = await getPipeline(esClient, ccrCustomAsset.name, abortController);
187+
if (response[ccrCustomAsset.name]) {
188+
installedPipelines[ccrCustomAsset.name] = response[ccrCustomAsset.name];
189+
}
190+
}
191+
}
192+
183193
const installedComponentTemplates = await getComponentTemplate(
184194
esClient,
185195
CUSTOM_ASSETS_PREFIX,

x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/custom_assets.test.ts

Lines changed: 143 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,22 @@
44
* 2.0; you may not use this file except in compliance with the Elastic License
55
* 2.0.
66
*/
7+
import { packagePolicyService } from '../../services';
78

8-
import { findIntegration, getCustomAssets, installCustomAsset } from './custom_assets';
9+
import {
10+
findIntegration,
11+
getCustomAssets,
12+
getPipelinesFromVars,
13+
installCustomAsset,
14+
} from './custom_assets';
15+
16+
jest.mock('../../services', () => ({
17+
packagePolicyService: {
18+
list: jest.fn(),
19+
},
20+
}));
21+
22+
const mockPackagePolicyService = packagePolicyService as jest.Mocked<typeof packagePolicyService>;
923

1024
describe('custom assets', () => {
1125
const integrations = [
@@ -54,7 +68,11 @@ describe('custom assets', () => {
5468
describe('getCustomAssets', () => {
5569
let esClientMock: any;
5670

57-
beforeEach(() => {});
71+
beforeEach(() => {
72+
mockPackagePolicyService.list.mockResolvedValue({
73+
items: [],
74+
} as any);
75+
});
5876

5977
it('should return custom assets', async () => {
6078
esClientMock = {
@@ -102,6 +120,7 @@ describe('custom assets', () => {
102120

103121
const customAssets = await getCustomAssets(
104122
esClientMock,
123+
{} as any,
105124
integrations,
106125
new AbortController(),
107126
undefined
@@ -171,6 +190,7 @@ describe('custom assets', () => {
171190

172191
const customAssets = await getCustomAssets(
173192
esClientMock,
193+
{} as any,
174194
integrations,
175195
new AbortController(),
176196
previousSyncIntegrationsData
@@ -218,6 +238,7 @@ describe('custom assets', () => {
218238

219239
const customAssets = await getCustomAssets(
220240
esClientMock,
241+
{} as any,
221242
integrations,
222243
new AbortController(),
223244
previousSyncIntegrationsData
@@ -227,6 +248,126 @@ describe('custom assets', () => {
227248
});
228249
});
229250

251+
describe('getPipelinesFromVars', () => {
252+
let esClientMock: any;
253+
254+
beforeEach(() => {
255+
mockPackagePolicyService.list.mockResolvedValue({
256+
items: [
257+
{
258+
package: {
259+
name: 'filestream',
260+
version: '1.1.0',
261+
},
262+
inputs: [
263+
{
264+
streams: [
265+
{
266+
vars: {
267+
pipeline: {
268+
value: 'filestream-pipeline1',
269+
},
270+
},
271+
},
272+
],
273+
},
274+
],
275+
},
276+
{
277+
package: {
278+
name: 'system',
279+
version: '1.1.0',
280+
},
281+
inputs: [],
282+
},
283+
{
284+
package: {
285+
name: 'package1',
286+
version: '1.1.0',
287+
},
288+
inputs: [
289+
{
290+
streams: [],
291+
},
292+
],
293+
},
294+
{
295+
package: {
296+
name: 'package2',
297+
version: '1.1.0',
298+
},
299+
inputs: [
300+
{
301+
streams: [
302+
{
303+
vars: {
304+
'data_stream.dataset': {
305+
value: 'package2.generic',
306+
type: 'text',
307+
},
308+
},
309+
},
310+
],
311+
},
312+
],
313+
},
314+
{
315+
package: {
316+
name: 'package3',
317+
version: '1.1.0',
318+
},
319+
inputs: [
320+
{
321+
streams: [
322+
{
323+
vars: {
324+
pipeline: {
325+
value: 'package3-other',
326+
},
327+
},
328+
},
329+
],
330+
},
331+
],
332+
},
333+
],
334+
} as any);
335+
336+
esClientMock = {
337+
ingest: {
338+
getPipeline: jest.fn().mockImplementation((request) => {
339+
if (request?.id === 'filestream-pipeline1') {
340+
return Promise.resolve({
341+
[request?.id as string]: {
342+
processors: [],
343+
},
344+
});
345+
} else {
346+
return Promise.resolve({});
347+
}
348+
}),
349+
},
350+
};
351+
});
352+
353+
it('should return pipelines from vars', async () => {
354+
const pipelines = await getPipelinesFromVars(esClientMock, {} as any, new AbortController());
355+
356+
expect(pipelines).toEqual([
357+
{
358+
is_deleted: false,
359+
name: 'filestream-pipeline1',
360+
package_name: 'filestream',
361+
package_version: '1.1.0',
362+
pipeline: {
363+
processors: [],
364+
},
365+
type: 'ingest_pipeline',
366+
},
367+
]);
368+
});
369+
});
370+
230371
describe('installCustomAsset', () => {
231372
let esClientMock: any;
232373

0 commit comments

Comments
 (0)