Skip to content

Commit 2393932

Browse files
JiaKeliyafan82
andauthored
Support the decompress feature in shuffle component. (#3)
* ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4 * ARROW-10880: [Java] Support reading/writing big-endian message size * ARROW-10880: [Java] Adjust variable names * ARROW-10880: [Java] Support empty buffers * ARROW-10880: [Java] Support passing raw data * ARROW-10880: [Java] Switch to commons-compress library * bug fix and support the fastpfor codec in the IPC framework * update the access permission from private to protected * disable the decompress function when loading the buffer Co-authored-by: liyafan82 <fan_li_ya@foxmail.com>
1 parent ec26cc1 commit 2393932

File tree

14 files changed

+435
-20
lines changed

14 files changed

+435
-20
lines changed

cpp/src/arrow/ipc/metadata_internal.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -919,6 +919,8 @@ static Status GetBodyCompression(FBB& fbb, const IpcWriteOptions& options,
919919
codec = flatbuf::CompressionType::LZ4_FRAME;
920920
} else if (options.codec->compression_type() == Compression::ZSTD) {
921921
codec = flatbuf::CompressionType::ZSTD;
922+
} else if (options.codec->compression_type() == Compression::FASTPFOR) {
923+
codec = flatbuf::CompressionType::FASTPFOR;
922924
} else {
923925
return Status::Invalid("Unsupported IPC compression codec: ",
924926
options.codec->name());

cpp/src/arrow/ipc/reader.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -609,6 +609,8 @@ Status GetCompression(const flatbuf::RecordBatch* batch, Compression::type* out)
609609
*out = Compression::LZ4_FRAME;
610610
} else if (compression->codec() == flatbuf::CompressionType::ZSTD) {
611611
*out = Compression::ZSTD;
612+
} else if (compression->codec() == flatbuf::CompressionType::FASTPFOR) {
613+
*out = Compression::FASTPFOR;
612614
} else {
613615
return Status::Invalid("Unsupported codec in RecordBatch::compression metadata");
614616
}

cpp/src/arrow/util/compression.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ Result<Compression::type> Codec::GetCompressionType(const std::string& name) {
8888
return Compression::ZSTD;
8989
} else if (name == "bz2") {
9090
return Compression::BZ2;
91-
} else if (name == "FASTPFOR") {
91+
} else if (name == "fastpfor") {
9292
return Compression::FASTPFOR;
9393
} else {
9494
return Status::Invalid("Unrecognized compression type: ", name);

cpp/src/generated/Message_generated.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,22 +32,25 @@ struct MessageBuilder;
3232
enum class CompressionType : int8_t {
3333
LZ4_FRAME = 0,
3434
ZSTD = 1,
35+
FASTPFOR = 2,
3536
MIN = LZ4_FRAME,
3637
MAX = ZSTD
3738
};
3839

39-
inline const CompressionType (&EnumValuesCompressionType())[2] {
40+
inline const CompressionType (&EnumValuesCompressionType())[3] {
4041
static const CompressionType values[] = {
4142
CompressionType::LZ4_FRAME,
42-
CompressionType::ZSTD
43+
CompressionType::ZSTD,
44+
CompressionType::FASTPFOR
4345
};
4446
return values;
4547
}
4648

4749
inline const char * const *EnumNamesCompressionType() {
48-
static const char * const names[3] = {
50+
static const char * const names[4] = {
4951
"LZ4_FRAME",
5052
"ZSTD",
53+
"FASTPFOR",
5154
nullptr
5255
};
5356
return names;

format/Message.fbs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ enum CompressionType:byte {
4949
LZ4_FRAME,
5050

5151
// Zstandard
52-
ZSTD
52+
ZSTD,
53+
FASTPFOR
5354
}
5455

5556
/// Provided for forward compatibility in case we need to support different

java/memory/memory-core/src/main/java/org/apache/arrow/memory/util/MemoryUtil.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.lang.reflect.Field;
2222
import java.lang.reflect.InvocationTargetException;
2323
import java.nio.ByteBuffer;
24+
import java.nio.ByteOrder;
2425
import java.security.AccessController;
2526
import java.security.PrivilegedAction;
2627

@@ -48,6 +49,11 @@ public class MemoryUtil {
4849
*/
4950
static final long BYTE_BUFFER_ADDRESS_OFFSET;
5051

52+
/**
53+
* If the native byte order is little-endian.
54+
*/
55+
public static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
56+
5157
static {
5258
try {
5359
// try to get the unsafe object
@@ -132,7 +138,7 @@ public Object run() {
132138
}
133139

134140
/**
135-
* Given a {@link ByteBuf}, gets the address the underlying memory space.
141+
* Given a {@link ByteBuffer}, gets the address the underlying memory space.
136142
*
137143
* @param buf the byte buffer.
138144
* @return address of the underlying memory.

java/vector/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@
7474
<groupId>org.slf4j</groupId>
7575
<artifactId>slf4j-api</artifactId>
7676
</dependency>
77+
<dependency>
78+
<groupId>org.apache.commons</groupId>
79+
<artifactId>commons-compress</artifactId>
80+
<version>1.20</version>
81+
</dependency>
7782
</dependencies>
7883

7984
<pluginRepositories>

java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,9 @@ public VectorLoader(VectorSchemaRoot root) {
5656
public void load(ArrowRecordBatch recordBatch) {
5757
Iterator<ArrowBuf> buffers = recordBatch.getBuffers().iterator();
5858
Iterator<ArrowFieldNode> nodes = recordBatch.getNodes().iterator();
59-
CompressionCodec codec = CompressionUtil.createCodec(recordBatch.getBodyCompression().getCodec());
59+
6060
for (FieldVector fieldVector : root.getFieldVectors()) {
61-
loadBuffers(fieldVector, fieldVector.getField(), buffers, nodes, codec);
61+
loadBuffers(fieldVector, fieldVector.getField(), buffers, nodes);
6262
}
6363
root.setRowCount(recordBatch.getLength());
6464
if (nodes.hasNext() || buffers.hasNext()) {
@@ -71,15 +71,14 @@ protected void loadBuffers(
7171
FieldVector vector,
7272
Field field,
7373
Iterator<ArrowBuf> buffers,
74-
Iterator<ArrowFieldNode> nodes,
75-
CompressionCodec codec) {
74+
Iterator<ArrowFieldNode> nodes) {
7675
checkArgument(nodes.hasNext(), "no more field nodes for for field %s and vector %s", field, vector);
7776
ArrowFieldNode fieldNode = nodes.next();
7877
int bufferLayoutCount = TypeLayout.getTypeBufferCount(field.getType());
7978
List<ArrowBuf> ownBuffers = new ArrayList<>(bufferLayoutCount);
8079
for (int j = 0; j < bufferLayoutCount; j++) {
8180
ArrowBuf nextBuf = buffers.next();
82-
ownBuffers.add(codec.decompress(vector.getAllocator(), nextBuf));
81+
ownBuffers.add(nextBuf);
8382
}
8483
try {
8584
vector.loadFieldBuffers(fieldNode, ownBuffers);
@@ -96,7 +95,7 @@ protected void loadBuffers(
9695
for (int i = 0; i < childrenFromFields.size(); i++) {
9796
Field child = children.get(i);
9897
FieldVector fieldVector = childrenFromFields.get(i);
99-
loadBuffers(fieldVector, child, buffers, nodes, codec);
98+
loadBuffers(fieldVector, child, buffers, nodes);
10099
}
101100
}
102101
}

java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,11 @@ public interface CompressionCodec {
2828
/**
2929
* Compress a buffer.
3030
* @param allocator the allocator for allocating memory for compressed buffer.
31-
* @param unCompressedBuffer the buffer to compress.
31+
* @param uncompressedBuffer the buffer to compress.
3232
* Implementation of this method should take care of releasing this buffer.
3333
* @return the compressed buffer.
3434
*/
35-
ArrowBuf compress(BufferAllocator allocator, ArrowBuf unCompressedBuffer);
35+
ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer);
3636

3737
/**
3838
* Decompress a buffer.

java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,23 @@
1919

2020
import org.apache.arrow.flatbuf.BodyCompressionMethod;
2121
import org.apache.arrow.flatbuf.CompressionType;
22+
import org.apache.arrow.memory.ArrowBuf;
23+
import org.apache.arrow.memory.BufferAllocator;
2224
import org.apache.arrow.vector.ipc.message.ArrowBodyCompression;
2325

2426
/**
2527
* Utilities for data compression/decompression.
2628
*/
2729
public class CompressionUtil {
2830

31+
static final long SIZE_OF_UNCOMPRESSED_LENGTH = 8L;
32+
33+
/**
34+
* Special flag to indicate no compression.
35+
* (e.g. when the compressed buffer has a larger size.)
36+
*/
37+
static final long NO_COMPRESSION_LENGTH = -1L;
38+
2939
private CompressionUtil() {
3040
}
3141

@@ -53,8 +63,29 @@ public static CompressionCodec createCodec(byte compressionType) {
5363
switch (compressionType) {
5464
case NoCompressionCodec.COMPRESSION_TYPE:
5565
return NoCompressionCodec.INSTANCE;
66+
case CompressionType.LZ4_FRAME:
67+
return new Lz4CompressionCodec();
5668
default:
5769
throw new IllegalArgumentException("Compression type not supported: " + compressionType);
5870
}
5971
}
72+
73+
/**
74+
* Process compression by compressing the buffer as is.
75+
*/
76+
public static ArrowBuf compressRawBuffer(BufferAllocator allocator, ArrowBuf inputBuffer) {
77+
ArrowBuf compressedBuffer = allocator.buffer(SIZE_OF_UNCOMPRESSED_LENGTH + inputBuffer.writerIndex());
78+
compressedBuffer.setLong(0, NO_COMPRESSION_LENGTH);
79+
compressedBuffer.setBytes(SIZE_OF_UNCOMPRESSED_LENGTH, inputBuffer, 0, inputBuffer.writerIndex());
80+
compressedBuffer.writerIndex(SIZE_OF_UNCOMPRESSED_LENGTH + inputBuffer.writerIndex());
81+
return compressedBuffer;
82+
}
83+
84+
/**
85+
* Process decompression by decompressing the buffer as is.
86+
*/
87+
public static ArrowBuf decompressRawBuffer(ArrowBuf inputBuffer) {
88+
return inputBuffer.slice(SIZE_OF_UNCOMPRESSED_LENGTH,
89+
inputBuffer.writerIndex() - SIZE_OF_UNCOMPRESSED_LENGTH);
90+
}
6091
}

0 commit comments

Comments
 (0)