File tree Expand file tree Collapse file tree 2 files changed +8
-3
lines changed
topic/src/main/java/tech/ydb/topic/read/impl Expand file tree Collapse file tree 2 files changed +8
-3
lines changed Original file line number Diff line number Diff line change @@ -83,8 +83,8 @@ public long getId() {
8383 return id ;
8484 }
8585
86- public long getFullId () {
87- return id ;
86+ public String getFullId () {
87+ return fullId ;
8888 }
8989
9090 public long getPartitionId () {
@@ -166,7 +166,11 @@ public CompletableFuture<Void> addBatches(List<YdbTopic.StreamReadMessage.ReadRe
166166 }
167167
168168 CompletableFuture .runAsync (() -> decode (newBatch ), decompressionExecutor )
169- .thenRun (() -> {
169+ .whenComplete ((res , th ) -> {
170+ if (th != null ) {
171+ logger .error ("[{}] Message decoding failed with error: " , fullId , th );
172+ return ;
173+ }
170174 boolean haveNewBatchesReady = false ;
171175 decodingBatchesLock .lock ();
172176
Original file line number Diff line number Diff line change @@ -402,6 +402,7 @@ private void onStartPartitionSessionRequest(YdbTopic.StreamReadMessage.StartPart
402402 .setId (partitionSessionId )
403403 .setFullId (partitionSessionFullId )
404404 .setTopicPath (request .getPartitionSession ().getPath ())
405+ .setConsumerName (consumerName )
405406 .setPartitionId (partitionId )
406407 .setCommittedOffset (request .getCommittedOffset ())
407408 .setPartitionOffsets (new OffsetsRangeImpl (request .getPartitionOffsets ().getStart (),
You can’t perform that action at this time.
0 commit comments