Skip to content

Commit 42b493a

Browse files
lhotariAnup Ghatage
authored andcommitted
Fix checksum calculation bug when the payload is a CompositeByteBuf with readerIndex > 0 (apache#4196)
* Add a test that reproduces a bug in checksum calculation * Revert "Fixed unnecessary copy to heap (apache#2701)" changes to ByteBufList This partially reverts commit 3c9c710. * Remove CompositeBuffer unwrapping in DigestManager * Rename update -> internalUpdate so that unwrapping logic could be added to update * Remove unnecessary unwrapping logic in Java9IntHash * Add safe way to handle CompositeByteBuf * Add license header * Fix checkstyle * Refactor ByteBuf visitor solution * Fix checkstyle * Reformat * Refactor recursive visiting * Revisit equals, hashCode and toString * Refactor test case * Add support for UnpooledHeapByteBuf.getBytes which passes an array * Add support for visiting buffers backed by byte[] arrays - getBytes calls setBytes with a byte[] argument for heap ByteBufs * Move ByteBufVisitor to org.apache.bookkeeper.util package * Update javadoc * Refactor to use stateless visitor so that instance can be shared * Improve test so that a single scenario can be used for debugging * Fix bug in Java9IntHash calculation that assumed crc32c_update(x) == ~crc32c_update(~x) - Java9IntHash uses private methods from java.util.zip.CRC32C class, updateBytes and updateDirectByteBuffer. When inspecting the use and interface contract, it doesn't match how it is used in Java9IntHash. This PR addresses that by introducing a separate initial value for initializing the accumulated value so that the initial value could match the logic in java.util.zip.CRC32C.reset method. There's also a separate method for finalizing the accumulated value into a final checksum value. This is to match the java.util.zip.CRC32C.getValue method's logic (uses bitwise complement operator ~). - With a quick glance, it might appear that the previous logic is similar. However it isn't since I have a failing test which gets fixed with this change. I haven't yet added the Java9IntHash level unit test case to prove how it differs. It must be related to integer value overflow. For the CRC32C function, I believe it means that it cannot be assumed in all cases that func(x) == ~func(~x). That's the assumption that the previous code was making. It probably applies for many inputs, but not all. It would break in overflow cases. * Fix checkstyle * Fix checkstyle * Fix missing depth increment that prevents StackOverflowException * Properly handle the depth increase and decrease * Remove unnecessary condition * Use more efficient way to read bytes to the target array * Don't use ByteBufVisitor if it's not necessary * Revert "Fix bug in Java9IntHash calculation that assumed crc32c_update(x) == ~crc32c_update(~x)" This reverts commit 272e962. * Fix issue in resume byte[] version that was added - input and output should be complemented. explanation has been added to the resume ByteBuf method * Polish ByteBufVisitor - reuse GetBytesCallbackByteBuf instance for handling the root ByteBuf instance * Use extracted method * Fix bug with array handling * Polish ByteBufVisitor * Optimize the buffer copying in the case where array or memory address cannot be accessed - read-only buffers will need to be copied before reading - use ByteBuf.copy for direct buffers with pooled allocator when the algorithm can accept a memory address buffer - use the 64kB threadlocal byte[] buffer for copying all other inputs * Check if memory address is accepted * Improve comments about complement (current = ~current) in resume * Print thread dump when build is cancelled * Filter empty buffers and arrays in ByteBufVisitor
1 parent 43ce06c commit 42b493a

File tree

17 files changed

+1618
-132
lines changed

17 files changed

+1618
-132
lines changed

.github/workflows/bk-ci.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,10 @@ jobs:
200200
path: surefire-reports
201201
retention-days: 7
202202

203+
- name: print JVM thread dumps when cancelled
204+
if: cancelled()
205+
run: ./dev/ci-tool print_thread_dumps
206+
203207
integration-tests:
204208
name: Integration Tests
205209
runs-on: ubuntu-latest

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,17 @@ void populateValueAndReset(int digest, ByteBuf buf) {
4646
}
4747

4848
@Override
49-
int update(int digest, ByteBuf data, int offset, int len) {
49+
int internalUpdate(int digest, ByteBuf data, int offset, int len) {
5050
return Crc32cIntChecksum.resumeChecksum(digest, data, offset, len);
5151
}
52+
53+
@Override
54+
int internalUpdate(int digest, byte[] buffer, int offset, int len) {
55+
return Crc32cIntChecksum.resumeChecksum(digest, buffer, offset, len);
56+
}
57+
58+
@Override
59+
boolean acceptsMemoryAddressBuffer() {
60+
return Crc32cIntChecksum.acceptsMemoryAddressBuffer();
61+
}
5262
}

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ interface CRC32Digest {
3434
long getValueAndReset();
3535

3636
void update(ByteBuf buf, int offset, int len);
37+
void update(byte[] buffer, int offset, int len);
3738
}
3839

3940
private static final FastThreadLocal<CRC32Digest> crc = new FastThreadLocal<CRC32Digest>() {
@@ -62,14 +63,25 @@ void populateValueAndReset(int digest, ByteBuf buf) {
6263
}
6364

6465
@Override
65-
int update(int digest, ByteBuf data, int offset, int len) {
66+
int internalUpdate(int digest, ByteBuf data, int offset, int len) {
6667
crc.get().update(data, offset, len);
6768
return 0;
6869
}
6970

71+
@Override
72+
int internalUpdate(int digest, byte[] buffer, int offset, int len) {
73+
crc.get().update(buffer, offset, len);
74+
return 0;
75+
}
76+
7077
@Override
7178
boolean isInt32Digest() {
7279
// This is stored as 8 bytes
7380
return false;
7481
}
82+
83+
@Override
84+
boolean acceptsMemoryAddressBuffer() {
85+
return DirectMemoryCRC32Digest.isSupported();
86+
}
7587
}

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java

Lines changed: 54 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,8 @@
2020
import io.netty.buffer.ByteBuf;
2121
import io.netty.buffer.ByteBufAllocator;
2222
import io.netty.buffer.ByteBufUtil;
23-
import io.netty.buffer.CompositeByteBuf;
2423
import io.netty.buffer.PooledByteBufAllocator;
2524
import io.netty.buffer.Unpooled;
26-
import io.netty.util.ReferenceCountUtil;
2725
import io.netty.util.ReferenceCounted;
2826
import io.netty.util.concurrent.FastThreadLocal;
2927
import java.security.GeneralSecurityException;
@@ -34,6 +32,7 @@
3432
import org.apache.bookkeeper.proto.BookieProtocol;
3533
import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType;
3634
import org.apache.bookkeeper.util.ByteBufList;
35+
import org.apache.bookkeeper.util.ByteBufVisitor;
3736
import org.slf4j.Logger;
3837
import org.slf4j.LoggerFactory;
3938

@@ -53,10 +52,25 @@ public abstract class DigestManager {
5352
final long ledgerId;
5453
final boolean useV2Protocol;
5554
private final ByteBufAllocator allocator;
55+
private final DigestUpdaterByteBufVisitorCallback byteBufVisitorCallback;
5656

5757
abstract int getMacCodeLength();
5858

59-
abstract int update(int digest, ByteBuf buffer, int offset, int len);
59+
abstract int internalUpdate(int digest, ByteBuf buffer, int offset, int len);
60+
61+
abstract int internalUpdate(int digest, byte[] buffer, int offset, int len);
62+
63+
final int update(int digest, ByteBuf buffer, int offset, int len) {
64+
if (buffer.hasMemoryAddress() && acceptsMemoryAddressBuffer()) {
65+
return internalUpdate(digest, buffer, offset, len);
66+
} else if (buffer.hasArray()) {
67+
return internalUpdate(digest, buffer.array(), buffer.arrayOffset() + offset, len);
68+
} else {
69+
UpdateContext updateContext = new UpdateContext(digest);
70+
ByteBufVisitor.visitBuffers(buffer, offset, len, byteBufVisitorCallback, updateContext);
71+
return updateContext.digest;
72+
}
73+
}
6074

6175
abstract void populateValueAndReset(int digest, ByteBuf buffer);
6276

@@ -69,6 +83,7 @@ public DigestManager(long ledgerId, boolean useV2Protocol, ByteBufAllocator allo
6983
this.useV2Protocol = useV2Protocol;
7084
this.macCodeLength = getMacCodeLength();
7185
this.allocator = allocator;
86+
this.byteBufVisitorCallback = new DigestUpdaterByteBufVisitorCallback();
7287
}
7388

7489
public static DigestManager instantiate(long ledgerId, byte[] passwd, DigestType digestType,
@@ -136,34 +151,19 @@ private ReferenceCounted computeDigestAndPackageForSendingV2(long entryId, long
136151

137152
// Compute checksum over the headers
138153
int digest = update(0, buf, buf.readerIndex(), buf.readableBytes());
139-
140-
// don't unwrap slices
141-
final ByteBuf unwrapped = data.unwrap() != null && data.unwrap() instanceof CompositeByteBuf
142-
? data.unwrap() : data;
143-
ReferenceCountUtil.retain(unwrapped);
144-
ReferenceCountUtil.safeRelease(data);
145-
146-
if (unwrapped instanceof CompositeByteBuf) {
147-
CompositeByteBuf cbb = (CompositeByteBuf) unwrapped;
148-
for (int i = 0; i < cbb.numComponents(); i++) {
149-
ByteBuf b = cbb.component(i);
150-
digest = update(digest, b, b.readerIndex(), b.readableBytes());
151-
}
152-
} else {
153-
digest = update(digest, unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes());
154-
}
154+
digest = update(digest, data, data.readerIndex(), data.readableBytes());
155155

156156
populateValueAndReset(digest, buf);
157157

158158
// Reset the reader index to the beginning
159159
buf.readerIndex(0);
160160

161161
if (isSmallEntry) {
162-
buf.writeBytes(unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes());
163-
unwrapped.release();
162+
buf.writeBytes(data, data.readerIndex(), data.readableBytes());
163+
data.release();
164164
return buf;
165165
} else {
166-
return ByteBufList.get(buf, unwrapped);
166+
return ByteBufList.get(buf, data);
167167
}
168168
}
169169

@@ -176,25 +176,9 @@ private ByteBufList computeDigestAndPackageForSendingV3(long entryId, long lastA
176176
headersBuffer.writeLong(length);
177177

178178
int digest = update(0, headersBuffer, 0, METADATA_LENGTH);
179-
180-
// don't unwrap slices
181-
final ByteBuf unwrapped = data.unwrap() != null && data.unwrap() instanceof CompositeByteBuf
182-
? data.unwrap() : data;
183-
ReferenceCountUtil.retain(unwrapped);
184-
ReferenceCountUtil.release(data);
185-
186-
if (unwrapped instanceof CompositeByteBuf) {
187-
CompositeByteBuf cbb = ((CompositeByteBuf) unwrapped);
188-
for (int i = 0; i < cbb.numComponents(); i++) {
189-
ByteBuf b = cbb.component(i);
190-
digest = update(digest, b, b.readerIndex(), b.readableBytes());
191-
}
192-
} else {
193-
digest = update(digest, unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes());
194-
}
179+
digest = update(digest, data, data.readerIndex(), data.readableBytes());
195180
populateValueAndReset(digest, headersBuffer);
196-
197-
return ByteBufList.get(headersBuffer, unwrapped);
181+
return ByteBufList.get(headersBuffer, data);
198182
}
199183

200184
/**
@@ -373,4 +357,34 @@ public RecoveryData verifyDigestAndReturnLastConfirmed(ByteBuf dataReceived) thr
373357
long length = dataReceived.readLong();
374358
return new RecoveryData(lastAddConfirmed, length);
375359
}
360+
361+
private static class UpdateContext {
362+
int digest;
363+
364+
UpdateContext(int digest) {
365+
this.digest = digest;
366+
}
367+
}
368+
369+
private class DigestUpdaterByteBufVisitorCallback implements ByteBufVisitor.ByteBufVisitorCallback<UpdateContext> {
370+
371+
@Override
372+
public void visitBuffer(UpdateContext context, ByteBuf visitBuffer, int visitIndex, int visitLength) {
373+
// recursively visit the sub buffer and update the digest
374+
context.digest = internalUpdate(context.digest, visitBuffer, visitIndex, visitLength);
375+
}
376+
377+
@Override
378+
public void visitArray(UpdateContext context, byte[] visitArray, int visitIndex, int visitLength) {
379+
// update the digest with the array
380+
context.digest = internalUpdate(context.digest, visitArray, visitIndex, visitLength);
381+
}
382+
383+
@Override
384+
public boolean acceptsMemoryAddress(UpdateContext context) {
385+
return DigestManager.this.acceptsMemoryAddressBuffer();
386+
}
387+
}
388+
389+
abstract boolean acceptsMemoryAddressBuffer();
376390
}

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DirectMemoryCRC32Digest.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,18 +50,32 @@ public void update(ByteBuf buf, int index, int length) {
5050
crcValue = (int) updateByteBuffer.invoke(null, crcValue, buf.memoryAddress(), index, length);
5151
} else if (buf.hasArray()) {
5252
// Use the internal method to update from array based
53-
crcValue = (int) updateBytes.invoke(null, crcValue, buf.array(), buf.arrayOffset() + index, length);
53+
crcValue = updateArray(crcValue, buf.array(), buf.arrayOffset() + index, length);
5454
} else {
5555
// Fallback to data copy if buffer is not contiguous
5656
byte[] b = new byte[length];
5757
buf.getBytes(index, b, 0, length);
58-
crcValue = (int) updateBytes.invoke(null, crcValue, b, 0, b.length);
58+
crcValue = updateArray(crcValue, b, 0, length);
5959
}
6060
} catch (IllegalAccessException | InvocationTargetException e) {
6161
throw new RuntimeException(e);
6262
}
6363
}
6464

65+
private static int updateArray(int crcValue, byte[] buf, int offset, int length)
66+
throws IllegalAccessException, InvocationTargetException {
67+
return (int) updateBytes.invoke(null, crcValue, buf, offset, length);
68+
}
69+
70+
@Override
71+
public void update(byte[] buffer, int offset, int len) {
72+
try {
73+
crcValue = updateArray(crcValue, buffer, offset, len);
74+
} catch (IllegalAccessException | InvocationTargetException e) {
75+
throw new RuntimeException(e);
76+
}
77+
}
78+
6579
private static final Method updateByteBuffer;
6680
private static final Method updateBytes;
6781

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,12 @@ int getMacCodeLength() {
3838
}
3939

4040
@Override
41-
int update(int digest, ByteBuf buffer, int offset, int len) {
41+
int internalUpdate(int digest, ByteBuf buffer, int offset, int len) {
42+
return 0;
43+
}
44+
45+
@Override
46+
int internalUpdate(int digest, byte[] buffer, int offset, int len) {
4247
return 0;
4348
}
4449

@@ -49,4 +54,9 @@ void populateValueAndReset(int digest, ByteBuf buffer) {}
4954
boolean isInt32Digest() {
5055
return false;
5156
}
57+
58+
@Override
59+
boolean acceptsMemoryAddressBuffer() {
60+
return false;
61+
}
5262
}

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,13 +96,24 @@ void populateValueAndReset(int digest, ByteBuf buffer) {
9696
}
9797

9898
@Override
99-
int update(int digest, ByteBuf data, int offset, int len) {
99+
int internalUpdate(int digest, ByteBuf data, int offset, int len) {
100100
mac.get().update(data.slice(offset, len).nioBuffer());
101101
return 0;
102102
}
103103

104+
@Override
105+
int internalUpdate(int digest, byte[] buffer, int offset, int len) {
106+
mac.get().update(buffer, offset, len);
107+
return 0;
108+
}
109+
104110
@Override
105111
boolean isInt32Digest() {
106112
return false;
107113
}
114+
115+
@Override
116+
boolean acceptsMemoryAddressBuffer() {
117+
return false;
118+
}
108119
}

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/StandardCRC32Digest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,9 @@ public long getValueAndReset() {
3939
public void update(ByteBuf buf, int offset, int len) {
4040
crc.update(buf.slice(offset, len).nioBuffer());
4141
}
42+
43+
@Override
44+
public void update(byte[] buffer, int offset, int len) {
45+
crc.update(buffer, offset, len);
46+
}
4247
}

bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java

Lines changed: 3 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222

2323
import com.google.common.annotations.VisibleForTesting;
2424
import io.netty.buffer.ByteBuf;
25-
import io.netty.buffer.CompositeByteBuf;
2625
import io.netty.buffer.Unpooled;
2726
import io.netty.channel.ChannelHandler.Sharable;
2827
import io.netty.channel.ChannelHandlerContext;
@@ -133,43 +132,14 @@ public static ByteBufList get() {
133132
* Append a {@link ByteBuf} at the end of this {@link ByteBufList}.
134133
*/
135134
public void add(ByteBuf buf) {
136-
final ByteBuf unwrapped = buf.unwrap() != null && buf.unwrap() instanceof CompositeByteBuf
137-
? buf.unwrap() : buf;
138-
ReferenceCountUtil.retain(unwrapped);
139-
ReferenceCountUtil.release(buf);
140-
141-
if (unwrapped instanceof CompositeByteBuf) {
142-
((CompositeByteBuf) unwrapped).forEach(b -> {
143-
ReferenceCountUtil.retain(b);
144-
buffers.add(b);
145-
});
146-
ReferenceCountUtil.release(unwrapped);
147-
} else {
148-
buffers.add(unwrapped);
149-
}
135+
buffers.add(buf);
150136
}
151137

152138
/**
153139
* Prepend a {@link ByteBuf} at the beginning of this {@link ByteBufList}.
154140
*/
155141
public void prepend(ByteBuf buf) {
156-
// don't unwrap slices
157-
final ByteBuf unwrapped = buf.unwrap() != null && buf.unwrap() instanceof CompositeByteBuf
158-
? buf.unwrap() : buf;
159-
ReferenceCountUtil.retain(unwrapped);
160-
ReferenceCountUtil.release(buf);
161-
162-
if (unwrapped instanceof CompositeByteBuf) {
163-
CompositeByteBuf composite = (CompositeByteBuf) unwrapped;
164-
for (int i = composite.numComponents() - 1; i >= 0; i--) {
165-
ByteBuf b = composite.component(i);
166-
ReferenceCountUtil.retain(b);
167-
buffers.add(0, b);
168-
}
169-
ReferenceCountUtil.release(unwrapped);
170-
} else {
171-
buffers.add(0, unwrapped);
172-
}
142+
buffers.add(0, buf);
173143
}
174144

175145
/**
@@ -285,7 +255,7 @@ public ByteBufList retain() {
285255
@Override
286256
protected void deallocate() {
287257
for (int i = 0; i < buffers.size(); i++) {
288-
ReferenceCountUtil.release(buffers.get(i));
258+
buffers.get(i).release();
289259
}
290260

291261
buffers.clear();

0 commit comments

Comments
 (0)