Skip to content

Commit 83973b3

Browse files
authored
Log warning for RPC timeout (#425)
1 parent 244d47d commit 83973b3

File tree

2 files changed

+25
-3
lines changed

2 files changed

+25
-3
lines changed

src/main/java/com/uber/cadence/internal/worker/Poller.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,14 @@
2222
import com.uber.cadence.internal.metrics.MetricsType;
2323
import com.uber.m3.tally.Scope;
2424
import java.util.Objects;
25-
import java.util.concurrent.*;
25+
import java.util.concurrent.ArrayBlockingQueue;
26+
import java.util.concurrent.CountDownLatch;
27+
import java.util.concurrent.Semaphore;
28+
import java.util.concurrent.ThreadPoolExecutor;
29+
import java.util.concurrent.TimeUnit;
2630
import java.util.concurrent.atomic.AtomicReference;
2731
import org.apache.thrift.TException;
32+
import org.apache.thrift.transport.TTransportException;
2833
import org.slf4j.Logger;
2934
import org.slf4j.LoggerFactory;
3035

@@ -52,7 +57,17 @@ interface ThrowingRunnable {
5257
private Throttler pollRateThrottler;
5358

5459
private Thread.UncaughtExceptionHandler uncaughtExceptionHandler =
55-
(t, e) -> log.error("Failure in thread " + t.getName(), e);
60+
(t, e) -> {
61+
if (e instanceof TTransportException) {
62+
TTransportException te = (TTransportException) e;
63+
if (te.getType() == TTransportException.TIMED_OUT) {
64+
log.warn("Failure in thread " + t.getName(), e);
65+
return;
66+
}
67+
}
68+
69+
log.error("Failure in thread " + t.getName(), e);
70+
};
5671

5772
public Poller(
5873
String identity,

src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@
100100
import com.uber.tchannel.api.TChannel;
101101
import com.uber.tchannel.api.TFuture;
102102
import com.uber.tchannel.api.errors.TChannelError;
103+
import com.uber.tchannel.errors.ErrorType;
103104
import com.uber.tchannel.messages.ThriftRequest;
104105
import com.uber.tchannel.messages.ThriftResponse;
105106
import java.net.InetAddress;
@@ -113,6 +114,7 @@
113114
import java.util.concurrent.ExecutionException;
114115
import org.apache.thrift.TException;
115116
import org.apache.thrift.async.AsyncMethodCallback;
117+
import org.apache.thrift.transport.TTransportException;
116118
import org.slf4j.Logger;
117119
import org.slf4j.LoggerFactory;
118120

@@ -540,7 +542,12 @@ private <T> CompletableFuture<ThriftResponse<T>> doRemoteCallAsync(ThriftRequest
540542

541543
private void throwOnRpcError(ThriftResponse<?> response) throws TException {
542544
if (response.isError()) {
543-
throw new TException("Rpc error:" + response.getError());
545+
if (response.getError().getErrorType() == ErrorType.Timeout) {
546+
throw new TTransportException(
547+
TTransportException.TIMED_OUT, response.getError().getMessage());
548+
} else {
549+
throw new TException("Rpc error:" + response.getError());
550+
}
544551
}
545552
}
546553

0 commit comments

Comments
 (0)