From a9544bac1c43aec564c02b5db1a76a6b4494834e Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 28 Nov 2024 17:18:51 +0800 Subject: [PATCH 1/5] [improve] [zk] Print details if write ZK failed due to package is too large --- .../org/apache/pulsar/metadata/impl/ZKMetadataStore.java | 6 +++++- .../apache/pulsar/metadata/impl/batching/MetadataOp.java | 2 ++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java index 603a4503dc8bb..97e00191c0c06 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java @@ -32,6 +32,7 @@ import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.GetResult; import org.apache.pulsar.metadata.api.MetadataStore; @@ -201,10 +202,13 @@ protected void batchOperation(List ops) { Collectors.groupingBy(MetadataOp::getType, Collectors.summingInt(op -> 1))) .entrySet().stream().map(e -> e.getValue() + " " + e.getKey().name() + " entries") .collect(Collectors.joining(", ")); + List opsForLog = ops.stream().map(op -> Pair.of(op.getPath(), op.size())) + .collect(Collectors.toList()); Long totalSize = ops.stream().collect(Collectors.summingLong(MetadataOp::size)); log.warn("Connection loss while executing batch operation of {} " + "of total data size of {}. " - + "Retrying individual operations one-by-one.", countsByType, totalSize); + + "Retrying individual operations one-by-one. ops: {}", + countsByType, totalSize, opsForLog); // Retry with the individual operations executor.schedule(() -> { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/MetadataOp.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/MetadataOp.java index abf60f7b7245c..06ff425372b58 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/MetadataOp.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/MetadataOp.java @@ -51,4 +51,6 @@ default OpGetChildren asGetChildren() { default OpPut asPut() { return (OpPut) this; } + + String getPath(); } From 52aa67290f27e74d31cf764087d7d6992bc45ed2 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 28 Nov 2024 18:16:14 +0800 Subject: [PATCH 2/5] - --- .../apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 478c6a1b37976..1cb04c5c7c61a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -2247,6 +2247,8 @@ public void operationFailed(ManagedLedgerException exception) { if (State.NoLedger.equals(STATE_UPDATER.get(this))) { if (ledger.isNoMessagesAfterPos(mdEntry.newPosition)) { + log.error("[{}][{}] Metadata ledger creation failed {}, try to persist the position in the metadata" + + " store.", ledger.getName(), name); persistPositionToMetaStore(mdEntry, cb); } else { cb.operationFailed(new ManagedLedgerException("Switch new cursor ledger failed")); @@ -2921,9 +2923,7 @@ public void operationComplete() { @Override public void operationFailed(ManagedLedgerException exception) { - log.error("[{}][{}] Metadata ledger creation failed {}, try to persist the position in the metadata" - + " store.", ledger.getName(), name, exception); - + log.error("[{}][{}] Cursor ledger creation failed {}", ledger.getName(), name, exception); synchronized (pendingMarkDeleteOps) { // At this point we don't have a ledger ready STATE_UPDATER.set(ManagedCursorImpl.this, State.NoLedger); From c7d7678bcb57370107d7ff7301687ec4da2856a3 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 28 Nov 2024 22:38:04 +0800 Subject: [PATCH 3/5] address comments --- .../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 1cb04c5c7c61a..6838947f46268 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -2923,7 +2923,7 @@ public void operationComplete() { @Override public void operationFailed(ManagedLedgerException exception) { - log.error("[{}][{}] Cursor ledger creation failed {}", ledger.getName(), name, exception); + log.error("[{}][{}] Metadata ledger creation failed {}", ledger.getName(), name, exception); synchronized (pendingMarkDeleteOps) { // At this point we don't have a ledger ready STATE_UPDATER.set(ManagedCursorImpl.this, State.NoLedger); From f033e90bb632d64190f8e366ea19fee8edc11994 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 28 Nov 2024 22:40:37 +0800 Subject: [PATCH 4/5] address comments --- .../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 6838947f46268..ec4154a858d9e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -2247,7 +2247,7 @@ public void operationFailed(ManagedLedgerException exception) { if (State.NoLedger.equals(STATE_UPDATER.get(this))) { if (ledger.isNoMessagesAfterPos(mdEntry.newPosition)) { - log.error("[{}][{}] Metadata ledger creation failed {}, try to persist the position in the metadata" + log.error("[{}][{}] Metadata ledger creation failed, try to persist the position in the metadata" + " store.", ledger.getName(), name); persistPositionToMetaStore(mdEntry, cb); } else { From cafadbd7c41bc13153f41e63ea84420325b70cf9 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 2 Dec 2024 17:26:21 +0800 Subject: [PATCH 5/5] improve logs --- .../org/apache/pulsar/metadata/impl/ZKMetadataStore.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java index 97e00191c0c06..4c24aa5938b93 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java @@ -202,12 +202,14 @@ protected void batchOperation(List ops) { Collectors.groupingBy(MetadataOp::getType, Collectors.summingInt(op -> 1))) .entrySet().stream().map(e -> e.getValue() + " " + e.getKey().name() + " entries") .collect(Collectors.joining(", ")); - List opsForLog = ops.stream().map(op -> Pair.of(op.getPath(), op.size())) + List opsForLog = ops.stream() + .filter(item -> item.size() > 256 * 1024) + .map(op -> Pair.of(op.getPath(), op.size())) .collect(Collectors.toList()); Long totalSize = ops.stream().collect(Collectors.summingLong(MetadataOp::size)); log.warn("Connection loss while executing batch operation of {} " + "of total data size of {}. " - + "Retrying individual operations one-by-one. ops: {}", + + "Retrying individual operations one-by-one. ops whose size > 256KB: {}", countsByType, totalSize, opsForLog); // Retry with the individual operations