Skip to content

Commit 6847223

Browse files
authored
feat(block-wal): use version V1 and support sequential return (#2682)
* feat(s3stream/block-wal): complete appends sequentially (#2665) * feat(s3stream/block-wal): complete appends sequentially * fix: use a lock to ensure there is at most one callback thread --------- * feat(s3stream/wal): write a padding record when no space at the end of device (#2673) * refactor(RecordHeader): remove useless methods * feat(SlidingWindowService): write a padding block when not enough space * feat(recovery): handle padding records * fix: fix incorrect assertion --------- * feat(s3stream/wal): defaults to using version V1 and forward compatible (#2676) * feat: introduce the Block WAL V1 * feat: impl `RecoverIteratorV1` which only recovers continuous records * feat: wal forward compatibility * test: fix tests * test: test recover from WAL V1 * test: test upgrade --------- --------- Signed-off-by: Ning Yu <[email protected]>
1 parent cf724bc commit 6847223

File tree

4 files changed

+48
-56
lines changed

4 files changed

+48
-56
lines changed

s3stream/src/main/java/com/automq/stream/s3/wal/common/RecordHeader.java

Lines changed: 32 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -26,62 +26,58 @@
2626
public class RecordHeader {
2727
public static final int RECORD_HEADER_SIZE = 4 + 4 + 8 + 4 + 4;
2828
public static final int RECORD_HEADER_WITHOUT_CRC_SIZE = RECORD_HEADER_SIZE - 4;
29-
public static final int RECORD_HEADER_MAGIC_CODE = 0x87654321;
30-
31-
private int magicCode0 = RECORD_HEADER_MAGIC_CODE;
32-
private int recordBodyLength1;
33-
private long recordBodyOffset2;
34-
private int recordBodyCRC3;
29+
public static final int RECORD_HEADER_DATA_MAGIC_CODE = 0x87654321;
30+
/**
31+
* Magic code for record header indicating that the record body is empty (used for padding).
32+
*/
33+
public static final int RECORD_HEADER_EMPTY_MAGIC_CODE = 0x76543210;
34+
35+
private final int magicCode0;
36+
private final int recordBodyLength1;
37+
private final long recordBodyOffset2;
38+
private final int recordBodyCRC3;
3539
private int recordHeaderCRC4;
3640

37-
public static RecordHeader unmarshal(ByteBuf byteBuf) {
38-
RecordHeader recordHeader = new RecordHeader();
41+
public RecordHeader(long offset, int length, int crc) {
42+
this.magicCode0 = RECORD_HEADER_DATA_MAGIC_CODE;
43+
this.recordBodyLength1 = length;
44+
this.recordBodyOffset2 = offset + RECORD_HEADER_SIZE;
45+
this.recordBodyCRC3 = crc;
46+
}
47+
48+
public RecordHeader(long offset, int length) {
49+
this.magicCode0 = RECORD_HEADER_EMPTY_MAGIC_CODE;
50+
this.recordBodyLength1 = length;
51+
this.recordBodyOffset2 = offset + RECORD_HEADER_SIZE;
52+
this.recordBodyCRC3 = 0;
53+
}
54+
55+
public RecordHeader(ByteBuf byteBuf) {
3956
byteBuf.markReaderIndex();
40-
recordHeader.magicCode0 = byteBuf.readInt();
41-
recordHeader.recordBodyLength1 = byteBuf.readInt();
42-
recordHeader.recordBodyOffset2 = byteBuf.readLong();
43-
recordHeader.recordBodyCRC3 = byteBuf.readInt();
44-
recordHeader.recordHeaderCRC4 = byteBuf.readInt();
57+
this.magicCode0 = byteBuf.readInt();
58+
this.recordBodyLength1 = byteBuf.readInt();
59+
this.recordBodyOffset2 = byteBuf.readLong();
60+
this.recordBodyCRC3 = byteBuf.readInt();
61+
this.recordHeaderCRC4 = byteBuf.readInt();
4562
byteBuf.resetReaderIndex();
46-
return recordHeader;
4763
}
4864

4965
public int getMagicCode() {
5066
return magicCode0;
5167
}
5268

53-
public RecordHeader setMagicCode(int magicCode) {
54-
this.magicCode0 = magicCode;
55-
return this;
56-
}
57-
5869
public int getRecordBodyLength() {
5970
return recordBodyLength1;
6071
}
6172

62-
public RecordHeader setRecordBodyLength(int recordBodyLength) {
63-
this.recordBodyLength1 = recordBodyLength;
64-
return this;
65-
}
66-
6773
public long getRecordBodyOffset() {
6874
return recordBodyOffset2;
6975
}
7076

71-
public RecordHeader setRecordBodyOffset(long recordBodyOffset) {
72-
this.recordBodyOffset2 = recordBodyOffset;
73-
return this;
74-
}
75-
7677
public int getRecordBodyCRC() {
7778
return recordBodyCRC3;
7879
}
7980

80-
public RecordHeader setRecordBodyCRC(int recordBodyCRC) {
81-
this.recordBodyCRC3 = recordBodyCRC;
82-
return this;
83-
}
84-
8581
public int getRecordHeaderCRC() {
8682
return recordHeaderCRC4;
8783
}
@@ -105,16 +101,10 @@ private ByteBuf marshalHeaderExceptCRC(ByteBuf buf) {
105101
return buf;
106102
}
107103

108-
public ByteBuf marshal(ByteBuf emptyBuf, boolean calculateCRC) {
104+
public ByteBuf marshal(ByteBuf emptyBuf) {
109105
assert emptyBuf.writableBytes() == RECORD_HEADER_SIZE;
110106
ByteBuf buf = marshalHeaderExceptCRC(emptyBuf);
111-
112-
if (calculateCRC) {
113-
buf.writeInt(WALUtil.crc32(buf, RECORD_HEADER_WITHOUT_CRC_SIZE));
114-
} else {
115-
buf.writeInt(-1);
116-
}
117-
107+
buf.writeInt(WALUtil.crc32(buf, RECORD_HEADER_WITHOUT_CRC_SIZE));
118108
return buf;
119109
}
120110
}

s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/ObjectWALService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public AppendResult append(TraceContext context, ByteBuf data, int crc) throws O
113113

114114
long expectedWriteOffset = accumulator.append(recordSize, start -> {
115115
CompositeByteBuf recordByteBuf = ByteBufAlloc.compositeByteBuffer();
116-
Record record = WALUtil.generateRecord(data, header, 0, start, true);
116+
Record record = WALUtil.generateRecord(data, header, 0, start);
117117
recordByteBuf.addComponents(true, record.header(), record.body());
118118
return recordByteBuf;
119119
}, appendResultFuture);
@@ -324,15 +324,15 @@ public RecoverResult next() {
324324
tryReadAhead();
325325

326326
ByteBuf recordHeaderBuf = dataBuffer.readBytes(RECORD_HEADER_SIZE);
327-
RecordHeader header = RecordHeader.unmarshal(recordHeaderBuf);
327+
RecordHeader header = new RecordHeader(recordHeaderBuf);
328328

329329
if (header.getRecordHeaderCRC() != WALUtil.crc32(recordHeaderBuf, RECORD_HEADER_WITHOUT_CRC_SIZE)) {
330330
recordHeaderBuf.release();
331331
throw new IllegalStateException("Record header crc check failed.");
332332
}
333333
recordHeaderBuf.release();
334334

335-
if (header.getMagicCode() != RecordHeader.RECORD_HEADER_MAGIC_CODE) {
335+
if (header.getMagicCode() != RecordHeader.RECORD_HEADER_DATA_MAGIC_CODE) {
336336
throw new IllegalStateException("Invalid magic code in record header.");
337337
}
338338

s3stream/src/main/java/com/automq/stream/s3/wal/util/WALUtil.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package com.automq.stream.s3.wal.util;
2121

22+
import com.automq.stream.s3.ByteBufAlloc;
2223
import com.automq.stream.s3.wal.common.Record;
2324
import com.automq.stream.s3.wal.common.RecordHeader;
2425
import com.automq.stream.utils.CommandResult;
@@ -38,7 +39,6 @@
3839
import jnr.posix.POSIX;
3940
import jnr.posix.POSIXFactory;
4041

41-
import static com.automq.stream.s3.wal.common.RecordHeader.RECORD_HEADER_MAGIC_CODE;
4242
import static com.automq.stream.s3.wal.common.RecordHeader.RECORD_HEADER_SIZE;
4343

4444
public class WALUtil {
@@ -51,17 +51,19 @@ public class WALUtil {
5151
private static final Logger LOGGER = LoggerFactory.getLogger(WALUtil.class);
5252

5353
public static Record generateRecord(ByteBuf body, ByteBuf emptyHeader, int crc, long start) {
54-
return generateRecord(body, emptyHeader, crc, start, true);
54+
crc = 0 == crc ? WALUtil.crc32(body) : crc;
55+
ByteBuf header = new RecordHeader(start, body.readableBytes(), crc).marshal(emptyHeader);
56+
return new Record(header, body);
5557
}
5658

57-
public static Record generateRecord(ByteBuf body, ByteBuf emptyHeader, int crc, long start, boolean calculateCRC) {
58-
crc = 0 == crc ? WALUtil.crc32(body) : crc;
59-
ByteBuf header = new RecordHeader()
60-
.setMagicCode(RECORD_HEADER_MAGIC_CODE)
61-
.setRecordBodyLength(body.readableBytes())
62-
.setRecordBodyOffset(start + RECORD_HEADER_SIZE)
63-
.setRecordBodyCRC(crc)
64-
.marshal(emptyHeader, calculateCRC);
59+
public static Record generatePaddingRecord(ByteBuf emptyHeader, long start, int length) {
60+
int bodyLength = length - RECORD_HEADER_SIZE;
61+
62+
ByteBuf header = new RecordHeader(start, bodyLength).marshal(emptyHeader);
63+
64+
ByteBuf body = ByteBufAlloc.byteBuffer(bodyLength);
65+
body.writeZero(bodyLength);
66+
6567
return new Record(header, body);
6668
}
6769

s3stream/src/test/java/com/automq/stream/s3/wal/impl/object/ObjectWALServiceTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ private void writeV1Object(ObjectStorage objectStorage, ByteBuf data, long start
331331

332332
private ByteBuf addRecordHeader(ByteBuf data, long startOffset) {
333333
ByteBuf header = ByteBufAlloc.byteBuffer(RECORD_HEADER_SIZE);
334-
Record record = WALUtil.generateRecord(data, header, 0, startOffset, true);
334+
Record record = WALUtil.generateRecord(data, header, 0, startOffset);
335335

336336
CompositeByteBuf buffer = ByteBufAlloc.compositeByteBuffer();
337337
buffer.addComponents(true, record.header(), record.body());

0 commit comments

Comments
 (0)