Skip to content

Commit 22ad5a4

Browse files
committed
Add topic and consumer names for each write / read session start
1 parent 8464a34 commit 22ad5a4

File tree

3 files changed

+30
-21
lines changed

3 files changed

+30
-21
lines changed

topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ public class PartitionSessionImpl {
3939

4040
private final long id;
4141
private final String fullId;
42-
private final String path;
42+
private final String topicPath;
43+
private final String consumerName;
4344
private final long partitionId;
4445
private final PartitionSession sessionInfo;
4546
private final Executor decompressionExecutor;
@@ -60,17 +61,18 @@ public class PartitionSessionImpl {
6061
private PartitionSessionImpl(Builder builder) {
6162
this.id = builder.id;
6263
this.fullId = builder.fullId;
63-
this.path = builder.path;
64+
this.topicPath = builder.topicPath;
65+
this.consumerName = builder.consumerName;
6466
this.partitionId = builder.partitionId;
65-
this.sessionInfo = new PartitionSession(id, partitionId, path);
67+
this.sessionInfo = new PartitionSession(id, partitionId, topicPath);
6668
this.lastReadOffset = builder.committedOffset;
6769
this.lastCommittedOffset = builder.committedOffset;
6870
this.decompressionExecutor = builder.decompressionExecutor;
6971
this.dataEventCallback = builder.dataEventCallback;
7072
this.commitFunction = builder.commitFunction;
71-
logger.info("[{}] Partition session for {} is started. CommittedOffset: {}. " +
72-
"Partition offsets: {}-{}", fullId, path, lastReadOffset, builder.partitionOffsets.getStart(),
73-
builder.partitionOffsets.getEnd());
73+
logger.info("[{}] Partition session is started for Topic \"{}\" and Consumer \"{}\". CommittedOffset: {}. " +
74+
"Partition offsets: {}-{}", fullId, topicPath, consumerName, lastReadOffset,
75+
builder.partitionOffsets.getStart(), builder.partitionOffsets.getEnd());
7476
}
7577

7678
public static Builder newBuilder() {
@@ -89,8 +91,8 @@ public long getPartitionId() {
8991
return partitionId;
9092
}
9193

92-
public String getPath() {
93-
return path;
94+
public String getTopicPath() {
95+
return topicPath;
9496
}
9597

9698
public PartitionSession getSessionInfo() {
@@ -216,7 +218,7 @@ public CompletableFuture<Void> commitOffsetRange(OffsetsRange rangeToCommit) {
216218
logger.info("[{}] Offset range [{}, {}) is requested to be committed, but partition session " +
217219
"is already closed", fullId, rangeToCommit.getStart(), rangeToCommit.getEnd());
218220
resultFuture.completeExceptionally(new RuntimeException("Partition session " + id + " (partition " +
219-
partitionId + ") for " + path + " is already closed"));
221+
partitionId + ") for " + topicPath + " is already closed"));
220222
return resultFuture;
221223
}
222224
} finally {
@@ -315,7 +317,7 @@ private void sendDataToReadersIfNeeded() {
315317
DataReceivedEvent event = new DataReceivedEventImpl(this, messagesToRead, offsetsToCommit);
316318
if (logger.isDebugEnabled()) {
317319
logger.debug("[{}] DataReceivedEvent callback with {} message(s) (offsets {}-{}) is about " +
318-
"to be called...", fullId, messagesToRead.size(),messagesToRead.get(0).getOffset(),
320+
"to be called...", fullId, messagesToRead.size(), messagesToRead.get(0).getOffset(),
319321
messagesToRead.get(messagesToRead.size() - 1).getOffset());
320322
}
321323
dataEventCallback.apply(event)
@@ -348,9 +350,9 @@ public void shutdown() {
348350
try {
349351
isWorking.set(false);
350352
logger.info("[{}] Partition session for {} is shutting down. Failing {} commit futures...", fullId,
351-
path, commitFutures.size());
353+
topicPath, commitFutures.size());
352354
commitFutures.values().forEach(f -> f.completeExceptionally(new RuntimeException("Partition session " + id +
353-
" (partition " + partitionId + ") for " + path + " is closed")));
355+
" (partition " + partitionId + ") for " + topicPath + " is closed")));
354356
} finally {
355357
commitFuturesLock.unlock();
356358
}
@@ -371,7 +373,8 @@ public void shutdown() {
371373
public static class Builder {
372374
private long id;
373375
private String fullId;
374-
private String path;
376+
private String topicPath;
377+
private String consumerName;
375378
private long partitionId;
376379
private long committedOffset;
377380
private OffsetsRange partitionOffsets;
@@ -389,8 +392,13 @@ public Builder setFullId(String fullId) {
389392
return this;
390393
}
391394

392-
public Builder setPath(String path) {
393-
this.path = path;
395+
public Builder setTopicPath(String topicPath) {
396+
this.topicPath = topicPath;
397+
return this;
398+
}
399+
400+
public Builder setConsumerName(String consumerName) {
401+
this.consumerName = consumerName;
394402
return this;
395403
}
396404

topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,15 +71,15 @@ public ReaderImpl(TopicRpc topicRpc, ReaderSettings settings) {
7171
message.append(" \"").append(settings.getReaderName()).append("\"");
7272
}
7373
message.append(" (generated id ").append(id).append(")");
74-
message.append(" created for topic(s): ");
74+
message.append(" created for Topic(s) ");
7575
for (TopicReadSettings topic : settings.getTopics()) {
7676
if (topic != settings.getTopics().get(0)) {
7777
message.append(", ");
7878
}
7979
message.append("\"").append(topic.getPath()).append("\"");
8080
}
8181
if (settings.getConsumerName() != null) {
82-
message.append(" and Consumer: \"").append(settings.getConsumerName()).append("\"");
82+
message.append(" and Consumer \"").append(settings.getConsumerName());
8383
consumerName = settings.getConsumerName();
8484
} else {
8585
message.append(" without a Consumer");
@@ -392,7 +392,7 @@ private void onStartPartitionSessionRequest(YdbTopic.StreamReadMessage.StartPart
392392
long partitionSessionId = request.getPartitionSession().getPartitionSessionId();
393393
long partitionId = request.getPartitionSession().getPartitionId();
394394
String partitionSessionFullId = streamId + '/' + partitionSessionId + "-p" + partitionId;
395-
logger.info("[{}] Received StartPartitionSessionRequest for Topic {} and Consumer {}. " +
395+
logger.info("[{}] Received StartPartitionSessionRequest for Topic \"{}\" and Consumer \"{}\". " +
396396
"Partition session {} (partition {}) with committedOffset {} and partitionOffsets [{}-{})",
397397
partitionSessionFullId, request.getPartitionSession().getPath(), consumerName,
398398
partitionSessionId, partitionId, request.getCommittedOffset(),
@@ -401,7 +401,7 @@ private void onStartPartitionSessionRequest(YdbTopic.StreamReadMessage.StartPart
401401
PartitionSessionImpl partitionSession = PartitionSessionImpl.newBuilder()
402402
.setId(partitionSessionId)
403403
.setFullId(partitionSessionFullId)
404-
.setPath(request.getPartitionSession().getPath())
404+
.setTopicPath(request.getPartitionSession().getPath())
405405
.setPartitionId(partitionId)
406406
.setCommittedOffset(request.getCommittedOffset())
407407
.setPartitionOffsets(new OffsetsRangeImpl(request.getPartitionOffsets().getStart(),

topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -413,8 +413,9 @@ private void onInitResponse(YdbTopic.StreamWriteMessage.InitResponse response) {
413413
sessionId = response.getSessionId();
414414
long lastSeqNo = response.getLastSeqNo();
415415
long actualLastSeqNo = lastSeqNo;
416-
logger.info("[{}] Session initialized with id {} (partition {}), lastSeqNo {}, actualLastSeqNo {}",
417-
streamId, sessionId, response.getPartitionId(), lastSeqNo, actualLastSeqNo);
416+
logger.info("[{}] Session with id {} (partition {}) initialized for topic \"{}\", lastSeqNo {}," +
417+
" actualLastSeqNo {}", streamId, sessionId, response.getPartitionId(),
418+
settings.getTopicPath(), lastSeqNo, actualLastSeqNo);
418419
// If there are messages that were already sent before reconnect but haven't received acks,
419420
// their highest seqNo should also be taken in consideration when calculating next seqNo automatically
420421
if (!sentMessages.isEmpty()) {

0 commit comments

Comments
 (0)