Skip to content

Commit 4cf29d4

Browse files
authored
Interrupt fine tuning (#1214)
1 parent 7f7caef commit 4cf29d4

File tree

5 files changed

+30
-23
lines changed

5 files changed

+30
-23
lines changed

src/main/java/io/nats/client/impl/MessageQueue.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -165,13 +165,7 @@ boolean push(NatsMessage msg, boolean internal) {
165165
if (!editLock.tryLock(offerLockMillis, TimeUnit.MILLISECONDS)) {
166166
throw new IllegalStateException(OUTPUT_QUEUE_IS_FULL + queue.size());
167167
}
168-
}
169-
catch (InterruptedException e) {
170-
Thread.currentThread().interrupt();
171-
throw new RuntimeException(e);
172-
}
173168

174-
try {
175169
if (!internal && this.discardWhenFull) {
176170
return this.queue.offer(msg);
177171
}
@@ -184,10 +178,12 @@ boolean push(NatsMessage msg, boolean internal) {
184178
this.sizeInBytes.getAndAdd(msg.getSizeInBytes());
185179
this.length.incrementAndGet();
186180
return true;
187-
} catch (InterruptedException ie) {
181+
}
182+
catch (InterruptedException e) {
188183
Thread.currentThread().interrupt();
189184
return false;
190-
} finally {
185+
}
186+
finally {
191187
editLock.unlock();
192188
}
193189
}

src/main/java/io/nats/client/impl/NatsConnection.java

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -320,19 +320,22 @@ void forceReconnectImpl(ForceReconnectOptions options) throws InterruptedExcepti
320320
closeMe.close();
321321
}
322322
}
323-
catch (IOException ignore) {}
323+
catch (IOException ignore) {
324+
}
324325
});
325326
}
326327

327328
// stop i/o
328329
try {
329330
this.reader.stop(false).get(100, TimeUnit.MILLISECONDS);
330-
} catch (Exception ex) {
331+
}
332+
catch (Exception ex) {
331333
processException(ex);
332334
}
333335
try {
334336
this.writer.stop().get(100, TimeUnit.MILLISECONDS);
335-
} catch (Exception ex) {
337+
}
338+
catch (Exception ex) {
336339
processException(ex);
337340
}
338341

@@ -344,19 +347,13 @@ void forceReconnectImpl(ForceReconnectOptions options) throws InterruptedExcepti
344347
closeSocketLock.unlock();
345348
}
346349

347-
try {
348-
// calling connect just starts like a new connection versus reconnect
349-
// but we have to manually resubscribe like reconnect once it is connected
350-
reconnectImpl();
351-
writer.setReconnectMode(false);
352-
}
353-
catch (InterruptedException e) {
354-
// if there is an exception close() will have been called already
355-
Thread.currentThread().interrupt();
356-
}
350+
// calling connect just starts like a new connection versus reconnect
351+
// but we have to manually resubscribe like reconnect once it is connected
352+
reconnectImpl();
353+
writer.setReconnectMode(false);
357354
}
358355

359-
void reconnect() throws InterruptedException {
356+
void reconnect() throws InterruptedException {
360357
if (!tryingToConnect.get()) {
361358
try {
362359
tryingToConnect.set(true);

src/main/java/io/nats/client/impl/NatsJetStreamPullSubscription.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,8 @@ private List<Message> _fetch(int batchSize, long maxWaitMillis) {
176176
}
177177
}
178178
catch (InterruptedException e) {
179+
// nextMessageInternal failed. By not throwing
180+
// this gives them the messages already added to the list
179181
Thread.currentThread().interrupt();
180182
}
181183
return messages;
@@ -199,6 +201,8 @@ private List<Message> drainAlreadyBuffered(int batchSize) {
199201
}
200202
}
201203
catch (InterruptedException ignore) {
204+
// nextMessageInternal failed. By not throwing
205+
// this gives them the messages already added to the list
202206
Thread.currentThread().interrupt();
203207
}
204208
return messages;
@@ -275,7 +279,7 @@ public boolean hasNext() {
275279
return false;
276280
}
277281

278-
if (buffered.size() == 0) {
282+
if (buffered.isEmpty()) {
279283
msg = _nextUnmanaged(timeout, pullSubject);
280284
if (msg == null) {
281285
done = true;
@@ -292,6 +296,9 @@ public boolean hasNext() {
292296
catch (InterruptedException e) {
293297
msg = null;
294298
done = true;
299+
// _nextUnmanaged failed
300+
// there still could be messages in the buffer
301+
// no good choice here
295302
Thread.currentThread().interrupt();
296303
return false;
297304
}

src/main/java/io/nats/client/impl/SocketDataPortWithWriteTimeout.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public void run() {
4747
}
4848
catch (InterruptedException e) {
4949
Thread.currentThread().interrupt();
50+
// This task is going to re-run anyway, so no point in throwing
5051
}
5152
}
5253
}

src/main/java/io/nats/service/Discovery.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,9 @@ private byte[] discoverOne(String action, String serviceName, String serviceId)
186186
}
187187
}
188188
catch (InterruptedException e) {
189+
// conn.request interrupted means data is not retrieved
189190
Thread.currentThread().interrupt();
191+
throw new RuntimeException(e);
190192
}
191193
return null;
192194
}
@@ -215,7 +217,11 @@ private void discoverMany(String action, String serviceName, Consumer<byte[]> da
215217
}
216218
}
217219
catch (InterruptedException e) {
220+
// sub.nextMessage was fetching one message
221+
// and data is not completely read
222+
// so it seems like this is an error condition
218223
Thread.currentThread().interrupt();
224+
throw new RuntimeException(e);
219225
}
220226
finally {
221227
try {

0 commit comments

Comments
 (0)