Skip to content

Commit dc681e8

Browse files
authored
Merge pull request #1144 from brendandburns/portforward
Add port forward to kubectl
2 parents acc62f3 + 3aac88e commit dc681e8

File tree

8 files changed

+175
-14
lines changed

8 files changed

+175
-14
lines changed

examples/src/main/java/io/kubernetes/client/examples/KubectlExample.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@
1515
import static io.kubernetes.client.extended.kubectl.Kubectl.exec;
1616
import static io.kubernetes.client.extended.kubectl.Kubectl.label;
1717
import static io.kubernetes.client.extended.kubectl.Kubectl.log;
18+
import static io.kubernetes.client.extended.kubectl.Kubectl.portforward;
1819
import static io.kubernetes.client.extended.kubectl.Kubectl.scale;
1920
import static io.kubernetes.client.extended.kubectl.Kubectl.version;
2021

2122
import com.google.common.io.ByteStreams;
2223
import io.kubernetes.client.common.KubernetesObject;
2324
import io.kubernetes.client.extended.kubectl.KubectlExec;
25+
import io.kubernetes.client.extended.kubectl.KubectlPortForward;
2426
import io.kubernetes.client.extended.kubectl.exception.KubectlException;
2527
import io.kubernetes.client.openapi.ApiClient;
2628
import io.kubernetes.client.openapi.models.V1Deployment;
@@ -84,10 +86,21 @@ public static void main(String[] args)
8486
String name = null;
8587

