Skip to content

Commit 7ba41bb

Browse files
committed
Removed deprecated class Async
1 parent 9227ada commit 7ba41bb

File tree

11 files changed

+60
-120
lines changed

11 files changed

+60
-120
lines changed

core/src/main/java/tech/ydb/core/impl/YdbDiscovery.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import tech.ydb.core.grpc.GrpcTransport;
2222
import tech.ydb.core.impl.pool.EndpointRecord;
2323
import tech.ydb.core.operation.OperationBinder;
24-
import tech.ydb.core.utils.Async;
24+
import tech.ydb.core.utils.FutureTools;
2525
import tech.ydb.proto.discovery.DiscoveryProtos;
2626
import tech.ydb.proto.discovery.v1.DiscoveryServiceGrpc;
2727

@@ -183,7 +183,7 @@ private void handleOk(String selfLocation, List<EndpointRecord> endpoints) {
183183

184184
private void handleDiscoveryResult(Result<DiscoveryProtos.ListEndpointsResult> response, Throwable th) {
185185
if (th != null) {
186-
Throwable cause = Async.unwrapCompletionException(th);
186+
Throwable cause = FutureTools.unwrapCompletionException(th);
187187
logger.warn("couldn't perform discovery with exception", cause);
188188
handleThrowable(cause);
189189
return;

core/src/main/java/tech/ydb/core/utils/Async.java

Lines changed: 0 additions & 81 deletions
This file was deleted.
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package tech.ydb.core.utils;
2+
3+
import java.util.concurrent.CompletableFuture;
4+
import java.util.concurrent.CompletionException;
5+
6+
/**
7+
*
8+
* @author Aleksandr Gorshenin
9+
*/
10+
public class FutureTools {
11+
private FutureTools() { }
12+
13+
public static Throwable unwrapCompletionException(Throwable throwable) {
14+
Throwable cause = throwable;
15+
while (cause instanceof CompletionException && cause.getCause() != null) {
16+
cause = cause.getCause();
17+
}
18+
return cause;
19+
}
20+
21+
public static <T> CompletableFuture<T> failedFuture(Throwable t) {
22+
CompletableFuture<T> f = new CompletableFuture<>();
23+
f.completeExceptionally(t);
24+
return f;
25+
}
26+
}

query/src/main/java/tech/ydb/query/impl/SessionPool.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import tech.ydb.core.StatusCode;
2222
import tech.ydb.core.UnexpectedResultException;
2323
import tech.ydb.core.grpc.GrpcReadStream;
24-
import tech.ydb.core.utils.Async;
24+
import tech.ydb.core.utils.FutureTools;
2525
import tech.ydb.proto.query.YdbQuery;
2626
import tech.ydb.query.QuerySession;
2727
import tech.ydb.query.settings.AttachSessionSettings;
@@ -125,7 +125,7 @@ private boolean pollNext(CompletableFuture<Result<QuerySession>> future) {
125125
return;
126126
}
127127

128-
Throwable ex = Async.unwrapCompletionException(th);
128+
Throwable ex = FutureTools.unwrapCompletionException(th);
129129
if (ex instanceof UnexpectedResultException) {
130130
future.complete(Result.fail((UnexpectedResultException) ex));
131131
} else {

query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package tech.ydb.query.tools;
22

33
import java.time.Duration;
4-
import java.time.Instant;
54
import java.util.Objects;
65
import java.util.concurrent.CompletableFuture;
76
import java.util.concurrent.Executor;
@@ -20,7 +19,7 @@
2019
import tech.ydb.core.Status;
2120
import tech.ydb.core.StatusCode;
2221
import tech.ydb.core.UnexpectedResultException;
23-
import tech.ydb.core.utils.Async;
22+
import tech.ydb.core.utils.FutureTools;
2423
import tech.ydb.query.QueryClient;
2524
import tech.ydb.query.QuerySession;
2625

@@ -76,7 +75,7 @@ private boolean canRetry(StatusCode code) {
7675
}
7776

7877
private boolean canRetry(Throwable t) {
79-
Throwable cause = Async.unwrapCompletionException(t);
78+
Throwable cause = FutureTools.unwrapCompletionException(t);
8079
if (cause instanceof UnexpectedResultException) {
8180
StatusCode statusCode = ((UnexpectedResultException) cause).getStatus().getCode();
8281
return canRetry(statusCode);
@@ -122,7 +121,7 @@ private long backoffTimeMillis(StatusCode code, int retryNumber) {
122121
}
123122

124123
private long backoffTimeMillis(Throwable t, int retryNumber) {
125-
Throwable cause = Async.unwrapCompletionException(t);
124+
Throwable cause = FutureTools.unwrapCompletionException(t);
126125
if (cause instanceof UnexpectedResultException) {
127126
StatusCode statusCode = ((UnexpectedResultException) cause).getStatus().getCode();
128127
return backoffTimeMillis(statusCode, retryNumber);
@@ -137,7 +136,6 @@ private abstract class BaseRetryableTask<R> implements Runnable {
137136
private final CompletableFuture<R> promise = new CompletableFuture<>();
138137
private final AtomicInteger retryNumber = new AtomicInteger();
139138
private final Function<QuerySession, CompletableFuture<R>> fn;
140-
private final long createTimestamp = Instant.now().toEpochMilli();
141139

142140
BaseRetryableTask(Function<QuerySession, CompletableFuture<R>> fn) {
143141
this.fn = fn;
@@ -150,10 +148,6 @@ CompletableFuture<R> getFuture() {
150148
abstract StatusCode toStatusCode(R result);
151149
abstract R toFailedResult(Result<QuerySession> sessionResult);
152150

153-
private long ms() {
154-
return Instant.now().toEpochMilli() - createTimestamp;
155-
}
156-
157151
// called on timer expiration
158152
@Override
159153
public void run() {

table/src/main/java/tech/ydb/table/SessionRetryContext.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,14 @@
1313
import javax.annotation.Nonnull;
1414
import javax.annotation.ParametersAreNonnullByDefault;
1515

16+
import com.google.common.base.Preconditions;
1617
import com.google.common.util.concurrent.MoreExecutors;
1718

1819
import tech.ydb.core.Result;
1920
import tech.ydb.core.Status;
2021
import tech.ydb.core.StatusCode;
2122
import tech.ydb.core.UnexpectedResultException;
22-
import tech.ydb.core.utils.Async;
23-
24-
import static com.google.common.base.Preconditions.checkArgument;
23+
import tech.ydb.core.utils.FutureTools;
2524

2625

2726
/**
@@ -85,7 +84,7 @@ private boolean canRetry(StatusCode code) {
8584
}
8685

8786
private boolean canRetry(Throwable t) {
88-
Throwable cause = Async.unwrapCompletionException(t);
87+
Throwable cause = FutureTools.unwrapCompletionException(t);
8988
if (cause instanceof UnexpectedResultException) {
9089
StatusCode statusCode = ((UnexpectedResultException) cause).getStatus().getCode();
9190
return canRetry(statusCode);
@@ -131,7 +130,7 @@ private long backoffTimeMillis(StatusCode code, int retryNumber) {
131130
}
132131

133132
private long backoffTimeMillis(Throwable t, int retryNumber) {
134-
Throwable cause = Async.unwrapCompletionException(t);
133+
Throwable cause = FutureTools.unwrapCompletionException(t);
135134
if (cause instanceof UnexpectedResultException) {
136135
StatusCode statusCode = ((UnexpectedResultException) cause).getStatus().getCode();
137136
return backoffTimeMillis(statusCode, retryNumber);
@@ -347,7 +346,7 @@ public Builder maxRetries(int maxRetries) {
347346
}
348347

349348
public Builder backoffSlot(Duration duration) {
350-
checkArgument(!duration.isNegative(), "backoffSlot(%s) is negative", duration);
349+
Preconditions.checkArgument(!duration.isNegative(), "backoffSlot(%s) is negative", duration);
351350
this.backoffSlotMillis = duration.toMillis();
352351
return this;
353352
}
@@ -358,7 +357,7 @@ public Builder backoffCeiling(int backoffCeiling) {
358357
}
359358

360359
public Builder fastBackoffSlot(Duration duration) {
361-
checkArgument(!duration.isNegative(), "backoffSlot(%s) is negative", duration);
360+
Preconditions.checkArgument(!duration.isNegative(), "backoffSlot(%s) is negative", duration);
362361
this.fastBackoffSlotMillis = duration.toMillis();
363362
return this;
364363
}

table/src/main/java/tech/ydb/table/SessionRetryHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
import tech.ydb.core.StatusCode;
77
import tech.ydb.core.UnexpectedResultException;
8-
import tech.ydb.core.utils.Async;
8+
import tech.ydb.core.utils.FutureTools;
99

1010
/**
1111
* Session retry helper interface to support the application-level monitoring of session operation
@@ -124,7 +124,7 @@ default String errorMsg(Throwable t) {
124124
if (!LOGGER.isDebugEnabled()) {
125125
return "unknown";
126126
}
127-
Throwable cause = Async.unwrapCompletionException(t);
127+
Throwable cause = FutureTools.unwrapCompletionException(t);
128128
if (cause instanceof UnexpectedResultException) {
129129
StatusCode statusCode = ((UnexpectedResultException) cause).getStatus().getCode();
130130
return statusCode.name();

table/src/main/java/tech/ydb/table/impl/pool/SessionPool.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import tech.ydb.core.Status;
2222
import tech.ydb.core.StatusCode;
2323
import tech.ydb.core.UnexpectedResultException;
24-
import tech.ydb.core.utils.Async;
24+
import tech.ydb.core.utils.FutureTools;
2525
import tech.ydb.table.Session;
2626
import tech.ydb.table.SessionPoolStats;
2727
import tech.ydb.table.impl.BaseSession;
@@ -110,7 +110,7 @@ private boolean pollNext(CompletableFuture<Result<Session>> future) {
110110

111111
nextSession.whenComplete((session, th) -> {
112112
if (th != null) {
113-
Throwable ex = Async.unwrapCompletionException(th);
113+
Throwable ex = FutureTools.unwrapCompletionException(th);
114114
Result<Session> fail = (ex instanceof UnexpectedResultException)
115115
? Result.fail((UnexpectedResultException) ex)
116116
: Result.error("can't create session", ex);

table/src/test/java/tech/ydb/table/SessionRetryContextTest.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,13 @@
1919
import tech.ydb.core.Status;
2020
import tech.ydb.core.StatusCode;
2121
import tech.ydb.core.UnexpectedResultException;
22-
import tech.ydb.core.utils.Async;
22+
import tech.ydb.core.utils.FutureTools;
2323
import tech.ydb.table.impl.PooledTableClient;
2424
import tech.ydb.table.impl.pool.FutureHelper;
2525
import tech.ydb.table.impl.pool.MockedTableRpc;
2626
import tech.ydb.table.query.DataQueryResult;
2727
import tech.ydb.table.transaction.TxControl;
2828

29-
import static java.util.concurrent.CompletableFuture.completedFuture;
30-
3129

3230
/**
3331
* @author Sergey Polovko
@@ -47,6 +45,10 @@ public class SessionRetryContextTest extends FutureHelper {
4745

4846
private static final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
4947

48+
private static <T> CompletableFuture<T> completedFuture(T value) {
49+
return CompletableFuture.completedFuture(value);
50+
}
51+
5052
@AfterClass
5153
public static void cleanUp() {
5254
scheduler.shutdown();
@@ -78,7 +80,7 @@ public void successSession_failedResult() {
7880
// not retryable status code
7981
{
8082
AtomicInteger cnt = new AtomicInteger();
81-
Result<?> result = ctx.supplyResult(session -> {
83+
Result<Object> result = ctx.supplyResult(session -> {
8284
cnt.incrementAndGet();
8385
return completedFuture(Result.fail(CANCELLED));
8486
}).join();
@@ -90,7 +92,7 @@ public void successSession_failedResult() {
9092
// retryable status code
9193
{
9294
AtomicInteger cnt = new AtomicInteger();
93-
Result<?> result = ctx.supplyResult(session -> {
95+
Result<Object> result = ctx.supplyResult(session -> {
9496
cnt.incrementAndGet();
9597
return completedFuture(Result.fail(OVERLOADED));
9698
}).join();
@@ -118,7 +120,7 @@ public void successSession_exceptionResult() {
118120
}).join();
119121
Assert.fail("expected exception not thrown");
120122
} catch (Throwable t) {
121-
Throwable cause = Async.unwrapCompletionException(t);
123+
Throwable cause = FutureTools.unwrapCompletionException(t);
122124
Assert.assertTrue(cause instanceof RuntimeException);
123125
Assert.assertEquals(1, cnt.get());
124126
Assert.assertEquals("some error message", cause.getMessage());
@@ -136,7 +138,7 @@ public void successSession_exceptionResult() {
136138
}).join();
137139
Assert.fail("expected exception not thrown");
138140
} catch (Throwable t) {
139-
Throwable cause = Async.unwrapCompletionException(t);
141+
Throwable cause = FutureTools.unwrapCompletionException(t);
140142
Assert.assertTrue(cause instanceof UnexpectedResultException);
141143
Assert.assertEquals(3, cnt.get());
142144
Assert.assertEquals("Cannot get value, code: NOT_FOUND", cause.getMessage());
@@ -375,7 +377,7 @@ public void exceptionSession() {
375377
}).join();
376378
Assert.fail("expected exception not thrown");
377379
} catch (Throwable t) {
378-
Throwable cause = Async.unwrapCompletionException(t);
380+
Throwable cause = FutureTools.unwrapCompletionException(t);
379381
Assert.assertTrue(cause instanceof RuntimeException);
380382
Assert.assertEquals("something goes wrong here", cause.getMessage());
381383
}
@@ -550,7 +552,7 @@ public ScheduledExecutorService getScheduler() {
550552
@Override
551553
public CompletableFuture<Result<Session>> createSession(Duration timeout) {
552554
retriesCount.incrementAndGet();
553-
return Async.failedFuture(new RuntimeException("something goes wrong here"));
555+
return FutureTools.failedFuture(new RuntimeException("something goes wrong here"));
554556
}
555557
}
556558

0 commit comments

Comments
 (0)