Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions core/src/main/java/tech/ydb/core/grpc/GrpcStatuses.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ public final class GrpcStatuses {

private GrpcStatuses() { }

public static <T> Result<T> toResult(io.grpc.Status status) {
public static <T> Result<T> toResult(io.grpc.Status status, String endpoint) {
assert !status.isOk();
return Result.fail(toStatus(status));
return Result.fail(toStatus(status, endpoint));
}

public static tech.ydb.core.Status toStatus(io.grpc.Status status) {
public static tech.ydb.core.Status toStatus(io.grpc.Status status, String endpoint) {
if (status.isOk()) {
return Status.SUCCESS;
}
Issue message = Issue.of(getMessage(status), Issue.Severity.ERROR);
Issue message = Issue.of(getMessage(status, endpoint), Issue.Severity.ERROR);
StatusCode code = getStatusCode(status.getCode());
Throwable cause = status.getCause();

Expand All @@ -35,17 +35,18 @@ public static tech.ydb.core.Status toStatus(io.grpc.Status status) {
return Status.of(code, cause, message, Issue.of(cause.toString(), Issue.Severity.ERROR));
}

private static String getMessage(io.grpc.Status status) {
private static String getMessage(io.grpc.Status status, String endpoint) {
if (status.getCode() == io.grpc.Status.Code.CANCELLED) {
logger.debug("gRPC cancellation: {}, {}", status.getCode(), status.getDescription());
} else {
logger.warn("gRPC issue: {}, {}", status.getCode(), status.getDescription());
}

String message = "gRPC error: (" + status.getCode() + ')';
return status.getDescription() == null
? message
: message + ' ' + status.getDescription();
String message = "gRPC error: (" + status.getCode() + ") on " + endpoint;
if (status.getDescription() != null) {
message += ", " + status.getDescription();
}
return message;
}

private static StatusCode getStatusCode(io.grpc.Status.Code code) {
Expand Down
22 changes: 12 additions & 10 deletions core/src/main/java/tech/ydb/core/impl/BaseGrpcTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import tech.ydb.core.grpc.GrpcReadStream;
import tech.ydb.core.grpc.GrpcReadWriteStream;
import tech.ydb.core.grpc.GrpcRequestSettings;
import tech.ydb.core.grpc.GrpcStatuses;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.core.grpc.YdbHeaders;
import tech.ydb.core.impl.auth.AuthCallOptions;
Expand Down Expand Up @@ -87,6 +86,7 @@ public <ReqT, RespT> CompletableFuture<Result<RespT>> unaryCall(

try {
GrpcChannel channel = getChannel(settings);
String endpoint = channel.getEndpoint().getHostAndPort();
ClientCall<ReqT, RespT> call = channel.getReadyChannel().newCall(method, options);
ChannelStatusHandler handler = new ChannelStatusHandler(channel, settings);

Expand All @@ -95,8 +95,8 @@ public <ReqT, RespT> CompletableFuture<Result<RespT>> unaryCall(
traceId, method.getFullMethodName(), channel.getEndpoint().getHostAndPort()
);
}

return new UnaryCall<>(traceId, call, handler).startCall(request, makeMetadataFromSettings(settings));
Metadata metadata = makeMetadataFromSettings(settings);
return new UnaryCall<>(traceId, endpoint, call, handler).startCall(request, metadata);
} catch (UnexpectedResultException ex) {
logger.warn("UnaryCall[{}] got unexpected status {}", traceId, ex.getStatus());
return CompletableFuture.completedFuture(Result.fail(ex));
Expand All @@ -122,13 +122,14 @@ public <ReqT, RespT> GrpcReadStream<RespT> readStreamCall(
if (settings.getDeadlineAfter() != 0) {
final long now = System.nanoTime();
if (now >= settings.getDeadlineAfter()) {
return new EmptyStream<>(GrpcStatuses.toStatus(deadlineExpiredStatus(method, settings)));
return new EmptyStream<>(deadlineExpiredStatus(method, settings));
}
options = options.withDeadlineAfter(settings.getDeadlineAfter() - now, TimeUnit.NANOSECONDS);
}

try {
GrpcChannel channel = getChannel(settings);
String endpoint = channel.getEndpoint().getHostAndPort();
ClientCall<ReqT, RespT> call = channel.getReadyChannel().newCall(method, options);
ChannelStatusHandler handler = new ChannelStatusHandler(channel, settings);

Expand All @@ -140,7 +141,7 @@ public <ReqT, RespT> GrpcReadStream<RespT> readStreamCall(

Metadata metadata = makeMetadataFromSettings(settings);
GrpcFlowControl flowCtrl = settings.getFlowControl();
return new ReadStreamCall<>(traceId, call, flowCtrl, request, metadata, handler);
return new ReadStreamCall<>(traceId, endpoint, call, flowCtrl, request, metadata, handler);
} catch (UnexpectedResultException ex) {
logger.warn("ReadStreamCall[{}] got unexpected status {}", traceId, ex.getStatus());
return new EmptyStream<>(ex.getStatus());
Expand All @@ -167,15 +168,16 @@ public <ReqT, RespT> GrpcReadWriteStream<RespT, ReqT> readWriteStreamCall(
if (settings.getDeadlineAfter() != 0) {
final long now = System.nanoTime();
if (now >= settings.getDeadlineAfter()) {
return new EmptyStream<>(GrpcStatuses.toStatus(deadlineExpiredStatus(method, settings)));
return new EmptyStream<>(deadlineExpiredStatus(method, settings));
}
options = options.withDeadlineAfter(settings.getDeadlineAfter() - now, TimeUnit.NANOSECONDS);
}

try {
GrpcChannel channel = getChannel(settings);
String endpoint = channel.getEndpoint().getHostAndPort();
ClientCall<ReqT, RespT> call = channel.getReadyChannel().newCall(method, options);
ChannelStatusHandler handler = new ChannelStatusHandler(channel, settings);
ChannelStatusHandler hdlr = new ChannelStatusHandler(channel, settings);

if (logger.isTraceEnabled()) {
logger.trace("ReadWriteStreamCall[{}] with method {} and endpoint {} created",
Expand All @@ -185,7 +187,7 @@ public <ReqT, RespT> GrpcReadWriteStream<RespT, ReqT> readWriteStreamCall(

Metadata metadata = makeMetadataFromSettings(settings);
GrpcFlowControl flowCtrl = settings.getFlowControl();
return new ReadWriteStreamCall<>(traceId, call, flowCtrl, metadata, getAuthCallOptions(), handler);
return new ReadWriteStreamCall<>(traceId, endpoint, call, flowCtrl, metadata, getAuthCallOptions(), hdlr);
} catch (UnexpectedResultException ex) {
logger.warn("ReadWriteStreamCall[{}] got unexpected status {}", traceId, ex.getStatus());
return new EmptyStream<>(ex.getStatus());
Expand All @@ -203,10 +205,10 @@ private static <T> Result<T> deadlineExpiredResult(MethodDescriptor<?, T> method
return Result.fail(Status.of(StatusCode.CLIENT_DEADLINE_EXPIRED, Issue.of(message, Issue.Severity.ERROR)));
}

private static io.grpc.Status deadlineExpiredStatus(MethodDescriptor<?, ?> method, GrpcRequestSettings settings) {
private static Status deadlineExpiredStatus(MethodDescriptor<?, ?> method, GrpcRequestSettings settings) {
String message = "deadline expired before calling method " + method.getFullMethodName() + " with traceId " +
settings.getTraceId();
return io.grpc.Status.DEADLINE_EXCEEDED.withDescription(message);
return Status.of(StatusCode.CLIENT_DEADLINE_EXPIRED, Issue.of(message, Issue.Severity.ERROR));
}

private Metadata makeMetadataFromSettings(GrpcRequestSettings settings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class ReadStreamCall<ReqT, RespT> extends ClientCall.Listener<RespT> impl
private static final Logger logger = LoggerFactory.getLogger(GrpcTransport.class);

private final String traceId;
private final String endpoint;
private final ClientCall<ReqT, RespT> call;
private final Lock callLock = new ReentrantLock();
private final GrpcStatusHandler statusConsumer;
Expand All @@ -41,9 +42,10 @@ public class ReadStreamCall<ReqT, RespT> extends ClientCall.Listener<RespT> impl

private Observer<RespT> consumer;

public ReadStreamCall(String traceId, ClientCall<ReqT, RespT> call, GrpcFlowControl flowCtrl, ReqT req,
Metadata headers, GrpcStatusHandler statusHandler) {
public ReadStreamCall(String traceId, String endpoint, ClientCall<ReqT, RespT> call, GrpcFlowControl flowCtrl,
ReqT req, Metadata headers, GrpcStatusHandler statusHandler) {
this.traceId = traceId;
this.endpoint = endpoint;
this.call = call;
this.request = req;
this.headers = headers;
Expand Down Expand Up @@ -142,7 +144,7 @@ public void onClose(io.grpc.Status status, @Nullable Metadata trailers) {
if (status.isOk()) {
statusFuture.complete(Status.SUCCESS);
} else {
statusFuture.complete(GrpcStatuses.toStatus(status));
statusFuture.complete(GrpcStatuses.toStatus(status, endpoint));
}

statusConsumer.postComplete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class ReadWriteStreamCall<R, W> extends ClientCall.Listener<R> implements
private static final Logger logger = LoggerFactory.getLogger(GrpcTransport.class);

private final String traceId;
private final String endpoint;
private final ClientCall<W, R> call;
private final Lock callLock = new ReentrantLock();
private final GrpcStatusHandler statusConsumer;
Expand All @@ -47,9 +48,10 @@ public class ReadWriteStreamCall<R, W> extends ClientCall.Listener<R> implements
private final CompletableFuture<Status> statusFuture = new CompletableFuture<>();
private Observer<R> consumer = null;

public ReadWriteStreamCall(String traceId, ClientCall<W, R> call, GrpcFlowControl flowCtrl,
public ReadWriteStreamCall(String traceId, String endpoint, ClientCall<W, R> call, GrpcFlowControl flowCtrl,
Metadata headers, AuthCallOptions options, GrpcStatusHandler statusHandler) {
this.traceId = traceId;
this.endpoint = endpoint;
this.call = call;
this.headers = headers;
this.statusConsumer = statusHandler;
Expand Down Expand Up @@ -204,7 +206,7 @@ public void onClose(io.grpc.Status status, @Nullable Metadata trailers) {
if (status.isOk()) {
statusFuture.complete(Status.SUCCESS);
} else {
statusFuture.complete(GrpcStatuses.toStatus(status));
statusFuture.complete(GrpcStatuses.toStatus(status, endpoint));
}

statusConsumer.postComplete();
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/java/tech/ydb/core/impl/call/UnaryCall.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,16 @@ public class UnaryCall<ReqT, RespT> extends ClientCall.Listener<RespT> {
.withIssues(Issue.of("More than one value received for gRPC unary call", Issue.Severity.ERROR));

private final String traceId;
private final String endpoint;
private final ClientCall<ReqT, RespT> call;
private final GrpcStatusHandler statusConsumer;

private final CompletableFuture<Result<RespT>> future = new CompletableFuture<>();
private final AtomicReference<RespT> value = new AtomicReference<>();

public UnaryCall(String traceId, ClientCall<ReqT, RespT> call, GrpcStatusHandler statusConsumer) {
public UnaryCall(String traceId, String endpoint, ClientCall<ReqT, RespT> call, GrpcStatusHandler statusConsumer) {
this.traceId = traceId;
this.endpoint = endpoint;
this.call = call;
this.statusConsumer = statusConsumer;
}
Expand Down Expand Up @@ -103,7 +105,7 @@ public void onClose(io.grpc.Status status, @Nullable Metadata trailers) {
future.complete(Result.success(snapshotValue));
}
} else {
future.complete(GrpcStatuses.toResult(status));
future.complete(GrpcStatuses.toResult(status, endpoint));
}

statusConsumer.postComplete();
Expand Down
38 changes: 19 additions & 19 deletions core/src/test/java/tech/ydb/core/grpc/GrpcStatusesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ static class MyException extends RuntimeException {

@Test
public void error() {
Result<?> result = GrpcStatuses.toResult(io.grpc.Status.INTERNAL.withDescription("error description"));
Result<?> result = GrpcStatuses.toResult(io.grpc.Status.INTERNAL.withDescription("error description"), "test");

assertFalse(result.isSuccess());
assertEquals(StatusCode.CLIENT_GRPC_ERROR, result.getStatus().getCode());
assertArrayEquals(new Issue[] {
Issue.of("gRPC error: (INTERNAL) error description", Issue.Severity.ERROR)
Issue.of("gRPC error: (INTERNAL) on test, error description", Issue.Severity.ERROR)
}, result.getStatus().getIssues());

try {
Expand All @@ -50,12 +50,12 @@ public void error() {
public void errorWithCause() {
Result<?> result = GrpcStatuses.toResult(io.grpc.Status.INTERNAL
.withDescription("error description")
.withCause(new MyException("exception message")));
.withCause(new MyException("exception message")), "test");

assertFalse(result.isSuccess());
assertEquals(StatusCode.CLIENT_GRPC_ERROR, result.getStatus().getCode());
assertArrayEquals(new Issue[] {
Issue.of("gRPC error: (INTERNAL) error description", Issue.Severity.ERROR),
Issue.of("gRPC error: (INTERNAL) on test, error description", Issue.Severity.ERROR),
Issue.of(MyException.class.getName() + ": exception message", Issue.Severity.ERROR)
}, result.getStatus().getIssues());

Expand All @@ -71,57 +71,57 @@ public void errorWithCause() {

@Test
public void statusOk() {
Status status = GrpcStatuses.toStatus(io.grpc.Status.OK);
Status status = GrpcStatuses.toStatus(io.grpc.Status.OK, "test");
assertEquals(Status.SUCCESS, status);
}

@Test
public void statusDeadlineExceeded() {
Status status = GrpcStatuses.toStatus(io.grpc.Status.DEADLINE_EXCEEDED);
Issue issue = Issue.of("gRPC error: (DEADLINE_EXCEEDED)", Issue.Severity.ERROR);
Status status = GrpcStatuses.toStatus(io.grpc.Status.DEADLINE_EXCEEDED, "test");
Issue issue = Issue.of("gRPC error: (DEADLINE_EXCEEDED) on test", Issue.Severity.ERROR);
assertEquals(Status.of(StatusCode.CLIENT_DEADLINE_EXCEEDED).withIssues(issue), status);
}

@Test
public void statusUnavailable() {
Status status = GrpcStatuses.toStatus(io.grpc.Status.UNAVAILABLE);
Issue issue = Issue.of("gRPC error: (UNAVAILABLE)", Issue.Severity.ERROR);
Status status = GrpcStatuses.toStatus(io.grpc.Status.UNAVAILABLE, "test");
Issue issue = Issue.of("gRPC error: (UNAVAILABLE) on test", Issue.Severity.ERROR);
assertEquals(Status.of(StatusCode.TRANSPORT_UNAVAILABLE).withIssues(issue), status);
}

@Test
public void statusUnauthenticated() {
Status status = GrpcStatuses.toStatus(io.grpc.Status.UNAUTHENTICATED);
Issue issue = Issue.of("gRPC error: (UNAUTHENTICATED)", Issue.Severity.ERROR);
Status status = GrpcStatuses.toStatus(io.grpc.Status.UNAUTHENTICATED, "test");
Issue issue = Issue.of("gRPC error: (UNAUTHENTICATED) on test", Issue.Severity.ERROR);
assertEquals(Status.of(StatusCode.CLIENT_UNAUTHENTICATED).withIssues(issue), status);
}

@Test
public void statusCancelled() {
Status status = GrpcStatuses.toStatus(io.grpc.Status.CANCELLED);
Issue issue = Issue.of("gRPC error: (CANCELLED)", Issue.Severity.ERROR);
Status status = GrpcStatuses.toStatus(io.grpc.Status.CANCELLED, "test");
Issue issue = Issue.of("gRPC error: (CANCELLED) on test", Issue.Severity.ERROR);
assertEquals(Status.of(StatusCode.CLIENT_CANCELLED).withIssues(issue), status);
}

@Test
public void statusUnimplemented() {
Status status = GrpcStatuses.toStatus(io.grpc.Status.UNIMPLEMENTED);
Issue issue = Issue.of("gRPC error: (UNIMPLEMENTED)", Issue.Severity.ERROR);
Status status = GrpcStatuses.toStatus(io.grpc.Status.UNIMPLEMENTED, "test");
Issue issue = Issue.of("gRPC error: (UNIMPLEMENTED) on test", Issue.Severity.ERROR);
assertEquals(Status.of(StatusCode.CLIENT_CALL_UNIMPLEMENTED).withIssues(issue), status);
}

@Test
public void statusResourceExhausted() {
Status status = GrpcStatuses.toStatus(io.grpc.Status.RESOURCE_EXHAUSTED);
Issue issue = Issue.of("gRPC error: (RESOURCE_EXHAUSTED)", Issue.Severity.ERROR);
Status status = GrpcStatuses.toStatus(io.grpc.Status.RESOURCE_EXHAUSTED, "test");
Issue issue = Issue.of("gRPC error: (RESOURCE_EXHAUSTED) on test", Issue.Severity.ERROR);
assertEquals(Status.of(StatusCode.CLIENT_RESOURCE_EXHAUSTED).withIssues(issue), status);
}

@Test
public void statusThrowable() {
Throwable th = new RuntimeException("Hello");
Status status = GrpcStatuses.toStatus(io.grpc.Status.RESOURCE_EXHAUSTED.withCause(th));
Issue issue1 = Issue.of("gRPC error: (RESOURCE_EXHAUSTED)", Issue.Severity.ERROR);
Status status = GrpcStatuses.toStatus(io.grpc.Status.RESOURCE_EXHAUSTED.withCause(th), "test");
Issue issue1 = Issue.of("gRPC error: (RESOURCE_EXHAUSTED) on test", Issue.Severity.ERROR);
Issue issue2 = Issue.of("java.lang.RuntimeException: Hello", Issue.Severity.ERROR);
assertEquals(Status.of(StatusCode.CLIENT_RESOURCE_EXHAUSTED).withIssues(issue1, issue2).withCause(th), status);
}
Expand Down