Skip to content

Commit bb4705c

Browse files
poorbarcodenikhil-ctds
authored andcommitted
[improve][log] Print ZK path if write to ZK fails due to data being too large to persist (apache#23652)
(cherry picked from commit 5a3a1f1) (cherry picked from commit f9f5be5)
1 parent 97f2349 commit bb4705c

File tree

3 files changed

+12
-4
lines changed

3 files changed

+12
-4
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2188,6 +2188,8 @@ public void operationFailed(ManagedLedgerException exception) {
21882188

21892189
if (State.NoLedger.equals(STATE_UPDATER.get(this))) {
21902190
if (ledger.isNoMessagesAfterPos(mdEntry.newPosition)) {
2191+
log.error("[{}][{}] Metadata ledger creation failed, try to persist the position in the metadata"
2192+
+ " store.", ledger.getName(), name);
21912193
persistPositionToMetaStore(mdEntry, cb);
21922194
} else {
21932195
cb.operationFailed(new ManagedLedgerException("Switch new cursor ledger failed"));
@@ -2863,9 +2865,7 @@ public void operationComplete() {
28632865

28642866
@Override
28652867
public void operationFailed(ManagedLedgerException exception) {
2866-
log.error("[{}][{}] Metadata ledger creation failed {}, try to persist the position in the metadata"
2867-
+ " store.", ledger.getName(), name, exception);
2868-
2868+
log.error("[{}][{}] Metadata ledger creation failed {}", ledger.getName(), name, exception);
28692869
synchronized (pendingMarkDeleteOps) {
28702870
// At this point we don't have a ledger ready
28712871
STATE_UPDATER.set(ManagedCursorImpl.this, State.NoLedger);

pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import lombok.SneakyThrows;
3333
import lombok.extern.slf4j.Slf4j;
3434
import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
35+
import org.apache.commons.lang3.tuple.Pair;
3536
import org.apache.pulsar.common.util.FutureUtil;
3637
import org.apache.pulsar.metadata.api.GetResult;
3738
import org.apache.pulsar.metadata.api.MetadataStore;
@@ -200,10 +201,15 @@ protected void batchOperation(List<MetadataOp> ops) {
200201
Collectors.groupingBy(MetadataOp::getType, Collectors.summingInt(op -> 1)))
201202
.entrySet().stream().map(e -> e.getValue() + " " + e.getKey().name() + " entries")
202203
.collect(Collectors.joining(", "));
204+
List<Pair> opsForLog = ops.stream()
205+
.filter(item -> item.size() > 256 * 1024)
206+
.map(op -> Pair.of(op.getPath(), op.size()))
207+
.collect(Collectors.toList());
203208
Long totalSize = ops.stream().collect(Collectors.summingLong(MetadataOp::size));
204209
log.warn("Connection loss while executing batch operation of {} "
205210
+ "of total data size of {}. "
206-
+ "Retrying individual operations one-by-one.", countsByType, totalSize);
211+
+ "Retrying individual operations one-by-one. ops whose size > 256KB: {}",
212+
countsByType, totalSize, opsForLog);
207213

208214
// Retry with the individual operations
209215
executor.schedule(() -> {

pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/MetadataOp.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,6 @@ default OpGetChildren asGetChildren() {
5151
default OpPut asPut() {
5252
return (OpPut) this;
5353
}
54+
55+
String getPath();
5456
}

0 commit comments

Comments
 (0)