Skip to content

Commit 5b25973

Browse files
authored
Merge pull request #204 from proAlexandr/issue-187
Fix #187 (forward.portForward - Uncaught Error: not opened)
2 parents 2338259 + a2def0a commit 5b25973

File tree

2 files changed

+84
-24
lines changed

2 files changed

+84
-24
lines changed

src/portforward.ts

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ export class PortForward {
2828
output: stream.Writable,
2929
err: stream.Writable | null,
3030
input: stream.Readable,
31-
): Promise<WebSocket> {
31+
retryCount: number = 0,
32+
): Promise<WebSocket | (() => WebSocket | null)> {
3233
if (targetPorts.length === 0) {
3334
throw new Error('You must provide at least one port to forward to.');
3435
}
@@ -45,29 +46,37 @@ export class PortForward {
4546
needsToReadPortNumber[index * 2 + 1] = true;
4647
});
4748
const path = `/api/v1/namespaces/${namespace}/pods/${podName}/portforward?${queryStr}`;
48-
const conn = await this.handler.connect(
49-
path,
50-
null,
51-
(streamNum: number, buff: Buffer | string): boolean => {
52-
if (streamNum >= targetPorts.length * 2) {
53-
return !this.disconnectOnErr;
54-
}
55-
// First two bytes of each stream are the port number
56-
if (needsToReadPortNumber[streamNum]) {
57-
buff = buff.slice(2);
58-
needsToReadPortNumber[streamNum] = false;
59-
}
60-
if (streamNum % 2 === 1) {
61-
if (err) {
62-
err.write(buff);
49+
const createWebSocket = (): Promise<WebSocket> => {
50+
return this.handler.connect(
51+
path,
52+
null,
53+
(streamNum: number, buff: Buffer | string): boolean => {
54+
if (streamNum >= targetPorts.length * 2) {
55+
return !this.disconnectOnErr;
6356
}
64-
} else {
65-
output.write(buff);
66-
}
67-
return true;
68-
},
69-
);
70-
WebSocketHandler.handleStandardInput(conn, input, 0);
71-
return conn;
57+
// First two bytes of each stream are the port number
58+
if (needsToReadPortNumber[streamNum]) {
59+
buff = buff.slice(2);
60+
needsToReadPortNumber[streamNum] = false;
61+
}
62+
if (streamNum % 2 === 1) {
63+
if (err) {
64+
err.write(buff);
65+
}
66+
} else {
67+
output.write(buff);
68+
}
69+
return true;
70+
},
71+
);
72+
};
73+
74+
if (retryCount < 1) {
75+
const ws = await createWebSocket();
76+
WebSocketHandler.handleStandardInput(ws, input, 0);
77+
return ws;
78+
}
79+
80+
return WebSocketHandler.restartableHandleStandardInput(createWebSocket, input, 0, retryCount);
7281
}
7382
}

src/web-socket-handler.ts

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,57 @@ export class WebSocketHandler implements WebSocketInterface {
7171
return true;
7272
}
7373

74+
public static restartableHandleStandardInput(
75+
createWS: () => Promise<WebSocket>,
76+
stdin: stream.Readable | any,
77+
streamNum: number = 0,
78+
retryCount: number = 3,
79+
): () => WebSocket | null {
80+
if (retryCount < 0) {
81+
throw new Error("retryCount can't be lower than 0.");
82+
}
83+
84+
let queue: Promise<void> = Promise.resolve();
85+
let ws: WebSocket | null;
86+
87+
async function processData(data): Promise<void> {
88+
const buff = Buffer.alloc(data.length + 1);
89+
90+
buff.writeInt8(streamNum, 0);
91+
if (data instanceof Buffer) {
92+
data.copy(buff, 1);
93+
} else {
94+
buff.write(data, 1);
95+
}
96+
97+
let i = 0;
98+
for (; i < retryCount; ++i) {
99+
if (ws !== null && ws.readyState === WebSocket.OPEN) {
100+
ws.send(buff);
101+
break;
102+
} else {
103+
ws = await createWS();
104+
}
105+
}
106+
107+
if (i >= retryCount) {
108+
throw new Error("can't send data to ws");
109+
}
110+
}
111+
112+
stdin.on('data', (data) => {
113+
queue = queue.then(() => processData(data));
114+
});
115+
116+
stdin.on('end', () => {
117+
if (ws) {
118+
ws.close();
119+
}
120+
});
121+
122+
return () => ws;
123+
}
124+
74125
// factory is really just for test injection
75126
public constructor(
76127
readonly config: KubeConfig,

0 commit comments

Comments
 (0)