8688
switch (verb) {
89+
case "portforward":
90+
name = args[1];
91+
KubectlPortForward forward = portforward(client).name(name).namespace(ns);
92+
for (int i = 2; i < args.length; i++) {
93+
String port = args[i];
94+
String[] ports = port.split(":");
95+
System.out.println("Forwarding " + ns + "/" + name + " " + ports[0] + "->" + ports[1]);
96+
forward.ports(Integer.parseInt(ports[0]), Integer.parseInt(ports[1]));
97+
}
98+
forward.execute();
99+
System.exit(0);
87100
case "log":
88101
name = args[1];
89102
ByteStreams.copy(
90-
log().name(name).namespace(ns).container(cli.getOptionValue("c", "")).execute(),
103+
log(client).name(name).namespace(ns).container(cli.getOptionValue("c", "")).execute(),
91104
System.out);
92105
System.exit(0);
93106
case "scale":

examples/src/main/java/io/kubernetes/client/examples/PortForwardExample.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,13 @@ public static void main(String[] args) throws IOException, ApiException, Interru
4141

4242
PortForward forward = new PortForward();
4343
List<Integer> ports = new ArrayList<>();
44-
ports.add(8080);
45-
ports.add(80);
44+
int localPort = 8080;
45+
int targetPort = 8080;
46+
ports.add(targetPort);
4647
final PortForward.PortForwardResult result =
47-
forward.forward("default", "nginx-d5dc44cf7-x7475", ports);
48-
49-
ServerSocket ss = new ServerSocket(8080);
48+
forward.forward("default", "camera-viz-7949dbf7c6-lpxkd", ports);
49+
System.out.println("Forwarding!");
50+
ServerSocket ss = new ServerSocket(localPort);
5051

5152
final Socket s = ss.accept();
5253
System.out.println("Connected!");
@@ -55,7 +56,7 @@ public static void main(String[] args) throws IOException, ApiException, Interru
5556
new Runnable() {
5657
public void run() {
5758
try {
58-
ByteStreams.copy(result.getInputStream(80), s.getOutputStream());
59+
ByteStreams.copy(result.getInputStream(targetPort), s.getOutputStream());
5960
} catch (IOException ex) {
6061
ex.printStackTrace();
6162
} catch (Exception ex) {
@@ -69,7 +70,7 @@ public void run() {
6970
new Runnable() {
7071
public void run() {
7172
try {
72-
ByteStreams.copy(s.getInputStream(), result.getOutboundStream(80));
73+
ByteStreams.copy(s.getInputStream(), result.getOutboundStream(targetPort));
7374
} catch (IOException ex) {
7475
ex.printStackTrace();
7576
} catch (Exception ex) {

extended/src/main/java/io/kubernetes/client/extended/kubectl/Kubectl.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,14 @@ public static KubectlLog log() {
155155
return log(Configuration.getDefaultApiClient());
156156
}
157157

158+
public static KubectlPortForward portforward() {
159+
return portforward(Configuration.getDefaultApiClient());
160+
}
161+
162+
public static KubectlPortForward portforward(ApiClient apiClient) {
163+
return new KubectlPortForward(apiClient);
164+
}
165+
158166
/**
159167
* Executable executes a kubectl helper.
160168
*

extended/src/main/java/io/kubernetes/client/extended/kubectl/KubectlExec.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,9 @@ public Integer execute() throws KubectlException {
6666
}
6767
}
6868

69-
private static void copyAsync(InputStream in, OutputStream out) {
70-
new Thread(
69+
protected static Thread copyAsync(InputStream in, OutputStream out) {
70+
Thread t =
71+
new Thread(
7172
new Runnable() {
7273
public void run() {
7374
try {
@@ -76,7 +77,8 @@ public void run() {
7677
ex.printStackTrace();
7778
}
7879
}
79-
})
80-
.start();
80+
});
81+
t.start();
82+
return t;
8183
}
8284
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
package io.kubernetes.client.extended.kubectl;
14+
15+
import static io.kubernetes.client.extended.kubectl.KubectlExec.copyAsync;
16+
17+
import io.kubernetes.client.PortForward;
18+
import io.kubernetes.client.extended.kubectl.exception.KubectlException;
19+
import io.kubernetes.client.openapi.ApiClient;
20+
import io.kubernetes.client.openapi.ApiException;
21+
import io.kubernetes.client.openapi.models.V1Pod;
22+
import java.io.IOException;
23+
import java.io.InputStream;
24+
import java.io.OutputStream;
25+
import java.net.ServerSocket;
26+
import java.net.Socket;
27+
import java.util.ArrayList;
28+
import java.util.List;
29+
30+
public class KubectlPortForward
31+
extends Kubectl.ResourceAndContainerBuilder<V1Pod, KubectlPortForward>
32+
implements Kubectl.Executable<Boolean> {
33+
List<Integer> localPorts;
34+
List<Integer> targetPorts;
35+
boolean running;
36+
37+
KubectlPortForward(ApiClient client) {
38+
super(client, V1Pod.class);
39+
40+
localPorts = new ArrayList<>();
41+
targetPorts = new ArrayList<>();
42+
}
43+
44+
/**
45+
* Add a port pair
46+
*
47+
* @param localPort The port to listen to on the local machine
48+
* @param targetPort The port to connect to on the target Pod
49+
*/
50+
public KubectlPortForward ports(int localPort, int targetPort) {
51+
localPorts.add(localPort);
52+
targetPorts.add(targetPort);
53+
return this;
54+
}
55+
56+
@Override
57+
public Boolean execute() throws KubectlException {
58+
running = true;
59+
try {
60+
executeInternal();
61+
return true;
62+
} catch (ApiException | IOException | InterruptedException ex) {
63+
throw new KubectlException(ex);
64+
}
65+
}
66+
67+
public void shutdown() {
68+
running = false;
69+
}
70+
71+
private void executeInternal()
72+
throws ApiException, KubectlException, IOException, InterruptedException {
73+
PortForward pf = new PortForward(apiClient);
74+
PortForward.PortForwardResult result = pf.forward(namespace, name, targetPorts);
75+
if (result == null) {
76+
throw new KubectlException("PortForward failed!");
77+
}
78+
// TODO: Convert this to NIO to reduce the number of threads?
79+
List<Thread> threads = new ArrayList<>();
80+
for (int i = 0; i < localPorts.size(); i++) {
81+
int targetPort = targetPorts.get(i);
82+
threads.add(
83+
portForward(
84+
new ServerSocket(localPorts.get(i)),
85+
result.getInputStream(targetPort),
86+
result.getOutboundStream(targetPort)));
87+
}
88+
for (Thread t : threads) {
89+
t.join();
90+
}
91+
}
92+
93+
private Thread portForward(ServerSocket server, InputStream in, OutputStream out) {
94+
Thread t =
95+
new Thread(
96+
new Runnable() {
97+
@Override
98+
public void run() {
99+
while (running) {
100+
try {
101+
Socket sock = server.accept();
102+
Thread t1 = copyAsync(sock.getInputStream(), out);
103+
Thread t2 = copyAsync(in, sock.getOutputStream());
104+
105+
t1.join();
106+
t2.join();
107+
} catch (InterruptedException | IOException ex) {
108+
ex.printStackTrace();
109+
}
110+
}
111+
}
112+
});
113+
t.start();
114+
return t;
115+
}
116+
}

util/src/main/java/io/kubernetes/client/PortForward.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,15 @@ public PortForwardResult forward(String namespace, String name, List<Integer> po
114114
queryParams.add(new Pair("ports", port.toString()));
115115
}
116116
WebSockets.stream(path, "GET", queryParams, apiClient, handler);
117-
117+
try {
118+
handler.waitForInitialized();
119+
} catch (InterruptedException ex) {
120+
throw new ApiException(ex);
121+
}
122+
Throwable err = handler.getError();
123+
if (err != null) {
124+
throw new ApiException(err);
125+
}
118126
// Wait for streams to start.
119127
result.init();
120128

util/src/main/java/io/kubernetes/client/util/WebSocketStreamHandler.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,13 @@ private static enum State {
5555
CLOSED
5656
};
5757

58+
public synchronized void waitForInitialized() throws InterruptedException {
59+
if (state != State.UNINITIALIZED) {
60+
return;
61+
}
62+
this.wait();
63+
}
64+
5865
@Override
5966
public synchronized void open(String protocol, WebSocket socket) {
6067
if (state != State.UNINITIALIZED) throw new IllegalStateException();
@@ -137,6 +144,7 @@ public synchronized void close() {
137144
}
138145
}
139146
}
147+
notifyAll();
140148
}
141149

142150
/**

util/src/test/java/io/kubernetes/client/PortForwardTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static io.kubernetes.client.ExecTest.makeStream;
2323
import static org.junit.Assert.assertEquals;
2424
import static org.junit.Assert.assertNotNull;
25+
import static org.junit.Assert.assertThrows;
2526

2627
import com.github.tomakehurst.wiremock.junit.WireMockRule;
2728
import io.kubernetes.client.PortForward.PortForwardResult;
@@ -77,7 +78,11 @@ public void testUrl() throws IOException, ApiException, InterruptedException {
7778
int portNumber = 8080;
7879
List<Integer> ports = new ArrayList<>();
7980
ports.add(portNumber);
80-
forward.forward(pod, ports);
81+
assertThrows(
82+
ApiException.class,
83+
() -> {
84+
forward.forward(pod, ports);
85+
});
8186

8287
// TODO: Kill this sleep, the trouble is that the test tries to validate before the
8388
// connection

0 commit comments

Comments
 (0)