Skip to content

Commit 5fadb77

Browse files
author
Dave Syer
committed
Fix resource lifecycle issues in KubectlPortForward
Without this change only one instance of PortForward is ever created and so when its resources are closed they cannot be re-used. It is better to create a new instance for each local socket, and then second and subsequent client connections will succeed.
1 parent f196696 commit 5fadb77

File tree

1 file changed

+28
-28
lines changed

1 file changed

+28
-28
lines changed

extended/src/main/java/io/kubernetes/client/extended/kubectl/KubectlPortForward.java

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.net.ServerSocket;
2525
import java.net.Socket;
2626
import java.util.ArrayList;
27+
import java.util.Arrays;
2728
import java.util.List;
2829
import java.util.Optional;
2930
import java.util.function.Consumer;
@@ -78,47 +79,46 @@ public void shutdown() {
7879
private void executeInternal()
7980
throws ApiException, KubectlException, IOException, InterruptedException {
8081
PortForward pf = new PortForward(apiClient);
81-
PortForward.PortForwardResult result = pf.forward(namespace, name, targetPorts);
82-
if (result == null) {
83-
throw new KubectlException("PortForward failed!");
84-
}
8582
// TODO: Convert this to NIO to reduce the number of threads?
8683
List<Thread> threads = new ArrayList<>();
8784
for (int i = 0; i < localPorts.size(); i++) {
8885
int targetPort = targetPorts.get(i);
8986
threads.add(
90-
portForward(
91-
new ServerSocket(localPorts.get(i)),
92-
result.getInputStream(targetPort),
93-
result.getOutboundStream(targetPort)));
87+
portForward(pf,
88+
new ServerSocket(localPorts.get(i)), targetPort));
9489
}
9590
for (Thread t : threads) {
9691
t.join();
9792
}
9893
}
9994

100-
private Thread portForward(ServerSocket server, InputStream in, OutputStream out) {
101-
Thread t =
102-
new Thread(
103-
new Runnable() {
104-
@Override
105-
public void run() {
106-
while (running) {
107-
try {
108-
Socket sock = server.accept();
109-
Thread t1 = copyAsync(sock.getInputStream(), out, onUnhandledError);
110-
Thread t2 = copyAsync(in, sock.getOutputStream(), onUnhandledError);
111-
112-
t1.join();
113-
t2.join();
114-
} catch (InterruptedException | IOException ex) {
115-
Optional.ofNullable(onUnhandledError)
116-
.orElse(Throwable::printStackTrace)
117-
.accept(ex);
118-
}
95+
private Thread portForward(PortForward pf, ServerSocket server, int targetPort) {
96+
Thread t = new Thread(
97+
new Runnable() {
98+
@Override
99+
public void run() {
100+
while (running) {
101+
try (Socket sock = server.accept()) {
102+
PortForward.PortForwardResult result = pf.forward(namespace, name, Arrays.asList(targetPort));
103+
if (result == null) {
104+
throw new KubectlException("PortForward failed!");
119105
}
106+
InputStream in = result.getInputStream(targetPort);
107+
OutputStream out = result.getOutboundStream(targetPort);
108+
Thread t1 = copyAsync(sock.getInputStream(), out, onUnhandledError);
109+
Thread t2 = copyAsync(in, sock.getOutputStream(), onUnhandledError);
110+
111+
t1.join();
112+
in.close();
113+
t2.join();
114+
} catch (Exception ex) {
115+
Optional.ofNullable(onUnhandledError)
116+
.orElse(Throwable::printStackTrace)
117+
.accept(ex);
120118
}
121-
});
119+
}
120+
}
121+
});
122122
t.start();
123123
return t;
124124
}

0 commit comments

Comments
 (0)