From 7ba41bb7ec4f3b33653ac73c1ac9467d8e569e25 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Tue, 24 Sep 2024 10:51:42 +0100 Subject: [PATCH 1/3] Removed deprecated class Async --- .../java/tech/ydb/core/impl/YdbDiscovery.java | 4 +- .../main/java/tech/ydb/core/utils/Async.java | 81 ------------------- .../java/tech/ydb/core/utils/FutureTools.java | 26 ++++++ .../java/tech/ydb/query/impl/SessionPool.java | 4 +- .../ydb/query/tools/SessionRetryContext.java | 12 +-- .../tech/ydb/table/SessionRetryContext.java | 13 ++- .../tech/ydb/table/SessionRetryHandler.java | 4 +- .../tech/ydb/table/impl/pool/SessionPool.java | 4 +- .../ydb/table/SessionRetryContextTest.java | 20 ++--- .../test/java/tech/ydb/table/SessionStub.java | 6 +- .../java/tech/ydb/table/TableRpcStub.java | 6 +- 11 files changed, 60 insertions(+), 120 deletions(-) delete mode 100644 core/src/main/java/tech/ydb/core/utils/Async.java create mode 100644 core/src/main/java/tech/ydb/core/utils/FutureTools.java diff --git a/core/src/main/java/tech/ydb/core/impl/YdbDiscovery.java b/core/src/main/java/tech/ydb/core/impl/YdbDiscovery.java index 9a858918f..6295f89f0 100644 --- a/core/src/main/java/tech/ydb/core/impl/YdbDiscovery.java +++ b/core/src/main/java/tech/ydb/core/impl/YdbDiscovery.java @@ -21,7 +21,7 @@ import tech.ydb.core.grpc.GrpcTransport; import tech.ydb.core.impl.pool.EndpointRecord; import tech.ydb.core.operation.OperationBinder; -import tech.ydb.core.utils.Async; +import tech.ydb.core.utils.FutureTools; import tech.ydb.proto.discovery.DiscoveryProtos; import tech.ydb.proto.discovery.v1.DiscoveryServiceGrpc; @@ -183,7 +183,7 @@ private void handleOk(String selfLocation, List endpoints) { private void handleDiscoveryResult(Result response, Throwable th) { if (th != null) { - Throwable cause = Async.unwrapCompletionException(th); + Throwable cause = FutureTools.unwrapCompletionException(th); logger.warn("couldn't perform discovery with exception", cause); handleThrowable(cause); return; diff --git a/core/src/main/java/tech/ydb/core/utils/Async.java b/core/src/main/java/tech/ydb/core/utils/Async.java deleted file mode 100644 index 74785983a..000000000 --- a/core/src/main/java/tech/ydb/core/utils/Async.java +++ /dev/null @@ -1,81 +0,0 @@ -package tech.ydb.core.utils; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.TimeUnit; -import java.util.function.BiFunction; -import java.util.function.Function; -import java.util.function.Supplier; - -import io.grpc.netty.shaded.io.netty.util.HashedWheelTimer; -import io.grpc.netty.shaded.io.netty.util.Timeout; -import io.grpc.netty.shaded.io.netty.util.Timer; -import io.grpc.netty.shaded.io.netty.util.TimerTask; -import io.grpc.netty.shaded.io.netty.util.internal.SystemPropertyUtil; - - - -/** - * @author Sergey Polovko - */ -public class Async { - private static final boolean DEFAULT_TIMER_THREAD_DAEMON = - SystemPropertyUtil.getBoolean("tech.ydb.table.async.daemon", true); - - private static final Timer DEFAULT_TIMER = new HashedWheelTimer( - r -> { - Thread t = new Thread(r); - t.setDaemon(DEFAULT_TIMER_THREAD_DAEMON); - t.setName("YdbAsyncTimer"); - return t; - }, - 100, TimeUnit.MILLISECONDS); - - protected Async() { } - - public static CompletableFuture failedFuture(Throwable t) { - CompletableFuture f = new CompletableFuture<>(); - f.completeExceptionally(t); - return f; - } - - @Deprecated - public static CompletableFuture safeCall(Supplier> fn) { - try { - return fn.get(); - } catch (Throwable ex) { - return failedFuture(ex); - } - } - - @Deprecated - public static CompletableFuture safeCall(T t, Function> fn) { - try { - return fn.apply(t); - } catch (Throwable ex) { - return failedFuture(ex); - } - } - - @Deprecated - public static CompletableFuture safeCall(T t, U u, BiFunction> fn) { - try { - return fn.apply(t, u); - } catch (Throwable ex) { - return failedFuture(ex); - } - } - - @Deprecated - public static Timeout runAfter(TimerTask task, long delay, TimeUnit unit) { - return DEFAULT_TIMER.newTimeout(task, delay, unit); - } - - public static Throwable unwrapCompletionException(Throwable throwable) { - Throwable cause = throwable; - while (cause instanceof CompletionException && cause.getCause() != null) { - cause = cause.getCause(); - } - return cause; - } -} diff --git a/core/src/main/java/tech/ydb/core/utils/FutureTools.java b/core/src/main/java/tech/ydb/core/utils/FutureTools.java new file mode 100644 index 000000000..43df0fca7 --- /dev/null +++ b/core/src/main/java/tech/ydb/core/utils/FutureTools.java @@ -0,0 +1,26 @@ +package tech.ydb.core.utils; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +/** + * + * @author Aleksandr Gorshenin + */ +public class FutureTools { + private FutureTools() { } + + public static Throwable unwrapCompletionException(Throwable throwable) { + Throwable cause = throwable; + while (cause instanceof CompletionException && cause.getCause() != null) { + cause = cause.getCause(); + } + return cause; + } + + public static CompletableFuture failedFuture(Throwable t) { + CompletableFuture f = new CompletableFuture<>(); + f.completeExceptionally(t); + return f; + } +} diff --git a/query/src/main/java/tech/ydb/query/impl/SessionPool.java b/query/src/main/java/tech/ydb/query/impl/SessionPool.java index ab316a77a..be136ee53 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionPool.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionPool.java @@ -21,7 +21,7 @@ import tech.ydb.core.StatusCode; import tech.ydb.core.UnexpectedResultException; import tech.ydb.core.grpc.GrpcReadStream; -import tech.ydb.core.utils.Async; +import tech.ydb.core.utils.FutureTools; import tech.ydb.proto.query.YdbQuery; import tech.ydb.query.QuerySession; import tech.ydb.query.settings.AttachSessionSettings; @@ -125,7 +125,7 @@ private boolean pollNext(CompletableFuture> future) { return; } - Throwable ex = Async.unwrapCompletionException(th); + Throwable ex = FutureTools.unwrapCompletionException(th); if (ex instanceof UnexpectedResultException) { future.complete(Result.fail((UnexpectedResultException) ex)); } else { diff --git a/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java b/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java index b90fc2a5a..19ee7a695 100644 --- a/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java +++ b/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java @@ -1,7 +1,6 @@ package tech.ydb.query.tools; import java.time.Duration; -import java.time.Instant; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -20,7 +19,7 @@ import tech.ydb.core.Status; import tech.ydb.core.StatusCode; import tech.ydb.core.UnexpectedResultException; -import tech.ydb.core.utils.Async; +import tech.ydb.core.utils.FutureTools; import tech.ydb.query.QueryClient; import tech.ydb.query.QuerySession; @@ -76,7 +75,7 @@ private boolean canRetry(StatusCode code) { } private boolean canRetry(Throwable t) { - Throwable cause = Async.unwrapCompletionException(t); + Throwable cause = FutureTools.unwrapCompletionException(t); if (cause instanceof UnexpectedResultException) { StatusCode statusCode = ((UnexpectedResultException) cause).getStatus().getCode(); return canRetry(statusCode); @@ -122,7 +121,7 @@ private long backoffTimeMillis(StatusCode code, int retryNumber) { } private long backoffTimeMillis(Throwable t, int retryNumber) { - Throwable cause = Async.unwrapCompletionException(t); + Throwable cause = FutureTools.unwrapCompletionException(t); if (cause instanceof UnexpectedResultException) { StatusCode statusCode = ((UnexpectedResultException) cause).getStatus().getCode(); return backoffTimeMillis(statusCode, retryNumber); @@ -137,7 +136,6 @@ private abstract class BaseRetryableTask implements Runnable { private final CompletableFuture promise = new CompletableFuture<>(); private final AtomicInteger retryNumber = new AtomicInteger(); private final Function> fn; - private final long createTimestamp = Instant.now().toEpochMilli(); BaseRetryableTask(Function> fn) { this.fn = fn; @@ -150,10 +148,6 @@ CompletableFuture getFuture() { abstract StatusCode toStatusCode(R result); abstract R toFailedResult(Result sessionResult); - private long ms() { - return Instant.now().toEpochMilli() - createTimestamp; - } - // called on timer expiration @Override public void run() { diff --git a/table/src/main/java/tech/ydb/table/SessionRetryContext.java b/table/src/main/java/tech/ydb/table/SessionRetryContext.java index 84e7a7d52..6833ddb0e 100644 --- a/table/src/main/java/tech/ydb/table/SessionRetryContext.java +++ b/table/src/main/java/tech/ydb/table/SessionRetryContext.java @@ -13,15 +13,14 @@ import javax.annotation.Nonnull; import javax.annotation.ParametersAreNonnullByDefault; +import com.google.common.base.Preconditions; import com.google.common.util.concurrent.MoreExecutors; import tech.ydb.core.Result; import tech.ydb.core.Status; import tech.ydb.core.StatusCode; import tech.ydb.core.UnexpectedResultException; -import tech.ydb.core.utils.Async; - -import static com.google.common.base.Preconditions.checkArgument; +import tech.ydb.core.utils.FutureTools; /** @@ -85,7 +84,7 @@ private boolean canRetry(StatusCode code) { } private boolean canRetry(Throwable t) { - Throwable cause = Async.unwrapCompletionException(t); + Throwable cause = FutureTools.unwrapCompletionException(t); if (cause instanceof UnexpectedResultException) { StatusCode statusCode = ((UnexpectedResultException) cause).getStatus().getCode(); return canRetry(statusCode); @@ -131,7 +130,7 @@ private long backoffTimeMillis(StatusCode code, int retryNumber) { } private long backoffTimeMillis(Throwable t, int retryNumber) { - Throwable cause = Async.unwrapCompletionException(t); + Throwable cause = FutureTools.unwrapCompletionException(t); if (cause instanceof UnexpectedResultException) { StatusCode statusCode = ((UnexpectedResultException) cause).getStatus().getCode(); return backoffTimeMillis(statusCode, retryNumber); @@ -347,7 +346,7 @@ public Builder maxRetries(int maxRetries) { } public Builder backoffSlot(Duration duration) { - checkArgument(!duration.isNegative(), "backoffSlot(%s) is negative", duration); + Preconditions.checkArgument(!duration.isNegative(), "backoffSlot(%s) is negative", duration); this.backoffSlotMillis = duration.toMillis(); return this; } @@ -358,7 +357,7 @@ public Builder backoffCeiling(int backoffCeiling) { } public Builder fastBackoffSlot(Duration duration) { - checkArgument(!duration.isNegative(), "backoffSlot(%s) is negative", duration); + Preconditions.checkArgument(!duration.isNegative(), "backoffSlot(%s) is negative", duration); this.fastBackoffSlotMillis = duration.toMillis(); return this; } diff --git a/table/src/main/java/tech/ydb/table/SessionRetryHandler.java b/table/src/main/java/tech/ydb/table/SessionRetryHandler.java index 98b06d1f7..e5242dbed 100644 --- a/table/src/main/java/tech/ydb/table/SessionRetryHandler.java +++ b/table/src/main/java/tech/ydb/table/SessionRetryHandler.java @@ -5,7 +5,7 @@ import tech.ydb.core.StatusCode; import tech.ydb.core.UnexpectedResultException; -import tech.ydb.core.utils.Async; +import tech.ydb.core.utils.FutureTools; /** * Session retry helper interface to support the application-level monitoring of session operation @@ -124,7 +124,7 @@ default String errorMsg(Throwable t) { if (!LOGGER.isDebugEnabled()) { return "unknown"; } - Throwable cause = Async.unwrapCompletionException(t); + Throwable cause = FutureTools.unwrapCompletionException(t); if (cause instanceof UnexpectedResultException) { StatusCode statusCode = ((UnexpectedResultException) cause).getStatus().getCode(); return statusCode.name(); diff --git a/table/src/main/java/tech/ydb/table/impl/pool/SessionPool.java b/table/src/main/java/tech/ydb/table/impl/pool/SessionPool.java index 938309f46..59617a6a1 100644 --- a/table/src/main/java/tech/ydb/table/impl/pool/SessionPool.java +++ b/table/src/main/java/tech/ydb/table/impl/pool/SessionPool.java @@ -21,7 +21,7 @@ import tech.ydb.core.Status; import tech.ydb.core.StatusCode; import tech.ydb.core.UnexpectedResultException; -import tech.ydb.core.utils.Async; +import tech.ydb.core.utils.FutureTools; import tech.ydb.table.Session; import tech.ydb.table.SessionPoolStats; import tech.ydb.table.impl.BaseSession; @@ -110,7 +110,7 @@ private boolean pollNext(CompletableFuture> future) { nextSession.whenComplete((session, th) -> { if (th != null) { - Throwable ex = Async.unwrapCompletionException(th); + Throwable ex = FutureTools.unwrapCompletionException(th); Result fail = (ex instanceof UnexpectedResultException) ? Result.fail((UnexpectedResultException) ex) : Result.error("can't create session", ex); diff --git a/table/src/test/java/tech/ydb/table/SessionRetryContextTest.java b/table/src/test/java/tech/ydb/table/SessionRetryContextTest.java index 655a6f2d6..046e38d0f 100644 --- a/table/src/test/java/tech/ydb/table/SessionRetryContextTest.java +++ b/table/src/test/java/tech/ydb/table/SessionRetryContextTest.java @@ -19,15 +19,13 @@ import tech.ydb.core.Status; import tech.ydb.core.StatusCode; import tech.ydb.core.UnexpectedResultException; -import tech.ydb.core.utils.Async; +import tech.ydb.core.utils.FutureTools; import tech.ydb.table.impl.PooledTableClient; import tech.ydb.table.impl.pool.FutureHelper; import tech.ydb.table.impl.pool.MockedTableRpc; import tech.ydb.table.query.DataQueryResult; import tech.ydb.table.transaction.TxControl; -import static java.util.concurrent.CompletableFuture.completedFuture; - /** * @author Sergey Polovko @@ -47,6 +45,10 @@ public class SessionRetryContextTest extends FutureHelper { private static final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + private static CompletableFuture completedFuture(T value) { + return CompletableFuture.completedFuture(value); + } + @AfterClass public static void cleanUp() { scheduler.shutdown(); @@ -78,7 +80,7 @@ public void successSession_failedResult() { // not retryable status code { AtomicInteger cnt = new AtomicInteger(); - Result result = ctx.supplyResult(session -> { + Result result = ctx.supplyResult(session -> { cnt.incrementAndGet(); return completedFuture(Result.fail(CANCELLED)); }).join(); @@ -90,7 +92,7 @@ public void successSession_failedResult() { // retryable status code { AtomicInteger cnt = new AtomicInteger(); - Result result = ctx.supplyResult(session -> { + Result result = ctx.supplyResult(session -> { cnt.incrementAndGet(); return completedFuture(Result.fail(OVERLOADED)); }).join(); @@ -118,7 +120,7 @@ public void successSession_exceptionResult() { }).join(); Assert.fail("expected exception not thrown"); } catch (Throwable t) { - Throwable cause = Async.unwrapCompletionException(t); + Throwable cause = FutureTools.unwrapCompletionException(t); Assert.assertTrue(cause instanceof RuntimeException); Assert.assertEquals(1, cnt.get()); Assert.assertEquals("some error message", cause.getMessage()); @@ -136,7 +138,7 @@ public void successSession_exceptionResult() { }).join(); Assert.fail("expected exception not thrown"); } catch (Throwable t) { - Throwable cause = Async.unwrapCompletionException(t); + Throwable cause = FutureTools.unwrapCompletionException(t); Assert.assertTrue(cause instanceof UnexpectedResultException); Assert.assertEquals(3, cnt.get()); Assert.assertEquals("Cannot get value, code: NOT_FOUND", cause.getMessage()); @@ -375,7 +377,7 @@ public void exceptionSession() { }).join(); Assert.fail("expected exception not thrown"); } catch (Throwable t) { - Throwable cause = Async.unwrapCompletionException(t); + Throwable cause = FutureTools.unwrapCompletionException(t); Assert.assertTrue(cause instanceof RuntimeException); Assert.assertEquals("something goes wrong here", cause.getMessage()); } @@ -550,7 +552,7 @@ public ScheduledExecutorService getScheduler() { @Override public CompletableFuture> createSession(Duration timeout) { retriesCount.incrementAndGet(); - return Async.failedFuture(new RuntimeException("something goes wrong here")); + return FutureTools.failedFuture(new RuntimeException("something goes wrong here")); } } diff --git a/table/src/test/java/tech/ydb/table/SessionStub.java b/table/src/test/java/tech/ydb/table/SessionStub.java index 7fb1846a5..329e049e9 100644 --- a/table/src/test/java/tech/ydb/table/SessionStub.java +++ b/table/src/test/java/tech/ydb/table/SessionStub.java @@ -7,7 +7,7 @@ import tech.ydb.core.Result; import tech.ydb.core.Status; import tech.ydb.core.grpc.GrpcReadStream; -import tech.ydb.core.utils.Async; +import tech.ydb.core.utils.FutureTools; import tech.ydb.table.description.TableDescription; import tech.ydb.table.query.DataQuery; import tech.ydb.table.query.DataQueryResult; @@ -22,7 +22,6 @@ import tech.ydb.table.settings.CommitTxSettings; import tech.ydb.table.settings.CopyTableSettings; import tech.ydb.table.settings.CopyTablesSettings; -import tech.ydb.table.settings.RenameTablesSettings; import tech.ydb.table.settings.CreateTableSettings; import tech.ydb.table.settings.DescribeTableSettings; import tech.ydb.table.settings.DropTableSettings; @@ -34,6 +33,7 @@ import tech.ydb.table.settings.PrepareDataQuerySettings; import tech.ydb.table.settings.ReadRowsSettings; import tech.ydb.table.settings.ReadTableSettings; +import tech.ydb.table.settings.RenameTablesSettings; import tech.ydb.table.settings.RollbackTxSettings; import tech.ydb.table.transaction.TableTransaction; import tech.ydb.table.transaction.Transaction; @@ -176,7 +176,7 @@ public CompletableFuture executeBulkUpsert(String tablePath, ListValue r } private static CompletableFuture notImplemented(String method) { - return Async.failedFuture(new UnsupportedOperationException(method + " not implemented")); + return FutureTools.failedFuture(new UnsupportedOperationException(method + " not implemented")); } @Override diff --git a/table/src/test/java/tech/ydb/table/TableRpcStub.java b/table/src/test/java/tech/ydb/table/TableRpcStub.java index d020449d6..97e7c4054 100644 --- a/table/src/test/java/tech/ydb/table/TableRpcStub.java +++ b/table/src/test/java/tech/ydb/table/TableRpcStub.java @@ -10,7 +10,7 @@ import tech.ydb.core.grpc.GrpcReadStream; import tech.ydb.core.grpc.GrpcRequestSettings; import tech.ydb.core.impl.call.EmptyStream; -import tech.ydb.core.utils.Async; +import tech.ydb.core.utils.FutureTools; import tech.ydb.proto.table.YdbTable; import tech.ydb.proto.table.YdbTable.AlterTableRequest; import tech.ydb.proto.table.YdbTable.BeginTransactionRequest; @@ -18,7 +18,6 @@ import tech.ydb.proto.table.YdbTable.CommitTransactionRequest; import tech.ydb.proto.table.YdbTable.CopyTableRequest; import tech.ydb.proto.table.YdbTable.CopyTablesRequest; -import tech.ydb.proto.table.YdbTable.RenameTablesRequest; import tech.ydb.proto.table.YdbTable.CreateSessionRequest; import tech.ydb.proto.table.YdbTable.CreateSessionResult; import tech.ydb.proto.table.YdbTable.CreateTableRequest; @@ -36,6 +35,7 @@ import tech.ydb.proto.table.YdbTable.PrepareDataQueryRequest; import tech.ydb.proto.table.YdbTable.PrepareQueryResult; import tech.ydb.proto.table.YdbTable.ReadRowsResponse; +import tech.ydb.proto.table.YdbTable.RenameTablesRequest; import tech.ydb.proto.table.YdbTable.RollbackTransactionRequest; import tech.ydb.table.rpc.TableRpc; @@ -195,6 +195,6 @@ public void close() { } private static CompletableFuture notImplemented(String method) { - return Async.failedFuture(new UnsupportedOperationException(method + " not implemented")); + return FutureTools.failedFuture(new UnsupportedOperationException(method + " not implemented")); } } From 1c13cb88bc2d19b695a1df5cfcd7764503634bf6 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Tue, 24 Sep 2024 12:05:57 +0100 Subject: [PATCH 2/3] Runtime loader of ManagedChannelFactory --- .../ydb/core/grpc/GrpcTransportBuilder.java | 19 +++++--- .../core/impl/pool/ChannelFactoryLoader.java | 48 +++++++++++++++++++ .../core/impl/pool/NettyChannelFactory.java | 28 ++++++++++- ...ry.java => ShadedNettyChannelFactory.java} | 22 ++++++--- .../impl/pool/DefaultChannelFactoryTest.java | 42 ++++++++-------- 5 files changed, 124 insertions(+), 35 deletions(-) create mode 100644 core/src/main/java/tech/ydb/core/impl/pool/ChannelFactoryLoader.java rename core/src/main/java/tech/ydb/core/impl/pool/{DefaultChannelFactory.java => ShadedNettyChannelFactory.java} (86%) diff --git a/core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java b/core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java index ddc5487b2..70b5fe96a 100644 --- a/core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java +++ b/core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java @@ -15,14 +15,13 @@ import com.google.common.net.HostAndPort; import com.google.common.util.concurrent.MoreExecutors; import io.grpc.ManagedChannel; -import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; import tech.ydb.auth.AuthRpcProvider; import tech.ydb.auth.NopAuthProvider; import tech.ydb.core.impl.YdbSchedulerFactory; import tech.ydb.core.impl.YdbTransportImpl; import tech.ydb.core.impl.auth.GrpcAuthRpc; -import tech.ydb.core.impl.pool.DefaultChannelFactory; +import tech.ydb.core.impl.pool.ChannelFactoryLoader; import tech.ydb.core.impl.pool.ManagedChannelFactory; import tech.ydb.core.utils.Version; @@ -69,7 +68,7 @@ public enum InitMode { private byte[] cert = null; private boolean useTLS = false; - private ManagedChannelFactory.Builder channelFactoryBuilder = DefaultChannelFactory::build; + private ManagedChannelFactory.Builder channelFactoryBuilder = null; private Supplier schedulerFactory = YdbSchedulerFactory::createScheduler; private String localDc; private BalancingSettings balancingSettings; @@ -177,6 +176,10 @@ public boolean useDefaultGrpcResolver() { } public ManagedChannelFactory getManagedChannelFactory() { + if (channelFactoryBuilder == null) { + channelFactoryBuilder = ChannelFactoryLoader.load(); + } + return channelFactoryBuilder.buildFactory(this); } @@ -193,18 +196,20 @@ public GrpcTransportBuilder withChannelFactoryBuilder(ManagedChannelFactory.Buil } /** - * Set a custom initialization of {@link NettyChannelBuilder}
+ * Set a custom initialization of {@link io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder}
* This method is deprecated. Use * {@link GrpcTransportBuilder#withChannelFactoryBuilder(tech.ydb.core.impl.pool.ManagedChannelFactory.Builder)} * instead * - * @param channelInitializer custom NettyChannelBuilder initializator + * @param ci custom NettyChannelBuilder initializator * @return this * @deprecated */ @Deprecated - public GrpcTransportBuilder withChannelInitializer(Consumer channelInitializer) { - this.channelFactoryBuilder = gtb -> DefaultChannelFactory.build(gtb, channelInitializer); + public GrpcTransportBuilder withChannelInitializer( + Consumer ci + ) { + this.channelFactoryBuilder = tech.ydb.core.impl.pool.ShadedNettyChannelFactory.withInterceptor(ci); return this; } diff --git a/core/src/main/java/tech/ydb/core/impl/pool/ChannelFactoryLoader.java b/core/src/main/java/tech/ydb/core/impl/pool/ChannelFactoryLoader.java new file mode 100644 index 000000000..f4ef826b6 --- /dev/null +++ b/core/src/main/java/tech/ydb/core/impl/pool/ChannelFactoryLoader.java @@ -0,0 +1,48 @@ +package tech.ydb.core.impl.pool; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * + * @author Aleksandr Gorshenin + */ +public class ChannelFactoryLoader { + private static final Logger logger = LoggerFactory.getLogger(ChannelFactoryLoader.class); + + private ChannelFactoryLoader() { } + + public static ManagedChannelFactory.Builder load() { + return FactoryLoader.factory; + } + + private static class FactoryLoader { + private static final String SHADED_DEPS = "io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder"; + private static final String NETTY_DEPS = "io.grpc.netty.NettyChannelBuilder"; + + private static ManagedChannelFactory.Builder factory; + + static { + boolean ok = tryLoad(SHADED_DEPS, ShadedNettyChannelFactory.build()) + || tryLoad(NETTY_DEPS, NettyChannelFactory.build()); + if (!ok) { + throw new IllegalStateException("Cannot load any ManagedChannelFactory!! " + + "Classpath must contain grpc-netty or grpc-netty-shaded"); + } + } + + private static boolean tryLoad(String name, ManagedChannelFactory.Builder f) { + try { + Class.forName(name); + logger.info("class {} is found, use {}", name, f); + factory = f; + return true; + } catch (ClassNotFoundException ex) { + logger.info("class {} is not found", name); + return false; + } + } + } +} diff --git a/core/src/main/java/tech/ydb/core/impl/pool/NettyChannelFactory.java b/core/src/main/java/tech/ydb/core/impl/pool/NettyChannelFactory.java index e58229c35..d21d05138 100644 --- a/core/src/main/java/tech/ydb/core/impl/pool/NettyChannelFactory.java +++ b/core/src/main/java/tech/ydb/core/impl/pool/NettyChannelFactory.java @@ -2,6 +2,7 @@ import java.io.ByteArrayInputStream; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import javax.net.ssl.SSLException; @@ -39,7 +40,7 @@ public class NettyChannelFactory implements ManagedChannelFactory { private final boolean useDefaultGrpcResolver; private final Long grpcKeepAliveTimeMillis; - public NettyChannelFactory(GrpcTransportBuilder builder) { + private NettyChannelFactory(GrpcTransportBuilder builder) { this.database = builder.getDatabase(); this.version = builder.getVersionString(); this.useTLS = builder.getUseTls(); @@ -120,4 +121,29 @@ private SslContext createSslContext() { throw new RuntimeException("cannot create ssl context", e); } } + + public static ManagedChannelFactory.Builder build() { + return new Builder() { + @Override + public ManagedChannelFactory buildFactory(GrpcTransportBuilder builder) { + return new NettyChannelFactory(builder); + } + + @Override + public String toString() { + return "NettyChannelFactory"; + } + }; + } + + public static ManagedChannelFactory.Builder withInterceptor(Consumer ci) { + return builder -> new NettyChannelFactory(builder) { + @Override + protected void configure(NettyChannelBuilder channelBuilder) { + if (ci != null) { + ci.accept(channelBuilder); + } + } + }; + } } diff --git a/core/src/main/java/tech/ydb/core/impl/pool/DefaultChannelFactory.java b/core/src/main/java/tech/ydb/core/impl/pool/ShadedNettyChannelFactory.java similarity index 86% rename from core/src/main/java/tech/ydb/core/impl/pool/DefaultChannelFactory.java rename to core/src/main/java/tech/ydb/core/impl/pool/ShadedNettyChannelFactory.java index fbe9ad762..6a8c58925 100644 --- a/core/src/main/java/tech/ydb/core/impl/pool/DefaultChannelFactory.java +++ b/core/src/main/java/tech/ydb/core/impl/pool/ShadedNettyChannelFactory.java @@ -27,7 +27,7 @@ * @author Nikolay Perfilov * @author Aleksandr Gorshenin */ -public class DefaultChannelFactory implements ManagedChannelFactory { +public class ShadedNettyChannelFactory implements ManagedChannelFactory { static final int INBOUND_MESSAGE_SIZE = 64 << 20; // 64 MiB static final String DEFAULT_BALANCER_POLICY = "round_robin"; @@ -40,7 +40,7 @@ public class DefaultChannelFactory implements ManagedChannelFactory { private final boolean useDefaultGrpcResolver; private final Long grpcKeepAliveTimeMillis; - private DefaultChannelFactory(GrpcTransportBuilder builder) { + public ShadedNettyChannelFactory(GrpcTransportBuilder builder) { this.database = builder.getDatabase(); this.version = builder.getVersionString(); this.useTLS = builder.getUseTls(); @@ -122,12 +122,22 @@ private SslContext createSslContext() { } } - public static ManagedChannelFactory build(GrpcTransportBuilder builder) { - return new DefaultChannelFactory(builder); + public static ManagedChannelFactory.Builder build() { + return new Builder() { + @Override + public ManagedChannelFactory buildFactory(GrpcTransportBuilder builder) { + return new ShadedNettyChannelFactory(builder); + } + + @Override + public String toString() { + return "ShadedNettyChannelFactory"; + } + }; } - public static ManagedChannelFactory build(GrpcTransportBuilder builder, Consumer ci) { - return new DefaultChannelFactory(builder) { + public static ManagedChannelFactory.Builder withInterceptor(Consumer ci) { + return builder -> new ShadedNettyChannelFactory(builder) { @Override protected void configure(NettyChannelBuilder channelBuilder) { if (ci != null) { diff --git a/core/src/test/java/tech/ydb/core/impl/pool/DefaultChannelFactoryTest.java b/core/src/test/java/tech/ydb/core/impl/pool/DefaultChannelFactoryTest.java index a856ec26c..75de2a44e 100644 --- a/core/src/test/java/tech/ydb/core/impl/pool/DefaultChannelFactoryTest.java +++ b/core/src/test/java/tech/ydb/core/impl/pool/DefaultChannelFactoryTest.java @@ -20,19 +20,18 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import org.mockito.MockedStatic; import org.mockito.Mockito; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import org.mockito.MockitoAnnotations; import tech.ydb.core.grpc.GrpcTransport; import tech.ydb.core.grpc.GrpcTransportBuilder; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - /** * * @author Aleksandr Gorshenin @@ -73,7 +72,7 @@ public void tearDown() throws Exception { @Test public void defaultParams() { GrpcTransportBuilder builder = GrpcTransport.forHost(MOCKED_HOST, MOCKED_PORT, "/Root"); - ManagedChannelFactory factory = DefaultChannelFactory.build(builder); + ManagedChannelFactory factory = ChannelFactoryLoader.load().buildFactory(builder); channelStaticMock.verify(FOR_ADDRESS, times(0)); Assert.assertEquals(30_000l, factory.getConnectTimeoutMs()); @@ -83,8 +82,8 @@ public void defaultParams() { verify(channelBuilderMock, times(0)).negotiationType(NegotiationType.TLS); verify(channelBuilderMock, times(1)).negotiationType(NegotiationType.PLAINTEXT); - verify(channelBuilderMock, times(1)).maxInboundMessageSize(DefaultChannelFactory.INBOUND_MESSAGE_SIZE); - verify(channelBuilderMock, times(1)).defaultLoadBalancingPolicy(DefaultChannelFactory.DEFAULT_BALANCER_POLICY); + verify(channelBuilderMock, times(1)).maxInboundMessageSize(ShadedNettyChannelFactory.INBOUND_MESSAGE_SIZE); + verify(channelBuilderMock, times(1)).defaultLoadBalancingPolicy(ShadedNettyChannelFactory.DEFAULT_BALANCER_POLICY); verify(channelBuilderMock, times(1)).withOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT); verify(channelBuilderMock, times(0)).enableRetry(); verify(channelBuilderMock, times(1)).disableRetry(); @@ -97,7 +96,7 @@ public void defaultSslFactory() { .withGrpcRetry(true) .withConnectTimeout(Duration.ofMinutes(1)); - ManagedChannelFactory factory = DefaultChannelFactory.build(builder); + ManagedChannelFactory factory = ChannelFactoryLoader.load().buildFactory(builder); channelStaticMock.verify(FOR_ADDRESS, times(0)); Assert.assertEquals(60000l, factory.getConnectTimeoutMs()); @@ -107,8 +106,8 @@ public void defaultSslFactory() { verify(channelBuilderMock, times(1)).negotiationType(NegotiationType.TLS); verify(channelBuilderMock, times(0)).negotiationType(NegotiationType.PLAINTEXT); - verify(channelBuilderMock, times(1)).maxInboundMessageSize(DefaultChannelFactory.INBOUND_MESSAGE_SIZE); - verify(channelBuilderMock, times(1)).defaultLoadBalancingPolicy(DefaultChannelFactory.DEFAULT_BALANCER_POLICY); + verify(channelBuilderMock, times(1)).maxInboundMessageSize(ShadedNettyChannelFactory.INBOUND_MESSAGE_SIZE); + verify(channelBuilderMock, times(1)).defaultLoadBalancingPolicy(ShadedNettyChannelFactory.DEFAULT_BALANCER_POLICY); verify(channelBuilderMock, times(1)).withOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT); verify(channelBuilderMock, times(1)).enableRetry(); verify(channelBuilderMock, times(0)).disableRetry(); @@ -119,9 +118,10 @@ public void customChannelInitializer() { GrpcTransportBuilder builder = GrpcTransport.forHost(MOCKED_HOST, MOCKED_PORT, "/Root") .withUseDefaultGrpcResolver(true); - ManagedChannelFactory factory = DefaultChannelFactory.build( - builder, cb -> cb.withOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) - ); + ManagedChannelFactory factory = ShadedNettyChannelFactory + .withInterceptor(cb -> cb.withOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)) + .buildFactory(builder); + channelStaticMock.verify(FOR_ADDRESS, times(0)); Assert.assertSame(channelMock, factory.newManagedChannel(MOCKED_HOST, MOCKED_PORT)); @@ -129,8 +129,8 @@ public void customChannelInitializer() { channelStaticMock.verify(FOR_ADDRESS, times(1)); verify(channelBuilderMock, times(1)).negotiationType(NegotiationType.PLAINTEXT); - verify(channelBuilderMock, times(1)).maxInboundMessageSize(DefaultChannelFactory.INBOUND_MESSAGE_SIZE); - verify(channelBuilderMock, times(0)).defaultLoadBalancingPolicy(DefaultChannelFactory.DEFAULT_BALANCER_POLICY); + verify(channelBuilderMock, times(1)).maxInboundMessageSize(ShadedNettyChannelFactory.INBOUND_MESSAGE_SIZE); + verify(channelBuilderMock, times(0)).defaultLoadBalancingPolicy(ShadedNettyChannelFactory.DEFAULT_BALANCER_POLICY); verify(channelBuilderMock, times(1)).withOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT); verify(channelBuilderMock, times(1)).withOption(ChannelOption.TCP_NODELAY, Boolean.TRUE); } @@ -147,7 +147,7 @@ public void customSslFactory() throws CertificateException, IOException { .withGrpcRetry(false) .withConnectTimeout(4, TimeUnit.SECONDS); - ManagedChannelFactory factory = DefaultChannelFactory.build(builder); + ManagedChannelFactory factory = ChannelFactoryLoader.load().buildFactory(builder); Assert.assertEquals(4000l, factory.getConnectTimeoutMs()); Assert.assertSame(channelMock, factory.newManagedChannel(MOCKED_HOST, MOCKED_PORT)); @@ -160,8 +160,8 @@ public void customSslFactory() throws CertificateException, IOException { verify(channelBuilderMock, times(1)).negotiationType(NegotiationType.TLS); verify(channelBuilderMock, times(0)).negotiationType(NegotiationType.PLAINTEXT); - verify(channelBuilderMock, times(1)).maxInboundMessageSize(DefaultChannelFactory.INBOUND_MESSAGE_SIZE); - verify(channelBuilderMock, times(1)).defaultLoadBalancingPolicy(DefaultChannelFactory.DEFAULT_BALANCER_POLICY); + verify(channelBuilderMock, times(1)).maxInboundMessageSize(ShadedNettyChannelFactory.INBOUND_MESSAGE_SIZE); + verify(channelBuilderMock, times(1)).defaultLoadBalancingPolicy(ShadedNettyChannelFactory.DEFAULT_BALANCER_POLICY); verify(channelBuilderMock, times(1)).withOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT); verify(channelBuilderMock, times(0)).enableRetry(); verify(channelBuilderMock, times(1)).disableRetry(); @@ -173,7 +173,7 @@ public void invalidSslCert() { GrpcTransportBuilder builder = GrpcTransport.forHost(MOCKED_HOST, MOCKED_PORT, "/Root") .withSecureConnection(cert); - ManagedChannelFactory factory = DefaultChannelFactory.build(builder); + ManagedChannelFactory factory = ChannelFactoryLoader.load().buildFactory(builder); RuntimeException ex = Assert.assertThrows(RuntimeException.class, () -> factory.newManagedChannel(MOCKED_HOST, MOCKED_PORT)); From f79ec337feb9e8056a6905f04a9164273bfce176 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Tue, 24 Sep 2024 12:58:55 +0100 Subject: [PATCH 3/3] Added handle of exceptions on channel creating --- .../java/tech/ydb/core/impl/YdbDiscovery.java | 40 ++++++++++--------- .../tech/ydb/core/impl/pool/GrpcChannel.java | 17 +++++--- 2 files changed, 33 insertions(+), 24 deletions(-) diff --git a/core/src/main/java/tech/ydb/core/impl/YdbDiscovery.java b/core/src/main/java/tech/ydb/core/impl/YdbDiscovery.java index 6295f89f0..e458bdb5e 100644 --- a/core/src/main/java/tech/ydb/core/impl/YdbDiscovery.java +++ b/core/src/main/java/tech/ydb/core/impl/YdbDiscovery.java @@ -140,26 +140,30 @@ private void tick() { private void runDiscovery() { lastUpdateTime = handler.instant(); - final GrpcTransport transport = handler.createDiscoveryTransport(); try { - logger.debug("execute list endpoints on {} with timeout {}", transport, discoveryTimeout); - DiscoveryProtos.ListEndpointsRequest request = DiscoveryProtos.ListEndpointsRequest.newBuilder() - .setDatabase(discoveryDatabase) - .build(); - - GrpcRequestSettings grpcSettings = GrpcRequestSettings.newBuilder() - .withDeadline(discoveryTimeout) - .build(); - - transport.unaryCall(DiscoveryServiceGrpc.getListEndpointsMethod(), grpcSettings, request) - .whenComplete((res, ex) -> transport.close()) // close transport for any result - .thenApply(OperationBinder.bindSync( - DiscoveryProtos.ListEndpointsResponse::getOperation, - DiscoveryProtos.ListEndpointsResult.class - )) - .whenComplete(this::handleDiscoveryResult); + final GrpcTransport transport = handler.createDiscoveryTransport(); + try { + logger.debug("execute list endpoints on {} with timeout {}", transport, discoveryTimeout); + DiscoveryProtos.ListEndpointsRequest request = DiscoveryProtos.ListEndpointsRequest.newBuilder() + .setDatabase(discoveryDatabase) + .build(); + + GrpcRequestSettings grpcSettings = GrpcRequestSettings.newBuilder() + .withDeadline(discoveryTimeout) + .build(); + + transport.unaryCall(DiscoveryServiceGrpc.getListEndpointsMethod(), grpcSettings, request) + .whenComplete((res, ex) -> transport.close()) // close transport for any result + .thenApply(OperationBinder.bindSync( + DiscoveryProtos.ListEndpointsResponse::getOperation, + DiscoveryProtos.ListEndpointsResult.class + )) + .whenComplete(this::handleDiscoveryResult); + } catch (Throwable th) { + transport.close(); + throw th; + } } catch (Throwable th) { - transport.close(); handleDiscoveryResult(null, th); } } diff --git a/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java b/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java index 03547670d..fff9ff662 100644 --- a/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java +++ b/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java @@ -25,12 +25,17 @@ public class GrpcChannel { private final ReadyWatcher readyWatcher; public GrpcChannel(EndpointRecord endpoint, ManagedChannelFactory factory) { - logger.debug("Creating grpc channel with {}", endpoint); - this.endpoint = endpoint; - this.channel = factory.newManagedChannel(endpoint.getHost(), endpoint.getPort()); - this.connectTimeoutMs = factory.getConnectTimeoutMs(); - this.readyWatcher = new ReadyWatcher(); - this.readyWatcher.checkState(); + try { + logger.debug("Creating grpc channel with {}", endpoint); + this.endpoint = endpoint; + this.channel = factory.newManagedChannel(endpoint.getHost(), endpoint.getPort()); + this.connectTimeoutMs = factory.getConnectTimeoutMs(); + this.readyWatcher = new ReadyWatcher(); + this.readyWatcher.checkState(); + } catch (Throwable th) { + logger.error("cannot create channel", th); + throw new RuntimeException("cannot create channel", th); + } } public EndpointRecord getEndpoint() {