Skip to content

Commit e5ec945

Browse files
authored
Merge pull request #240 from lizardruss/master
Support terminal resizes
2 parents 5e7c581 + f385aba commit e5ec945

11 files changed

+195
-22
lines changed

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
"@types/chai": "^4.1.6",
6868
"@types/mocha": "^5.2.5",
6969
"@types/mock-fs": "^3.6.30",
70+
"@types/stream-buffers": "^3.0.3",
7071
"chai": "^4.2.0",
7172
"jasmine": "^3.3.0",
7273
"mocha": "^5.2.0",

src/attach.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@ import querystring = require('querystring');
33
import stream = require('stream');
44

55
import { KubeConfig } from './config';
6+
import { isResizable, ResizableStream, TerminalSizeQueue } from './terminal-size-queue';
67
import { WebSocketHandler, WebSocketInterface } from './web-socket-handler';
78

89
export class Attach {
910
public 'handler': WebSocketInterface;
1011

12+
private terminalSizeQueue?: TerminalSizeQueue;
13+
1114
public constructor(config: KubeConfig, websocketInterface?: WebSocketInterface) {
1215
if (websocketInterface) {
1316
this.handler = websocketInterface;
@@ -43,9 +46,13 @@ export class Attach {
4346
},
4447
);
4548
if (stdin != null) {
46-
WebSocketHandler.handleStandardInput(conn, stdin);
49+
WebSocketHandler.handleStandardInput(conn, stdin, WebSocketHandler.StdinStream);
50+
}
51+
if (isResizable(stdout)) {
52+
this.terminalSizeQueue = new TerminalSizeQueue();
53+
WebSocketHandler.handleStandardInput(conn, this.terminalSizeQueue, WebSocketHandler.ResizeStream);
54+
this.terminalSizeQueue.handleResizes((stdout as any) as ResizableStream);
4755
}
48-
4956
return conn;
5057
}
5158
}

src/attach_test.ts

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ import WebSocket = require('isomorphic-ws');
33
import { ReadableStreamBuffer, WritableStreamBuffer } from 'stream-buffers';
44
import { anyFunction, anything, capture, instance, mock, verify, when } from 'ts-mockito';
55

6+
import { CallAwaiter, matchBuffer, ResizableWriteableStreamBuffer } from '../test';
67
import { Attach } from './attach';
78
import { KubeConfig } from './config';
9+
import { TerminalSize } from './terminal-size-queue';
810
import { WebSocketHandler, WebSocketInterface } from './web-socket-handler';
911

