|
1 | | -// Copyright (c) 2020-2023 Broadcom. All Rights Reserved. |
| 1 | +// Copyright (c) 2020-2024 Broadcom. All Rights Reserved. |
2 | 2 | // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. |
3 | 3 | // |
4 | 4 | // This software, the RabbitMQ Stream Java client library, is dual-licensed under the |
|
16 | 16 |
|
17 | 17 | import com.rabbitmq.stream.ChunkChecksum; |
18 | 18 | import com.rabbitmq.stream.ChunkChecksumValidationException; |
19 | | -import com.rabbitmq.stream.StreamException; |
20 | 19 | import io.netty.buffer.ByteBuf; |
21 | | -import io.netty.util.ByteProcessor; |
22 | | -import java.lang.reflect.InvocationTargetException; |
23 | | -import java.lang.reflect.Method; |
24 | | -import java.nio.ByteBuffer; |
25 | 20 | import java.util.function.Supplier; |
26 | 21 | import java.util.zip.CRC32; |
27 | 22 | import java.util.zip.Checksum; |
28 | | -import org.slf4j.Logger; |
29 | | -import org.slf4j.LoggerFactory; |
30 | 23 |
|
31 | 24 | class JdkChunkChecksum implements ChunkChecksum { |
32 | 25 |
|
33 | | - static final ChunkChecksum CRC32_SINGLETON; |
34 | | - private static final Logger LOGGER = LoggerFactory.getLogger(JdkChunkChecksum.class); |
35 | 26 | private static final Supplier<Checksum> CRC32_SUPPLIER = CRC32::new; |
36 | | - |
37 | | - static { |
38 | | - if (isChecksumUpdateByteBufferAvailable()) { |
39 | | - LOGGER.debug("Checksum#update(ByteBuffer) method available, using it for direct buffers"); |
40 | | - CRC32_SINGLETON = new ByteBufferDirectByteBufChecksum(CRC32_SUPPLIER); |
41 | | - } else { |
42 | | - LOGGER.debug( |
43 | | - "Checksum#update(ByteBuffer) method not available, using byte-by-byte CRC calculation for direct buffers"); |
44 | | - CRC32_SINGLETON = new JdkChunkChecksum(CRC32_SUPPLIER); |
45 | | - } |
46 | | - } |
| 27 | + static final ChunkChecksum CRC32_SINGLETON = new JdkChunkChecksum(CRC32_SUPPLIER); |
47 | 28 |
|
48 | 29 | private final Supplier<Checksum> checksumSupplier; |
49 | 30 |
|
50 | | - JdkChunkChecksum() { |
51 | | - this(CRC32_SUPPLIER); |
52 | | - } |
53 | | - |
54 | 31 | JdkChunkChecksum(Supplier<Checksum> checksumSupplier) { |
55 | 32 | this.checksumSupplier = checksumSupplier; |
56 | 33 | } |
57 | 34 |
|
58 | | - private static boolean isChecksumUpdateByteBufferAvailable() { |
59 | | - try { |
60 | | - Checksum.class.getDeclaredMethod("update", ByteBuffer.class); |
61 | | - return true; |
62 | | - } catch (Exception e) { |
63 | | - return false; |
64 | | - } |
65 | | - } |
66 | | - |
67 | 35 | @Override |
68 | 36 | public void checksum(ByteBuf byteBuf, long dataLength, long expected) { |
69 | 37 | Checksum checksum = checksumSupplier.get(); |
70 | 38 | if (byteBuf.hasArray()) { |
71 | 39 | checksum.update( |
72 | 40 | byteBuf.array(), byteBuf.arrayOffset() + byteBuf.readerIndex(), byteBuf.readableBytes()); |
73 | 41 | } else { |
74 | | - byteBuf.forEachByte( |
75 | | - byteBuf.readerIndex(), byteBuf.readableBytes(), new UpdateProcessor(checksum)); |
| 42 | + checksum.update(byteBuf.nioBuffer(byteBuf.readerIndex(), byteBuf.readableBytes())); |
76 | 43 | } |
77 | 44 | if (expected != checksum.getValue()) { |
78 | 45 | throw new ChunkChecksumValidationException(expected, checksum.getValue()); |
79 | 46 | } |
80 | 47 | } |
81 | | - |
82 | | - private static final class ByteBufferDirectByteBufChecksum implements ChunkChecksum { |
83 | | - |
84 | | - private final Supplier<Checksum> checksumSupplier; |
85 | | - private final Method updateMethod; |
86 | | - |
87 | | - private ByteBufferDirectByteBufChecksum(Supplier<Checksum> checksumSupplier) { |
88 | | - this.checksumSupplier = checksumSupplier; |
89 | | - try { |
90 | | - this.updateMethod = Checksum.class.getDeclaredMethod("update", ByteBuffer.class); |
91 | | - } catch (NoSuchMethodException e) { |
92 | | - throw new StreamException("Error while looking up Checksum#update(ByteBuffer) method", e); |
93 | | - } |
94 | | - } |
95 | | - |
96 | | - @Override |
97 | | - public void checksum(ByteBuf byteBuf, long dataLength, long expected) { |
98 | | - Checksum checksum = checksumSupplier.get(); |
99 | | - if (byteBuf.hasArray()) { |
100 | | - checksum.update( |
101 | | - byteBuf.array(), |
102 | | - byteBuf.arrayOffset() + byteBuf.readerIndex(), |
103 | | - byteBuf.readableBytes()); |
104 | | - } else { |
105 | | - try { |
106 | | - this.updateMethod.invoke( |
107 | | - checksum, byteBuf.nioBuffer(byteBuf.readerIndex(), byteBuf.readableBytes())); |
108 | | - } catch (IllegalAccessException | InvocationTargetException e) { |
109 | | - throw new StreamException("Error while calculating CRC", e); |
110 | | - } |
111 | | - } |
112 | | - if (expected != checksum.getValue()) { |
113 | | - throw new ChunkChecksumValidationException(expected, checksum.getValue()); |
114 | | - } |
115 | | - } |
116 | | - } |
117 | | - |
118 | | - private static class UpdateProcessor implements ByteProcessor { |
119 | | - |
120 | | - private final Checksum checksum; |
121 | | - |
122 | | - private UpdateProcessor(Checksum checksum) { |
123 | | - this.checksum = checksum; |
124 | | - } |
125 | | - |
126 | | - @Override |
127 | | - public boolean process(byte value) { |
128 | | - checksum.update(value); |
129 | | - return true; |
130 | | - } |
131 | | - } |
132 | 48 | } |
0 commit comments