2
2
3
3
import static com .google .common .base .Strings .isNullOrEmpty ;
4
4
5
+ import com .google .common .base .Preconditions ;
5
6
import com .google .protobuf .Any ;
6
7
import envoy .api .v2 .ClusterDiscoveryServiceGrpc .ClusterDiscoveryServiceImplBase ;
7
8
import envoy .api .v2 .Discovery .DiscoveryRequest ;
18
19
import io .grpc .Status ;
19
20
import io .grpc .stub .StreamObserver ;
20
21
import java .util .Collections ;
22
+ import java .util .List ;
21
23
import java .util .Map ;
22
24
import java .util .Set ;
23
25
import java .util .concurrent .ConcurrentHashMap ;
28
30
29
31
public class DiscoveryServer {
30
32
31
- private static final DiscoveryServerCallbacks DEFAULT_CALLBACKS = new DiscoveryServerCallbacks () { };
32
33
private static final Logger LOGGER = LoggerFactory .getLogger (DiscoveryServer .class );
33
34
34
35
static final String ANY_TYPE_URL = "" ;
35
36
36
- private final DiscoveryServerCallbacks callbacks ;
37
+ private final List < DiscoveryServerCallbacks > callbacks ;
37
38
private final ConfigWatcher configWatcher ;
38
39
private final AtomicLong streamCount = new AtomicLong ();
39
40
41
+ public DiscoveryServer (ConfigWatcher configWatcher ) {
42
+ this (Collections .emptyList (), configWatcher );
43
+ }
44
+
40
45
public DiscoveryServer (DiscoveryServerCallbacks callbacks , ConfigWatcher configWatcher ) {
41
- this .callbacks = callbacks ;
42
- this .configWatcher = configWatcher ;
46
+ this (Collections .singletonList (callbacks ), configWatcher );
43
47
}
44
48
45
- public DiscoveryServer (ConfigWatcher configWatcher ) {
46
- this (DEFAULT_CALLBACKS , configWatcher );
49
+ /**
50
+ * Creates the server.
51
+ * @param callbacks server callbacks
52
+ * @param configWatcher source of configuration updates
53
+ */
54
+ public DiscoveryServer (List <DiscoveryServerCallbacks > callbacks , ConfigWatcher configWatcher ) {
55
+ Preconditions .checkNotNull (callbacks , "callbacks cannot be null" );
56
+ Preconditions .checkNotNull (configWatcher , "configWatcher cannot be null" );
57
+
58
+ this .callbacks = callbacks ;
59
+ this .configWatcher = configWatcher ;
47
60
}
48
61
49
62
/**
@@ -137,7 +150,7 @@ private StreamObserver<DiscoveryRequest> createRequestHandler(
137
150
138
151
LOGGER .info ("[{}] open stream from {}" , streamId , defaultTypeUrl );
139
152
140
- callbacks .onStreamOpen (streamId , defaultTypeUrl );
153
+ callbacks .forEach ( cb -> cb . onStreamOpen (streamId , defaultTypeUrl ) );
141
154
142
155
return new StreamObserver <DiscoveryRequest >() {
143
156
@@ -173,7 +186,7 @@ public void onNext(DiscoveryRequest request) {
173
186
nonce ,
174
187
request .getVersionInfo ());
175
188
176
- callbacks .onStreamRequest (streamId , request );
189
+ callbacks .forEach ( cb -> cb . onStreamRequest (streamId , request ) );
177
190
178
191
for (String typeUrl : Resources .TYPE_URLS ) {
179
192
DiscoveryResponse response = latestResponse .get (typeUrl );
@@ -213,7 +226,7 @@ public void onError(Throwable t) {
213
226
}
214
227
215
228
try {
216
- callbacks .onStreamCloseWithError (streamId , defaultTypeUrl , t );
229
+ callbacks .forEach ( cb -> cb . onStreamCloseWithError (streamId , defaultTypeUrl , t ) );
217
230
responseObserver .onError (Status .fromThrowable (t ).asException ());
218
231
} finally {
219
232
cancel ();
@@ -225,7 +238,7 @@ public void onCompleted() {
225
238
LOGGER .info ("[{}] stream closed" , streamId );
226
239
227
240
try {
228
- callbacks .onStreamClose (streamId , defaultTypeUrl );
241
+ callbacks .forEach ( cb -> cb . onStreamClose (streamId , defaultTypeUrl ) );
229
242
responseObserver .onCompleted ();
230
243
} finally {
231
244
cancel ();
@@ -248,7 +261,7 @@ private void send(Response response, String typeUrl) {
248
261
249
262
LOGGER .info ("[{}] response {} with nonce {} version {}" , streamId , typeUrl , nonce , response .version ());
250
263
251
- callbacks .onStreamResponse (streamId , response .request (), discoveryResponse );
264
+ callbacks .forEach ( cb -> cb . onStreamResponse (streamId , response .request (), discoveryResponse ) );
252
265
253
266
// Store the latest response *before* we send the response. This ensures that by the time the request
254
267
// is processed the map is guaranteed to be updated. Doing it afterwards leads to a race conditions
0 commit comments