Skip to content

Commit a00ab9e

Browse files
authored
chore(zerozone2): isolate the record encode allocator (#2944)
Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
1 parent f5afb1c commit a00ab9e

File tree

7 files changed

+32
-27
lines changed

7 files changed

+32
-27
lines changed

core/src/main/java/kafka/automq/zerozone/DefaultLinkRecordDecoder.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.kafka.common.record.MemoryRecords;
2323
import org.apache.kafka.common.record.MutableRecordBatch;
2424

25+
import com.automq.stream.s3.cache.SnapshotReadCache;
2526
import com.automq.stream.s3.model.StreamRecordBatch;
2627

2728
import org.slf4j.Logger;
@@ -62,8 +63,8 @@ public CompletableFuture<StreamRecordBatch> decode(StreamRecordBatch src) {
6263
recordBatch.setPartitionLeaderEpoch(linkRecord.partitionLeaderEpoch());
6364
StreamRecordBatch streamRecordBatch = new StreamRecordBatch(src.getStreamId(), src.getEpoch(), src.getBaseOffset(),
6465
-src.getCount(), Unpooled.wrappedBuffer(records.buffer()));
65-
// duplicated copy the payload
66-
streamRecordBatch.encoded();
66+
// The buf will be release after the finally block, so we need copy the data by #encoded.
67+
streamRecordBatch.encoded(SnapshotReadCache.ENCODE_ALLOC);
6768
return streamRecordBatch;
6869
} finally {
6970
src.release();

s3stream/src/main/java/com/automq/stream/s3/ByteBufAlloc.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ public class ByteBufAlloc {
6060
public static final int BLOCK_CACHE = 11;
6161
public static final int S3_WAL = 12;
6262
public static final int POOLED_MEMORY_RECORDS = 13;
63+
public static final int SNAPSHOT_READ_CACHE = 14;
6364

6465
// the MAX_TYPE_NUMBER may change when new type added.
6566
public static final int MAX_TYPE_NUMBER = 20;
@@ -104,6 +105,7 @@ public class ByteBufAlloc {
104105
registerAllocType(BLOCK_CACHE, "block_cache");
105106
registerAllocType(S3_WAL, "s3_wal");
106107
registerAllocType(POOLED_MEMORY_RECORDS, "pooled_memory_records");
108+
registerAllocType(SNAPSHOT_READ_CACHE, "snapshot_read_cache");
107109

108110
}
109111

s3stream/src/main/java/com/automq/stream/s3/StreamRecordBatchCodec.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@
2424

2525
import io.netty.buffer.ByteBuf;
2626

27-
import static com.automq.stream.s3.ByteBufAlloc.ENCODE_RECORD;
28-
2927
public class StreamRecordBatchCodec {
3028
public static final byte MAGIC_V0 = 0x22;
3129
public static final int HEADER_SIZE =
@@ -35,12 +33,11 @@ public class StreamRecordBatchCodec {
3533
+ 8 // baseOffset
3634
+ 4 // lastOffsetDelta
3735
+ 4; // payload length
38-
private static final ByteBufSeqAlloc ENCODE_ALLOC = new ByteBufSeqAlloc(ENCODE_RECORD, 8);
3936

40-
public static ByteBuf encode(StreamRecordBatch streamRecord) {
37+
public static ByteBuf encode(StreamRecordBatch streamRecord, ByteBufSeqAlloc alloc) {
4138
int totalLength = HEADER_SIZE + streamRecord.size(); // payload
4239
// use sequential allocator to avoid memory fragmentation
43-
ByteBuf buf = ENCODE_ALLOC.byteBuffer(totalLength);
40+
ByteBuf buf = alloc.byteBuffer(totalLength);
4441
buf.writeByte(MAGIC_V0);
4542
buf.writeLong(streamRecord.getStreamId());
4643
buf.writeLong(streamRecord.getEpoch());

s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ public void clearStreamRecords(long streamId) {
345345
readLock.lock();
346346
try {
347347
for (LogCacheBlock block : blocks) {
348-
size.addAndGet(-block.free(streamId));
348+
block.free(streamId);
349349
}
350350
} finally {
351351
readLock.unlock();
@@ -478,16 +478,13 @@ public void free() {
478478
}, LOGGER);
479479
}
480480

481-
public long free(long streamId) {
482-
AtomicLong size = new AtomicLong();
481+
public void free(long streamId) {
483482
suppress(() -> {
484483
StreamCache streamCache = map.remove(streamId);
485484
if (streamCache != null) {
486-
size.addAndGet(streamCache.free());
485+
streamCache.free();
487486
}
488487
}, LOGGER);
489-
this.size.addAndGet(-size.get());
490-
return size.get();
491488
}
492489

493490
public void addFreeListener(FreeListener freeListener) {
@@ -625,14 +622,9 @@ synchronized StreamRange range() {
625622
return new StreamRange(startOffset, endOffset);
626623
}
627624

628-
synchronized long free() {
629-
AtomicLong size = new AtomicLong();
630-
records.forEach(record -> {
631-
size.addAndGet(record.occupiedSize());
632-
record.release();
633-
});
625+
synchronized void free() {
626+
records.forEach(StreamRecordBatch::release);
634627
records.clear();
635-
return size.get();
636628
}
637629

638630
synchronized long startOffset() {

s3stream/src/main/java/com/automq/stream/s3/cache/SnapshotReadCache.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.automq.stream.s3.cache;
22

3+
import com.automq.stream.ByteBufSeqAlloc;
34
import com.automq.stream.api.LinkRecordDecoder;
45
import com.automq.stream.s3.DataBlockIndex;
56
import com.automq.stream.s3.ObjectReader;
@@ -47,7 +48,10 @@
4748
import io.opentelemetry.api.common.AttributeKey;
4849
import io.opentelemetry.api.common.Attributes;
4950

51+
import static com.automq.stream.s3.ByteBufAlloc.SNAPSHOT_READ_CACHE;
52+
5053
public class SnapshotReadCache {
54+
public static final ByteBufSeqAlloc ENCODE_ALLOC = new ByteBufSeqAlloc(SNAPSHOT_READ_CACHE, 8);
5155
private static final Logger LOGGER = LoggerFactory.getLogger(SnapshotReadCache.class);
5256
private static final long MAX_INFLIGHT_LOAD_BYTES = 100L * 1024 * 1024;
5357

@@ -232,8 +236,7 @@ public void run() {
232236
return;
233237
}
234238
records.addAll(cfList.stream().map(CompletableFuture::join).toList());
235-
// move to mem pool
236-
records.forEach(StreamRecordBatch::encoded);
239+
records.forEach(r -> r.encoded(ENCODE_ALLOC));
237240
loadCf.complete(null);
238241
});
239242
}).whenComplete((rst, ex) -> {

s3stream/src/main/java/com/automq/stream/s3/model/StreamRecordBatch.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,17 @@
1919

2020
package com.automq.stream.s3.model;
2121

22+
import com.automq.stream.ByteBufSeqAlloc;
2223
import com.automq.stream.s3.StreamRecordBatchCodec;
2324
import com.automq.stream.utils.biniarysearch.ComparableItem;
2425

2526
import io.netty.buffer.ByteBuf;
2627

28+
import static com.automq.stream.s3.ByteBufAlloc.ENCODE_RECORD;
29+
2730
public class StreamRecordBatch implements Comparable<StreamRecordBatch>, ComparableItem<Long> {
2831
private static final int OBJECT_OVERHEAD = 48 /* fields */ + 48 /* ByteBuf payload */ + 48 /* ByteBuf encoded */;
32+
private static final ByteBufSeqAlloc ENCODE_ALLOC = new ByteBufSeqAlloc(ENCODE_RECORD, 8);
2933
private final long streamId;
3034
private final long epoch;
3135
private final long baseOffset;
@@ -42,9 +46,13 @@ public StreamRecordBatch(long streamId, long epoch, long baseOffset, int count,
4246
}
4347

4448
public ByteBuf encoded() {
49+
return encoded(ENCODE_ALLOC);
50+
}
51+
52+
public ByteBuf encoded(ByteBufSeqAlloc alloc) {
4553
// TODO: keep the ref count
4654
if (encoded == null) {
47-
encoded = StreamRecordBatchCodec.encode(this);
55+
encoded = StreamRecordBatchCodec.encode(this, alloc);
4856
ByteBuf oldPayload = payload;
4957
payload = encoded.slice(encoded.readerIndex() + encoded.readableBytes() - payload.readableBytes(), payload.readableBytes());
5058
oldPayload.release();

s3stream/src/test/java/com/automq/stream/s3/cache/LogCacheTest.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,15 +96,17 @@ public void testClearStreamRecords() {
9696
logCache.archiveCurrentBlock();
9797
logCache.put(new StreamRecordBatch(233L, 0L, 13L, 2, TestUtils.random(20)));
9898

99-
long size0 = logCache.size();
10099
logCache.put(new StreamRecordBatch(234L, 0L, 13L, 2, TestUtils.random(20)));
101-
long size1 = logCache.size();
102100

101+
assertTrue(logCache.blocks.get(0).containsStream(233L));
102+
assertTrue(logCache.blocks.get(1).containsStream(234L));
103103
logCache.clearStreamRecords(233L);
104-
assertEquals(size1 - size0, logCache.size());
104+
assertFalse(logCache.blocks.get(0).containsStream(233L));
105+
assertTrue(logCache.blocks.get(1).containsStream(234L));
105106

106107
logCache.clearStreamRecords(234L);
107-
assertEquals(0, logCache.size());
108+
assertFalse(logCache.blocks.get(0).containsStream(233L));
109+
assertFalse(logCache.blocks.get(1).containsStream(234L));
108110
}
109111

110112
@Test

0 commit comments

Comments
 (0)