@@ -44,10 +44,10 @@ public void subscribe(@NotNull String uri, @NotNull SubListener listener) throws
4444 if (response .payload .size () > 0 ) {
4545 for (byte [] payload : response .payload ) {
4646 Pubsub .Subscription sub = Pubsub .Subscription .parseFrom (payload );
47- subscriptions .add (new InternalSubListener (sub .getUri (), listener ));
47+ subscriptions .add (new InternalSubListener (sub .getUri (), listener , true ));
4848 }
4949 } else {
50- subscriptions .add (new InternalSubListener (uri , listener ));
50+ subscriptions .add (new InternalSubListener (uri , listener , true ));
5151 }
5252
5353 LOGGER .trace (String .format ("Subscribed successfully to %s!" , uri ));
@@ -182,6 +182,10 @@ protected void handle(@NotNull Packet packet) throws InvalidProtocolBufferExcept
182182 } else {
183183 LOGGER .warn (String .format ("Skipped Mercury response, seq: %d, uri: %s, code %d" , seq , header .getUri (), header .getStatusCode ()));
184184 }
185+
186+ synchronized (callbacks ) {
187+ callbacks .notifyAll ();
188+ }
185189 } else {
186190 LOGGER .warn (String .format ("Couldn't handle packet, seq: %d, uri: %s, code %d" , seq , header .getUri (), header .getStatusCode ()));
187191 }
@@ -193,7 +197,7 @@ protected void exception(@NotNull Exception ex) {
193197 }
194198
195199 public void interestedIn (@ NotNull String uri , @ NotNull SubListener listener ) {
196- subscriptions .add (new InternalSubListener (uri , listener ));
200+ subscriptions .add (new InternalSubListener (uri , listener , false ));
197201 }
198202
199203 public void notInterested (@ NotNull SubListener listener ) {
@@ -205,6 +209,35 @@ public void notInterested(@NotNull SubListener listener) {
205209 }
206210 }
207211
212+ @ Override
213+ public void close () {
214+ if (!subscriptions .isEmpty ()) {
215+ for (InternalSubListener listener : new ArrayList <>(subscriptions )) {
216+ try {
217+ if (listener .isSub ) unsubscribe (listener .uri );
218+ else notInterested (listener .listener );
219+ } catch (IOException | PubSubException ex ) {
220+ LOGGER .debug ("Failed unsubscribing." , ex );
221+ }
222+ }
223+ }
224+
225+ while (true ) {
226+ if (callbacks .isEmpty ()) {
227+ break ;
228+ } else {
229+ synchronized (callbacks ) {
230+ try {
231+ callbacks .wait ();
232+ } catch (InterruptedException ignored ) {
233+ }
234+ }
235+ }
236+ }
237+
238+ super .close ();
239+ }
240+
208241 public interface Callback {
209242 void response (@ NotNull Response response );
210243 }
@@ -238,10 +271,12 @@ private PubSubException(Response response) {
238271 private static class InternalSubListener {
239272 private final String uri ;
240273 private final SubListener listener ;
274+ private final boolean isSub ;
241275
242- InternalSubListener (@ NotNull String uri , @ NotNull SubListener listener ) {
276+ InternalSubListener (@ NotNull String uri , @ NotNull SubListener listener , boolean isSub ) {
243277 this .uri = uri ;
244278 this .listener = listener ;
279+ this .isSub = isSub ;
245280 }
246281
247282 boolean matches (String uri ) {
0 commit comments