Skip to content

Commit fd5f4aa

Browse files
authored
Enable use of InProcessTransport outside of InProcessServer
This allows an InProcessTransport instance to be created via a new internal accessor class InternalInProcess. We effectively just expose a method to create an InProcessTransport with a existing ServerListener instance. This will be used for in-process channels to an under-development on-device server.
1 parent afc1f2e commit fd5f4aa

File tree

3 files changed

+260
-10
lines changed

3 files changed

+260
-10
lines changed

core/src/main/java/io/grpc/inprocess/InProcessTransport.java

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static java.lang.Math.max;
2222

2323
import com.google.common.base.MoreObjects;
24+
import com.google.common.base.Optional;
2425
import com.google.common.util.concurrent.ListenableFuture;
2526
import com.google.common.util.concurrent.SettableFuture;
2627
import io.grpc.Attributes;
@@ -48,6 +49,7 @@
4849
import io.grpc.internal.ManagedClientTransport;
4950
import io.grpc.internal.NoopClientStream;
5051
import io.grpc.internal.ObjectPool;
52+
import io.grpc.internal.ServerListener;
5153
import io.grpc.internal.ServerStream;
5254
import io.grpc.internal.ServerStreamListener;
5355
import io.grpc.internal.ServerTransport;
@@ -79,6 +81,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
7981
private final int clientMaxInboundMetadataSize;
8082
private final String authority;
8183
private final String userAgent;
84+
private final Optional<ServerListener> optionalServerListener;
8285
private int serverMaxInboundMetadataSize;
8386
private ObjectPool<ScheduledExecutorService> serverSchedulerPool;
8487
private ScheduledExecutorService serverScheduler;
@@ -111,9 +114,8 @@ protected void handleNotInUse() {
111114
}
112115
};
113116

114-
public InProcessTransport(
115-
String name, int maxInboundMetadataSize, String authority, String userAgent,
116-
Attributes eagAttrs) {
117+
private InProcessTransport(String name, int maxInboundMetadataSize, String authority,
118+
String userAgent, Attributes eagAttrs, Optional<ServerListener> optionalServerListener) {
117119
this.name = name;
118120
this.clientMaxInboundMetadataSize = maxInboundMetadataSize;
119121
this.authority = authority;
@@ -125,21 +127,45 @@ public InProcessTransport(
125127
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, new InProcessSocketAddress(name))
126128
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, new InProcessSocketAddress(name))
127129
.build();
130+
this.optionalServerListener = optionalServerListener;
128131
logId = InternalLogId.allocate(getClass(), name);
129132
}
130133

