Skip to content

Commit 01bdd64

Browse files
committed
Add Port forwarding.
1 parent 5ca2d39 commit 01bdd64

File tree

10 files changed

+268
-13
lines changed

10 files changed

+268
-13
lines changed
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import k8s = require('@kubernetes/client-node');
2+
import * as net from 'net';
3+
4+
const command = process.argv[2];
5+
6+
const kc = new k8s.KubeConfig();
7+
kc.loadFromDefault();
8+
9+
const forward = new k8s.PortForward(kc);
10+
11+
// This simple server just forwards traffic from itself to a service running in kubernetes
12+
// -> localhost:8080 -> port-forward-tunnel -> kubernetes-pod
13+
// This is basically equivalent to 'kubectl port-forward ...' but in TypeScript.
14+
const server = net.createServer(function(socket) {
15+
forward.portForward('default', 'simple-node-simple-node-d49cc9d69-8ls5q', [8000], socket, null, socket);
16+
});
17+
18+
server.listen(8080, '127.0.0.1');
19+

node-client/package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
],
2727
"exclude": [
2828
"src/api.ts",
29-
"src/config_test.ts"
29+
"src/*_test.ts"
3030
],
3131
"extension": [
3232
".ts"
@@ -71,6 +71,8 @@
7171
"mock-fs": "^4.6.0",
7272
"nyc": "^12.0.2",
7373
"source-map-support": "^0.5.9",
74+
"stream-buffers": "^3.0.2",
75+
"ts-mockito": "^2.3.1",
7476
"ts-node": "^3.1.0",
7577
"tslint": "^5.10.0",
7678
"typescript": "^2.6.2"

node-client/src/attach.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@ export class Attach {
2323
};
2424
const queryStr = querystring.stringify(query);
2525
const path = `/api/v1/namespaces/${namespace}/pods/${podName}/attach?${queryStr}`;
26-
const promise = this.handler.connect(path, () => { return; }, (streamNum: number, buff: Buffer) => {
26+
const promise = this.handler.connect(path, null, (streamNum: number, buff: Buffer): boolean => {
2727
WebSocketHandler.handleStandardStreams(streamNum, buff, stdout, stderr);
28+
return true;
2829
});
2930
const result = new Promise<void>((resolvePromise, reject) => {
3031
promise.then(() => resolvePromise(), (err) => reject(err));

node-client/src/exec.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@ export class Exec {
2626
};
2727
const queryStr = querystring.stringify(query);
2828
const path = `/api/v1/namespaces/${namespace}/pods/${podName}/exec?${queryStr}`;
29-
const conn = await this.handler.connect(path, (text) => { return; }, (streamNum: number, buff: Buffer) => {
29+
const conn = await this.handler.connect(path, null, (streamNum: number, buff: Buffer): boolean => {
3030
WebSocketHandler.handleStandardStreams(streamNum, buff, stdout, stderr);
31+
return true;
3132
});
3233
if (stdin != null) {
3334
WebSocketHandler.handleStandardInput(conn, stdin);

node-client/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ export * from './api';
33
export * from './attach';
44
export * from './watch';
55
export * from './exec';
6+
export * from './portforward';

node-client/src/portforward.ts

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import querystring = require('querystring');
2+
import stream = require('stream');
3+
import ws = require('websocket');
4+
5+
import { KubeConfig } from './config';
6+
import { WebSocketHandler, WebSocketInterface } from './web-socket-handler';
7+
8+
export class PortForward {
9+
private readonly handler: WebSocketInterface;
10+
private readonly disconnectOnErr: boolean;
11+
12+
// handler is a parameter really only for injecting for testing.
13+
constructor(config: KubeConfig, disconnectOnErr?: boolean, handler?: WebSocketInterface) {
14+
if (!handler) {
15+
this.handler = new WebSocketHandler(config);
16+
} else {
17+
this.handler = handler;
18+
}
19+
this.disconnectOnErr = true;
20+
}
21+
22+
// TODO: support multiple ports for real...
23+
public async portForward(namespace: string, podName: string, targetPorts: number[],
24+
output: stream.Writable, err: stream.Writable,
25+
input: stream.Readable): Promise<ws.connection> {
26+
if (targetPorts.length === 0) {
27+
throw new Error('You must provide at least one port to forward to.');
28+
}
29+
if (targetPorts.length > 1) {
30+
throw(new Error('ERROR: Only one port is currently supported for port-forward'));
31+
}
32+
const query = {
33+
ports: targetPorts[0],
34+
};
35+
const queryStr = querystring.stringify(query);
36+
const needsToReadPortNumber: boolean[] = [];
37+
targetPorts.forEach((value: number, index: number) => {
38+
needsToReadPortNumber[index * 2] = true;
39+
needsToReadPortNumber[index * 2 + 1] = true;
40+
});
41+
const path = `/api/v1/namespaces/${namespace}/pods/${podName}/portforward?${queryStr}`;
42+
const conn = await this.handler.connect(path, null, (streamNum: number, buff: Buffer | string): boolean => {
43+
if (streamNum >= targetPorts.length * 2) {
44+
if (this.disconnectOnErr) {
45+
return false;
46+
}
47+
}
48+
// First two bytes of each stream are the port number
49+
if (needsToReadPortNumber[streamNum]) {
50+
buff = buff.slice(2);
51+
needsToReadPortNumber[streamNum] = false;
52+
}
53+
if (streamNum % 2 === 1) {
54+
if (err) {
55+
err.write(buff);
56+
}
57+
} else {
58+
output.write(buff);
59+
}
60+
return true;
61+
});
62+
WebSocketHandler.handleStandardInput(conn, input, 0);
63+
return conn;
64+
}
65+
}

node-client/src/portforward_test.ts

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import { expect } from 'chai';
2+
import { ReadableStreamBuffer, WritableStreamBuffer } from 'stream-buffers';
3+
import { anyFunction, capture, instance, mock, reset, verify } from 'ts-mockito';
4+
5+
import { KubeConfig } from './config';
6+
import { PortForward } from './portforward';
7+
import { WebSocketHandler, WebSocketInterface } from './web-socket-handler';
8+
9+
describe('PortForward', () => {
10+
describe('basic', () => {
11+
it('should correctly port-forward to a url', async () => {
12+
const kc = new KubeConfig();
13+
const fakeWebSocket: WebSocketInterface = mock(WebSocketHandler);
14+
const portForward = new PortForward(kc, true, instance(fakeWebSocket));
15+
const osStream = new WritableStreamBuffer();
16+
const errStream = new WritableStreamBuffer();
17+
const isStream = new ReadableStreamBuffer();
18+
19+
const namespace = 'somenamespace';
20+
const pod = 'somepod';
21+
const port = 8080;
22+
23+
await portForward.portForward(
24+
namespace, pod, [port], osStream, errStream, isStream);
25+
26+
const path = `/api/v1/namespaces/${namespace}/pods/${pod}/portforward?ports=${port}`;
27+
verify(fakeWebSocket.connect(path, null, anyFunction())).called();
28+
});
29+
30+
it('should correctly port-forward streams', async () => {
31+
const kc = new KubeConfig();
32+
const fakeWebSocket: WebSocketInterface = mock(WebSocketHandler);
33+
const portForward = new PortForward(kc, true, instance(fakeWebSocket));
34+
const osStream = new WritableStreamBuffer();
35+
const errStream = new WritableStreamBuffer();
36+
const isStream = new ReadableStreamBuffer();
37+
38+
await portForward.portForward(
39+
'ns', 'p', [8000], osStream, errStream, isStream);
40+
41+
const [ , , outputFn] = capture(fakeWebSocket.connect).last();
42+
43+
/* tslint:disable:no-unused-expression */
44+
expect(outputFn).to.not.be.null;
45+
// this is redundant but needed for the compiler, sigh...
46+
if (outputFn) {
47+
const buffer = Buffer.alloc(1024, 10);
48+
49+
outputFn(0, buffer);
50+
// first time, drop two bytes for the port number.
51+
expect(osStream.size()).to.equal(1022);
52+
53+
outputFn(0, buffer);
54+
expect(osStream.size()).to.equal(2046);
55+
56+
// error stream, drop two bytes for the port number.
57+
outputFn(1, buffer);
58+
expect(errStream.size()).to.equal(1022);
59+
60+
outputFn(1, buffer);
61+
expect(errStream.size()).to.equal(2046);
62+
63+
// unknown stream, shouldn't change anything.
64+
outputFn(2, buffer);
65+
expect(osStream.size()).to.equal(2046);
66+
expect(errStream.size()).to.equal(2046);
67+
}
68+
});
69+
});
70+
});

node-client/src/proto-client.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import http = require('http');
2+
import url = require('url');
3+
4+
import { KubeConfig } from './config';
5+
6+
export class ProtoClient {
7+
public readonly 'config': KubeConfig;
8+
9+
public async get(msgType: any, requestPath: string): Promise<any> {
10+
const server = this.config.getCurrentCluster().server;
11+
const u = new url.URL(server);
12+
const options = {
13+
path: requestPath,
14+
hostname: u.hostname,
15+
protocol: u.protocol,
16+
};
17+
this.config.applytoHTTPSOptions(options);
18+
const req = http.request(options);
19+
20+
const result = new Promise<any>((resolve, reject) => {
21+
let data = '';
22+
req.on('data', (chunk) => {
23+
data = data + chunk;
24+
});
25+
req.on('end', () => {
26+
const obj = msgType.deserializeBinary(data);
27+
resolve(obj);
28+
});
29+
req.on('error', (err) => {
30+
reject(err);
31+
});
32+
});
33+
req.end();
34+
return result;
35+
}
36+
}

node-client/src/web-socket-handler.ts

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import https = require('https');
22
import stream = require('stream');
3-
43
import ws = require('websocket');
4+
55
import { V1Status } from './api';
66
import { KubeConfig } from './config';
77

@@ -12,13 +12,20 @@ const protocols = [
1212
'channel.k8s.io',
1313
];
1414

15-
export class WebSocketHandler {
15+
export interface WebSocketInterface {
16+
connect(path: string,
17+
textHandler: ((text: string) => boolean) | null,
18+
binaryHandler: ((stream: number, buff: Buffer) => boolean) | null): Promise<ws.connection>;
19+
}
20+
21+
export class WebSocketHandler implements WebSocketInterface {
1622
public static readonly StdinStream = 0;
1723
public static readonly StdoutStream = 1;
1824
public static readonly StderrStream = 2;
1925
public static readonly StatusStream = 3;
2026

21-
public static handleStandardStreams(streamNum: number, buff: Buffer, stdout: any, stderr: any): V1Status | null {
27+
public static handleStandardStreams(streamNum: number, buff: Buffer,
28+
stdout: stream.Writable, stderr: stream.Writable): V1Status | null {
2229
if (buff.length < 1) {
2330
return null;
2431
}
@@ -41,10 +48,11 @@ export class WebSocketHandler {
4148
return null;
4249
}
4350

44-
public static handleStandardInput(conn: ws.connection, stdin: stream.Readable | any) {
51+
public static handleStandardInput(conn: ws.connection,
52+
stdin: stream.Readable | any, streamNum: number = 0): boolean {
4553
stdin.on('data', (data) => {
4654
const buff = Buffer.alloc(data.length + 1);
47-
buff.writeInt8(0, 0);
55+
buff.writeInt8(streamNum, 0);
4856
if (data instanceof Buffer) {
4957
data.copy(buff, 1);
5058
} else {
@@ -56,6 +64,8 @@ export class WebSocketHandler {
5664
stdin.on('end', () => {
5765
conn.close();
5866
});
67+
// Keep the stream open
68+
return true;
5969
}
6070

6171
public 'config': KubeConfig;
@@ -64,9 +74,17 @@ export class WebSocketHandler {
6474
this.config = config;
6575
}
6676

77+
/**
78+
* Connect to a web socket endpoint.
79+
* @param path The HTTP Path to connect to on the server.
80+
* @param textHandler Callback for text over the web socket.
81+
* Returns true if the connection should be kept alive, false to disconnect.
82+
* @param binaryHandler Callback for binary data over the web socket.
83+
* Returns true if the connection should be kept alive, false to disconnect.
84+
*/
6785
public connect(path: string,
68-
textHandler: (text: string) => void,
69-
binaryHandler: (stream: number, buff: Buffer) => void): Promise<ws.connection> {
86+
textHandler: ((text: string) => boolean) | null,
87+
binaryHandler: ((stream: number, buff: Buffer) => boolean) | null): Promise<ws.connection> {
7088

7189
const server = this.config.getCurrentCluster().server;
7290
const ssl = server.startsWith('https://');
@@ -78,19 +96,23 @@ export class WebSocketHandler {
7896
// TODO: This doesn't set insecureSSL if skipTLSVerify is set...
7997
this.config.applytoHTTPSOptions(opts);
8098

81-
const client = new ws.client({ tlsOptions: opts } );
99+
const client = new ws.client({ tlsOptions: opts });
82100

83101
return new Promise((resolve, reject) => {
84102
client.on('connect', (connection) => {
85103
connection.on('message', (message: ws.IMessage) => {
86104
if (message.type === 'utf8' && message.utf8Data) {
87105
if (textHandler) {
88-
textHandler(message.utf8Data);
106+
if (!textHandler(message.utf8Data)) {
107+
connection.close();
108+
}
89109
}
90110
} else if (message.type === 'binary' && message.binaryData) {
91111
if (binaryHandler) {
92112
const streamNum = message.binaryData.readInt8(0);
93-
binaryHandler(streamNum, message.binaryData.slice(1));
113+
if (!binaryHandler(streamNum, message.binaryData.slice(1))) {
114+
connection.close();
115+
}
94116
}
95117
}
96118
});
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import { expect } from 'chai';
2+
import { ReadableStreamBuffer, WritableStreamBuffer } from 'stream-buffers';
3+
import { anyFunction, capture, instance, mock, reset, verify } from 'ts-mockito';
4+
5+
import { KubeConfig } from './config';
6+
import { PortForward } from './portforward';
7+
import { WebSocketHandler, WebSocketInterface } from './web-socket-handler';
8+
9+
describe('WebSocket', () => {
10+
it('should handle output streams', () => {
11+
const osStream = new WritableStreamBuffer();
12+
const errStream = new WritableStreamBuffer();
13+
14+
const fill1 = 1;
15+
const fill2 = 2;
16+
17+
const buff1 = Buffer.alloc(1024, fill1);
18+
const buff2 = Buffer.alloc(512, fill2);
19+
20+
WebSocketHandler.handleStandardStreams(WebSocketHandler.StdoutStream, buff1, osStream, errStream);
21+
22+
expect(osStream.size()).to.equal(1024);
23+
expect(errStream.size()).to.equal(0);
24+
25+
WebSocketHandler.handleStandardStreams(WebSocketHandler.StderrStream, buff2, osStream, errStream);
26+
27+
expect(osStream.size()).to.equal(1024);
28+
expect(errStream.size()).to.equal(512);
29+
30+
for (let i = 0; i < 1024; i++) {
31+
expect(osStream.getContents()[0]).to.equal(fill1);
32+
}
33+
34+
for (let i = 0; i < 512; i++) {
35+
expect(errStream.getContents()[0]).to.equal(fill2);
36+
}
37+
});
38+
});

0 commit comments

Comments
 (0)