Skip to content

Commit 65e4ead

Browse files
authored
Support forward function mesh and worker request to k8s (#76)
* Forward `/apis/cloud.streamnative.io/v1alpha1/namespaces/namespace/functionmeshes`
1 parent 833c899 commit 65e4ead

File tree

3 files changed

+258
-0
lines changed

3 files changed

+258
-0
lines changed

java-proxy/pom.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
<license.plugin.version>3.0</license.plugin.version>
3737
<mockito.version>1.10.19</mockito.version>
3838
<powermock.version>1.7.4</powermock.version>
39+
<jetty.version>9.4.35.v20201120</jetty.version>
3940
</properties>
4041

4142
<dependencies>
@@ -79,6 +80,16 @@
7980
<artifactId>powermock-module-junit4</artifactId>
8081
<version>${powermock.version}</version>
8182
</dependency>
83+
<dependency>
84+
<groupId>org.eclipse.jetty</groupId>
85+
<artifactId>jetty-proxy</artifactId>
86+
<version>${jetty.version}</version>
87+
</dependency>
88+
<dependency>
89+
<groupId>org.eclipse.jetty</groupId>
90+
<artifactId>jetty-client</artifactId>
91+
<version>${jetty.version}</version>
92+
</dependency>
8293
</dependencies>
8394

8495
<build>
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package io.streamnative.function.mesh.proxy;
20+
21+
import javax.net.ssl.SSLContext;
22+
import javax.servlet.ServletConfig;
23+
import javax.servlet.ServletException;
24+
import javax.servlet.http.HttpServletRequest;
25+
import java.io.File;
26+
import java.net.URI;
27+
import java.nio.charset.StandardCharsets;
28+
import java.security.cert.X509Certificate;
29+
import java.util.concurrent.Executor;
30+
import lombok.extern.slf4j.Slf4j;
31+
import org.apache.commons.io.FileUtils;
32+
import org.apache.pulsar.common.util.SecurityUtility;
33+
import org.eclipse.jetty.client.HttpClient;
34+
import org.eclipse.jetty.client.ProtocolHandlers;
35+
import org.eclipse.jetty.client.RedirectProtocolHandler;
36+
import org.eclipse.jetty.client.api.Request;
37+
import org.eclipse.jetty.proxy.ProxyServlet;
38+
import org.eclipse.jetty.util.HttpCookieStore;
39+
import org.eclipse.jetty.util.ssl.SslContextFactory;
40+
import org.eclipse.jetty.util.thread.QueuedThreadPool;
41+
42+
/**
43+
* Function mesh proxy.
44+
*/
45+
@Slf4j
46+
public class FunctionMeshProxyHandler extends ProxyServlet {
47+
48+
private static final String FUNCTION_MESH_PATH_PREFIX = "/apis/cloud.streamnative.io/v1alpha1/namespaces";
49+
50+
private static final String FUNCTION_MESH_KEY = "functionmeshes";
51+
52+
private static final String KUBERNETES_SERVICE_HOST = "KUBERNETES_SERVICE_HOST";
53+
54+
private static final String KUBERNETES_SERVICE_PORT = "443";
55+
56+
private static final String KUBERNETES_CA_CRT_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt";
57+
58+
private static final String KUBERNETES_TOKEN_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/token";
59+
60+
@Override
61+
protected HttpClient createHttpClient() throws ServletException {
62+
ServletConfig config = getServletConfig();
63+
64+
HttpClient httpClient = newHttpClient();
65+
httpClient.setFollowRedirects(true);
66+
httpClient.setCookieStore(new HttpCookieStore.Empty());
67+
68+
Executor executor;
69+
String value = config.getInitParameter("maxThreads");
70+
if (value == null || "-".equals(value)) {
71+
executor = (Executor) getServletContext().getAttribute("org.eclipse.jetty.server.Executor");
72+
if (executor == null)
73+
throw new IllegalStateException("No server executor for proxy");
74+
} else {
75+
QueuedThreadPool qtp = new QueuedThreadPool(Integer.parseInt(value));
76+
String servletName = config.getServletName();
77+
int dot = servletName.lastIndexOf('.');
78+
if (dot >= 0)
79+
servletName = servletName.substring(dot + 1);
80+
qtp.setName(servletName);
81+
executor = qtp;
82+
}
83+
84+
httpClient.setExecutor(executor);
85+
86+
value = config.getInitParameter("maxConnections");
87+
if (value == null)
88+
value = "256";
89+
httpClient.setMaxConnectionsPerDestination(Integer.parseInt(value));
90+
91+
value = config.getInitParameter("idleTimeout");
92+
if (value == null)
93+
value = "30000";
94+
httpClient.setIdleTimeout(Long.parseLong(value));
95+
96+
value = config.getInitParameter("requestBufferSize");
97+
if (value != null)
98+
httpClient.setRequestBufferSize(Integer.parseInt(value));
99+
100+
value = config.getInitParameter("responseBufferSize");
101+
if (value != null)
102+
httpClient.setResponseBufferSize(Integer.parseInt(value));
103+
104+
try {
105+
httpClient.start();
106+
107+
// Content must not be decoded, otherwise the client gets confused.
108+
httpClient.getContentDecoderFactories().clear();
109+
110+
// Pass traffic to the client, only intercept what's necessary.
111+
ProtocolHandlers protocolHandlers = httpClient.getProtocolHandlers();
112+
protocolHandlers.clear();
113+
protocolHandlers.put(new RedirectProtocolHandler(httpClient));
114+
115+
return httpClient;
116+
} catch (Exception x) {
117+
throw new ServletException(x);
118+
}
119+
120+
}
121+
122+
@Override
123+
protected HttpClient newHttpClient() {
124+
125+
try {
126+
X509Certificate[] trustCertificates = SecurityUtility
127+
.loadCertificatesFromPemFile(KUBERNETES_CA_CRT_PATH);
128+
129+
SSLContext sslCtx = SecurityUtility.createSslContext(
130+
false,
131+
trustCertificates
132+
);
133+
134+
135+
SslContextFactory contextFactory = new SslContextFactory.Client(true);
136+
contextFactory.setSslContext(sslCtx);
137+
138+
return new HttpClient(contextFactory);
139+
} catch (Exception e) {
140+
log.error("Init http client failed for proxy" + e.getMessage());
141+
}
142+
143+
// return an unauthenticated client, every request will fail.
144+
return new HttpClient();
145+
}
146+
147+
@Override
148+
protected String rewriteTarget(HttpServletRequest request) {
149+
StringBuilder url = new StringBuilder();
150+
boolean isFunctionMeshRestRequest = false;
151+
String requestUri = request.getRequestURI();
152+
if (requestUri.startsWith(FUNCTION_MESH_PATH_PREFIX)) {
153+
String [] requestUriPath = requestUri.split("/");
154+
if (requestUriPath.length >= 7 && requestUriPath[6].equals(FUNCTION_MESH_KEY)) {
155+
isFunctionMeshRestRequest = true;
156+
}
157+
}
158+
if (isFunctionMeshRestRequest) {
159+
String controllerHost = this.getEnvironment(KUBERNETES_SERVICE_HOST);
160+
url.append("https://").append(controllerHost).append(":").append(KUBERNETES_SERVICE_PORT).append(requestUri);
161+
String query = request.getQueryString();
162+
if (query != null) {
163+
url.append("?").append(query);
164+
}
165+
166+
URI rewrittenUrl = URI.create(url.toString()).normalize();
167+
168+
if (!validateDestination(rewrittenUrl.getHost(), rewrittenUrl.getPort())) {
169+
return null;
170+
}
171+
return rewrittenUrl.toString();
172+
}
173+
return null;
174+
}
175+
176+
protected String getEnvironment(String key) {
177+
return System.getenv(key);
178+
}
179+
180+
@Override
181+
protected void addProxyHeaders(HttpServletRequest clientRequest, Request proxyRequest) {
182+
super.addProxyHeaders(clientRequest, proxyRequest);
183+
try {
184+
File file = new File(KUBERNETES_TOKEN_PATH);
185+
String cloudControllerAuthToken = FileUtils.readFileToString(file, StandardCharsets.UTF_8);
186+
proxyRequest.header("Authorization", "Bearer " + cloudControllerAuthToken);
187+
} catch (java.io.IOException e) {
188+
log.error("Init cloud controller ca cert failed, message: {}", e.getMessage());
189+
}
190+
}
191+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package io.streamnative.function.mesh.proxy;
20+
21+
import org.junit.Assert;
22+
import org.junit.Test;
23+
import org.junit.runner.RunWith;
24+
import org.powermock.api.mockito.PowerMockito;
25+
import org.powermock.core.classloader.annotations.PowerMockIgnore;
26+
import org.powermock.modules.junit4.PowerMockRunner;
27+
28+
import javax.servlet.http.HttpServletRequest;
29+
30+
import static org.powermock.api.mockito.PowerMockito.spy;
31+
32+
@RunWith(PowerMockRunner.class)
33+
@PowerMockIgnore({"javax.management.*"})
34+
public class FunctionMeshProxyHandlerTest {
35+
36+
private FunctionMeshProxyHandler functionMeshProxyHandler = spy(new FunctionMeshProxyHandler());
37+
38+
@Test
39+
public void rewriteTargetTest() {
40+
41+
String path = "/api/pods";
42+
HttpServletRequest httpServletRequest = PowerMockito.mock(HttpServletRequest.class);
43+
PowerMockito.when(httpServletRequest.getRequestURI()).thenReturn(path);
44+
Assert.assertNull(functionMeshProxyHandler.rewriteTarget(httpServletRequest));
45+
46+
PowerMockito.when(functionMeshProxyHandler
47+
.getEnvironment("KUBERNETES_SERVICE_HOST")).thenReturn("localhost");
48+
path = "/apis/cloud.streamnative.io/v1alpha1/namespaces/default/functionmeshes";
49+
PowerMockito.when(httpServletRequest.getRequestURI()).thenReturn(path);
50+
PowerMockito.when(httpServletRequest.getQueryString()).thenReturn("limit=500");
51+
String rewriteTarget = functionMeshProxyHandler.rewriteTarget(httpServletRequest);
52+
String expectedValue = "https://localhost:443" +
53+
"/apis/cloud.streamnative.io/v1alpha1/namespaces/default/functionmeshes?limit=500";
54+
Assert.assertEquals(rewriteTarget, expectedValue);
55+
}
56+
}

0 commit comments

Comments
 (0)