Skip to content

Commit a3a9f50

Browse files
committed
feat: sysdig integration
1 parent 92d324c commit a3a9f50

File tree

7 files changed

+289
-19
lines changed

7 files changed

+289
-19
lines changed

src/common/config.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,12 @@ config.EXCLUDED_NAMESPACES = loadExcludedNamespaces();
4545
config.WORKERS_COUNT = Number(config.WORKERS_COUNT) || 10;
4646
config.SKOPEO_COMPRESSION_LEVEL = Number(config.SKOPEO_COMPRESSION_LEVEL) || 6;
4747

48+
// return Sysdig endpoint information
49+
if (config.SYSDIG_ENDPOINT && config.SYSDIG_TOKEN) {
50+
config.SYSDIG_ENDPOINT = config.SYSDIG_ENDPOINT.trim();
51+
config.SYSDIG_TOKEN = config.SYSDIG_TOKEN.trim();
52+
}
53+
4854
/**
4955
* Important: we delete the following env vars because we don't want to proxy requests to the Kubernetes API server.
5056
* The Kubernetes client library would honor the NO/HTTP/HTTPS_PROXY env vars.

src/data-scraper/index.ts

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import { logger } from '../common/logger';
2+
import { config } from '../common/config';
3+
import { sendRuntimeData } from '../transmitter';
4+
import { constructRuntimeData } from '../transmitter/payload';
5+
import { retryRequest } from '../transmitter';
6+
import { IRuntimeImagesResponse } from '../transmitter/types';
7+
import { NeedleOptions } from 'needle';
8+
import { Agent as HttpsAgent } from 'https';
9+
10+
function isSuccessStatusCode(statusCode: number | undefined): boolean {
11+
return statusCode !== undefined && statusCode >= 200 && statusCode < 300;
12+
}
13+
14+
export async function scrapeData(): Promise<void> {
15+
const base: string = config.SYSDIG_ENDPOINT;
16+
const header: string = `Bearer ${config.SYSDIG_TOKEN}`;
17+
18+
const url: string = base + '/v1/runtimeimages';
19+
// limit: min 1, max 500, default 250
20+
const limit: number = 10;
21+
const reqOptions: NeedleOptions = {
22+
agent: new HttpsAgent({
23+
keepAlive: true,
24+
// We agreed with Sysdig to skip TLS certificates validation for HTTPS connection.
25+
rejectUnauthorized: false,
26+
}),
27+
headers: {
28+
authorization: header,
29+
},
30+
};
31+
32+
let cursor: string = '';
33+
while (true) {
34+
try {
35+
logger.info({ cursor }, 'attempting to get runtime images');
36+
37+
const { response, attempt } = await retryRequest(
38+
'get',
39+
`${url}?limit=${limit}&cursor=${cursor}`,
40+
{},
41+
reqOptions,
42+
);
43+
if (!isSuccessStatusCode(response.statusCode)) {
44+
throw new Error(`${response.statusCode} ${response.statusMessage}`);
45+
}
46+
47+
logger.info(
48+
{
49+
attempt,
50+
cursor,
51+
},
52+
'runtime images received successfully',
53+
);
54+
55+
const responseBody: IRuntimeImagesResponse = response.body;
56+
const runtimeDataPayload = constructRuntimeData(responseBody.data);
57+
logger.info({}, 'sending runtime data upstream');
58+
await sendRuntimeData(runtimeDataPayload);
59+
60+
cursor = responseBody.page.next || '';
61+
if (!cursor) {
62+
break;
63+
}
64+
} catch (error) {
65+
logger.error(
66+
{
67+
error,
68+
cursor,
69+
},
70+
'could not get runtime images',
71+
);
72+
break;
73+
}
74+
}
75+
}

src/index.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { beginWatchingWorkloads } from './supervisor/watchers';
1010
import { loadAndSendWorkloadEventsPolicy } from './common/policy';
1111
import { sendClusterMetadata } from './transmitter';
1212
import { setSnykMonitorAgentId } from './supervisor/agent';
13+
import { scrapeData } from './data-scraper';
1314

1415
process.on('uncaughtException', (err) => {
1516
if (state.shutdownInProgress) {
@@ -68,4 +69,20 @@ setImmediate(async function setUpAndMonitor(): Promise<void> {
6869
await sendClusterMetadata();
6970
await loadAndSendWorkloadEventsPolicy();
7071
await monitor();
72+
73+
const interval: number = 4 * 60 * 60 * 1000; // 4 hours in milliseconds
74+
if (config.SYSDIG_ENDPOINT && config.SYSDIG_TOKEN) {
75+
setInterval(async () => {
76+
try {
77+
await scrapeData();
78+
} catch (error) {
79+
logger.error(
80+
{ error },
81+
'an error occurred while scraping runtime data',
82+
);
83+
}
84+
}, interval).unref();
85+
} else {
86+
logger.info({}, 'Sysdig integration not detected');
87+
}
7188
});

src/transmitter/index.ts

Lines changed: 70 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import { parse } from 'url';
21
import { queue } from 'async';
32
import needle from 'needle';
43
import sleep from 'sleep-promise';
@@ -17,6 +16,7 @@ import {
1716
IDependencyGraphPayload,
1817
IWorkloadEventsPolicyPayload,
1918
IClusterMetadataPayload,
19+
IRuntimeDataPayload,
2020
} from './types';
2121
import { getProxyAgent } from './proxy';
2222

@@ -27,24 +27,29 @@ interface KubernetesUpstreamRequest {
2727
| IDependencyGraphPayload
2828
| ScanResultsPayload
2929
| IWorkloadMetadataPayload
30-
| IDeleteWorkloadPayload;
30+
| IDeleteWorkloadPayload
31+
| IClusterMetadataPayload
32+
| IRuntimeDataPayload;
3133
}
3234

3335
const upstreamUrl =
3436
config.INTEGRATION_API || config.DEFAULT_KUBERNETES_UPSTREAM_URL;
3537

36-
let agent = new HttpAgent({
38+
let httpAgent = new HttpAgent({
3739
keepAlive: true,
3840
});
3941

40-
if (parse(upstreamUrl).protocol?.startsWith('https')) {
41-
agent = new HttpsAgent({
42-
keepAlive: true,
43-
});
42+
let httpsAgent = new HttpsAgent({
43+
keepAlive: true,
44+
});
45+
46+
function getAgent(u: string): HttpAgent {
47+
const url = new URL(u);
48+
return url.protocol === 'https:' ? httpsAgent : httpAgent;
4449
}
4550

4651
// Async queue wraps around the call to retryRequest in order to limit
47-
// the number of requests in flight to Homebase at any one time.
52+
// the number of requests in flight to kubernetes upstream at any one time.
4853
const reqQueue = queue(async function (req: KubernetesUpstreamRequest) {
4954
return await retryRequest(req.method, req.url, req.payload);
5055
}, config.REQUEST_QUEUE_LENGTH);
@@ -60,7 +65,7 @@ export async function sendDepGraph(
6065
const request: KubernetesUpstreamRequest = {
6166
method: 'post',
6267
url: `${upstreamUrl}/api/v1/dependency-graph`,
63-
payload: payload,
68+
payload,
6469
};
6570

6671
const { response, attempt } = await reqQueue.pushAsync(request);
@@ -91,7 +96,7 @@ export async function sendScanResults(
9196
const request: KubernetesUpstreamRequest = {
9297
method: 'post',
9398
url: `${upstreamUrl}/api/v1/scan-results`,
94-
payload: payload,
99+
payload,
95100
};
96101

97102
const { response, attempt } = await reqQueue.pushAsync(request);
@@ -127,7 +132,7 @@ export async function sendWorkloadMetadata(
127132
const request: KubernetesUpstreamRequest = {
128133
method: 'post',
129134
url: `${upstreamUrl}/api/v1/workload`,
130-
payload: payload,
135+
payload,
131136
};
132137

133138
const { response, attempt } = await reqQueue.pushAsync(request);
@@ -198,7 +203,7 @@ export async function deleteWorkload(
198203
const request: KubernetesUpstreamRequest = {
199204
method: 'delete',
200205
url: `${upstreamUrl}/api/v1/workload`,
201-
payload: payload,
206+
payload,
202207
};
203208

204209
const { response, attempt } = await reqQueue.pushAsync(request);
@@ -229,10 +234,11 @@ function isSuccessStatusCode(statusCode: number | undefined): boolean {
229234
return statusCode !== undefined && statusCode > 100 && statusCode < 400;
230235
}
231236

232-
async function retryRequest(
237+
export async function retryRequest(
233238
verb: NeedleHttpVerbs,
234239
url: string,
235240
payload: object,
241+
reqOptions: NeedleOptions = {},
236242
): Promise<IResponseWithAttempts> {
237243
const retry = {
238244
attempts: 3,
@@ -241,7 +247,8 @@ async function retryRequest(
241247
const options: NeedleOptions = {
242248
json: true,
243249
compressed: true,
244-
agent,
250+
agent: getAgent(url),
251+
...reqOptions,
245252
};
246253

247254
if (config.HTTP_PROXY || config.HTTPS_PROXY) {
@@ -317,11 +324,13 @@ export async function sendClusterMetadata(): Promise<void> {
317324
'attempting to send cluster metadata',
318325
);
319326

320-
const { response, attempt } = await retryRequest(
321-
'post',
322-
`${upstreamUrl}/api/v1/cluster`,
327+
const request: KubernetesUpstreamRequest = {
328+
method: 'post',
329+
url: `${upstreamUrl}/api/v1/cluster`,
323330
payload,
324-
);
331+
};
332+
333+
const { response, attempt } = await reqQueue.pushAsync(request);
325334
if (!isSuccessStatusCode(response.statusCode)) {
326335
throw new Error(`${response.statusCode} ${response.statusMessage}`);
327336
}
@@ -347,3 +356,46 @@ export async function sendClusterMetadata(): Promise<void> {
347356
);
348357
}
349358
}
359+
360+
export async function sendRuntimeData(
361+
payload: IRuntimeDataPayload,
362+
): Promise<void> {
363+
const logContext = {
364+
userLocator: payload.target.userLocator,
365+
cluster: payload.target.cluster,
366+
agentId: payload.target.agentId,
367+
identity: payload.identity,
368+
};
369+
370+
try {
371+
logger.info(logContext, 'attempting to send runtime data');
372+
373+
const request: KubernetesUpstreamRequest = {
374+
method: 'post',
375+
url: `${upstreamUrl}/api/v1/runtime-results`,
376+
payload,
377+
};
378+
379+
const { response, attempt } = await reqQueue.pushAsync(request);
380+
381+
if (!isSuccessStatusCode(response.statusCode)) {
382+
throw new Error(`${response.statusCode} ${response.statusMessage}`);
383+
}
384+
385+
logger.info(
386+
{
387+
attempt,
388+
...logContext,
389+
},
390+
'runtime data sent upstream successfully',
391+
);
392+
} catch (error) {
393+
logger.error(
394+
{
395+
error,
396+
...logContext,
397+
},
398+
'could not send runtime data',
399+
);
400+
}
401+
}

src/transmitter/payload.ts

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,13 @@ import {
1313
IDependencyGraphPayload,
1414
IWorkloadEventsPolicyPayload,
1515
Telemetry,
16+
IRuntimeDataPayload,
17+
IRuntimeDataFact,
18+
IRuntimeImage,
1619
} from './types';
1720
import { state } from '../state';
21+
import { isExcludedNamespace } from '../supervisor/watchers/internal-namespaces';
22+
import { logger } from '../common/logger';
1823

1924
export function constructDepGraph(
2025
scannedImages: IScanResult[],
@@ -134,3 +139,49 @@ export function constructWorkloadEventsPolicy(
134139
agentId: config.AGENT_ID,
135140
};
136141
}
142+
143+
const workloadKindMap = {
144+
deployment: 'Deployment',
145+
replicaset: 'ReplicaSet',
146+
statefulset: 'StatefulSet',
147+
daemonset: 'DaemonSet',
148+
job: 'Job',
149+
cronjob: 'CronJob',
150+
replicationcontroller: 'ReplicationController',
151+
deploymentconfig: 'DeploymentConfig',
152+
pod: 'Pod',
153+
};
154+
export function constructRuntimeData(
155+
runtimeResults: IRuntimeImage[],
156+
): IRuntimeDataPayload {
157+
const filteredRuntimeResults = runtimeResults.reduce((acc, runtimeResult) => {
158+
if (!isExcludedNamespace(runtimeResult.namespace)) {
159+
const mappedWorkloadKind =
160+
workloadKindMap[runtimeResult.workloadKind.toLowerCase()];
161+
if (mappedWorkloadKind) {
162+
runtimeResult.workloadKind = mappedWorkloadKind;
163+
acc.push(runtimeResult);
164+
} else {
165+
logger.error({ runtimeResult }, 'invalid Sysdig workload kind');
166+
}
167+
}
168+
return acc;
169+
}, [] as IRuntimeImage[]);
170+
171+
const dataFact: IRuntimeDataFact = {
172+
type: 'loadedPackages',
173+
data: filteredRuntimeResults,
174+
};
175+
176+
return {
177+
identity: {
178+
type: 'sysdig',
179+
},
180+
target: {
181+
agentId: config.AGENT_ID,
182+
userLocator: config.INTEGRATION_ID,
183+
cluster: currentClusterName,
184+
},
185+
facts: [dataFact],
186+
};
187+
}

src/transmitter/proxy.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import { URL } from 'url';
21
import { httpOverHttp, httpsOverHttp } from 'tunnel';
32
import { Agent, globalAgent as HttpAgent } from 'http';
43
import { globalAgent as HttpsAgent } from 'https';

0 commit comments

Comments
 (0)