1012
describe('Attach', () => {
@@ -46,9 +48,11 @@ describe('Attach', () => {
4648

4749
it('should correctly attach to streams', async () => {
4850
const kc = new KubeConfig();
49-
const fakeWebSocket: WebSocketInterface = mock(WebSocketHandler);
50-
const attach = new Attach(kc, instance(fakeWebSocket));
51-
const osStream = new WritableStreamBuffer();
51+
const fakeWebSocketInterface: WebSocketInterface = mock(WebSocketHandler);
52+
const fakeWebSocket: WebSocket = mock(WebSocket);
53+
const callAwaiter: CallAwaiter = new CallAwaiter();
54+
const attach = new Attach(kc, instance(fakeWebSocketInterface));
55+
const osStream = new ResizableWriteableStreamBuffer();
5256
const errStream = new WritableStreamBuffer();
5357
const isStream = new ReadableStreamBuffer();
5458

@@ -59,11 +63,15 @@ describe('Attach', () => {
5963
const path = `/api/v1/namespaces/${namespace}/pods/${pod}/attach`;
6064
const args = `container=${container}&stderr=true&stdin=true&stdout=true&tty=false`;
6165

62-
const fakeConn: WebSocket = mock(WebSocket);
63-
when(fakeWebSocket.connect(`${path}?${args}`, null, anyFunction())).thenResolve(fakeConn);
66+
const fakeConn: WebSocket = instance(fakeWebSocket);
67+
when(fakeWebSocketInterface.connect(`${path}?${args}`, null, anyFunction())).thenResolve(
68+
fakeConn,
69+
);
70+
when(fakeWebSocket.send(anything())).thenCall(callAwaiter.resolveCall('send'));
71+
when(fakeWebSocket.close()).thenCall(callAwaiter.resolveCall('close'));
6472

6573
await attach.attach(namespace, pod, container, osStream, errStream, isStream, false);
66-
const [, , outputFn] = capture(fakeWebSocket.connect).last();
74+
const [, , outputFn] = capture(fakeWebSocketInterface.connect).last();
6775

6876
/* tslint:disable:no-unused-expression */
6977
expect(outputFn).to.not.be.null;
@@ -90,12 +98,34 @@ describe('Attach', () => {
9098
expect(buff[i]).to.equal(20);
9199
}
92100

101+
const initialTerminalSize: TerminalSize = { height: 0, width: 0 };
102+
await callAwaiter.awaitCall('send');
103+
verify(
104+
fakeWebSocket.send(
105+
matchBuffer(WebSocketHandler.ResizeStream, JSON.stringify(initialTerminalSize)),
106+
),
107+
).called();
108+
93109
const msg = 'This is test data';
110+
const inputPromise = callAwaiter.awaitCall('send');
94111
isStream.put(msg);
95-
verify(fakeConn.send(msg));
96-
112+
await inputPromise;
113+
verify(fakeWebSocket.send(matchBuffer(WebSocketHandler.StdinStream, msg))).called();
114+
115+
const terminalSize: TerminalSize = { height: 80, width: 120 };
116+
const resizePromise = callAwaiter.awaitCall('send');
117+
osStream.rows = terminalSize.height;
118+
osStream.columns = terminalSize.width;
119+
osStream.emit('resize');
120+
await resizePromise;
121+
verify(
122+
fakeWebSocket.send(matchBuffer(WebSocketHandler.ResizeStream, JSON.stringify(terminalSize))),
123+
).called();
124+
125+
const closePromise = callAwaiter.awaitCall('close');
97126
isStream.stop();
98-
verify(fakeConn.close());
127+
await closePromise;
128+
verify(fakeWebSocket.close()).called();
99129
});
100130
});
101131
});

src/exec.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,14 @@ import stream = require('stream');
44

55
import { V1Status } from './api';
66
import { KubeConfig } from './config';
7+
import { isResizable, ResizableStream, TerminalSizeQueue } from './terminal-size-queue';
78
import { WebSocketHandler, WebSocketInterface } from './web-socket-handler';
89

910
export class Exec {
1011
public 'handler': WebSocketInterface;
1112

13+
private terminalSizeQueue?: TerminalSizeQueue;
14+
1215
public constructor(config: KubeConfig, wsInterface?: WebSocketInterface) {
1316
if (wsInterface) {
1417
this.handler = wsInterface;
@@ -66,7 +69,12 @@ export class Exec {
6669
},
6770
);
6871
if (stdin != null) {
69-
WebSocketHandler.handleStandardInput(conn, stdin);
72+
WebSocketHandler.handleStandardInput(conn, stdin, WebSocketHandler.StdinStream);
73+
}
74+
if (isResizable(stdout)) {
75+
this.terminalSizeQueue = new TerminalSizeQueue();
76+
WebSocketHandler.handleStandardInput(conn, this.terminalSizeQueue, WebSocketHandler.ResizeStream);
77+
this.terminalSizeQueue.handleResizes((stdout as any) as ResizableStream);
7078
}
7179
return conn;
7280
}

src/exec_test.ts

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
import { expect } from 'chai';
22
import WebSocket = require('isomorphic-ws');
33
import { ReadableStreamBuffer, WritableStreamBuffer } from 'stream-buffers';
4-
import { anyFunction, capture, instance, mock, verify, when } from 'ts-mockito';
4+
import { anyFunction, anything, capture, instance, mock, verify, when } from 'ts-mockito';
55

6+
import { CallAwaiter, matchBuffer, ResizableWriteableStreamBuffer } from '../test';
67
import { V1Status } from './api';
78
import { KubeConfig } from './config';
89
import { Exec } from './exec';
10+
import { TerminalSize } from './terminal-size-queue';
911
import { WebSocketHandler, WebSocketInterface } from './web-socket-handler';
1012

1113
describe('Exec', () => {
@@ -55,9 +57,11 @@ describe('Exec', () => {
5557

5658
it('should correctly attach to streams', async () => {
5759
const kc = new KubeConfig();
58-
const fakeWebSocket: WebSocketInterface = mock(WebSocketHandler);
59-
const exec = new Exec(kc, instance(fakeWebSocket));
60-
const osStream = new WritableStreamBuffer();
60+
const fakeWebSocketInterface: WebSocketInterface = mock(WebSocketHandler);
61+
const fakeWebSocket: WebSocket = mock(WebSocket);
62+
const callAwaiter: CallAwaiter = new CallAwaiter();
63+
const exec = new Exec(kc, instance(fakeWebSocketInterface));
64+
const osStream = new ResizableWriteableStreamBuffer();
6165
const errStream = new WritableStreamBuffer();
6266
const isStream = new ReadableStreamBuffer();
6367

@@ -71,8 +75,12 @@ describe('Exec', () => {
7175

7276
let statusOut = {} as V1Status;
7377

74-
const fakeConn: WebSocket = mock(WebSocket);
75-
when(fakeWebSocket.connect(`${path}?${args}`, null, anyFunction())).thenResolve(fakeConn);
78+
const fakeConn: WebSocket = instance(fakeWebSocket);
79+
when(fakeWebSocketInterface.connect(`${path}?${args}`, null, anyFunction())).thenResolve(
80+
fakeConn,
81+
);
82+
when(fakeWebSocket.send(anything())).thenCall(callAwaiter.resolveCall('send'));
83+
when(fakeWebSocket.close()).thenCall(callAwaiter.resolveCall('close'));
7684

7785
await exec.exec(
7886
namespace,
@@ -88,7 +96,7 @@ describe('Exec', () => {
8896
},
8997
);
9098

91-
const [, , outputFn] = capture(fakeWebSocket.connect).last();
99+
const [, , outputFn] = capture(fakeWebSocketInterface.connect).last();
92100

93101
/* tslint:disable:no-unused-expression */
94102
expect(outputFn).to.not.be.null;
@@ -115,20 +123,41 @@ describe('Exec', () => {
115123
expect(buff[i]).to.equal(20);
116124
}
117125

126+
const initialTerminalSize: TerminalSize = { height: 0, width: 0 };
127+
await callAwaiter.awaitCall('send');
128+
verify(
129+
fakeWebSocket.send(
130+
matchBuffer(WebSocketHandler.ResizeStream, JSON.stringify(initialTerminalSize)),
131+
),
132+
).called();
133+
118134
const msg = 'This is test data';
135+
const inputPromise = callAwaiter.awaitCall('send');
119136
isStream.put(msg);
120-
verify(fakeConn.send(msg));
137+
await inputPromise;
138+
verify(fakeWebSocket.send(matchBuffer(WebSocketHandler.StdinStream, msg))).called();
139+
140+
const terminalSize: TerminalSize = { height: 80, width: 120 };
141+
const resizePromise = callAwaiter.awaitCall('send');
142+
osStream.rows = terminalSize.height;
143+
osStream.columns = terminalSize.width;
144+
osStream.emit('resize');
145+
await resizePromise;
146+
verify(
147+
fakeWebSocket.send(matchBuffer(WebSocketHandler.ResizeStream, JSON.stringify(terminalSize))),
148+
).called();
121149

122150
const statusIn = {
123151
code: 100,
124152
message: 'this is a test',
125153
} as V1Status;
126-
127154
outputFn(WebSocketHandler.StatusStream, Buffer.from(JSON.stringify(statusIn)));
128155
expect(statusOut).to.deep.equal(statusIn);
129156

157+
const closePromise = callAwaiter.awaitCall('close');
130158
isStream.stop();
131-
verify(fakeConn.close());
159+
await closePromise;
160+
verify(fakeWebSocket.close()).called();
132161
});
133162
});
134163
});

src/terminal-size-queue.ts

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import { Readable, ReadableOptions } from 'stream';
2+
3+
export interface ResizableStream {
4+
columns: number;
5+
rows: number;
6+
on(event: 'resize', cb: () => void);
7+
}
8+
9+
export interface TerminalSize {
10+
height: number;
11+
width: number;
12+
}
13+
14+
export class TerminalSizeQueue extends Readable {
15+
constructor(opts: ReadableOptions = {}) {
16+
super({
17+
...opts,
18+
// tslint:disable-next-line:no-empty
19+
read() {},
20+
});
21+
}
22+
23+
public handleResizes(writeStream: ResizableStream) {
24+
// Set initial size
25+
this.resize(getTerminalSize(writeStream));
26+
27+
// Handle future size updates
28+
writeStream.on('resize', () => this.resize(getTerminalSize(writeStream)));
29+
}
30+
31+
private resize(size: TerminalSize) {
32+
this.push(JSON.stringify(size));
33+
}
34+
}
35+
36+
export function isResizable(stream: any) {
37+
if (stream == null) {
38+
return false;
39+
}
40+
41+
const hasRows = 'rows' in stream;
42+
const hasColumns = 'columns' in stream;
43+
const hasOn = typeof stream.on === 'function';
44+
return hasRows && hasColumns && hasOn;
45+
}
46+
47+
function getTerminalSize(writeStream: ResizableStream): TerminalSize {
48+
return { height: writeStream.rows!, width: writeStream.columns! };
49+
}

src/web-socket-handler.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ export class WebSocketHandler implements WebSocketInterface {
1919
public static readonly StdoutStream = 1;
2020
public static readonly StderrStream = 2;
2121
public static readonly StatusStream = 3;
22+
public static readonly ResizeStream = 4;
2223

2324
public static handleStandardStreams(
2425
streamNum: number,

test/call-awaiter.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import { EventEmitter } from 'events';
2+
3+
export class CallAwaiter extends EventEmitter {
4+
public awaitCall(event: string) {
5+
return new Promise<any[]>((resolve) => {
6+
this.once(event, resolve);
7+
});
8+
}
9+
10+
public resolveCall(event: string) {
11+
return (...args: any[]) => this.emit(event, ...args);
12+
}
13+
}

test/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
export * from './call-awaiter';
2+
export * from './match-buffer';
3+
export * from './resizable-writeable-stream-buffer';

test/match-buffer.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import { Matcher } from 'ts-mockito/lib/matcher/type/Matcher';
2+
3+
export function matchBuffer(channel: number, contents: string): Matcher {
4+
return new StringBufferMatcher(channel, contents);
5+
}
6+
7+
class StringBufferMatcher extends Matcher {
8+
constructor(private channel: number, private contents: string) {
9+
super();
10+
}
11+
12+
public match(value: any): boolean {
13+
if (value instanceof Buffer) {
14+
const buffer = value as Buffer;
15+
const channel: number = buffer.readInt8(0);
16+
const contents: string = buffer.toString('utf8', 1);
17+
return this.channel === channel && this.contents === contents;
18+
}
19+
20+
return false;
21+
}
22+
23+
public toString(): string {
24+
return `buffer did not contain "${this.contents}"`;
25+
}
26+
}

0 commit comments

Comments
 (0)