Skip to content

Commit cc2fa64

Browse files
author
Igor Melnichenko
committed
Code cleanup
1 parent 50b241a commit cc2fa64

File tree

74 files changed

+404
-329
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

74 files changed

+404
-329
lines changed

coordination/src/main/java/tech/ydb/coordination/impl/SessionImpl.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ private CompletableFuture<Result<Long>> connectToSession(Stream stream, long ses
152152
SessionState local = state.get();
153153
boolean recoverableState = local.getState() == State.CONNECTED || local.getState() == State.RECONNECTING;
154154
if (recoverableState && local.hasStream(stream)) {
155-
logger.debug("stream {} starts to recover");
155+
logger.debug("stream {} starts to recover", this);
156156
long disconnectedAt = clock.millis();
157157
restoreSession(disconnectedAt, 0, messagesToRetry);
158158
} else {
@@ -162,7 +162,7 @@ private CompletableFuture<Result<Long>> connectToSession(Stream stream, long ses
162162
}
163163
}, executor);
164164

165-
// and send session start message with id of previos session (or zero if it's first connect)
165+
// and send session start message with id of previous session (or zero if it's first connect)
166166
return stream.sendSessionStart(sessionID, nodePath, connectTimeout, protectionKey);
167167
}
168168

@@ -207,7 +207,7 @@ private void restoreSession(long disconnectedAt, int retryCount, List<StreamMsg<
207207
long elapsedTimeMs = clock.millis() - disconnectedAt;
208208
long retryInMs = retryPolicy.nextRetryMs(retryCount, elapsedTimeMs);
209209
if (retryInMs < 0) {
210-
logger.debug("stream {} lost connection by retry policy");
210+
logger.debug("stream {} lost connection by retry policy", this);
211211
updateState(local, makeLostState(local));
212212
completeMessagesWithBadSession(messagesToRetry);
213213
return;
@@ -221,14 +221,14 @@ private void restoreSession(long disconnectedAt, int retryCount, List<StreamMsg<
221221
}
222222

223223
if (retryInMs > 0) {
224-
logger.debug("stream {} shedule next retry {} in {} ms", this, retryCount, retryInMs);
224+
logger.debug("stream {} schedule next retry {} in {} ms", this, retryCount, retryInMs);
225225
rpc.getScheduler().schedule(
226226
() -> reconnect(stream, disconnectedAt, retryCount, messagesToRetry),
227227
retryInMs,
228228
TimeUnit.MILLISECONDS
229229
);
230230
} else {
231-
logger.debug("stream {} immediatelly retry {}", this, retryCount);
231+
logger.debug("stream {} immediately retry {}", this, retryCount);
232232
reconnect(stream, disconnectedAt, retryCount, messagesToRetry);
233233
}
234234
}

coordination/src/main/java/tech/ydb/coordination/impl/Stream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public CompletableFuture<Status> stop() {
9595
logger.trace("stream {} send session stop msg", hashCode());
9696
stream.sendNext(stopMsg);
9797

98-
// schedule cancelation of grpc-stream
98+
// schedule cancellation of grpc-stream
9999
// if server doesn't close stream by stop message - this timer cancels grpc stream
100100
final Future<?> timer = scheduler.schedule(this::cancelStream, SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS);
101101
stopFuture.whenComplete((st, ex) -> {

coordination/src/main/java/tech/ydb/coordination/impl/StreamMsg.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ public boolean isIdempotent() {
4646
protected abstract boolean handleResponse(SessionResponse response);
4747
protected abstract boolean handleError(Status status);
4848

49-
protected Status incorrectTypeStatus(SessionResponse response, String exptected) {
50-
String msg = "Incorrect type of response " + TextFormat.shortDebugString(response) + ", expected " + exptected;
49+
protected Status incorrectTypeStatus(SessionResponse response, String expected) {
50+
String msg = "Incorrect type of response " + TextFormat.shortDebugString(response) + ", expected " + expected;
5151
return Status.of(StatusCode.CLIENT_INTERNAL_ERROR, Issue.of(msg, Issue.Severity.ERROR));
5252
}
5353

@@ -78,8 +78,8 @@ public static StreamMsg<Result<SemaphoreDescription>> describeSemaphore(String n
7878
}
7979

8080
public static StreamMsg<Result<SemaphoreWatcher>> watchSemaphore(
81-
String name, DescribeSemaphoreMode decribeMode, WatchSemaphoreMode watchMode) {
82-
return new WatchSemaphoreMsg(name, decribeMode, watchMode);
81+
String name, DescribeSemaphoreMode describeMode, WatchSemaphoreMode watchMode) {
82+
return new WatchSemaphoreMsg(name, describeMode, watchMode);
8383
}
8484

8585
private abstract static class BaseStatusMsg extends StreamMsg<Status> {
@@ -308,9 +308,7 @@ public boolean handleResponse(SessionResponse response) {
308308
}
309309
SemaphoreDescription desc = null;
310310
SessionResponse.DescribeSemaphoreResult result = response.getDescribeSemaphoreResult();
311-
if (result.getSemaphoreDescription() != null) {
312-
desc = new SemaphoreDescription(result.getSemaphoreDescription());
313-
}
311+
desc = new SemaphoreDescription(result.getSemaphoreDescription());
314312
return handleResult(desc, result.getStatus(), result.getIssuesList());
315313
}
316314
}
@@ -353,10 +351,8 @@ public boolean handleResponse(SessionResponse response) {
353351
}
354352
SemaphoreWatcher watcher = null;
355353
SessionResponse.DescribeSemaphoreResult result = response.getDescribeSemaphoreResult();
356-
if (result.getSemaphoreDescription() != null) {
357-
SemaphoreDescription desc = new SemaphoreDescription(result.getSemaphoreDescription());
358-
watcher = new SemaphoreWatcher(desc, changedMsg.getResult());
359-
}
354+
SemaphoreDescription desc = new SemaphoreDescription(result.getSemaphoreDescription());
355+
watcher = new SemaphoreWatcher(desc, changedMsg.getResult());
360356
return handleResult(watcher, result.getStatus(), result.getIssuesList());
361357
}
362358

coordination/src/test/java/tech/ydb/coordination/CoordinationServiceTest.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ public void createSessionTest() {
130130
CoordinationSession session = CLIENT.createSession(nodePath);
131131

132132
List<CoordinationSession.State> states = new ArrayList<>();
133-
session.addStateListener(state -> states.add(state));
133+
session.addStateListener(states::add);
134134

135135
Assert.assertEquals(CoordinationSession.State.INITIAL, session.getState());
136136
Assert.assertEquals(-1, session.getId());
@@ -143,7 +143,6 @@ public void createSessionTest() {
143143
Assert.assertEquals(CoordinationSession.State.CONNECTING, states.get(0));
144144
Assert.assertEquals(CoordinationSession.State.CONNECTED, states.get(1));
145145
Assert.assertEquals(CoordinationSession.State.CONNECTED, session.getState());
146-
Assert.assertNotNull(session.getId());
147146

148147
logger.info("stop session");
149148
session.close();
@@ -173,7 +172,7 @@ public void createSemaphoreTest() {
173172
session.createSemaphore(semaphoreName, 10, semaphoreData).join().expectSuccess("cannot create semaphore");
174173

175174
logger.info("delete semaphore");
176-
session.deleteSemaphore(semaphoreName).join().expectSuccess("cannpt create semaphore");
175+
session.deleteSemaphore(semaphoreName).join().expectSuccess("cannot create semaphore");
177176

178177
logger.info("stop session");
179178
session.close();
@@ -217,19 +216,19 @@ public void acquireSemaphoreTest() {
217216
logger.info("delete semaphore");
218217
session2.deleteSemaphore(semaphoreName).join().expectSuccess("cannot create semaphore");
219218

220-
logger.info("take first ephemaral lease");
219+
logger.info("take first ephemeral lease");
221220
lease2 = session2.acquireEphemeralSemaphore(semaphoreName, true, timeout);
222221
lease2.join().getStatus().expectSuccess("cannot acquire semaphore");
223222

224-
logger.info("request second ephemaral lease, waiting");
223+
logger.info("request second ephemeral lease, waiting");
225224
lease1 = session1.acquireEphemeralSemaphore(semaphoreName, true, timeout);
226225
Assert.assertFalse(lease1.isDone());
227226

228-
logger.info("release first ephemaral lease, complete second ephemaral lease");
227+
logger.info("release first ephemeral lease, complete second ephemaral lease");
229228
lease2.join().getValue().release().join();
230229
lease1.join().getStatus().expectSuccess("cannot acquire semaphore");
231230

232-
logger.info("release second ephemaral lease");
231+
logger.info("release second ephemeral lease");
233232
lease1.join().getValue().release().join();
234233

235234
logger.info("stop sessions");
@@ -280,7 +279,7 @@ public void describeAndUpdateSemaphoreTest() {
280279
Assert.assertTrue(description.getWaitersList().isEmpty());
281280

282281
logger.info("delete semaphore");
283-
session2.deleteSemaphore(semaphoreName).join().expectSuccess("cannpt create semaphore");
282+
session2.deleteSemaphore(semaphoreName).join().expectSuccess("cannot create semaphore");
284283
logger.info("stop sessions");
285284
session1.close();
286285
session2.close();
@@ -380,11 +379,11 @@ public void watchSemaphoreTest() {
380379
Assert.assertTrue(description.getWaitersList().isEmpty());
381380

382381
logger.info("delete semaphore");
383-
session2.deleteSemaphore(semaphoreName).join().expectSuccess("cannpt create semaphore");
382+
session2.deleteSemaphore(semaphoreName).join().expectSuccess("cannot create semaphore");
384383
logger.info("stop sessions");
385384
session1.close();
386385
session2.close();
387386
logger.info("drop node");
388387
CLIENT.dropNode(nodePath).join().expectSuccess("removing of node failed");
389388
}
390-
}
389+
}

coordination/src/test/java/tech/ydb/coordination/impl/GrpcStreamMock.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import org.junit.Assert;
99

1010
import tech.ydb.core.Status;
11-
import tech.ydb.core.StatusCode;
1211
import tech.ydb.core.grpc.GrpcReadWriteStream;
1312
import tech.ydb.proto.coordination.SessionRequest;
1413
import tech.ydb.proto.coordination.SessionResponse;
@@ -70,10 +69,6 @@ public void closeConnectionOK() {
7069
executor.execute(() -> finish.complete(Status.SUCCESS));
7170
}
7271

73-
public void closeConnecttionUnavailable() {
74-
executor.execute(() -> finish.complete(Status.of(StatusCode.TRANSPORT_UNAVAILABLE)));
75-
}
76-
7772
public boolean hasNextRequest() {
7873
return !requests.isEmpty();
7974
}
@@ -87,7 +82,6 @@ public void responseSessionStarted(long id, long timeout) {
8782
SessionResponse.SessionStarted.newBuilder().setSessionId(id).setTimeoutMillis(timeout).build()
8883
).build();
8984
executor.execute(() -> observer.onNext(response));
90-
9185
}
9286

9387
public void responseSessionStopped(long id) {

coordination/src/test/java/tech/ydb/coordination/impl/StreamTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public void beforeEach() {
3535
Mockito.when(rpc.getScheduler()).thenReturn(scheduler);
3636
Mockito.when(rpc.getDatabase()).thenReturn("/mocked");
3737
Mockito.when(rpc.createSession(Mockito.any())).thenAnswer(i -> {
38-
GrpcStreamMock mock = new GrpcStreamMock(r -> r.run());
38+
GrpcStreamMock mock = new GrpcStreamMock(Runnable::run);
3939
grpcMocks.add(mock);
4040
return mock;
4141
});

core/src/main/java/tech/ydb/core/Issue.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,10 +168,10 @@ public String toString() {
168168

169169
public void toString(StringBuilder sb) {
170170
if (position != Position.EMPTY) {
171-
sb.append(position.toString());
171+
sb.append(position);
172172
if (endPosition != Position.EMPTY) {
173173
sb.append(" - ");
174-
sb.append(endPosition.toString());
174+
sb.append(endPosition);
175175
}
176176
sb.append(": ");
177177
}

core/src/main/java/tech/ydb/core/Result.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ static <V> Result<V> fail(UnexpectedResultException unexpected) {
4949
}
5050

5151
static <V> Result<V> error(String message, Throwable throwable) {
52-
if (throwable != null && throwable instanceof UnexpectedResultException) {
52+
if (throwable instanceof UnexpectedResultException) {
5353
return new Unexpected<>(message, (UnexpectedResultException) throwable);
5454
}
5555
return new Error<>(message, throwable);
@@ -67,7 +67,7 @@ static <V> Result<V> error(String message, Throwable throwable) {
6767
* // Execute one query, with opening new transaction
6868
* session.executeDataQuery(...)
6969
* // Execute second query if first was successful
70-
* .thenCompose(Result.compose(fisrt -> session.executeDataQuery(...)))
70+
* .thenCompose(Result.compose(first -> session.executeDataQuery(...)))
7171
* // Commit transaction after two successful query executions
7272
* .thenCompose(Result.composeStatus(second -> session.commitTransaction(...)));
7373
* }</pre>
@@ -93,7 +93,7 @@ static <T, U> Function<Result<T>, CompletableFuture<Result<U>>> compose(
9393
* // Execute one query, with opening new transaction
9494
* session.executeDataQuery(...)
9595
* // Execute second query if first was successful
96-
* .thenCompose(Result.compose(fisrt -> session.executeDataQuery(...)))
96+
* .thenCompose(Result.compose(first -> session.executeDataQuery(...)))
9797
* // Commit transaction after two successful query executions
9898
* .thenCompose(Result.composeStatus(second -> session.commitTransaction(...)));
9999
* }</pre>
@@ -118,7 +118,7 @@ static <T> Function<Result<T>, CompletableFuture<Status>> composeStatus(
118118
* // Execute one query
119119
* session.executeDataQuery(...)
120120
* // Execute second query if first was successful
121-
* .thenCompose(Result.compose(fisrt -> session
121+
* .thenCompose(Result.compose(first -> session
122122
* .executeDataQuery(...)
123123
* // But use first request result as the result of
124124
* .thenCompose(Result.composeValue(first))
@@ -147,26 +147,31 @@ private Success(V value, Status status) {
147147
this.status = status;
148148
}
149149

150+
@Nonnull
150151
@Override
151152
public Status getStatus() {
152153
return status;
153154
}
154155

156+
@Nonnull
155157
@Override
156158
public V getValue() {
157159
return value;
158160
}
159161

162+
@Nonnull
160163
@Override
161164
public <U> Success<U> map(Function<V, U> mapper) {
162165
return new Success<>(mapper.apply(value), status);
163166
}
164167

168+
@Nonnull
165169
@Override
166170
public <U> CompletableFuture<Result<U>> mapResultFuture(Function<V, CompletableFuture<Result<U>>> mapper) {
167171
return mapper.apply(value);
168172
}
169173

174+
@Nonnull
170175
@Override
171176
public CompletableFuture<Status> mapStatusFuture(Function<V, CompletableFuture<Status>> mapper) {
172177
return mapper.apply(value);
@@ -208,28 +213,33 @@ private Fail(Status status) {
208213
this.status = status;
209214
}
210215

216+
@Nonnull
211217
@Override
212218
@SuppressWarnings("unchecked")
213219
public <U> Fail<U> map(Function<V, U> mapper) {
214220
return (Fail<U>) this;
215221
}
216222

223+
@Nonnull
217224
@Override
218225
@SuppressWarnings("unchecked")
219226
public <U> CompletableFuture<Result<U>> mapResultFuture(Function<V, CompletableFuture<Result<U>>> mapper) {
220227
return CompletableFuture.completedFuture((Fail<U>) this);
221228
}
222229

230+
@Nonnull
223231
@Override
224232
public CompletableFuture<Status> mapStatusFuture(Function<V, CompletableFuture<Status>> mapper) {
225233
return CompletableFuture.completedFuture(status);
226234
}
227235

236+
@Nonnull
228237
@Override
229238
public Status getStatus() {
230239
return status;
231240
}
232241

242+
@Nonnull
233243
@Override
234244
public V getValue() {
235245
throw new UnexpectedResultException("Cannot get value", status);
@@ -270,28 +280,33 @@ private Unexpected(String message, UnexpectedResultException cause) {
270280
this.cause = (message == null || message.isEmpty()) ? cause : new UnexpectedResultException(message, cause);
271281
}
272282

283+
@Nonnull
273284
@Override
274285
public Status getStatus() {
275286
return cause.getStatus();
276287
}
277288

289+
@Nonnull
278290
@Override
279291
public V getValue() {
280292
throw cause;
281293
}
282294

295+
@Nonnull
283296
@Override
284297
@SuppressWarnings("unchecked")
285298
public <U> Unexpected<U> map(Function<V, U> mapper) {
286299
return (Unexpected<U>) this;
287300
}
288301

302+
@Nonnull
289303
@Override
290304
@SuppressWarnings("unchecked")
291305
public <U> CompletableFuture<Result<U>> mapResultFuture(Function<V, CompletableFuture<Result<U>>> mapper) {
292306
return CompletableFuture.completedFuture((Unexpected<U>) this);
293307
}
294308

309+
@Nonnull
295310
@Override
296311
public CompletableFuture<Status> mapStatusFuture(Function<V, CompletableFuture<Status>> mapper) {
297312
return CompletableFuture.completedFuture(cause.getStatus());
@@ -338,30 +353,34 @@ private Error(String message, Throwable cause) {
338353
this.status = ERROR.withCause(cause);
339354
}
340355

356+
@Nonnull
341357
@Override
342358
public Status getStatus() {
343359
return status;
344360
}
345361

362+
@Nonnull
346363
@Override
347364
public V getValue() {
348365
throw new UnexpectedResultException(message, status);
349366
}
350367

368+
@Nonnull
351369
@Override
352370
@SuppressWarnings("unchecked")
353371
public <U> Error<U> map(Function<V, U> mapper) {
354372
return (Error<U>) this;
355373
}
356374

375+
@Nonnull
357376
@Override
358-
@SuppressWarnings("unchecked")
359377
public <U> CompletableFuture<Result<U>> mapResultFuture(Function<V, CompletableFuture<Result<U>>> mapper) {
360378
CompletableFuture<Result<U>> future = new CompletableFuture<>();
361379
future.completeExceptionally(status.getCause());
362380
return future;
363381
}
364382

383+
@Nonnull
365384
@Override
366385
public CompletableFuture<Status> mapStatusFuture(Function<V, CompletableFuture<Status>> mapper) {
367386
return CompletableFuture.completedFuture(status);

0 commit comments

Comments
 (0)