diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java index 6e28f86c2..7fb42eb04 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java @@ -83,8 +83,8 @@ public long getId() { return id; } - public long getFullId() { - return id; + public String getFullId() { + return fullId; } public long getPartitionId() { @@ -166,7 +166,11 @@ public CompletableFuture addBatches(List decode(newBatch), decompressionExecutor) - .thenRun(() -> { + .whenComplete((res, th) -> { + if (th != null) { + logger.error("[{}] Message decoding failed with error: ", fullId, th); + return; + } boolean haveNewBatchesReady = false; decodingBatchesLock.lock(); diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java index 984134e06..2cbe5d51d 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java @@ -402,6 +402,7 @@ private void onStartPartitionSessionRequest(YdbTopic.StreamReadMessage.StartPart .setId(partitionSessionId) .setFullId(partitionSessionFullId) .setTopicPath(request.getPartitionSession().getPath()) + .setConsumerName(consumerName) .setPartitionId(partitionId) .setCommittedOffset(request.getCommittedOffset()) .setPartitionOffsets(new OffsetsRangeImpl(request.getPartitionOffsets().getStart(),