Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2247,6 +2247,8 @@ public void operationFailed(ManagedLedgerException exception) {

if (State.NoLedger.equals(STATE_UPDATER.get(this))) {
if (ledger.isNoMessagesAfterPos(mdEntry.newPosition)) {
log.error("[{}][{}] Metadata ledger creation failed, try to persist the position in the metadata"
+ " store.", ledger.getName(), name);
persistPositionToMetaStore(mdEntry, cb);
} else {
cb.operationFailed(new ManagedLedgerException("Switch new cursor ledger failed"));
Expand Down Expand Up @@ -2921,9 +2923,7 @@ public void operationComplete() {

@Override
public void operationFailed(ManagedLedgerException exception) {
log.error("[{}][{}] Metadata ledger creation failed {}, try to persist the position in the metadata"
+ " store.", ledger.getName(), name, exception);

log.error("[{}][{}] Metadata ledger creation failed {}", ledger.getName(), name, exception);
synchronized (pendingMarkDeleteOps) {
// At this point we don't have a ledger ready
STATE_UPDATER.set(ManagedCursorImpl.this, State.NoLedger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStore;
Expand Down Expand Up @@ -201,10 +202,15 @@ protected void batchOperation(List<MetadataOp> ops) {
Collectors.groupingBy(MetadataOp::getType, Collectors.summingInt(op -> 1)))
.entrySet().stream().map(e -> e.getValue() + " " + e.getKey().name() + " entries")
.collect(Collectors.joining(", "));
List<Pair> opsForLog = ops.stream()
.filter(item -> item.size() > 256 * 1024)
.map(op -> Pair.of(op.getPath(), op.size()))
.collect(Collectors.toList());
Long totalSize = ops.stream().collect(Collectors.summingLong(MetadataOp::size));
log.warn("Connection loss while executing batch operation of {} "
+ "of total data size of {}. "
+ "Retrying individual operations one-by-one.", countsByType, totalSize);
+ "Retrying individual operations one-by-one. ops whose size > 256KB: {}",
countsByType, totalSize, opsForLog);

// Retry with the individual operations
executor.schedule(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,6 @@ default OpGetChildren asGetChildren() {
default OpPut asPut() {
return (OpPut) this;
}

String getPath();
}
Loading