Skip to content

Commit 35ea309

Browse files
authored
Merge pull request #545 from alex268/master
Added endpoint info to all GRPC errors
2 parents 78d2d99 + 845dbc9 commit 35ea309

File tree

6 files changed

+54
-45
lines changed

6 files changed

+54
-45
lines changed

core/src/main/java/tech/ydb/core/grpc/GrpcStatuses.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,16 @@ public final class GrpcStatuses {
1616

1717
private GrpcStatuses() { }
1818

19-
public static <T> Result<T> toResult(io.grpc.Status status) {
19+
public static <T> Result<T> toResult(io.grpc.Status status, String endpoint) {
2020
assert !status.isOk();
21-
return Result.fail(toStatus(status));
21+
return Result.fail(toStatus(status, endpoint));
2222
}
2323

24-
public static tech.ydb.core.Status toStatus(io.grpc.Status status) {
24+
public static tech.ydb.core.Status toStatus(io.grpc.Status status, String endpoint) {
2525
if (status.isOk()) {
2626
return Status.SUCCESS;
2727
}
28-
Issue message = Issue.of(getMessage(status), Issue.Severity.ERROR);
28+
Issue message = Issue.of(getMessage(status, endpoint), Issue.Severity.ERROR);
2929
StatusCode code = getStatusCode(status.getCode());
3030
Throwable cause = status.getCause();
3131

@@ -35,17 +35,18 @@ public static tech.ydb.core.Status toStatus(io.grpc.Status status) {
3535
return Status.of(code, cause, message, Issue.of(cause.toString(), Issue.Severity.ERROR));
3636
}
3737

38-
private static String getMessage(io.grpc.Status status) {
38+
private static String getMessage(io.grpc.Status status, String endpoint) {
3939
if (status.getCode() == io.grpc.Status.Code.CANCELLED) {
4040
logger.debug("gRPC cancellation: {}, {}", status.getCode(), status.getDescription());
4141
} else {
4242
logger.warn("gRPC issue: {}, {}", status.getCode(), status.getDescription());
4343
}
4444

45-
String message = "gRPC error: (" + status.getCode() + ')';
46-
return status.getDescription() == null
47-
? message
48-
: message + ' ' + status.getDescription();
45+
String message = "gRPC error: (" + status.getCode() + ") on " + endpoint;
46+
if (status.getDescription() != null) {
47+
message += ", " + status.getDescription();
48+
}
49+
return message;
4950
}
5051

5152
private static StatusCode getStatusCode(io.grpc.Status.Code code) {

core/src/main/java/tech/ydb/core/impl/BaseGrpcTransport.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import tech.ydb.core.grpc.GrpcReadStream;
2222
import tech.ydb.core.grpc.GrpcReadWriteStream;
2323
import tech.ydb.core.grpc.GrpcRequestSettings;
24-
import tech.ydb.core.grpc.GrpcStatuses;
2524
import tech.ydb.core.grpc.GrpcTransport;
2625
import tech.ydb.core.grpc.YdbHeaders;
2726
import tech.ydb.core.impl.auth.AuthCallOptions;
@@ -87,6 +86,7 @@ public <ReqT, RespT> CompletableFuture<Result<RespT>> unaryCall(
8786

8887
try {
8988
GrpcChannel channel = getChannel(settings);
89+
String endpoint = channel.getEndpoint().getHostAndPort();
9090
ClientCall<ReqT, RespT> call = channel.getReadyChannel().newCall(method, options);
9191
ChannelStatusHandler handler = new ChannelStatusHandler(channel, settings);
9292

@@ -95,8 +95,8 @@ public <ReqT, RespT> CompletableFuture<Result<RespT>> unaryCall(
9595
traceId, method.getFullMethodName(), channel.getEndpoint().getHostAndPort()
9696
);
9797
}
98-
99-
return new UnaryCall<>(traceId, call, handler).startCall(request, makeMetadataFromSettings(settings));
98+
Metadata metadata = makeMetadataFromSettings(settings);
99+
return new UnaryCall<>(traceId, endpoint, call, handler).startCall(request, metadata);
100100
} catch (UnexpectedResultException ex) {
101101
logger.warn("UnaryCall[{}] got unexpected status {}", traceId, ex.getStatus());
102102
return CompletableFuture.completedFuture(Result.fail(ex));
@@ -122,13 +122,14 @@ public <ReqT, RespT> GrpcReadStream<RespT> readStreamCall(
122122
if (settings.getDeadlineAfter() != 0) {
123123
final long now = System.nanoTime();
124124
if (now >= settings.getDeadlineAfter()) {
125-
return new EmptyStream<>(GrpcStatuses.toStatus(deadlineExpiredStatus(method, settings)));
125+
return new EmptyStream<>(deadlineExpiredStatus(method, settings));
126126
}
127127
options = options.withDeadlineAfter(settings.getDeadlineAfter() - now, TimeUnit.NANOSECONDS);
128128
}
129129

130130
try {
131131
GrpcChannel channel = getChannel(settings);
132+
String endpoint = channel.getEndpoint().getHostAndPort();
132133
ClientCall<ReqT, RespT> call = channel.getReadyChannel().newCall(method, options);
133134
ChannelStatusHandler handler = new ChannelStatusHandler(channel, settings);
134135

@@ -140,7 +141,7 @@ public <ReqT, RespT> GrpcReadStream<RespT> readStreamCall(
140141

141142
Metadata metadata = makeMetadataFromSettings(settings);
142143
GrpcFlowControl flowCtrl = settings.getFlowControl();
143-
return new ReadStreamCall<>(traceId, call, flowCtrl, request, metadata, handler);
144+
return new ReadStreamCall<>(traceId, endpoint, call, flowCtrl, request, metadata, handler);
144145
} catch (UnexpectedResultException ex) {
145146
logger.warn("ReadStreamCall[{}] got unexpected status {}", traceId, ex.getStatus());
146147
return new EmptyStream<>(ex.getStatus());
@@ -167,15 +168,16 @@ public <ReqT, RespT> GrpcReadWriteStream<RespT, ReqT> readWriteStreamCall(
167168
if (settings.getDeadlineAfter() != 0) {
168169
final long now = System.nanoTime();
169170
if (now >= settings.getDeadlineAfter()) {
170-
return new EmptyStream<>(GrpcStatuses.toStatus(deadlineExpiredStatus(method, settings)));
171+
return new EmptyStream<>(deadlineExpiredStatus(method, settings));
171172
}
172173
options = options.withDeadlineAfter(settings.getDeadlineAfter() - now, TimeUnit.NANOSECONDS);
173174
}
174175

175176
try {
176177
GrpcChannel channel = getChannel(settings);
178+
String endpoint = channel.getEndpoint().getHostAndPort();
177179
ClientCall<ReqT, RespT> call = channel.getReadyChannel().newCall(method, options);
178-
ChannelStatusHandler handler = new ChannelStatusHandler(channel, settings);
180+
ChannelStatusHandler hdlr = new ChannelStatusHandler(channel, settings);
179181

180182
if (logger.isTraceEnabled()) {
181183
logger.trace("ReadWriteStreamCall[{}] with method {} and endpoint {} created",
@@ -185,7 +187,7 @@ public <ReqT, RespT> GrpcReadWriteStream<RespT, ReqT> readWriteStreamCall(
185187

186188
Metadata metadata = makeMetadataFromSettings(settings);
187189
GrpcFlowControl flowCtrl = settings.getFlowControl();
188-
return new ReadWriteStreamCall<>(traceId, call, flowCtrl, metadata, getAuthCallOptions(), handler);
190+
return new ReadWriteStreamCall<>(traceId, endpoint, call, flowCtrl, metadata, getAuthCallOptions(), hdlr);
189191
} catch (UnexpectedResultException ex) {
190192
logger.warn("ReadWriteStreamCall[{}] got unexpected status {}", traceId, ex.getStatus());
191193
return new EmptyStream<>(ex.getStatus());
@@ -203,10 +205,10 @@ private static <T> Result<T> deadlineExpiredResult(MethodDescriptor<?, T> method
203205
return Result.fail(Status.of(StatusCode.CLIENT_DEADLINE_EXPIRED, Issue.of(message, Issue.Severity.ERROR)));
204206
}
205207

206-
private static io.grpc.Status deadlineExpiredStatus(MethodDescriptor<?, ?> method, GrpcRequestSettings settings) {
208+
private static Status deadlineExpiredStatus(MethodDescriptor<?, ?> method, GrpcRequestSettings settings) {
207209
String message = "deadline expired before calling method " + method.getFullMethodName() + " with traceId " +
208210
settings.getTraceId();
209-
return io.grpc.Status.DEADLINE_EXCEEDED.withDescription(message);
211+
return Status.of(StatusCode.CLIENT_DEADLINE_EXPIRED, Issue.of(message, Issue.Severity.ERROR));
210212
}
211213

212214
private Metadata makeMetadataFromSettings(GrpcRequestSettings settings) {

core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public class ReadStreamCall<ReqT, RespT> extends ClientCall.Listener<RespT> impl
3030
private static final Logger logger = LoggerFactory.getLogger(GrpcTransport.class);
3131

3232
private final String traceId;
33+
private final String endpoint;
3334
private final ClientCall<ReqT, RespT> call;
3435
private final Lock callLock = new ReentrantLock();
3536
private final GrpcStatusHandler statusConsumer;
@@ -41,9 +42,10 @@ public class ReadStreamCall<ReqT, RespT> extends ClientCall.Listener<RespT> impl
4142

4243
private Observer<RespT> consumer;
4344

44-
public ReadStreamCall(String traceId, ClientCall<ReqT, RespT> call, GrpcFlowControl flowCtrl, ReqT req,
45-
Metadata headers, GrpcStatusHandler statusHandler) {
45+
public ReadStreamCall(String traceId, String endpoint, ClientCall<ReqT, RespT> call, GrpcFlowControl flowCtrl,
46+
ReqT req, Metadata headers, GrpcStatusHandler statusHandler) {
4647
this.traceId = traceId;
48+
this.endpoint = endpoint;
4749
this.call = call;
4850
this.request = req;
4951
this.headers = headers;
@@ -142,7 +144,7 @@ public void onClose(io.grpc.Status status, @Nullable Metadata trailers) {
142144
if (status.isOk()) {
143145
statusFuture.complete(Status.SUCCESS);
144146
} else {
145-
statusFuture.complete(GrpcStatuses.toStatus(status));
147+
statusFuture.complete(GrpcStatuses.toStatus(status, endpoint));
146148
}
147149

148150
statusConsumer.postComplete();

core/src/main/java/tech/ydb/core/impl/call/ReadWriteStreamCall.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public class ReadWriteStreamCall<R, W> extends ClientCall.Listener<R> implements
3535
private static final Logger logger = LoggerFactory.getLogger(GrpcTransport.class);
3636

3737
private final String traceId;
38+
private final String endpoint;
3839
private final ClientCall<W, R> call;
3940
private final Lock callLock = new ReentrantLock();
4041
private final GrpcStatusHandler statusConsumer;
@@ -47,9 +48,10 @@ public class ReadWriteStreamCall<R, W> extends ClientCall.Listener<R> implements
4748
private final CompletableFuture<Status> statusFuture = new CompletableFuture<>();
4849
private Observer<R> consumer = null;
4950

50-
public ReadWriteStreamCall(String traceId, ClientCall<W, R> call, GrpcFlowControl flowCtrl,
51+
public ReadWriteStreamCall(String traceId, String endpoint, ClientCall<W, R> call, GrpcFlowControl flowCtrl,
5152
Metadata headers, AuthCallOptions options, GrpcStatusHandler statusHandler) {
5253
this.traceId = traceId;
54+
this.endpoint = endpoint;
5355
this.call = call;
5456
this.headers = headers;
5557
this.statusConsumer = statusHandler;
@@ -204,7 +206,7 @@ public void onClose(io.grpc.Status status, @Nullable Metadata trailers) {
204206
if (status.isOk()) {
205207
statusFuture.complete(Status.SUCCESS);
206208
} else {
207-
statusFuture.complete(GrpcStatuses.toStatus(status));
209+
statusFuture.complete(GrpcStatuses.toStatus(status, endpoint));
208210
}
209211

210212
statusConsumer.postComplete();

core/src/main/java/tech/ydb/core/impl/call/UnaryCall.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,16 @@ public class UnaryCall<ReqT, RespT> extends ClientCall.Listener<RespT> {
3636
.withIssues(Issue.of("More than one value received for gRPC unary call", Issue.Severity.ERROR));
3737

3838
private final String traceId;
39+
private final String endpoint;
3940
private final ClientCall<ReqT, RespT> call;
4041
private final GrpcStatusHandler statusConsumer;
4142

4243
private final CompletableFuture<Result<RespT>> future = new CompletableFuture<>();
4344
private final AtomicReference<RespT> value = new AtomicReference<>();
4445

45-
public UnaryCall(String traceId, ClientCall<ReqT, RespT> call, GrpcStatusHandler statusConsumer) {
46+
public UnaryCall(String traceId, String endpoint, ClientCall<ReqT, RespT> call, GrpcStatusHandler statusConsumer) {
4647
this.traceId = traceId;
48+
this.endpoint = endpoint;
4749
this.call = call;
4850
this.statusConsumer = statusConsumer;
4951
}
@@ -103,7 +105,7 @@ public void onClose(io.grpc.Status status, @Nullable Metadata trailers) {
103105
future.complete(Result.success(snapshotValue));
104106
}
105107
} else {
106-
future.complete(GrpcStatuses.toResult(status));
108+
future.complete(GrpcStatuses.toResult(status, endpoint));
107109
}
108110

109111
statusConsumer.postComplete();

core/src/test/java/tech/ydb/core/grpc/GrpcStatusesTest.java

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,12 @@ static class MyException extends RuntimeException {
3030

3131
@Test
3232
public void error() {
33-
Result<?> result = GrpcStatuses.toResult(io.grpc.Status.INTERNAL.withDescription("error description"));
33+
Result<?> result = GrpcStatuses.toResult(io.grpc.Status.INTERNAL.withDescription("error description"), "test");
3434

3535
assertFalse(result.isSuccess());
3636
assertEquals(StatusCode.CLIENT_GRPC_ERROR, result.getStatus().getCode());
3737
assertArrayEquals(new Issue[] {
38-
Issue.of("gRPC error: (INTERNAL) error description", Issue.Severity.ERROR)
38+
Issue.of("gRPC error: (INTERNAL) on test, error description", Issue.Severity.ERROR)
3939
}, result.getStatus().getIssues());
4040

4141
try {
@@ -50,12 +50,12 @@ public void error() {
5050
public void errorWithCause() {
5151
Result<?> result = GrpcStatuses.toResult(io.grpc.Status.INTERNAL
5252
.withDescription("error description")
53-
.withCause(new MyException("exception message")));
53+
.withCause(new MyException("exception message")), "test");
5454

5555
assertFalse(result.isSuccess());
5656
assertEquals(StatusCode.CLIENT_GRPC_ERROR, result.getStatus().getCode());
5757
assertArrayEquals(new Issue[] {
58-
Issue.of("gRPC error: (INTERNAL) error description", Issue.Severity.ERROR),
58+
Issue.of("gRPC error: (INTERNAL) on test, error description", Issue.Severity.ERROR),
5959
Issue.of(MyException.class.getName() + ": exception message", Issue.Severity.ERROR)
6060
}, result.getStatus().getIssues());
6161

@@ -71,57 +71,57 @@ public void errorWithCause() {
7171

7272
@Test
7373
public void statusOk() {
74-
Status status = GrpcStatuses.toStatus(io.grpc.Status.OK);
74+
Status status = GrpcStatuses.toStatus(io.grpc.Status.OK, "test");
7575
assertEquals(Status.SUCCESS, status);
7676
}
7777

7878
@Test
7979
public void statusDeadlineExceeded() {
80-
Status status = GrpcStatuses.toStatus(io.grpc.Status.DEADLINE_EXCEEDED);
81-
Issue issue = Issue.of("gRPC error: (DEADLINE_EXCEEDED)", Issue.Severity.ERROR);
80+
Status status = GrpcStatuses.toStatus(io.grpc.Status.DEADLINE_EXCEEDED, "test");
81+
Issue issue = Issue.of("gRPC error: (DEADLINE_EXCEEDED) on test", Issue.Severity.ERROR);
8282
assertEquals(Status.of(StatusCode.CLIENT_DEADLINE_EXCEEDED).withIssues(issue), status);
8383
}
8484

8585
@Test
8686
public void statusUnavailable() {
87-
Status status = GrpcStatuses.toStatus(io.grpc.Status.UNAVAILABLE);
88-
Issue issue = Issue.of("gRPC error: (UNAVAILABLE)", Issue.Severity.ERROR);
87+
Status status = GrpcStatuses.toStatus(io.grpc.Status.UNAVAILABLE, "test");
88+
Issue issue = Issue.of("gRPC error: (UNAVAILABLE) on test", Issue.Severity.ERROR);
8989
assertEquals(Status.of(StatusCode.TRANSPORT_UNAVAILABLE).withIssues(issue), status);
9090
}
9191

9292
@Test
9393
public void statusUnauthenticated() {
94-
Status status = GrpcStatuses.toStatus(io.grpc.Status.UNAUTHENTICATED);
95-
Issue issue = Issue.of("gRPC error: (UNAUTHENTICATED)", Issue.Severity.ERROR);
94+
Status status = GrpcStatuses.toStatus(io.grpc.Status.UNAUTHENTICATED, "test");
95+
Issue issue = Issue.of("gRPC error: (UNAUTHENTICATED) on test", Issue.Severity.ERROR);
9696
assertEquals(Status.of(StatusCode.CLIENT_UNAUTHENTICATED).withIssues(issue), status);
9797
}
9898

9999
@Test
100100
public void statusCancelled() {
101-
Status status = GrpcStatuses.toStatus(io.grpc.Status.CANCELLED);
102-
Issue issue = Issue.of("gRPC error: (CANCELLED)", Issue.Severity.ERROR);
101+
Status status = GrpcStatuses.toStatus(io.grpc.Status.CANCELLED, "test");
102+
Issue issue = Issue.of("gRPC error: (CANCELLED) on test", Issue.Severity.ERROR);
103103
assertEquals(Status.of(StatusCode.CLIENT_CANCELLED).withIssues(issue), status);
104104
}
105105

106106
@Test
107107
public void statusUnimplemented() {
108-
Status status = GrpcStatuses.toStatus(io.grpc.Status.UNIMPLEMENTED);
109-
Issue issue = Issue.of("gRPC error: (UNIMPLEMENTED)", Issue.Severity.ERROR);
108+
Status status = GrpcStatuses.toStatus(io.grpc.Status.UNIMPLEMENTED, "test");
109+
Issue issue = Issue.of("gRPC error: (UNIMPLEMENTED) on test", Issue.Severity.ERROR);
110110
assertEquals(Status.of(StatusCode.CLIENT_CALL_UNIMPLEMENTED).withIssues(issue), status);
111111
}
112112

113113
@Test
114114
public void statusResourceExhausted() {
115-
Status status = GrpcStatuses.toStatus(io.grpc.Status.RESOURCE_EXHAUSTED);
116-
Issue issue = Issue.of("gRPC error: (RESOURCE_EXHAUSTED)", Issue.Severity.ERROR);
115+
Status status = GrpcStatuses.toStatus(io.grpc.Status.RESOURCE_EXHAUSTED, "test");
116+
Issue issue = Issue.of("gRPC error: (RESOURCE_EXHAUSTED) on test", Issue.Severity.ERROR);
117117
assertEquals(Status.of(StatusCode.CLIENT_RESOURCE_EXHAUSTED).withIssues(issue), status);
118118
}
119119

120120
@Test
121121
public void statusThrowable() {
122122
Throwable th = new RuntimeException("Hello");
123-
Status status = GrpcStatuses.toStatus(io.grpc.Status.RESOURCE_EXHAUSTED.withCause(th));
124-
Issue issue1 = Issue.of("gRPC error: (RESOURCE_EXHAUSTED)", Issue.Severity.ERROR);
123+
Status status = GrpcStatuses.toStatus(io.grpc.Status.RESOURCE_EXHAUSTED.withCause(th), "test");
124+
Issue issue1 = Issue.of("gRPC error: (RESOURCE_EXHAUSTED) on test", Issue.Severity.ERROR);
125125
Issue issue2 = Issue.of("java.lang.RuntimeException: Hello", Issue.Severity.ERROR);
126126
assertEquals(Status.of(StatusCode.CLIENT_RESOURCE_EXHAUSTED).withIssues(issue1, issue2).withCause(th), status);
127127
}

0 commit comments

Comments
 (0)