Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ public class DefaultWriter implements Writer {
private final AtomicLong trimOffset = new AtomicLong(-1);
private CompletableFuture<Void> lastTrimCf = CompletableFuture.completedFuture(null);

// --- BEGIN: Offset monotonicity fix ---
// Lightweight object lock to claim offset ranges atomically.
private final Object offsetClaimLock = new Object();
// --- END: Offset monotonicity fix ---

public DefaultWriter(Time time, ObjectStorage objectStorage, ObjectWALConfig config) {
this.time = time;
this.objectStorage = objectStorage;
Expand Down Expand Up @@ -318,6 +323,33 @@ private void tryUploadBulkInWaiting() {
}
}

/**
* Atomically claims a contiguous range of offsets for the provided records.
*
* This method assigns `record.offset` for every record and advances the global
* `nextOffset`. It uses a lightweight lock to ensure concurrent bulk preparations
* cannot reuse the same offsets when uploads are slow or pending.
*
* The method performs only offset arithmetic and assignment; it must be called
* before any buffer allocation or IO to ensure correctness.
*
* @param records the list of records to claim offsets for
* @return the starting offset assigned to the first record
*/
private long claimOffsetRange(List<Record> records) {
synchronized (offsetClaimLock) {
long current = nextOffset.get();
long start = current;

for (Record r : records) {
r.offset = current;
current += r.size;
}
nextOffset.set(current); // advance globally
return start;
}
}

private void uploadBulk0(Bulk bulk) {
try {
long startTime = time.nanoseconds();
Expand All @@ -334,19 +366,23 @@ private void uploadBulk0(Bulk bulk) {
return rst;
});

long firstOffset = bulk.baseOffset;
// Claim offsets atomically to ensure monotonicity even if uploads are delayed.
long firstOffset = claimOffsetRange(records);
long nextOffset = firstOffset;
long lastRecordOffset = nextOffset;

CompositeByteBuf dataBuffer = ByteBufAlloc.compositeByteBuffer();
for (Record record : records) {
record.offset = nextOffset;
lastRecordOffset = record.offset;
ByteBuf data = record.streamRecordBatch.encoded();
ByteBuf header = BYTE_BUF_ALLOC.byteBuffer(RECORD_HEADER_SIZE);
header = WALUtil.generateHeader(data, header, 0, nextOffset);
nextOffset += record.size;
header = WALUtil.generateHeader(data, header, 0, record.offset);

dataBuffer.addComponent(true, header);
dataBuffer.addComponent(true, data);

// Keep track of nextOffset for endOffset calculation.
nextOffset = record.offset + record.size;
}

// Build object buffer.
Expand Down
Loading