Skip to content

Commit 0483fc4

Browse files
authored
Merge pull request #664 from snyk/RUN-1388/throttle-homebase-requests
2 parents f97dcb7 + 3620788 commit 0483fc4

File tree

9 files changed

+88
-48
lines changed

9 files changed

+88
-48
lines changed

.circleci/config.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -812,7 +812,8 @@ jobs:
812812
command: |
813813
npm run lint &&
814814
npm run build &&
815-
npm run test:unit
815+
npm run test:unit:tap &&
816+
npm run test:unit:jest
816817
name: Unit tests
817818
- run:
818819
command: |

.circleci/config/jobs/@jobs.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ unit_tests:
9292
command: |
9393
npm run lint &&
9494
npm run build &&
95-
npm run test:unit
95+
npm run test:unit:tap &&
96+
npm run test:unit:jest
9697
- run:
9798
name: Notify Slack on failure
9899
command: |

.tool-versions

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
nodejs lts-erbium

config.default.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
"MAX_AGE_MS": 60000
1313
},
1414
"WORKLOADS_TO_SCAN_QUEUE_WORKER_COUNT": 10,
15-
"METADATA_TO_SEND_QUEUE_WORKER_COUNT": 10,
15+
"REQUEST_QUEUE_LENGTH": 2,
1616
"QUEUE_LENGTH_LOG_FREQUENCY_MINUTES": 15,
1717
"INTEGRATION_ID": "",
1818
"DEFAULT_KUBERNETES_UPSTREAM_URL": "https://kubernetes-upstream.snyk.io"

package-lock.json