134+
public InProcessTransport(
135+
String name, int maxInboundMetadataSize, String authority, String userAgent,
136+
Attributes eagAttrs) {
137+
this(name, maxInboundMetadataSize, authority, userAgent, eagAttrs,
138+
Optional.<ServerListener>absent());
139+
}
140+
141+
InProcessTransport(
142+
String name, int maxInboundMetadataSize, String authority, String userAgent,
143+
Attributes eagAttrs, ObjectPool<ScheduledExecutorService> serverSchedulerPool,
144+
List<ServerStreamTracer.Factory> serverStreamTracerFactories,
145+
ServerListener serverListener) {
146+
this(name, maxInboundMetadataSize, authority, userAgent, eagAttrs, Optional.of(serverListener));
147+
this.serverMaxInboundMetadataSize = maxInboundMetadataSize;
148+
this.serverSchedulerPool = serverSchedulerPool;
149+
this.serverStreamTracerFactories = serverStreamTracerFactories;
150+
}
151+
131152
@CheckReturnValue
132153
@Override
133154
public synchronized Runnable start(ManagedClientTransport.Listener listener) {
134155
this.clientTransportListener = listener;
135-
InProcessServer server = InProcessServer.findServer(name);
136-
if (server != null) {
137-
serverMaxInboundMetadataSize = server.getMaxInboundMetadataSize();
138-
serverSchedulerPool = server.getScheduledExecutorServicePool();
156+
if (optionalServerListener.isPresent()) {
139157
serverScheduler = serverSchedulerPool.getObject();
140-
serverStreamTracerFactories = server.getStreamTracerFactories();
141-
// Must be semi-initialized; past this point, can begin receiving requests
142-
serverTransportListener = server.register(this);
158+
serverTransportListener = optionalServerListener.get().transportCreated(this);
159+
} else {
160+
InProcessServer server = InProcessServer.findServer(name);
161+
if (server != null) {
162+
serverMaxInboundMetadataSize = server.getMaxInboundMetadataSize();
163+
serverSchedulerPool = server.getScheduledExecutorServicePool();
164+
serverScheduler = serverSchedulerPool.getObject();
165+
serverStreamTracerFactories = server.getStreamTracerFactories();
166+
// Must be semi-initialized; past this point, can begin receiving requests
167+
serverTransportListener = server.register(this);
168+
}
143169
}
144170
if (serverTransportListener == null) {
145171
shutdownStatus = Status.UNAVAILABLE.withDescription("Could not find server: " + name);
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright 2020 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc.inprocess;
18+
19+
import io.grpc.Attributes;
20+
import io.grpc.Internal;
21+
import io.grpc.ServerStreamTracer;
22+
import io.grpc.internal.ConnectionClientTransport;
23+
import io.grpc.internal.ObjectPool;
24+
import io.grpc.internal.ServerListener;
25+
import java.util.List;
26+
import java.util.concurrent.ScheduledExecutorService;
27+
28+
/**
29+
* Internal {@link InProcessTransport} accessor.
30+
*
31+
* <p>This is intended for use by io.grpc.internal, and the specifically
32+
* supported transport packages.
33+
*/
34+
@Internal
35+
public final class InternalInProcess {
36+
37+
private InternalInProcess() {}
38+
39+
/**
40+
* Creates a new InProcessTransport.
41+
*
42+
* <p>When started, the transport will be registered with the given
43+
* {@link ServerListener}.
44+
*/
45+
@Internal
46+
public static ConnectionClientTransport createInProcessTransport(
47+
String name,
48+
int maxInboundMetadataSize,
49+
String authority,
50+
String userAgent,
51+
Attributes eagAttrs,
52+
ObjectPool<ScheduledExecutorService> serverSchedulerPool,
53+
List<ServerStreamTracer.Factory> serverStreamTracerFactories,
54+
ServerListener serverListener) {
55+
return new InProcessTransport(
56+
name,
57+
maxInboundMetadataSize,
58+
authority,
59+
userAgent,
60+
eagAttrs,
61+
serverSchedulerPool,
62+
serverStreamTracerFactories,
63+
serverListener);
64+
}
65+
}
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
/*
2+
* Copyright 2020 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc.inprocess;
18+
19+
import com.google.common.collect.ImmutableList;
20+
import io.grpc.InternalChannelz.SocketStats;
21+
import io.grpc.InternalInstrumented;
22+
import io.grpc.ServerStreamTracer;
23+
import io.grpc.internal.AbstractTransportTest;
24+
import io.grpc.internal.GrpcUtil;
25+
import io.grpc.internal.InternalServer;
26+
import io.grpc.internal.ManagedClientTransport;
27+
import io.grpc.internal.ObjectPool;
28+
import io.grpc.internal.ServerListener;
29+
import io.grpc.internal.ServerTransport;
30+
import io.grpc.internal.ServerTransportListener;
31+
import io.grpc.internal.SharedResourcePool;
32+
import java.io.IOException;
33+
import java.net.SocketAddress;
34+
import java.util.List;
35+
import java.util.concurrent.ScheduledExecutorService;
36+
import javax.annotation.Nullable;
37+
import org.junit.Ignore;
38+
import org.junit.Test;
39+
import org.junit.runner.RunWith;
40+
import org.junit.runners.JUnit4;
41+
42+
/** Unit tests for {@link InProcessTransport} when used with a separate {@link InternalServer}. */
43+
@RunWith(JUnit4.class)
44+
public final class StandaloneInProcessTransportTest extends AbstractTransportTest {
45+
private static final String TRANSPORT_NAME = "perfect-for-testing";
46+
private static final String AUTHORITY = "a-testing-authority";
47+
private static final String USER_AGENT = "a-testing-user-agent";
48+
49+
private final ObjectPool<ScheduledExecutorService> schedulerPool =
50+
SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE);
51+
52+
private TestServer currentServer;
53+
54+
@Override
55+
protected List<? extends InternalServer> newServer(
56+
List<ServerStreamTracer.Factory> streamTracerFactories) {
57+
return ImmutableList.of(new TestServer(streamTracerFactories));
58+
}
59+
60+
@Override
61+
protected List<? extends InternalServer> newServer(
62+
int port, List<ServerStreamTracer.Factory> streamTracerFactories) {
63+
return newServer(streamTracerFactories);
64+
}
65+
66+
@Override
67+
protected String testAuthority(InternalServer server) {
68+
return AUTHORITY;
69+
}
70+
71+
@Override
72+
protected ManagedClientTransport newClientTransport(InternalServer server) {
73+
TestServer testServer = (TestServer) server;
74+
return InternalInProcess.createInProcessTransport(
75+
TRANSPORT_NAME,
76+
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE,
77+
testAuthority(server),
78+
USER_AGENT,
79+
eagAttrs(),
80+
schedulerPool,
81+
testServer.streamTracerFactories,
82+
testServer.serverListener);
83+
}
84+
85+
@Override
86+
protected boolean sizesReported() {
87+
// TODO(zhangkun83): InProcessTransport doesn't record metrics for now
88+
// (https://github.com/grpc/grpc-java/issues/2284)
89+
return false;
90+
}
91+
92+
@Test
93+
@Ignore
94+
@Override
95+
public void socketStats() throws Exception {
96+
// test does not apply to in-process
97+
}
98+
99+
/** An internalserver just for this test. */
100+
private final class TestServer implements InternalServer {
101+
102+
final List<ServerStreamTracer.Factory> streamTracerFactories;
103+
ServerListener serverListener;
104+
105+
TestServer(List<ServerStreamTracer.Factory> streamTracerFactories) {
106+
this.streamTracerFactories = streamTracerFactories;
107+
}
108+
109+
@Override
110+
public void start(ServerListener serverListener) throws IOException {
111+
if (currentServer != null) {
112+
throw new IOException("Server already present");
113+
}
114+
currentServer = this;
115+
this.serverListener = new ServerListenerWrapper(serverListener);
116+
}
117+
118+
@Override
119+
public void shutdown() {
120+
currentServer = null;
121+
serverListener.serverShutdown();
122+
}
123+
124+
@Override
125+
public SocketAddress getListenSocketAddress() {
126+
return new SocketAddress() {};
127+
}
128+
129+
@Override
130+
@Nullable
131+
public InternalInstrumented<SocketStats> getListenSocketStats() {
132+
return null;
133+
}
134+
}
135+
136+
/** Wraps the server listener to ensure we don't accept new transports after shutdown. */
137+
private static final class ServerListenerWrapper implements ServerListener {
138+
private final ServerListener delegateListener;
139+
private boolean shutdown;
140+
141+
ServerListenerWrapper(ServerListener delegateListener) {
142+
this.delegateListener = delegateListener;
143+
}
144+
145+
@Override
146+
public ServerTransportListener transportCreated(ServerTransport transport) {
147+
if (shutdown) {
148+
return null;
149+
}
150+
return delegateListener.transportCreated(transport);
151+
}
152+
153+
@Override
154+
public void serverShutdown() {
155+
shutdown = true;
156+
delegateListener.serverShutdown();
157+
}
158+
}
159+
}

0 commit comments

Comments
 (0)