Skip to content

Commit e70f70b

Browse files
feat: add basic proxy
stack-info: PR: #9689, branch: igorbernstein2/stack/2
1 parent 0000bdf commit e70f70b

File tree

13 files changed

+1235
-1
lines changed

13 files changed

+1235
-1
lines changed

bigtable/bigtable-proxy/pom.xml

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,47 @@
4444
</dependencyManagement>
4545

4646
<dependencies>
47+
<!-- gRPC -->
48+
<dependency>
49+
<groupId>io.grpc</groupId>
50+
<artifactId>grpc-api</artifactId>
51+
</dependency>
52+
<dependency>
53+
<groupId>io.grpc</groupId>
54+
<artifactId>grpc-core</artifactId>
55+
</dependency>
56+
<dependency>
57+
<groupId>io.grpc</groupId>
58+
<artifactId>grpc-netty-shaded</artifactId>
59+
</dependency>
60+
61+
62+
<!-- service defs -->
63+
<dependency>
64+
<groupId>com.google.api.grpc</groupId>
65+
<artifactId>grpc-google-cloud-bigtable-v2</artifactId>
66+
</dependency>
67+
<dependency>
68+
<groupId>com.google.api.grpc</groupId>
69+
<artifactId>proto-google-cloud-bigtable-v2</artifactId>
70+
</dependency>
71+
<dependency>
72+
<groupId>com.google.api.grpc</groupId>
73+
<artifactId>grpc-google-cloud-bigtable-admin-v2</artifactId>
74+
</dependency>
75+
<dependency>
76+
<groupId>com.google.api.grpc</groupId>
77+
<artifactId>proto-google-cloud-bigtable-admin-v2</artifactId>
78+
</dependency>
79+
<dependency>
80+
<groupId>com.google.api.grpc</groupId>
81+
<artifactId>grpc-google-common-protos</artifactId>
82+
</dependency>
83+
<dependency>
84+
<groupId>com.google.api.grpc</groupId>
85+
<artifactId>proto-google-common-protos</artifactId>
86+
</dependency>
87+
4788
<!-- Logging -->
4889
<dependency>
4990
<groupId>org.slf4j</groupId>
@@ -80,6 +121,11 @@
80121
</dependency>
81122

82123
<!-- Test -->
124+
<dependency>
125+
<groupId>io.grpc</groupId>
126+
<artifactId>grpc-testing</artifactId>
127+
<scope>test</scope>
128+
</dependency>
83129
<dependency>
84130
<groupId>junit</groupId>
85131
<artifactId>junit</artifactId>

bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/Main.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.google.cloud.bigtable.examples.proxy;
1818

