Skip to content

Commit 8464a34

Browse files
committed
Improve reader / read session / partition session ids
1 parent 6fa8147 commit 8464a34

File tree

6 files changed

+96
-96
lines changed

6 files changed

+96
-96
lines changed

topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public abstract class GrpcStreamRetrier {
2222
private static final int EXP_BACKOFF_BASE_MS = 256;
2323
private static final int EXP_BACKOFF_CEILING_MS = 40000; // 40 sec (max delays would be 40-80 sec)
2424
private static final int EXP_BACKOFF_MAX_POWER = 7;
25-
private static final int ID_LENGTH = 50;
25+
private static final int ID_LENGTH = 6;
2626
private static final char[] ID_ALPHABET = "abcdefghijklmnopqrstuvwxyzABSDEFGHIJKLMNOPQRSTUVWXYZ1234567890"
2727
.toCharArray();
2828

@@ -34,18 +34,22 @@ public abstract class GrpcStreamRetrier {
3434

3535
protected GrpcStreamRetrier(ScheduledExecutorService scheduler) {
3636
this.scheduler = scheduler;
37-
this.id = new Random().ints(0, ID_ALPHABET.length)
38-
.limit(ID_LENGTH)
39-
.map(charId -> ID_ALPHABET[charId])
40-
.collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
41-
.toString();
37+
this.id = generateRandomId(ID_LENGTH);
4238
}
4339

4440
protected abstract Logger getLogger();
4541
protected abstract String getStreamName();
4642
protected abstract void onStreamReconnect();
4743
protected abstract void onShutdown(String reason);
4844

45+
protected static String generateRandomId(int length) {
46+
return new Random().ints(0, ID_ALPHABET.length)
47+
.limit(length)
48+
.map(charId -> ID_ALPHABET[charId])
49+
.collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
50+
.toString();
51+
}
52+
4953
private void tryScheduleReconnect() {
5054
int currentReconnectCounter = reconnectCounter.get() + 1;
5155
if (MAX_RECONNECT_COUNT > 0 && currentReconnectCounter > MAX_RECONNECT_COUNT) {
@@ -56,8 +60,8 @@ private void tryScheduleReconnect() {
5660
shutdownImpl(errorMessage);
5761
return;
5862
} else {
59-
getLogger().debug("[{}] Maximum retry count ({}}) exceeded. But {} is already shut down.", id,
60-
MAX_RECONNECT_COUNT, getStreamName());
63+
getLogger().info("[{}] Maximum retry count ({}}) exceeded. Need to shutdown {} but it's already " +
64+
"shut down.", id, MAX_RECONNECT_COUNT, getStreamName());
6165
}
6266
}
6367
if (isReconnecting.compareAndSet(false, true)) {

topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,8 @@ public CompletableFuture<Void> commit() {
2929

3030
public CompletableFuture<Void> commitImpl(boolean fromCommitter) {
3131
if (logger.isDebugEnabled()) {
32-
logger.debug("[{}] partition session {} (partition {}): committing {} message(s), offsets" +
33-
" [{},{})" + (fromCommitter ? " from Committer" : ""), partitionSession.getPath(),
34-
partitionSession.getId(), partitionSession.getPartitionId(), messageCount,
35-
offsetsToCommit.getStart(), offsetsToCommit.getEnd());
32+
logger.debug("[{}] Committing {} message(s), offsets [{},{})" + (fromCommitter ? " from Committer" : ""),
33+
partitionSession.getFullId(), messageCount, offsetsToCommit.getStart(), offsetsToCommit.getEnd());
3634
}
3735
return partitionSession.commitOffsetRange(offsetsToCommit);
3836
}

topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@ private void add(OffsetsRange offsetRange) {
4141
rangesLock.unlock();
4242
}
4343
} catch (RuntimeException exception) {
44-
String errorMessage = "Error adding new offset range to DeferredCommitter for partition session " +
45-
partitionSession.getId() + " (partition " + partitionSession.getPartitionId() + "): " +
46-
exception.getMessage();
44+
String errorMessage = "[" + partitionSession.getFullId() + "] Error adding new offset range to " +
45+
"DeferredCommitter for partition session " + partitionSession.getId() + " (partition " +
46+
partitionSession.getPartitionId() + "): " + exception.getMessage();
4747
logger.error(errorMessage);
4848
throw new RuntimeException(errorMessage, exception);
4949
}

0 commit comments

Comments
 (0)