From 845dbc93aba6395844acda9287d94a602c06cd5f Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Mon, 20 Oct 2025 12:45:57 +0100 Subject: [PATCH] Added endpoint info to all GRPC errors --- .../java/tech/ydb/core/grpc/GrpcStatuses.java | 19 +++++----- .../tech/ydb/core/impl/BaseGrpcTransport.java | 22 ++++++----- .../ydb/core/impl/call/ReadStreamCall.java | 8 ++-- .../core/impl/call/ReadWriteStreamCall.java | 6 ++- .../tech/ydb/core/impl/call/UnaryCall.java | 6 ++- .../tech/ydb/core/grpc/GrpcStatusesTest.java | 38 +++++++++---------- 6 files changed, 54 insertions(+), 45 deletions(-) diff --git a/core/src/main/java/tech/ydb/core/grpc/GrpcStatuses.java b/core/src/main/java/tech/ydb/core/grpc/GrpcStatuses.java index b479adfc0..4dbef235d 100644 --- a/core/src/main/java/tech/ydb/core/grpc/GrpcStatuses.java +++ b/core/src/main/java/tech/ydb/core/grpc/GrpcStatuses.java @@ -16,16 +16,16 @@ public final class GrpcStatuses { private GrpcStatuses() { } - public static Result toResult(io.grpc.Status status) { + public static Result toResult(io.grpc.Status status, String endpoint) { assert !status.isOk(); - return Result.fail(toStatus(status)); + return Result.fail(toStatus(status, endpoint)); } - public static tech.ydb.core.Status toStatus(io.grpc.Status status) { + public static tech.ydb.core.Status toStatus(io.grpc.Status status, String endpoint) { if (status.isOk()) { return Status.SUCCESS; } - Issue message = Issue.of(getMessage(status), Issue.Severity.ERROR); + Issue message = Issue.of(getMessage(status, endpoint), Issue.Severity.ERROR); StatusCode code = getStatusCode(status.getCode()); Throwable cause = status.getCause(); @@ -35,17 +35,18 @@ public static tech.ydb.core.Status toStatus(io.grpc.Status status) { return Status.of(code, cause, message, Issue.of(cause.toString(), Issue.Severity.ERROR)); } - private static String getMessage(io.grpc.Status status) { + private static String getMessage(io.grpc.Status status, String endpoint) { if (status.getCode() == io.grpc.Status.Code.CANCELLED) { logger.debug("gRPC cancellation: {}, {}", status.getCode(), status.getDescription()); } else { logger.warn("gRPC issue: {}, {}", status.getCode(), status.getDescription()); } - String message = "gRPC error: (" + status.getCode() + ')'; - return status.getDescription() == null - ? message - : message + ' ' + status.getDescription(); + String message = "gRPC error: (" + status.getCode() + ") on " + endpoint; + if (status.getDescription() != null) { + message += ", " + status.getDescription(); + } + return message; } private static StatusCode getStatusCode(io.grpc.Status.Code code) { diff --git a/core/src/main/java/tech/ydb/core/impl/BaseGrpcTransport.java b/core/src/main/java/tech/ydb/core/impl/BaseGrpcTransport.java index ac77b9f26..c1d3c047c 100644 --- a/core/src/main/java/tech/ydb/core/impl/BaseGrpcTransport.java +++ b/core/src/main/java/tech/ydb/core/impl/BaseGrpcTransport.java @@ -21,7 +21,6 @@ import tech.ydb.core.grpc.GrpcReadStream; import tech.ydb.core.grpc.GrpcReadWriteStream; import tech.ydb.core.grpc.GrpcRequestSettings; -import tech.ydb.core.grpc.GrpcStatuses; import tech.ydb.core.grpc.GrpcTransport; import tech.ydb.core.grpc.YdbHeaders; import tech.ydb.core.impl.auth.AuthCallOptions; @@ -87,6 +86,7 @@ public CompletableFuture> unaryCall( try { GrpcChannel channel = getChannel(settings); + String endpoint = channel.getEndpoint().getHostAndPort(); ClientCall call = channel.getReadyChannel().newCall(method, options); ChannelStatusHandler handler = new ChannelStatusHandler(channel, settings); @@ -95,8 +95,8 @@ public CompletableFuture> unaryCall( traceId, method.getFullMethodName(), channel.getEndpoint().getHostAndPort() ); } - - return new UnaryCall<>(traceId, call, handler).startCall(request, makeMetadataFromSettings(settings)); + Metadata metadata = makeMetadataFromSettings(settings); + return new UnaryCall<>(traceId, endpoint, call, handler).startCall(request, metadata); } catch (UnexpectedResultException ex) { logger.warn("UnaryCall[{}] got unexpected status {}", traceId, ex.getStatus()); return CompletableFuture.completedFuture(Result.fail(ex)); @@ -122,13 +122,14 @@ public GrpcReadStream readStreamCall( if (settings.getDeadlineAfter() != 0) { final long now = System.nanoTime(); if (now >= settings.getDeadlineAfter()) { - return new EmptyStream<>(GrpcStatuses.toStatus(deadlineExpiredStatus(method, settings))); + return new EmptyStream<>(deadlineExpiredStatus(method, settings)); } options = options.withDeadlineAfter(settings.getDeadlineAfter() - now, TimeUnit.NANOSECONDS); } try { GrpcChannel channel = getChannel(settings); + String endpoint = channel.getEndpoint().getHostAndPort(); ClientCall call = channel.getReadyChannel().newCall(method, options); ChannelStatusHandler handler = new ChannelStatusHandler(channel, settings); @@ -140,7 +141,7 @@ public GrpcReadStream readStreamCall( Metadata metadata = makeMetadataFromSettings(settings); GrpcFlowControl flowCtrl = settings.getFlowControl(); - return new ReadStreamCall<>(traceId, call, flowCtrl, request, metadata, handler); + return new ReadStreamCall<>(traceId, endpoint, call, flowCtrl, request, metadata, handler); } catch (UnexpectedResultException ex) { logger.warn("ReadStreamCall[{}] got unexpected status {}", traceId, ex.getStatus()); return new EmptyStream<>(ex.getStatus()); @@ -167,15 +168,16 @@ public GrpcReadWriteStream readWriteStreamCall( if (settings.getDeadlineAfter() != 0) { final long now = System.nanoTime(); if (now >= settings.getDeadlineAfter()) { - return new EmptyStream<>(GrpcStatuses.toStatus(deadlineExpiredStatus(method, settings))); + return new EmptyStream<>(deadlineExpiredStatus(method, settings)); } options = options.withDeadlineAfter(settings.getDeadlineAfter() - now, TimeUnit.NANOSECONDS); } try { GrpcChannel channel = getChannel(settings); + String endpoint = channel.getEndpoint().getHostAndPort(); ClientCall call = channel.getReadyChannel().newCall(method, options); - ChannelStatusHandler handler = new ChannelStatusHandler(channel, settings); + ChannelStatusHandler hdlr = new ChannelStatusHandler(channel, settings); if (logger.isTraceEnabled()) { logger.trace("ReadWriteStreamCall[{}] with method {} and endpoint {} created", @@ -185,7 +187,7 @@ public GrpcReadWriteStream readWriteStreamCall( Metadata metadata = makeMetadataFromSettings(settings); GrpcFlowControl flowCtrl = settings.getFlowControl(); - return new ReadWriteStreamCall<>(traceId, call, flowCtrl, metadata, getAuthCallOptions(), handler); + return new ReadWriteStreamCall<>(traceId, endpoint, call, flowCtrl, metadata, getAuthCallOptions(), hdlr); } catch (UnexpectedResultException ex) { logger.warn("ReadWriteStreamCall[{}] got unexpected status {}", traceId, ex.getStatus()); return new EmptyStream<>(ex.getStatus()); @@ -203,10 +205,10 @@ private static Result deadlineExpiredResult(MethodDescriptor method return Result.fail(Status.of(StatusCode.CLIENT_DEADLINE_EXPIRED, Issue.of(message, Issue.Severity.ERROR))); } - private static io.grpc.Status deadlineExpiredStatus(MethodDescriptor method, GrpcRequestSettings settings) { + private static Status deadlineExpiredStatus(MethodDescriptor method, GrpcRequestSettings settings) { String message = "deadline expired before calling method " + method.getFullMethodName() + " with traceId " + settings.getTraceId(); - return io.grpc.Status.DEADLINE_EXCEEDED.withDescription(message); + return Status.of(StatusCode.CLIENT_DEADLINE_EXPIRED, Issue.of(message, Issue.Severity.ERROR)); } private Metadata makeMetadataFromSettings(GrpcRequestSettings settings) { diff --git a/core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java b/core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java index 82a475430..1d1ee65b8 100644 --- a/core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java +++ b/core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java @@ -30,6 +30,7 @@ public class ReadStreamCall extends ClientCall.Listener impl private static final Logger logger = LoggerFactory.getLogger(GrpcTransport.class); private final String traceId; + private final String endpoint; private final ClientCall call; private final Lock callLock = new ReentrantLock(); private final GrpcStatusHandler statusConsumer; @@ -41,9 +42,10 @@ public class ReadStreamCall extends ClientCall.Listener impl private Observer consumer; - public ReadStreamCall(String traceId, ClientCall call, GrpcFlowControl flowCtrl, ReqT req, - Metadata headers, GrpcStatusHandler statusHandler) { + public ReadStreamCall(String traceId, String endpoint, ClientCall call, GrpcFlowControl flowCtrl, + ReqT req, Metadata headers, GrpcStatusHandler statusHandler) { this.traceId = traceId; + this.endpoint = endpoint; this.call = call; this.request = req; this.headers = headers; @@ -142,7 +144,7 @@ public void onClose(io.grpc.Status status, @Nullable Metadata trailers) { if (status.isOk()) { statusFuture.complete(Status.SUCCESS); } else { - statusFuture.complete(GrpcStatuses.toStatus(status)); + statusFuture.complete(GrpcStatuses.toStatus(status, endpoint)); } statusConsumer.postComplete(); diff --git a/core/src/main/java/tech/ydb/core/impl/call/ReadWriteStreamCall.java b/core/src/main/java/tech/ydb/core/impl/call/ReadWriteStreamCall.java index 882ad83cf..013522b01 100644 --- a/core/src/main/java/tech/ydb/core/impl/call/ReadWriteStreamCall.java +++ b/core/src/main/java/tech/ydb/core/impl/call/ReadWriteStreamCall.java @@ -35,6 +35,7 @@ public class ReadWriteStreamCall extends ClientCall.Listener implements private static final Logger logger = LoggerFactory.getLogger(GrpcTransport.class); private final String traceId; + private final String endpoint; private final ClientCall call; private final Lock callLock = new ReentrantLock(); private final GrpcStatusHandler statusConsumer; @@ -47,9 +48,10 @@ public class ReadWriteStreamCall extends ClientCall.Listener implements private final CompletableFuture statusFuture = new CompletableFuture<>(); private Observer consumer = null; - public ReadWriteStreamCall(String traceId, ClientCall call, GrpcFlowControl flowCtrl, + public ReadWriteStreamCall(String traceId, String endpoint, ClientCall call, GrpcFlowControl flowCtrl, Metadata headers, AuthCallOptions options, GrpcStatusHandler statusHandler) { this.traceId = traceId; + this.endpoint = endpoint; this.call = call; this.headers = headers; this.statusConsumer = statusHandler; @@ -204,7 +206,7 @@ public void onClose(io.grpc.Status status, @Nullable Metadata trailers) { if (status.isOk()) { statusFuture.complete(Status.SUCCESS); } else { - statusFuture.complete(GrpcStatuses.toStatus(status)); + statusFuture.complete(GrpcStatuses.toStatus(status, endpoint)); } statusConsumer.postComplete(); diff --git a/core/src/main/java/tech/ydb/core/impl/call/UnaryCall.java b/core/src/main/java/tech/ydb/core/impl/call/UnaryCall.java index 9c4943982..7f74196f6 100644 --- a/core/src/main/java/tech/ydb/core/impl/call/UnaryCall.java +++ b/core/src/main/java/tech/ydb/core/impl/call/UnaryCall.java @@ -36,14 +36,16 @@ public class UnaryCall extends ClientCall.Listener { .withIssues(Issue.of("More than one value received for gRPC unary call", Issue.Severity.ERROR)); private final String traceId; + private final String endpoint; private final ClientCall call; private final GrpcStatusHandler statusConsumer; private final CompletableFuture> future = new CompletableFuture<>(); private final AtomicReference value = new AtomicReference<>(); - public UnaryCall(String traceId, ClientCall call, GrpcStatusHandler statusConsumer) { + public UnaryCall(String traceId, String endpoint, ClientCall call, GrpcStatusHandler statusConsumer) { this.traceId = traceId; + this.endpoint = endpoint; this.call = call; this.statusConsumer = statusConsumer; } @@ -103,7 +105,7 @@ public void onClose(io.grpc.Status status, @Nullable Metadata trailers) { future.complete(Result.success(snapshotValue)); } } else { - future.complete(GrpcStatuses.toResult(status)); + future.complete(GrpcStatuses.toResult(status, endpoint)); } statusConsumer.postComplete(); diff --git a/core/src/test/java/tech/ydb/core/grpc/GrpcStatusesTest.java b/core/src/test/java/tech/ydb/core/grpc/GrpcStatusesTest.java index eea6b0c4e..cea07f59c 100644 --- a/core/src/test/java/tech/ydb/core/grpc/GrpcStatusesTest.java +++ b/core/src/test/java/tech/ydb/core/grpc/GrpcStatusesTest.java @@ -30,12 +30,12 @@ static class MyException extends RuntimeException { @Test public void error() { - Result result = GrpcStatuses.toResult(io.grpc.Status.INTERNAL.withDescription("error description")); + Result result = GrpcStatuses.toResult(io.grpc.Status.INTERNAL.withDescription("error description"), "test"); assertFalse(result.isSuccess()); assertEquals(StatusCode.CLIENT_GRPC_ERROR, result.getStatus().getCode()); assertArrayEquals(new Issue[] { - Issue.of("gRPC error: (INTERNAL) error description", Issue.Severity.ERROR) + Issue.of("gRPC error: (INTERNAL) on test, error description", Issue.Severity.ERROR) }, result.getStatus().getIssues()); try { @@ -50,12 +50,12 @@ public void error() { public void errorWithCause() { Result result = GrpcStatuses.toResult(io.grpc.Status.INTERNAL .withDescription("error description") - .withCause(new MyException("exception message"))); + .withCause(new MyException("exception message")), "test"); assertFalse(result.isSuccess()); assertEquals(StatusCode.CLIENT_GRPC_ERROR, result.getStatus().getCode()); assertArrayEquals(new Issue[] { - Issue.of("gRPC error: (INTERNAL) error description", Issue.Severity.ERROR), + Issue.of("gRPC error: (INTERNAL) on test, error description", Issue.Severity.ERROR), Issue.of(MyException.class.getName() + ": exception message", Issue.Severity.ERROR) }, result.getStatus().getIssues()); @@ -71,57 +71,57 @@ public void errorWithCause() { @Test public void statusOk() { - Status status = GrpcStatuses.toStatus(io.grpc.Status.OK); + Status status = GrpcStatuses.toStatus(io.grpc.Status.OK, "test"); assertEquals(Status.SUCCESS, status); } @Test public void statusDeadlineExceeded() { - Status status = GrpcStatuses.toStatus(io.grpc.Status.DEADLINE_EXCEEDED); - Issue issue = Issue.of("gRPC error: (DEADLINE_EXCEEDED)", Issue.Severity.ERROR); + Status status = GrpcStatuses.toStatus(io.grpc.Status.DEADLINE_EXCEEDED, "test"); + Issue issue = Issue.of("gRPC error: (DEADLINE_EXCEEDED) on test", Issue.Severity.ERROR); assertEquals(Status.of(StatusCode.CLIENT_DEADLINE_EXCEEDED).withIssues(issue), status); } @Test public void statusUnavailable() { - Status status = GrpcStatuses.toStatus(io.grpc.Status.UNAVAILABLE); - Issue issue = Issue.of("gRPC error: (UNAVAILABLE)", Issue.Severity.ERROR); + Status status = GrpcStatuses.toStatus(io.grpc.Status.UNAVAILABLE, "test"); + Issue issue = Issue.of("gRPC error: (UNAVAILABLE) on test", Issue.Severity.ERROR); assertEquals(Status.of(StatusCode.TRANSPORT_UNAVAILABLE).withIssues(issue), status); } @Test public void statusUnauthenticated() { - Status status = GrpcStatuses.toStatus(io.grpc.Status.UNAUTHENTICATED); - Issue issue = Issue.of("gRPC error: (UNAUTHENTICATED)", Issue.Severity.ERROR); + Status status = GrpcStatuses.toStatus(io.grpc.Status.UNAUTHENTICATED, "test"); + Issue issue = Issue.of("gRPC error: (UNAUTHENTICATED) on test", Issue.Severity.ERROR); assertEquals(Status.of(StatusCode.CLIENT_UNAUTHENTICATED).withIssues(issue), status); } @Test public void statusCancelled() { - Status status = GrpcStatuses.toStatus(io.grpc.Status.CANCELLED); - Issue issue = Issue.of("gRPC error: (CANCELLED)", Issue.Severity.ERROR); + Status status = GrpcStatuses.toStatus(io.grpc.Status.CANCELLED, "test"); + Issue issue = Issue.of("gRPC error: (CANCELLED) on test", Issue.Severity.ERROR); assertEquals(Status.of(StatusCode.CLIENT_CANCELLED).withIssues(issue), status); } @Test public void statusUnimplemented() { - Status status = GrpcStatuses.toStatus(io.grpc.Status.UNIMPLEMENTED); - Issue issue = Issue.of("gRPC error: (UNIMPLEMENTED)", Issue.Severity.ERROR); + Status status = GrpcStatuses.toStatus(io.grpc.Status.UNIMPLEMENTED, "test"); + Issue issue = Issue.of("gRPC error: (UNIMPLEMENTED) on test", Issue.Severity.ERROR); assertEquals(Status.of(StatusCode.CLIENT_CALL_UNIMPLEMENTED).withIssues(issue), status); } @Test public void statusResourceExhausted() { - Status status = GrpcStatuses.toStatus(io.grpc.Status.RESOURCE_EXHAUSTED); - Issue issue = Issue.of("gRPC error: (RESOURCE_EXHAUSTED)", Issue.Severity.ERROR); + Status status = GrpcStatuses.toStatus(io.grpc.Status.RESOURCE_EXHAUSTED, "test"); + Issue issue = Issue.of("gRPC error: (RESOURCE_EXHAUSTED) on test", Issue.Severity.ERROR); assertEquals(Status.of(StatusCode.CLIENT_RESOURCE_EXHAUSTED).withIssues(issue), status); } @Test public void statusThrowable() { Throwable th = new RuntimeException("Hello"); - Status status = GrpcStatuses.toStatus(io.grpc.Status.RESOURCE_EXHAUSTED.withCause(th)); - Issue issue1 = Issue.of("gRPC error: (RESOURCE_EXHAUSTED)", Issue.Severity.ERROR); + Status status = GrpcStatuses.toStatus(io.grpc.Status.RESOURCE_EXHAUSTED.withCause(th), "test"); + Issue issue1 = Issue.of("gRPC error: (RESOURCE_EXHAUSTED) on test", Issue.Severity.ERROR); Issue issue2 = Issue.of("java.lang.RuntimeException: Hello", Issue.Severity.ERROR); assertEquals(Status.of(StatusCode.CLIENT_RESOURCE_EXHAUSTED).withIssues(issue1, issue2).withCause(th), status); }