19+
import com.google.cloud.bigtable.examples.proxy.commands.Serve;
1920
import org.slf4j.Logger;
2021
import org.slf4j.LoggerFactory;
2122
import org.slf4j.bridge.SLF4JBridgeHandler;
@@ -26,7 +27,7 @@
2627
* Main entry point for proxy commands under {@link
2728
* com.google.cloud.bigtable.examples.proxy.commands}.
2829
*/
29-
@Command(subcommands = {})
30+
@Command(subcommands = {Serve.class})
3031
public final class Main {
3132
private static final Logger LOGGER = LoggerFactory.getLogger(Main.class);
3233

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.bigtable.examples.proxy.commands;
18+
19+
import com.google.auto.value.AutoValue;
20+
import com.google.common.base.Preconditions;
21+
import picocli.CommandLine.ITypeConverter;
22+
23+
@AutoValue
24+
abstract class Endpoint {
25+
abstract String getName();
26+
27+
abstract int getPort();
28+
29+
@Override
30+
public String toString() {
31+
return String.format("%s:%d", getName(), getPort());
32+
}
33+
34+
static Endpoint create(String name, int port) {
35+
return new AutoValue_Endpoint(name, port);
36+
}
37+
38+
static class ArgConverter implements ITypeConverter<Endpoint> {
39+
@Override
40+
public Endpoint convert(String s) throws Exception {
41+
int i = s.lastIndexOf(":");
42+
Preconditions.checkArgument(i > 0, "endpoint must of the form `name:port`");
43+
44+
String name = s.substring(0, i);
45+
int port = Integer.parseInt(s.substring(i + 1));
46+
return Endpoint.create(name, port);
47+
}
48+
}
49+
}
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.bigtable.examples.proxy.commands;
18+
19+
import com.google.bigtable.admin.v2.BigtableInstanceAdminGrpc;
20+
import com.google.bigtable.admin.v2.BigtableTableAdminGrpc;
21+
import com.google.bigtable.v2.BigtableGrpc;
22+
import com.google.cloud.bigtable.examples.proxy.core.ProxyHandler;
23+
import com.google.cloud.bigtable.examples.proxy.core.Registry;
24+
import com.google.common.collect.ImmutableMap;
25+
import com.google.longrunning.OperationsGrpc;
26+
import io.grpc.InsecureServerCredentials;
27+
import io.grpc.ManagedChannel;
28+
import io.grpc.ManagedChannelBuilder;
29+
import io.grpc.Server;
30+
import io.grpc.ServerCallHandler;
31+
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
32+
import java.io.IOException;
33+
import java.net.InetSocketAddress;
34+
import java.util.Map;
35+
import java.util.concurrent.Callable;
36+
import java.util.concurrent.TimeUnit;
37+
import org.slf4j.Logger;
38+
import org.slf4j.LoggerFactory;
39+
import picocli.CommandLine.Command;
40+
import picocli.CommandLine.Help.Visibility;
41+
import picocli.CommandLine.Option;
42+
43+
@Command(name = "serve", mixinStandardHelpOptions = true, description = "Start the proxy server")
44+
public class Serve implements Callable<Void> {
45+
private static final Logger LOGGER = LoggerFactory.getLogger(Serve.class);
46+
47+
@Option(
48+
names = "--listen-port",
49+
required = true,
50+
description = "Local port to accept connections on")
51+
int listenPort;
52+
53+
@Option(names = "--useragent", showDefaultValue = Visibility.ALWAYS)
54+
String userAgent = "bigtable-java-proxy";
55+
56+
@Option(
57+
names = "--bigtable-data-endpoint",
58+
converter = Endpoint.ArgConverter.class,
59+
showDefaultValue = Visibility.ALWAYS)
60+
Endpoint dataEndpoint = Endpoint.create("bigtable.googleapis.com", 443);
61+
62+
@Option(
63+
names = "--bigtable-admin-endpoint",
64+
converter = Endpoint.ArgConverter.class,
65+
showDefaultValue = Visibility.ALWAYS)
66+
Endpoint adminEndpoint = Endpoint.create("bigtableadmin.googleapis.com", 443);
67+
68+
ManagedChannel adminChannel = null;
69+
ManagedChannel dataChannel = null;
70+
Server server;
71+
72+
@Override
73+
public Void call() throws Exception {
74+
start();
75+
server.awaitTermination();
76+
cleanup();
77+
return null;
78+
}
79+
80+
void start() throws IOException {
81+
if (dataChannel == null) {
82+
dataChannel =
83+
ManagedChannelBuilder.forAddress(dataEndpoint.getName(), dataEndpoint.getPort())
84+
.userAgent(userAgent)
85+
.maxInboundMessageSize(256 * 1024 * 1024)
86+
.disableRetry()
87+
.keepAliveTime(30, TimeUnit.SECONDS)
88+
.keepAliveTimeout(10, TimeUnit.SECONDS)
89+
.build();
90+
}
91+
if (adminChannel == null) {
92+
adminChannel =
93+
ManagedChannelBuilder.forAddress(adminEndpoint.getName(), adminEndpoint.getPort())
94+
.userAgent(userAgent)
95+
.disableRetry()
96+
.build();
97+
}
98+
99+
Map<String, ServerCallHandler<byte[], byte[]>> serviceMap =
100+
ImmutableMap.of(
101+
BigtableGrpc.SERVICE_NAME,
102+
new ProxyHandler<>(dataChannel),
103+
BigtableInstanceAdminGrpc.SERVICE_NAME,
104+
new ProxyHandler<>(adminChannel),
105+
BigtableTableAdminGrpc.SERVICE_NAME,
106+
new ProxyHandler<>(adminChannel),
107+
OperationsGrpc.SERVICE_NAME,
108+
new ProxyHandler<>(adminChannel));
109+
110+
server =
111+
NettyServerBuilder.forAddress(
112+
new InetSocketAddress("localhost", listenPort), InsecureServerCredentials.create())
113+
.fallbackHandlerRegistry(new Registry(serviceMap))
114+
.maxInboundMessageSize(256 * 1024 * 1024)
115+
.build();
116+
117+
server.start();
118+
LOGGER.info("Listening on port {}", server.getPort());
119+
}
120+
121+
void cleanup() throws InterruptedException {
122+
dataChannel.shutdown();
123+
adminChannel.shutdown();
124+
}
125+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.bigtable.examples.proxy.core;
18+
19+
import com.google.common.io.ByteStreams;
20+
import io.grpc.MethodDescriptor;
21+
import java.io.ByteArrayInputStream;
22+
import java.io.IOException;
23+
import java.io.InputStream;
24+
25+
class ByteMarshaller implements MethodDescriptor.Marshaller<byte[]> {
26+
27+
@Override
28+
public byte[] parse(InputStream stream) {
29+
try {
30+
return ByteStreams.toByteArray(stream);
31+
} catch (IOException ex) {
32+
throw new RuntimeException(ex);
33+
}
34+
}
35+
36+
@Override
37+
public InputStream stream(byte[] value) {
38+
return new ByteArrayInputStream(value);
39+
}
40+
}

0 commit comments

Comments
 (0)