@@ -145,7 +145,22 @@ public Watch createWatch(
145
145
}
146
146
147
147
// Otherwise, the watch may be responded immediately
148
- respond (watch , snapshot , group );
148
+ boolean responded = respond (watch , snapshot , group );
149
+
150
+ if (!responded ) {
151
+ watchCount ++;
152
+
153
+ LOGGER .info ("did not respond immediately, leaving open watch {} for {}[{}] from node {} for version {}" ,
154
+ watchCount ,
155
+ request .getTypeUrl (),
156
+ String .join (", " , request .getResourceNamesList ()),
157
+ group ,
158
+ request .getVersionInfo ());
159
+
160
+ status .setWatch (watchCount , watch );
161
+
162
+ watch .setStop (() -> status .removeWatch (watchCount ));
163
+ }
149
164
150
165
return watch ;
151
166
@@ -245,7 +260,7 @@ private Response createResponse(DiscoveryRequest request, Map<String, ? extends
245
260
return Response .create (request , filtered , version );
246
261
}
247
262
248
- private void respond (Watch watch , Snapshot snapshot , T group ) {
263
+ private boolean respond (Watch watch , Snapshot snapshot , T group ) {
249
264
Map <String , ? extends Message > snapshotResources = snapshot .resources (watch .request ().getTypeUrl ());
250
265
251
266
if (!watch .request ().getResourceNamesList ().isEmpty () && watch .ads ()) {
@@ -262,7 +277,7 @@ private void respond(Watch watch, Snapshot snapshot, T group) {
262
277
String .join (", " , watch .request ().getResourceNamesList ()),
263
278
String .join (", " , missingNames ));
264
279
265
- return ;
280
+ return false ;
266
281
}
267
282
}
268
283
@@ -281,6 +296,7 @@ private void respond(Watch watch, Snapshot snapshot, T group) {
281
296
282
297
try {
283
298
watch .respond (response );
299
+ return true ;
284
300
} catch (WatchCancelledException e ) {
285
301
LOGGER .error (
286
302
"failed to respond for {} from node {} at version {} with version {} because watch was already cancelled" ,
@@ -289,5 +305,7 @@ private void respond(Watch watch, Snapshot snapshot, T group) {
289
305
watch .request ().getVersionInfo (),
290
306
version );
291
307
}
308
+
309
+ return false ;
292
310
}
293
311
}
0 commit comments