Skip to content

Commit 45b7126

Browse files
When interrupted call interrupt
Signed-off-by: Maurice van Veen <[email protected]>
1 parent a6ef493 commit 45b7126

File tree

8 files changed

+24
-4
lines changed

8 files changed

+24
-4
lines changed

src/main/java/io/nats/client/impl/MessageQueue.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ boolean push(NatsMessage msg, boolean internal) {
185185
this.length.incrementAndGet();
186186
return true;
187187
} catch (InterruptedException ie) {
188+
Thread.currentThread().interrupt();
188189
return false;
189190
} finally {
190191
editLock.unlock();

src/main/java/io/nats/client/impl/NatsConnection.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -653,6 +653,7 @@ public void run() {
653653
this.closeSocket(false, true);
654654
} catch (InterruptedException e) {
655655
processException(e);
656+
Thread.currentThread().interrupt();
656657
}
657658
} finally {
658659
statusLock.lock();
@@ -1516,7 +1517,11 @@ public Duration RTT() throws IOException {
15161517
catch (ExecutionException e) {
15171518
throw new IOException(e.getCause());
15181519
}
1519-
catch (InterruptedException | TimeoutException e) {
1520+
catch (TimeoutException e) {
1521+
throw new IOException(e);
1522+
}
1523+
catch (InterruptedException e) {
1524+
Thread.currentThread().interrupt();
15201525
throw new IOException(e);
15211526
}
15221527
}
@@ -2211,8 +2216,11 @@ public CompletableFuture<Boolean> drain(Duration timeout) throws TimeoutExceptio
22112216

22122217
this.close(false, false); // close the connection after the last flush
22132218
tracker.complete(consumers.isEmpty());
2214-
} catch (TimeoutException | InterruptedException e) {
2219+
} catch (TimeoutException e) {
2220+
this.processException(e);
2221+
} catch (InterruptedException e) {
22152222
this.processException(e);
2223+
Thread.currentThread().interrupt();
22162224
} finally {
22172225
try {
22182226
this.close(false, false);// close the connection after the last flush

src/main/java/io/nats/client/impl/NatsConnectionReader.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,11 @@ else if (this.mode == Mode.GATHER_HEADERS) {
172172
if (running.get()) {
173173
this.connection.handleCommunicationIssue(io);
174174
}
175-
} catch (CancellationException | ExecutionException | InterruptedException ex) {
175+
} catch (CancellationException | ExecutionException ex) {
176176
// Exit
177+
} catch (InterruptedException ex) {
178+
// Exit
179+
Thread.currentThread().interrupt();
177180
} finally {
178181
this.running.set(false);
179182
// Clear the buffers, since they are only used inside this try/catch

src/main/java/io/nats/client/impl/NatsConnectionWriter.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,8 +209,11 @@ public void run() {
209209
if (running.get()) {
210210
this.connection.handleCommunicationIssue(io);
211211
}
212-
} catch (CancellationException | ExecutionException | InterruptedException ex) {
212+
} catch (CancellationException | ExecutionException ex) {
213213
// Exit
214+
} catch (InterruptedException ex) {
215+
// Exit
216+
Thread.currentThread().interrupt();
214217
} finally {
215218
this.running.set(false);
216219
}

src/main/java/io/nats/client/impl/NatsConsumer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ public CompletableFuture<Boolean> drain(Duration timeout) throws InterruptedExce
221221
this.cleanUpAfterDrain();
222222
} catch (InterruptedException e) {
223223
this.connection.processException(e);
224+
Thread.currentThread().interrupt();
224225
} finally {
225226
tracker.complete(this.isDrained());
226227
}

src/main/java/io/nats/client/impl/NatsDispatcher.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ public void run() {
126126
if (this.running.get()){
127127
this.connection.processException(exp);
128128
} //otherwise we did it
129+
Thread.currentThread().interrupt();
129130
}
130131
finally {
131132
this.running.set(false);

src/main/java/io/nats/client/impl/NatsDispatcherWithExecutor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public void run() {
6969
if (this.running.get()){
7070
this.connection.processException(exp);
7171
} //otherwise we did it
72+
Thread.currentThread().interrupt();
7273
}
7374
finally {
7475
this.running.set(false);

src/main/java/io/nats/client/impl/NatsJetStreamImpl.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ Message makeRequestResponseRequired(String subject, byte[] bytes, Duration timeo
232232
try {
233233
return responseRequired(conn.request(prependPrefix(subject), bytes, timeout));
234234
} catch (InterruptedException e) {
235+
Thread.currentThread().interrupt();
235236
throw new IOException(e);
236237
}
237238
}
@@ -240,6 +241,7 @@ Message makeInternalRequestResponseRequired(String subject, Headers headers, byt
240241
try {
241242
return responseRequired(conn.requestInternal(subject, headers, data, timeout, cancelAction, validateSubjectAndReplyTo));
242243
} catch (InterruptedException e) {
244+
Thread.currentThread().interrupt();
243245
throw new IOException(e);
244246
}
245247
}

0 commit comments

Comments
 (0)