Skip to content

Commit 38a111b

Browse files
authored
Merge pull request #1880 from joeferner/fix-cp-promises
Fix cp promise returns by converting the exec callbacks into promises
2 parents a4e9c0d + 3eeb3e9 commit 38a111b

File tree

6 files changed

+192
-56
lines changed

6 files changed

+192
-56
lines changed

src/cp.ts

Lines changed: 59 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -35,26 +35,39 @@ export class Cp {
3535
command.push(srcPath);
3636
const writerStream = fs.createWriteStream(tmpFileName);
3737
const errStream = new WritableStreamBuffer();
38-
this.execInstance.exec(
39-
namespace,
40-
podName,
41-
containerName,
42-
command,
43-
writerStream,
44-
errStream,
45-
null,
46-
false,
47-
async ({ status }) => {
48-
writerStream.close();
49-
if (status === 'Failure' || errStream.size()) {
50-
throw new Error(`Error from cpFromPod - details: \n ${errStream.getContentsAsString()}`);
51-
}
52-
await tar.x({
53-
file: tmpFileName,
54-
cwd: tgtPath,
55-
});
56-
},
57-
);
38+
return new Promise<void>((resolve, reject) => {
39+
this.execInstance
40+
.exec(
41+
namespace,
42+
podName,
43+
containerName,
44+
command,
45+
writerStream,
46+
errStream,
47+
null,
48+
false,
49+
async ({ status }) => {
50+
try {
51+
writerStream.close();
52+
if (status === 'Failure' || errStream.size()) {
53+
return reject(
54+
new Error(
55+
`Error from cpFromPod - details: \n ${errStream.getContentsAsString()}`,
56+
),
57+
);
58+
}
59+
await tar.x({
60+
file: tmpFileName,
61+
cwd: tgtPath,
62+
});
63+
resolve();
64+
} catch (e) {
65+
reject(e);
66+
}
67+
},
68+
)
69+
.catch(reject);
70+
});
5871
}
5972

