Skip to content

Commit 748eb62

Browse files
committed
Support terminal resizes
1 parent 5e7c581 commit 748eb62

File tree

12 files changed

+214
-21
lines changed

12 files changed

+214
-21
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import * as k8s from '@kubernetes/client-node';
2+
3+
const kc = new k8s.KubeConfig();
4+
kc.loadFromDefault();
5+
6+
const terminalSizeQueue = new k8s.TerminalSizeQueue();
7+
terminalSizeQueue.resize(getSize(process.stdout));
8+
process.stdout.on('resize', () => {
9+
terminalSizeQueue.resize(getSize(process.stdout));
10+
});
11+
12+
const attach = new k8s.Attach(kc);
13+
attach.attach(
14+
'default',
15+
'nginx-4217019353-9gl4s',
16+
'nginx',
17+
process.stdout,
18+
process.stderr,
19+
null /* stdin */,
20+
false /* tty */,
21+
terminalSizeQueue,
22+
);
23+
24+
function getSize(writeStream: NodeJS.WriteStream): k8s.TerminalSize {
25+
return {
26+
height: writeStream.rows!,
27+
width: writeStream.columns!,
28+
};
29+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import * as k8s from '@kubernetes/client-node';
2+
import * as stream from 'stream';
3+
4+
const command = process.argv[2];
5+
6+
const kc = new k8s.KubeConfig();
7+
kc.loadFromDefault();
8+
9+
const terminalSizeQueue = new k8s.TerminalSizeQueue();
10+
terminalSizeQueue.resize(getSize(process.stdout));
11+
process.stdout.on('resize', () => {
12+
terminalSizeQueue.resize(getSize(process.stdout));
13+
});
14+
15+
const exec = new k8s.Exec(kc);
16+
exec.exec(
17+
'tutor',
18+
'tutor-environment-operator-deployment-c888b5cd8-qp95f',
19+
'default',
20+
command,
21+
process.stdout as stream.Writable,
22+
process.stderr as stream.Writable,
23+
process.stdin as stream.Readable,
24+
true /* tty */,
25+
(status: k8s.V1Status) => {
26+
// tslint:disable-next-line:no-console
27+
console.log('Exited with status:');
28+
// tslint:disable-next-line:no-console
29+
console.log(JSON.stringify(status, null, 2));
30+
},
31+
terminalSizeQueue,
32+
);
33+
34+
function getSize(writeStream: NodeJS.WriteStream): k8s.TerminalSize {
35+
return {
36+
height: writeStream.rows!,
37+
width: writeStream.columns!,
38+
};
39+
}

src/attach.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import querystring = require('querystring');
33
import stream = require('stream');
44

55
import { KubeConfig } from './config';
6+
import { TerminalSizeQueue } from './terminal-size-queue';
67
import { WebSocketHandler, WebSocketInterface } from './web-socket-handler';
78

89
export class Attach {
@@ -24,6 +25,7 @@ export class Attach {
2425
stderr: stream.Writable | any,
2526
stdin: stream.Readable | any,
2627
tty: boolean,
28+
terminalSizeQueue?: TerminalSizeQueue,
2729
): Promise<WebSocket> {
2830
const query = {
2931
container: containerName,
@@ -43,9 +45,12 @@ export class Attach {
4345
},
4446
);
4547
if (stdin != null) {
46-
WebSocketHandler.handleStandardInput(conn, stdin);
48+
WebSocketHandler.handleStandardInput(conn, stdin, WebSocketHandler.StdinStream);
4749
}
4850

51+
if (terminalSizeQueue != null) {
52+
WebSocketHandler.handleStandardInput(conn, terminalSizeQueue, WebSocketHandler.ResizeStream);
53+
}
4954
return conn;
5055
}
5156
}

src/attach_test.ts

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
import { expect } from 'chai';
2+
import { EventEmitter } from 'events';
23
import WebSocket = require('isomorphic-ws');
34
import { ReadableStreamBuffer, WritableStreamBuffer } from 'stream-buffers';
45
import { anyFunction, anything, capture, instance, mock, verify, when } from 'ts-mockito';
6+
import { CallAwaiter, matchBuffer } from '../test';
57

68
import { Attach } from './attach';
79
import { KubeConfig } from './config';
10+
import { TerminalSize, TerminalSizeQueue } from './terminal-size-queue';
811
import { WebSocketHandler, WebSocketInterface } from './web-socket-handler';
912

1013
describe('Attach', () => {
@@ -46,11 +49,14 @@ describe('Attach', () => {
4649

4750
it('should correctly attach to streams', async () => {
4851
const kc = new KubeConfig();
49-
const fakeWebSocket: WebSocketInterface = mock(WebSocketHandler);
50-
const attach = new Attach(kc, instance(fakeWebSocket));
52+
const fakeWebSocketInterface: WebSocketInterface = mock(WebSocketHandler);
53+
const fakeWebSocket: WebSocket = mock(WebSocket);
54+
const callAwaiter: CallAwaiter = new CallAwaiter();
55+
const attach = new Attach(kc, instance(fakeWebSocketInterface));
5156
const osStream = new WritableStreamBuffer();
5257
const errStream = new WritableStreamBuffer();
5358
const isStream = new ReadableStreamBuffer();
59+
const terminalSizeQueue = new TerminalSizeQueue();
5460

5561
const namespace = 'somenamespace';
5662
const pod = 'somepod';
@@ -59,11 +65,24 @@ describe('Attach', () => {
5965
const path = `/api/v1/namespaces/${namespace}/pods/${pod}/attach`;
6066
const args = `container=${container}&stderr=true&stdin=true&stdout=true&tty=false`;
6167

62-
const fakeConn: WebSocket = mock(WebSocket);
63-
when(fakeWebSocket.connect(`${path}?${args}`, null, anyFunction())).thenResolve(fakeConn);
64-
65-
await attach.attach(namespace, pod, container, osStream, errStream, isStream, false);
66-
const [, , outputFn] = capture(fakeWebSocket.connect).last();
68+
const fakeConn: WebSocket = instance(fakeWebSocket);
69+
when(fakeWebSocketInterface.connect(`${path}?${args}`, null, anyFunction())).thenResolve(
70+
fakeConn,
71+
);
72+
when(fakeWebSocket.send(anything())).thenCall(callAwaiter.resolveCall('send'));
73+
when(fakeWebSocket.close()).thenCall(callAwaiter.resolveCall('close'));
74+
75+
await attach.attach(
76+
namespace,
77+
pod,
78+
container,
79+
osStream,
80+
errStream,
81+
isStream,
82+
false,
83+
terminalSizeQueue,
84+
);
85+
const [, , outputFn] = capture(fakeWebSocketInterface.connect).last();
6786

6887
/* tslint:disable:no-unused-expression */
6988
expect(outputFn).to.not.be.null;
@@ -91,11 +110,23 @@ describe('Attach', () => {
91110
}
92111

93112
const msg = 'This is test data';
113+
const inputPromise = callAwaiter.awaitCall('send');
94114
isStream.put(msg);
95-
verify(fakeConn.send(msg));
96-
115+
await inputPromise;
116+
verify(fakeWebSocket.send(matchBuffer(WebSocketHandler.StdinStream, msg))).called();
117+
118+
const terminalSize: TerminalSize = { height: 80, width: 120 };
119+
const resizePromise = callAwaiter.awaitCall('send');
120+
terminalSizeQueue.resize(terminalSize);
121+
await resizePromise;
122+
verify(
123+
fakeWebSocket.send(matchBuffer(WebSocketHandler.ResizeStream, JSON.stringify(terminalSize))),
124+
).called();
125+
126+
const closePromise = callAwaiter.awaitCall('close');
97127
isStream.stop();
98-
verify(fakeConn.close());
128+
await closePromise;
129+
verify(fakeWebSocket.close()).called();
99130
});
100131
});
101132
});

src/exec.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import stream = require('stream');
44

55
import { V1Status } from './api';
66
import { KubeConfig } from './config';
7+
import { TerminalSizeQueue } from './terminal-size-queue';
78
import { WebSocketHandler, WebSocketInterface } from './web-socket-handler';
89

910
export class Exec {
@@ -40,6 +41,7 @@ export class Exec {
4041
stdin: stream.Readable | null,
4142
tty: boolean,
4243
statusCallback?: (status: V1Status) => void,
44+
terminalSizeQueue?: TerminalSizeQueue,
4345
): Promise<WebSocket> {
4446
const query = {
4547
stdout: stdout != null,
@@ -66,7 +68,10 @@ export class Exec {
6668
},
6769
);
6870
if (stdin != null) {
69-
WebSocketHandler.handleStandardInput(conn, stdin);
71+
WebSocketHandler.handleStandardInput(conn, stdin, WebSocketHandler.StdinStream);
72+
}
73+
if (terminalSizeQueue != null) {
74+
WebSocketHandler.handleStandardInput(conn, terminalSizeQueue, WebSocketHandler.ResizeStream);
7075
}
7176
return conn;
7277
}

src/exec_test.ts

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
import { expect } from 'chai';
22
import WebSocket = require('isomorphic-ws');
33
import { ReadableStreamBuffer, WritableStreamBuffer } from 'stream-buffers';
4-
import { anyFunction, capture, instance, mock, verify, when } from 'ts-mockito';
4+
import { anyFunction, anything, capture, instance, mock, verify, when } from 'ts-mockito';
55

6+
import { CallAwaiter, matchBuffer } from '../test';
67
import { V1Status } from './api';
78
import { KubeConfig } from './config';
89
import { Exec } from './exec';
10+
import { TerminalSize, TerminalSizeQueue } from './terminal-size-queue';
911
import { WebSocketHandler, WebSocketInterface } from './web-socket-handler';
1012

1113
describe('Exec', () => {
@@ -55,11 +57,14 @@ describe('Exec', () => {
5557

5658
it('should correctly attach to streams', async () => {
5759
const kc = new KubeConfig();
58-
const fakeWebSocket: WebSocketInterface = mock(WebSocketHandler);
59-
const exec = new Exec(kc, instance(fakeWebSocket));
60+
const fakeWebSocketInterface: WebSocketInterface = mock(WebSocketHandler);
61+
const fakeWebSocket: WebSocket = mock(WebSocket);
62+
const callAwaiter: CallAwaiter = new CallAwaiter();
63+
const exec = new Exec(kc, instance(fakeWebSocketInterface));
6064
const osStream = new WritableStreamBuffer();
6165
const errStream = new WritableStreamBuffer();
6266
const isStream = new ReadableStreamBuffer();
67+
const terminalSizeQueue = new TerminalSizeQueue();
6368

6469
const namespace = 'somenamespace';
6570
const pod = 'somepod';
@@ -71,8 +76,12 @@ describe('Exec', () => {
7176

7277
let statusOut = {} as V1Status;
7378

74-
const fakeConn: WebSocket = mock(WebSocket);
75-
when(fakeWebSocket.connect(`${path}?${args}`, null, anyFunction())).thenResolve(fakeConn);
79+
const fakeConn: WebSocket = instance(fakeWebSocket);
80+
when(fakeWebSocketInterface.connect(`${path}?${args}`, null, anyFunction())).thenResolve(
81+
fakeConn,
82+
);
83+
when(fakeWebSocket.send(anything())).thenCall(callAwaiter.resolveCall('send'));
84+
when(fakeWebSocket.close()).thenCall(callAwaiter.resolveCall('close'));
7685

7786
await exec.exec(
7887
namespace,
@@ -86,9 +95,10 @@ describe('Exec', () => {
8695
(status: V1Status) => {
8796
statusOut = status;
8897
},
98+
terminalSizeQueue,
8999
);
90100

91-
const [, , outputFn] = capture(fakeWebSocket.connect).last();
101+
const [, , outputFn] = capture(fakeWebSocketInterface.connect).last();
92102

93103
/* tslint:disable:no-unused-expression */
94104
expect(outputFn).to.not.be.null;
@@ -116,19 +126,30 @@ describe('Exec', () => {
116126
}
117127

118128
const msg = 'This is test data';
129+
const inputPromise = callAwaiter.awaitCall('send');
119130
isStream.put(msg);
120-
verify(fakeConn.send(msg));
131+
await inputPromise;
132+
verify(fakeWebSocket.send(matchBuffer(WebSocketHandler.StdinStream, msg))).called();
133+
134+
const terminalSize: TerminalSize = { height: 80, width: 120 };
135+
const resizePromise = callAwaiter.awaitCall('send');
136+
terminalSizeQueue.resize(terminalSize);
137+
await resizePromise;
138+
verify(
139+
fakeWebSocket.send(matchBuffer(WebSocketHandler.ResizeStream, JSON.stringify(terminalSize))),
140+
).called();
121141

122142
const statusIn = {
123143
code: 100,
124144
message: 'this is a test',
125145
} as V1Status;
126-
127146
outputFn(WebSocketHandler.StatusStream, Buffer.from(JSON.stringify(statusIn)));
128147
expect(statusOut).to.deep.equal(statusIn);
129148

149+
const closePromise = callAwaiter.awaitCall('close');
130150
isStream.stop();
131-
verify(fakeConn.close());
151+
await closePromise;
152+
verify(fakeWebSocket.close()).called();
132153
});
133154
});
134155
});

src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@ export * from './portforward';
88
export * from './types';
99
export * from './yaml';
1010
export * from './log';
11+
export * from './terminal-size-queue';

src/terminal-size-queue.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import { Readable, ReadableOptions } from 'stream';
2+
3+
export interface TerminalSize {
4+
height: number;
5+
width: number;
6+
}
7+
8+
export class TerminalSizeQueue extends Readable {
9+
constructor(opts: ReadableOptions = {}) {
10+
super({
11+
...opts,
12+
// tslint:disable-next-line:no-empty
13+
read() {},
14+
});
15+
}
16+
17+
public resize(size: TerminalSize) {
18+
this.push(JSON.stringify(size));
19+
}
20+
}

src/web-socket-handler.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ export class WebSocketHandler implements WebSocketInterface {
1919
public static readonly StdoutStream = 1;
2020
public static readonly StderrStream = 2;
2121
public static readonly StatusStream = 3;
22+
public static readonly ResizeStream = 4;
2223

2324
public static handleStandardStreams(
2425
streamNum: number,

test/call-awaiter.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import { EventEmitter } from 'events';
2+
3+
export class CallAwaiter extends EventEmitter {
4+
public awaitCall(event: string) {
5+
return new Promise<any[]>((resolve) => {
6+
this.once(event, resolve);
7+
});
8+
}
9+
10+
public resolveCall(event: string) {
11+
return (...args: any[]) => this.emit(event, ...args);
12+
}
13+
}

0 commit comments

Comments
 (0)