Lines changed: 4 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@
44
"main": "dist/index.js",
55
"scripts": {
66
"pretest": "./scripts/docker/build-image.sh",
7-
"test": "npm run lint && npm run build && npm run test:unit && npm run test:integration:kind:helm",
8-
"test:unit": "NODE_ENV=test tap test/unit --timeout=300",
7+
"test": "npm run lint && npm run build && npm run test:unit:tap && npm run test:unit:jest && npm run test:integration:kind:helm",
8+
"test:unit:tap": "NODE_ENV=test tap test/unit/*.test.ts test/unit/**/*.test.ts --timeout=300",
9+
"test:unit:jest": "jest --logHeapUsage --ci test/unit",
910
"test:system": "jest --logHeapUsage --ci --maxWorkers=1 test/system",
1011
"test:integration:kind:yaml": "DEPLOYMENT_TYPE=YAML TEST_PLATFORM=kind CREATE_CLUSTER=true jest --logHeapUsage --ci --maxWorkers=1 test/integration/kubernetes.spec.ts",
1112
"test:integration:kind:helm": "DEPLOYMENT_TYPE=Helm TEST_PLATFORM=kind CREATE_CLUSTER=true jest --logHeapUsage --ci --maxWorkers=1 test/integration/kubernetes.spec.ts",
@@ -29,7 +30,6 @@
2930
"dependencies": {
3031
"@kubernetes/client-node": "^0.14.0",
3132
"@snyk/dep-graph": "^1.21.0",
32-
"@types/async": "^3.2.3",
3333
"@types/child-process-promise": "^2.2.1",
3434
"@types/lru-cache": "^5.1.0",
3535
"@types/needle": "^2.0.4",
@@ -56,6 +56,7 @@
5656
"@types/jest": "^26.0.20",
5757
"@typescript-eslint/eslint-plugin": "^2.22.0",
5858
"@typescript-eslint/parser": "^2.22.0",
59+
"@types/async": "^3.2.5",
5960
"eslint": "^6.8.0",
6061
"eslint-config-prettier": "^6.10.0",
6162
"jest": "^26.6.3",

src/supervisor/watchers/handlers/pod.ts

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,26 +39,14 @@ async function queueWorkerWorkloadScan(task, callback): Promise<void> {
3939

4040
const workloadsToScanQueue = async.queue(queueWorkerWorkloadScan, config.WORKLOADS_TO_SCAN_QUEUE_WORKER_COUNT);
4141

42-
async function queueWorkerMetadataSender(task, callback): Promise<void> {
43-
const {workloadMetadataPayload} = task;
44-
await sendWorkloadMetadata(workloadMetadataPayload);
45-
}
46-
47-
const metadataToSendQueue = async.queue(queueWorkerMetadataSender, config.METADATA_TO_SEND_QUEUE_WORKER_COUNT);
48-
4942
workloadsToScanQueue.error(function(err, task) {
5043
logger.error({err, task}, 'error processing a workload in the pod handler 1');
5144
});
5245

53-
metadataToSendQueue.error(function(err, task) {
54-
logger.error({err, task}, 'error processing a workload metadata send task');
55-
});
56-
5746
setInterval(() => {
5847
try {
5948
const queueDataToReport: {[key: string]: any} = {};
6049
queueDataToReport.workloadsToScanLength = workloadsToScanQueue.length();
61-
queueDataToReport.metadataToSendLength = metadataToSendQueue.length();
6250
logger.info(queueDataToReport, 'queue sizes report');
6351
} catch (err) {
6452
logger.warn({err}, 'failed logging queue sizes');
@@ -114,7 +102,7 @@ export async function podWatchHandler(pod: V1Pod): Promise<void> {
114102
const workloadRevision = workloadMember.revision ? workloadMember.revision.toString() : ''; // this is actually the observed generation
115103
if (state.workloadsAlreadyScanned.get(workloadKey) !== workloadRevision) { // either not exists or different
116104
state.workloadsAlreadyScanned.set(workloadKey, workloadRevision); // empty string takes zero bytes and is !== undefined
117-
metadataToSendQueue.push({workloadMetadataPayload});
105+
await sendWorkloadMetadata(workloadMetadataPayload);
118106
}
119107

120108
handleReadyPod(workloadMetadata);

src/transmitter/index.ts

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { queue } from 'async';
12
import * as needle from 'needle';
23
import { NeedleResponse, NeedleHttpVerbs, NeedleOptions } from 'needle';
34
import * as sleep from 'sleep-promise';
@@ -14,15 +15,37 @@ import {
1415
} from './types';
1516
import { getProxyAgent } from './proxy';
1617

18+
interface HomebaseRequest {
19+
method: NeedleHttpVerbs;
20+
url: string;
21+
payload:
22+
IDependencyGraphPayload |
23+
ScanResultsPayload |
24+
IWorkloadMetadataPayload |
25+
IDeleteWorkloadPayload;
26+
}
27+
1728
const upstreamUrl = config.INTEGRATION_API || config.DEFAULT_KUBERNETES_UPSTREAM_URL;
1829

30+
// Async queue wraps around the call to retryRequest in order to limit
31+
// the number of requests in flight to Homebase at any one time.
32+
const reqQueue = queue(async function(req: HomebaseRequest) {
33+
return await retryRequest(req.method, req.url, req.payload);
34+
}, config.REQUEST_QUEUE_LENGTH);
35+
1936
export async function sendDepGraph(...payloads: IDependencyGraphPayload[]): Promise<void> {
2037
for (const payload of payloads) {
2138
// Intentionally removing dependencyGraph as it would be too big to log
2239
// eslint-disable-next-line @typescript-eslint/no-unused-vars
2340
const { dependencyGraph, ...payloadWithoutDepGraph } = payload;
2441
try {
25-
const {response, attempt} = await retryRequest('post', `${upstreamUrl}/api/v1/dependency-graph`, payload);
42+
const request: HomebaseRequest = {
43+
method: 'post',
44+
url: `${upstreamUrl}/api/v1/dependency-graph`,
45+
payload: payload,
46+
};
47+
48+
const { response, attempt } = await reqQueue.pushAsync(request);
2649
if (!isSuccessStatusCode(response.statusCode)) {
2750
throw new Error(`${response.statusCode} ${response.statusMessage}`);
2851
} else {
@@ -40,7 +63,13 @@ export async function sendScanResults(payloads: ScanResultsPayload[]): Promise<b
4063
// eslint-disable-next-line @typescript-eslint/no-unused-vars
4164
const { scanResults, ...payloadWithoutScanResults } = payload;
4265
try {
43-
const {response, attempt} = await retryRequest('post', `${upstreamUrl}/api/v1/scan-results`, payload);
66+
const request: HomebaseRequest = {
67+
method: 'post',
68+
url: `${upstreamUrl}/api/v1/scan-results`,
69+
payload: payload,
70+
};
71+
72+
const { response, attempt } = await reqQueue.pushAsync(request);
4473
if (!isSuccessStatusCode(response.statusCode)) {
4574
throw new Error(`${response.statusCode} ${response.statusMessage}`);
4675
} else {
@@ -59,7 +88,13 @@ export async function sendWorkloadMetadata(payload: IWorkloadMetadataPayload): P
5988
try {
6089
logger.info({workloadLocator: payload.workloadLocator}, 'attempting to send workload metadata upstream');
6190

62-
const {response, attempt} = await retryRequest('post', `${upstreamUrl}/api/v1/workload`, payload);
91+
const request: HomebaseRequest = {
92+
method: 'post',
93+
url: `${upstreamUrl}/api/v1/workload`,
94+
payload: payload,
95+
};
96+
97+
const { response, attempt } = await reqQueue.pushAsync(request);
6398
if (!isSuccessStatusCode(response.statusCode)) {
6499
throw new Error(`${response.statusCode} ${response.statusMessage}`);
65100
} else {
@@ -96,7 +131,13 @@ export async function sendWorkloadAutoImportPolicy(payload: WorkloadAutoImportPo
96131

97132
export async function deleteWorkload(payload: IDeleteWorkloadPayload): Promise<void> {
98133
try {
99-
const {response, attempt} = await retryRequest('delete', `${upstreamUrl}/api/v1/workload`, payload);
134+
const request: HomebaseRequest = {
135+
method: 'delete',
136+
url: `${upstreamUrl}/api/v1/workload`,
137+
payload: payload,
138+
};
139+
140+
const { response, attempt } = await reqQueue.pushAsync(request);
100141
if (response.statusCode === 404) {
101142
// TODO: maybe we're still building it?
102143
const msg = 'attempted to delete a workload the Upstream service could not find';
Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,18 @@
1-
import * as tap from 'tap';
2-
import * as sinon from 'sinon';
3-
import * as sleep from 'sleep-promise';
1+
import * as async from 'async';
42
import * as fs from 'fs';
3+
import * as sleep from 'sleep-promise';
54
import * as YAML from 'yaml';
6-
import async = require('async');
75

86
import { V1PodSpec, V1Pod } from '@kubernetes/client-node';
97
import { IWorkload } from '../../../src/transmitter/types';
108
import * as metadataExtractor from '../../../src/supervisor/metadata-extractor';
119

1210
let pushCallCount = 0;
13-
sinon.stub(async, 'queue').returns({ error: () => { }, push: () => pushCallCount++ } as any);
11+
const asyncQueueSpy = jest.spyOn(async, 'queue').mockReturnValue({ error: () => {}, push: () => pushCallCount++ } as any);
1412

1513
import * as pod from '../../../src/supervisor/watchers/handlers/pod';
1614

17-
tap.test('image and workload image cache', async (t) => {
15+
describe('image and workload image cache', () => {
1816
const podSpecFixture = fs.readFileSync('./test/fixtures/pod-spec.json', 'utf8');
1917
const podSpec: V1PodSpec = YAML.parse(podSpecFixture);
2018
const workloadMetadata: IWorkload[] = [
@@ -36,25 +34,33 @@ tap.test('image and workload image cache', async (t) => {
3634
},
3735
];
3836

39-
sinon.stub(metadataExtractor, 'buildMetadataForWorkload').resolves(workloadMetadata);
4037

41-
t.teardown(() => {
42-
(async['queue'] as any).restore();
43-
(metadataExtractor['buildMetadataForWorkload'] as any).restore();
38+
const buildMetadataSpy = jest.spyOn(metadataExtractor, 'buildMetadataForWorkload').mockResolvedValue(workloadMetadata);
39+
40+
afterAll(() => {
41+
asyncQueueSpy.mockRestore();
42+
buildMetadataSpy.mockRestore();
4443
});
4544

4645
const podFixture = fs.readFileSync('./test/fixtures/sidecar-containers/pod.yaml', 'utf8');
4746
const podObject: V1Pod = YAML.parse(podFixture);
48-
await pod.podWatchHandler(podObject);
49-
await sleep(500);
50-
t.equals(pushCallCount, 2, 'pushed data to send');
51-
52-
await pod.podWatchHandler(podObject);
53-
await sleep(500);
54-
t.equals(pushCallCount, 2, 'cached info, no data pushed to send');
55-
56-
workloadMetadata[0].imageId = 'newImageName';
57-
await pod.podWatchHandler(podObject);
58-
await sleep(1000);
59-
t.equals(pushCallCount, 3, 'new image parsed, workload is cached');
47+
48+
it('pushed data to send', async () => {
49+
await pod.podWatchHandler(podObject);
50+
await sleep(500);
51+
expect(pushCallCount).toEqual(1);
52+
});
53+
54+
it('cached info, no data pushed to send', async () => {
55+
await pod.podWatchHandler(podObject);
56+
await sleep(500);
57+
expect(pushCallCount).toEqual(1);
58+
});
59+
60+
it('new image parsed, workload is cached', async () => {
61+
workloadMetadata[0].imageId = 'newImageName';
62+
await pod.podWatchHandler(podObject);
63+
await sleep(1000);
64+
expect(pushCallCount).toEqual(2);
65+
});
6066
});

0 commit comments

Comments
 (0)