Skip to content

Commit c0cefb4

Browse files
author
Silas Boyd-Wickizer
committed
Handle JSON parse errors in watch implementation
This change "handles" JSON parse parse errors in the watch implementation by ignoring them: the JSONStream implementation ignores parsing errors. The request might result in invalid JSON if, for example, the HTTP connection is abruptly terminated (and a partial line is piped to the transform stream). The code this commit replaces would raise an uncaught exception exception on parse errors. An alternative approach to this commit would invoke the `done` callback with parse errors, but that could result in multiple invocations of the `done` callback.
1 parent 375e92f commit c0cefb4

File tree

4 files changed

+84
-40
lines changed

4 files changed

+84
-40
lines changed

package-lock.json

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,9 @@
5353
"@types/request": "^2.47.1",
5454
"@types/underscore": "^1.8.9",
5555
"@types/ws": "^6.0.1",
56-
"byline": "^5.0.0",
5756
"isomorphic-ws": "^4.0.1",
5857
"js-yaml": "^3.12.0",
58+
"json-stream": "^1.0.0",
5959
"jsonpath-plus": "^0.19.0",
6060
"request": "^2.88.0",
6161
"shelljs": "^0.8.2",

src/watch.ts

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { LineStream } from 'byline';
1+
import JSONStream from 'json-stream';
22
import request = require('request');
33
import { KubeConfig } from './config';
44

@@ -55,21 +55,8 @@ export class Watch {
5555
};
5656
this.config.applyToRequest(requestOptions);
5757

58-
const stream = new LineStream();
59-
stream.on('data', (data) => {
60-
let obj: WatchUpdate;
61-
if (data instanceof Buffer) {
62-
obj = JSON.parse(data.toString()) as WatchUpdate;
63-
} else {
64-
obj = JSON.parse(data) as WatchUpdate;
65-
}
66-
if (typeof obj === 'object' && obj.object) {
67-
callback(obj.type, obj.object);
68-
} else {
69-
throw new Error(`unexpected ${typeof obj}: ${JSON.stringify(obj)}`);
70-
}
71-
});
72-
58+
const stream = new JSONStream();
59+
stream.on('data', (data) => callback(data.type, data.object));
7360
const req = this.requestImpl.webRequest(requestOptions, (error, response, body) => {
7461
if (error) {
7562
done(error);

src/watch_test.ts

Lines changed: 75 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,32 @@ import { KubeConfig } from './config';
77
import { Cluster, Context, User } from './config_types';
88
import { DefaultRequest, Watch } from './watch';
99

10+
const server = 'foo.company.com';
11+
12+
const fakeConfig: {
13+
clusters: Cluster[];
14+
contexts: Context[];
15+
users: User[];
16+
} = {
17+
clusters: [
18+
{
19+
name: 'cluster',
20+
server,
21+
} as Cluster,
22+
],
23+
contexts: [
24+
{
25+
cluster: 'cluster',
26+
user: 'user',
27+
} as Context,
28+
],
29+
users: [
30+
{
31+
name: 'user',
32+
} as User,
33+
],
34+
};
35+
1036
describe('Watch', () => {
1137
it('should construct correctly', () => {
1238
const kc = new KubeConfig();
@@ -15,24 +41,7 @@ describe('Watch', () => {
1541

1642
it('should watch correctly', () => {
1743
const kc = new KubeConfig();
18-
const server = 'foo.company.com';
19-
kc.clusters = [
20-
{
21-
name: 'cluster',
22-
server,
23-
} as Cluster,
24-
] as Cluster[];
25-
kc.contexts = [
26-
{
27-
cluster: 'cluster',
28-
user: 'user',
29-
} as Context,
30-
] as Context[];
31-
kc.users = [
32-
{
33-
name: 'user',
34-
} as User,
35-
];
44+
Object.assign(kc, fakeConfig);
3645
const fakeRequestor = mock(DefaultRequest);
3746
const watch = new Watch(kc, instance(fakeRequestor));
3847

@@ -102,4 +111,52 @@ describe('Watch', () => {
102111
doneCallback(errIn, null, null);
103112
expect(doneErr).to.deep.equal(errIn);
104113
});
114+
115+
it('should ignore JSON parse errors', () => {
116+
const kc = new KubeConfig();
117+
Object.assign(kc, fakeConfig);
118+
const fakeRequestor = mock(DefaultRequest);
119+
const watch = new Watch(kc, instance(fakeRequestor));
120+
121+
const obj = {
122+
type: 'MODIFIED',
123+
object: {
124+
baz: 'blah',
125+
},
126+
};
127+
128+
const fakeRequest = {
129+
pipe: (stream) => {
130+
stream.write(JSON.stringify(obj) + '\n');
131+
stream.write('{"truncated json\n');
132+
},
133+
};
134+
135+
when(fakeRequestor.webRequest(anything(), anyFunction())).thenReturn(fakeRequest);
136+
137+
const path = '/some/path/to/object';
138+
139+
const receivedTypes: string[] = [];
140+
const receivedObjects: string[] = [];
141+
142+
watch.watch(
143+
path,
144+
{},
145+
(recievedType: string, recievedObject: string) => {
146+
receivedTypes.push(recievedType);
147+
receivedObjects.push(recievedObject);
148+
},
149+
() => {
150+
/* ignore */
151+
},
152+
);
153+
154+
verify(fakeRequestor.webRequest(anything(), anyFunction()));
155+
156+
const [opts, doneCallback] = capture(fakeRequestor.webRequest).last();
157+
const reqOpts: request.OptionsWithUri = opts as request.OptionsWithUri;
158+
159+
expect(receivedTypes).to.deep.equal([obj.type]);
160+
expect(receivedObjects).to.deep.equal([obj.object]);
161+
});
105162
});

0 commit comments

Comments
 (0)