6073
/**
@@ -78,20 +91,31 @@ export class Cp {
7891
await tar.c({ file: tmpFileName, cwd }, [srcPath]);
7992
const readStream = fs.createReadStream(tmpFileName);
8093
const errStream = new WritableStreamBuffer();
81-
this.execInstance.exec(
82-
namespace,
83-
podName,
84-
containerName,
85-
command,
86-
null,
87-
errStream,
88-
readStream,
89-
false,
90-
async ({ status }) => {
91-
if (status === 'Failure' || errStream.size()) {
92-
throw new Error(`Error from cpToPod - details: \n ${errStream.getContentsAsString()}`);
93-
}
94-
},
95-
);
94+
return new Promise<void>((resolve, reject) => {
95+
this.execInstance
96+
.exec(
97+
namespace,
98+
podName,
99+
containerName,
100+
command,
101+
null,
102+
errStream,
103+
readStream,
104+
false,
105+
async ({ status }) => {
106+
await fs.promises.unlink(tmpFileName);
107+
if (status === 'Failure' || errStream.size()) {
108+
reject(
109+
new Error(
110+
`Error from cpToPod - details: \n ${errStream.getContentsAsString()}`,
111+
),
112+
);
113+
} else {
114+
resolve();
115+
}
116+
},
117+
)
118+
.catch(reject);
119+
});
96120
}
97121
}

src/cp_test.ts

Lines changed: 118 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,50 @@
11
import { anything, anyFunction, instance, mock, verify, when } from 'ts-mockito';
22
import * as querystring from 'querystring';
3+
import { expect } from 'chai';
34
import WebSocket = require('isomorphic-ws');
4-
5+
import * as fs from 'node:fs';
6+
import * as path from 'node:path';
7+
import { tmpdir } from 'os';
8+
import * as tar from 'tar';
59
import { CallAwaiter } from '../test';
610
import { KubeConfig } from './config';
711
import { Exec } from './exec';
812
import { Cp } from './cp';
9-
import { WebSocketHandler, WebSocketInterface } from './web-socket-handler';
13+
import { BinaryHandler, WebSocketHandler, WebSocketInterface } from './web-socket-handler';
14+
import { V1Status } from './api';
15+
import { randomUUID } from 'crypto';
16+
import { sleep } from './util';
1017

1118
describe('Cp', () => {
19+
let tmpDir = '';
20+
21+
beforeEach(() => {
22+
tmpDir = `${tmpdir()}/${randomUUID()}`;
23+
fs.mkdirSync(tmpDir);
24+
});
25+
26+
afterEach(() => {
27+
if (tmpDir) {
28+
fs.rmSync(tmpDir, { recursive: true, force: true });
29+
}
30+
});
31+
1232
describe('cpFromPod', () => {
1333
it('should run create tar command to a url', async () => {
1434
const kc = new KubeConfig();
15-
const fakeWebSocket: WebSocketInterface = mock(WebSocketHandler);
16-
const exec = new Exec(kc, instance(fakeWebSocket));
35+
const fakeWebSocketInterface: WebSocketInterface = mock(WebSocketHandler);
36+
const fakeWebSocket: WebSocket.WebSocket = mock(WebSocket);
37+
const fakeConn: WebSocket.WebSocket = instance(fakeWebSocket);
38+
const callAwaiter: CallAwaiter = new CallAwaiter();
39+
const exec = new Exec(kc, instance(fakeWebSocketInterface));
1740
const cp = new Cp(kc, exec);
1841

1942
const namespace = 'somenamespace';
2043
const pod = 'somepod';
2144
const container = 'container';
2245
const srcPath = '/';
23-
const tgtPath = '/';
2446
const cmdArray = ['tar', 'zcf', '-', srcPath];
25-
const path = `/api/v1/namespaces/${namespace}/pods/${pod}/exec`;
47+
const queryPath = `/api/v1/namespaces/${namespace}/pods/${pod}/exec`;
2648

2749
const query = {
2850
stdout: true,
@@ -34,9 +56,46 @@ describe('Cp', () => {
3456
};
3557
const queryStr = querystring.stringify(query);
3658

37-
await cp.cpFromPod(namespace, pod, container, srcPath, tgtPath);
38-
// tslint:disable-next-line:max-line-length
39-
verify(fakeWebSocket.connect(`${path}?${queryStr}`, null, anyFunction())).called();
59+
when(fakeWebSocketInterface.connect(`${queryPath}?${queryStr}`, null, anyFunction())).thenCall(
60+
callAwaiter.resolveCall('connect', fakeConn),
61+
);
62+
when(fakeWebSocket.close()).thenCall(callAwaiter.resolveCall('close'));
63+
64+
let complete = false;
65+
let lastErr = undefined;
66+
const promise = cp
67+
.cpFromPod(namespace, pod, container, srcPath, tmpDir)
68+
.then(() => {
69+
complete = true;
70+
})
71+
.catch((err) => {
72+
lastErr = err;
73+
});
74+
expect(lastErr).to.be.undefined;
75+
expect(complete).to.be.false;
76+
77+
const binaryHandler: BinaryHandler = (await callAwaiter.awaitCall('connect'))[2];
78+
79+
// simulate a network hope with a sleep
80+
await sleep(1);
81+
const contents = fs.readFileSync('testdata/archive.tgz');
82+
binaryHandler(WebSocketHandler.StdoutStream, contents);
83+
84+
// simulate a network hope with a sleep
85+
await sleep(1);
86+
const status: V1Status = {
87+
status: 'Success',
88+
};
89+
binaryHandler(WebSocketHandler.StatusStream, Buffer.from(JSON.stringify(status)));
90+
91+
await promise;
92+
93+
expect(lastErr).to.be.undefined;
94+
expect(complete).to.be.true;
95+
96+
const found = fs.readFileSync(path.join(tmpDir, 'archive.txt')).toString('utf8');
97+
const expected = fs.readFileSync('testdata/archive.txt').toString('utf8');
98+
expect(found).to.eq(expected);
4099
});
41100
});
42101

@@ -52,10 +111,11 @@ describe('Cp', () => {
52111
const namespace = 'somenamespace';
53112
const pod = 'somepod';
54113
const container = 'container';
55-
const srcPath = 'testdata/archive.txt';
114+
const srcPath = 'archive.txt';
56115
const tgtPath = '/';
57116
const cmdArray = ['tar', 'xf', '-', '-C', tgtPath];
58-
const path = `/api/v1/namespaces/${namespace}/pods/${pod}/exec`;
117+
const cwd = 'testdata/';
118+
const queryPath = `/api/v1/namespaces/${namespace}/pods/${pod}/exec`;
59119

60120
const query = {
61121
stdout: false,
@@ -68,14 +128,56 @@ describe('Cp', () => {
68128
const queryStr = querystring.stringify(query);
69129

70130
const fakeConn: WebSocket.WebSocket = instance(fakeWebSocket);
71-
when(fakeWebSocketInterface.connect(`${path}?${queryStr}`, null, anyFunction())).thenResolve(
72-
fakeConn,
131+
when(fakeWebSocketInterface.connect(`${queryPath}?${queryStr}`, null, anyFunction())).thenCall(
132+
callAwaiter.resolveCall('connect', fakeConn),
73133
);
74-
when(fakeWebSocket.send(anything())).thenCall(callAwaiter.resolveCall('send'));
134+
135+
const outFilename = path.join(tmpDir, 'send-data.tar');
136+
const out = fs.createWriteStream(outFilename);
137+
when(fakeWebSocket.send(anything())).thenCall((data) => {
138+
const streamNum = data.readInt8(0);
139+
if (streamNum === WebSocketHandler.StdinStream) {
140+
out.write(data.subarray(1));
141+
} else {
142+
console.log(streamNum);
143+
}
144+
});
145+
75146
when(fakeWebSocket.close()).thenCall(callAwaiter.resolveCall('close'));
76147

77-
await cp.cpToPod(namespace, pod, container, srcPath, tgtPath);
78-
verify(fakeWebSocketInterface.connect(`${path}?${queryStr}`, null, anyFunction())).called();
148+
let complete = false;
149+
let lastErr = undefined;
150+
const promise = cp
151+
.cpToPod(namespace, pod, container, srcPath, tgtPath, cwd)
152+
.then(() => {
153+
complete = true;
154+
})
155+
.catch((err) => {
156+
lastErr = err;
157+
});
158+
expect(lastErr).to.be.undefined;
159+
expect(complete).to.be.false;
160+
161+
const binaryHandler: BinaryHandler = (await callAwaiter.awaitCall('connect'))[2];
162+
163+
// wait for all data to be written and close called
164+
await callAwaiter.awaitCall('close');
165+
out.close();
166+
await tar.x({ f: outFilename, cwd: tmpDir });
167+
168+
// simulate a network hope with a sleep
169+
await sleep(1);
170+
const status: V1Status = {
171+
status: 'Success',
172+
};
173+
binaryHandler(WebSocketHandler.StatusStream, Buffer.from(JSON.stringify(status)));
174+
175+
await promise;
176+
177+
expect(lastErr).to.be.undefined;
178+
expect(complete).to.be.true;
179+
180+
verify(fakeWebSocketInterface.connect(`${queryPath}?${queryStr}`, null, anyFunction())).called();
79181
});
80182
});
81183
});

src/util.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,3 +194,7 @@ export const resolvablePromise = <T>(): ResolvablePromise<T> => {
194194
promise.reject = reject!;
195195
return promise;
196196
};
197+
198+
export const sleep = (ms: number): Promise<void> => {
199+
return new Promise<void>((resolve) => setTimeout(resolve));
200+
};

src/web-socket-handler.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,14 @@ import { KubeConfig } from './config';
66

77
const protocols = ['v4.channel.k8s.io', 'v3.channel.k8s.io', 'v2.channel.k8s.io', 'channel.k8s.io'];
88

9+
export type TextHandler = (text: string) => boolean;
10+
export type BinaryHandler = (stream: number, buff: Buffer) => boolean;
11+
912
export interface WebSocketInterface {
1013
connect(
1114
path: string,
12-
textHandler: ((text: string) => boolean) | null,
13-
binaryHandler: ((stream: number, buff: Buffer) => boolean) | null,
15+
textHandler: TextHandler | null,
16+
binaryHandler: BinaryHandler | null,
1417
): Promise<WebSocket.WebSocket>;
1518
}
1619

test/call-awaiter.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@ import { EventEmitter } from 'events';
33
export class CallAwaiter extends EventEmitter {
44
public awaitCall(event: string) {
55
return new Promise<any[]>((resolve) => {
6-
this.once(event, resolve);
6+
this.once(event, (...args: any[]) => resolve(args));
77
});
88
}
99

10-
public resolveCall(event: string) {
11-
return (...args: any[]) => this.emit(event, ...args);
10+
public resolveCall(event: string, returnValue?: any) {
11+
return (...args: any[]) => {
12+
this.emit(event, ...args);
13+
return returnValue;
14+
}
1215
}
1316
}

testdata/archive.tgz

128 Bytes
Binary file not shown.

0 commit comments

Comments
 (0)