Skip to content

Commit e1ff522

Browse files
[common] Introduce AlignedRow for row data encoding (apache#1620)
The AlignedRow is inspired by Flink BinaryRowData.
1 parent 69ba18e commit e1ff522

File tree

10 files changed

+3363
-1
lines changed

10 files changed

+3363
-1
lines changed

LICENSE

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,8 @@ Apache Flink
242242
./fluss-common/src/main/java/org/apache/fluss/row/BinarySegmentUtils.java
243243
./fluss-common/src/main/java/org/apache/fluss/row/BinaryString.java
244244
./fluss-common/src/main/java/org/apache/fluss/row/Decimal.java
245+
./fluss-common/src/main/java/org/apache/fluss/row/aligned/AlignedRow.java
246+
./fluss-common/src/main/java/org/apache/fluss/row/aligned/AlignedRowWriter.java
245247
./fluss-common/src/main/java/org/apache/fluss/types/DataType.java
246248
./fluss-common/src/main/java/org/apache/fluss/utils/AbstractAutoCloseableRegistry.java
247249
./fluss-common/src/main/java/org/apache/fluss/utils/DateTimeUtils.java

fluss-common/src/main/java/org/apache/fluss/row/BinarySection.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,38 @@
3131

3232
/** Describe a section of memory. */
3333
@Internal
34-
abstract class BinarySection implements MemoryAwareGetters, Serializable {
34+
public abstract class BinarySection implements MemoryAwareGetters, Serializable {
3535

3636
private static final long serialVersionUID = 1L;
3737

38+
/**
39+
* It decides whether to put data in FixLenPart or VarLenPart. See more in {@link BinaryRow}.
40+
*
41+
* <p>If len is less than 8, its binary format is: 1-bit mark(1) = 1, 7-bits len, and 7-bytes
42+
* data. Data is stored in fix-length part.
43+
*
44+
* <p>If len is greater or equal to 8, its binary format is: 1-bit mark(1) = 0, 31-bits offset
45+
* to the data, and 4-bytes length of data. Data is stored in variable-length part.
46+
*/
47+
public static final int MAX_FIX_PART_DATA_SIZE = 7;
48+
49+
/**
50+
* To get the mark in highest bit of long. Form: 10000000 00000000 ... (8 bytes)
51+
*
52+
* <p>This is used to decide whether the data is stored in fixed-length part or variable-length
53+
* part. see {@link #MAX_FIX_PART_DATA_SIZE} for more information.
54+
*/
55+
public static final long HIGHEST_FIRST_BIT = 0x80L << 56;
56+
57+
/**
58+
* To get the 7 bits length in second bit to eighth bit out of a long. Form: 01111111 00000000
59+
* ... (8 bytes)
60+
*
61+
* <p>This is used to get the length of the data which is stored in this long. see {@link
62+
* #MAX_FIX_PART_DATA_SIZE} for more information.
63+
*/
64+
public static final long HIGHEST_SECOND_TO_EIGHTH_BIT = 0x7FL << 56;
65+
3866
protected MemorySegment[] segments;
3967
protected int offset;
4068
protected int sizeInBytes;

fluss-common/src/main/java/org/apache/fluss/row/BinarySegmentUtils.java

Lines changed: 278 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,11 @@
2323
import org.apache.fluss.utils.MurmurHashUtils;
2424

2525
import java.io.IOException;
26+
import java.nio.ByteOrder;
2627

2728
import static org.apache.fluss.memory.MemoryUtils.UNSAFE;
29+
import static org.apache.fluss.row.BinarySection.HIGHEST_FIRST_BIT;
30+
import static org.apache.fluss.row.BinarySection.HIGHEST_SECOND_TO_EIGHTH_BIT;
2831

2932
/* This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache
3033
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
@@ -34,6 +37,9 @@
3437
@Internal
3538
public final class BinarySegmentUtils {
3639

40+
public static final boolean LITTLE_ENDIAN =
41+
(ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN);
42+
3743
private static final int ADDRESS_BITS_PER_WORD = 3;
3844

3945
private static final int BIT_BYTE_INDEX_MASK = 7;
@@ -368,6 +374,20 @@ private static int byteIndex(int bitIndex) {
368374
return bitIndex >>> ADDRESS_BITS_PER_WORD;
369375
}
370376

377+
/**
378+
* unset bit.
379+
*
380+
* @param segment target segment.
381+
* @param baseOffset bits base offset.
382+
* @param index bit index from base offset.
383+
*/
384+
public static void bitUnSet(MemorySegment segment, int baseOffset, int index) {
385+
int offset = baseOffset + byteIndex(index);
386+
byte current = segment.get(offset);
387+
current &= ~(1 << (index & BIT_BYTE_INDEX_MASK));
388+
segment.put(offset, current);
389+
}
390+
371391
/**
372392
* read bit.
373393
*
@@ -386,4 +406,262 @@ private static int hashMultiSeg(MemorySegment[] segments, int offset, int numByt
386406
copyMultiSegmentsToBytes(segments, offset, bytes, 0, numBytes);
387407
return MurmurHashUtils.hashUnsafeBytes(bytes, BYTE_ARRAY_BASE_OFFSET, numBytes);
388408
}
409+
410+
/**
411+
* get long from segments.
412+
*
413+
* @param segments target segments.
414+
* @param offset value offset.
415+
*/
416+
public static long getLong(MemorySegment[] segments, int offset) {
417+
if (inFirstSegment(segments, offset, 8)) {
418+
return segments[0].getLong(offset);
419+
} else {
420+
return getLongMultiSegments(segments, offset);
421+
}
422+
}
423+
424+
private static long getLongMultiSegments(MemorySegment[] segments, int offset) {
425+
int segSize = segments[0].size();
426+
int segIndex = offset / segSize;
427+
int segOffset = offset - segIndex * segSize; // equal to %
428+
429+
if (segOffset < segSize - 7) {
430+
return segments[segIndex].getLong(segOffset);
431+
} else {
432+
return getLongSlowly(segments, segSize, segIndex, segOffset);
433+
}
434+
}
435+
436+
private static long getLongSlowly(
437+
MemorySegment[] segments, int segSize, int segNum, int segOffset) {
438+
MemorySegment segment = segments[segNum];
439+
long ret = 0;
440+
for (int i = 0; i < 8; i++) {
441+
if (segOffset == segSize) {
442+
segment = segments[++segNum];
443+
segOffset = 0;
444+
}
445+
long unsignedByte = segment.get(segOffset) & 0xff;
446+
if (LITTLE_ENDIAN) {
447+
ret |= (unsignedByte << (i * 8));
448+
} else {
449+
ret |= (unsignedByte << ((7 - i) * 8));
450+
}
451+
segOffset++;
452+
}
453+
return ret;
454+
}
455+
456+
/**
457+
* set long from segments.
458+
*
459+
* @param segments target segments.
460+
* @param offset value offset.
461+
*/
462+
public static void setLong(MemorySegment[] segments, int offset, long value) {
463+
if (inFirstSegment(segments, offset, 8)) {
464+
segments[0].putLong(offset, value);
465+
} else {
466+
setLongMultiSegments(segments, offset, value);
467+
}
468+
}
469+
470+
private static void setLongMultiSegments(MemorySegment[] segments, int offset, long value) {
471+
int segSize = segments[0].size();
472+
int segIndex = offset / segSize;
473+
int segOffset = offset - segIndex * segSize; // equal to %
474+
475+
if (segOffset < segSize - 7) {
476+
segments[segIndex].putLong(segOffset, value);
477+
} else {
478+
setLongSlowly(segments, segSize, segIndex, segOffset, value);
479+
}
480+
}
481+
482+
private static void setLongSlowly(
483+
MemorySegment[] segments, int segSize, int segNum, int segOffset, long value) {
484+
MemorySegment segment = segments[segNum];
485+
for (int i = 0; i < 8; i++) {
486+
if (segOffset == segSize) {
487+
segment = segments[++segNum];
488+
segOffset = 0;
489+
}
490+
long unsignedByte;
491+
if (LITTLE_ENDIAN) {
492+
unsignedByte = value >> (i * 8);
493+
} else {
494+
unsignedByte = value >> ((7 - i) * 8);
495+
}
496+
segment.put(segOffset, (byte) unsignedByte);
497+
segOffset++;
498+
}
499+
}
500+
501+
/**
502+
* Copy target segments from source byte[].
503+
*
504+
* @param segments target segments.
505+
* @param offset target segments offset.
506+
* @param bytes source byte[].
507+
* @param bytesOffset source byte[] offset.
508+
* @param numBytes the number bytes to copy.
509+
*/
510+
public static void copyFromBytes(
511+
MemorySegment[] segments, int offset, byte[] bytes, int bytesOffset, int numBytes) {
512+
if (segments.length == 1) {
513+
segments[0].put(offset, bytes, bytesOffset, numBytes);
514+
} else {
515+
copyMultiSegmentsFromBytes(segments, offset, bytes, bytesOffset, numBytes);
516+
}
517+
}
518+
519+
private static void copyMultiSegmentsFromBytes(
520+
MemorySegment[] segments, int offset, byte[] bytes, int bytesOffset, int numBytes) {
521+
int remainSize = numBytes;
522+
for (MemorySegment segment : segments) {
523+
int remain = segment.size() - offset;
524+
if (remain > 0) {
525+
int nCopy = Math.min(remain, remainSize);
526+
segment.put(offset, bytes, numBytes - remainSize + bytesOffset, nCopy);
527+
remainSize -= nCopy;
528+
// next new segment.
529+
offset = 0;
530+
if (remainSize == 0) {
531+
return;
532+
}
533+
} else {
534+
// remain is negative, let's advance to next segment
535+
// now the offset = offset - segmentSize (-remain)
536+
offset = -remain;
537+
}
538+
}
539+
}
540+
541+
/** Gets an instance of {@link Decimal} from underlying {@link MemorySegment}. */
542+
public static Decimal readDecimalData(
543+
MemorySegment[] segments,
544+
int baseOffset,
545+
long offsetAndSize,
546+
int precision,
547+
int scale) {
548+
final int size = ((int) offsetAndSize);
549+
int subOffset = (int) (offsetAndSize >> 32);
550+
byte[] bytes = new byte[size];
551+
copyToBytes(segments, baseOffset + subOffset, bytes, 0, size);
552+
return Decimal.fromUnscaledBytes(bytes, precision, scale);
553+
}
554+
555+
/**
556+
* Get binary, if len less than 8, will be include in variablePartOffsetAndLen.
557+
*
558+
* <p>Note: Need to consider the ByteOrder.
559+
*
560+
* @param baseOffset base offset of composite binary format.
561+
* @param fieldOffset absolute start offset of 'variablePartOffsetAndLen'.
562+
* @param variablePartOffsetAndLen a long value, real data or offset and len.
563+
*/
564+
public static byte[] readBinary(
565+
MemorySegment[] segments,
566+
int baseOffset,
567+
int fieldOffset,
568+
long variablePartOffsetAndLen) {
569+
long mark = variablePartOffsetAndLen & HIGHEST_FIRST_BIT;
570+
if (mark == 0) {
571+
final int subOffset = (int) (variablePartOffsetAndLen >> 32);
572+
final int len = (int) variablePartOffsetAndLen;
573+
return copyToBytes(segments, baseOffset + subOffset, len);
574+
} else {
575+
int len = (int) ((variablePartOffsetAndLen & HIGHEST_SECOND_TO_EIGHTH_BIT) >>> 56);
576+
if (LITTLE_ENDIAN) {
577+
return copyToBytes(segments, fieldOffset, len);
578+
} else {
579+
// fieldOffset + 1 to skip header.
580+
return copyToBytes(segments, fieldOffset + 1, len);
581+
}
582+
}
583+
}
584+
585+
/**
586+
* Get binary string, if len less than 8, will be include in variablePartOffsetAndLen.
587+
*
588+
* <p>Note: Need to consider the ByteOrder.
589+
*
590+
* @param baseOffset base offset of composite binary format.
591+
* @param fieldOffset absolute start offset of 'variablePartOffsetAndLen'.
592+
* @param variablePartOffsetAndLen a long value, real data or offset and len.
593+
*/
594+
public static BinaryString readBinaryString(
595+
MemorySegment[] segments,
596+
int baseOffset,
597+
int fieldOffset,
598+
long variablePartOffsetAndLen) {
599+
long mark = variablePartOffsetAndLen & HIGHEST_FIRST_BIT;
600+
if (mark == 0) {
601+
final int subOffset = (int) (variablePartOffsetAndLen >> 32);
602+
final int len = (int) variablePartOffsetAndLen;
603+
return BinaryString.fromAddress(segments, baseOffset + subOffset, len);
604+
} else {
605+
int len = (int) ((variablePartOffsetAndLen & HIGHEST_SECOND_TO_EIGHTH_BIT) >>> 56);
606+
if (BinarySegmentUtils.LITTLE_ENDIAN) {
607+
return BinaryString.fromAddress(segments, fieldOffset, len);
608+
} else {
609+
// fieldOffset + 1 to skip header.
610+
return BinaryString.fromAddress(segments, fieldOffset + 1, len);
611+
}
612+
}
613+
}
614+
615+
/**
616+
* hash segments to int, numBytes must be aligned to 4 bytes.
617+
*
618+
* @param segments Source segments.
619+
* @param offset Source segments offset.
620+
* @param numBytes the number bytes to hash.
621+
*/
622+
public static int hashByWords(MemorySegment[] segments, int offset, int numBytes) {
623+
if (inFirstSegment(segments, offset, numBytes)) {
624+
return MurmurHashUtils.hashBytesByWords(segments[0], offset, numBytes);
625+
} else {
626+
return hashMultiSegByWords(segments, offset, numBytes);
627+
}
628+
}
629+
630+
private static int hashMultiSegByWords(MemorySegment[] segments, int offset, int numBytes) {
631+
byte[] bytes = allocateReuseBytes(numBytes);
632+
copyMultiSegmentsToBytes(segments, offset, bytes, 0, numBytes);
633+
return MurmurHashUtils.hashUnsafeBytesByWords(bytes, BYTE_ARRAY_BASE_OFFSET, numBytes);
634+
}
635+
636+
/**
637+
* Gets an instance of {@link TimestampLtz} from underlying {@link MemorySegment}.
638+
*
639+
* @param segments the underlying MemorySegments
640+
* @param baseOffset the base offset of current instance of {@code TimestampLtz}
641+
* @param offsetAndNanos the offset of milli-seconds part and nanoseconds
642+
* @return an instance of {@link TimestampLtz}
643+
*/
644+
public static TimestampLtz readTimestampLtzData(
645+
MemorySegment[] segments, int baseOffset, long offsetAndNanos) {
646+
final int nanoOfMillisecond = (int) offsetAndNanos;
647+
final int subOffset = (int) (offsetAndNanos >> 32);
648+
final long millisecond = getLong(segments, baseOffset + subOffset);
649+
return TimestampLtz.fromEpochMillis(millisecond, nanoOfMillisecond);
650+
}
651+
652+
/**
653+
* Gets an instance of {@link TimestampNtz} from underlying {@link MemorySegment}.
654+
*
655+
* @param segments the underlying MemorySegments
656+
* @param baseOffset the base offset of current instance of {@code TimestampNtz}
657+
* @param offsetAndNanos the offset of milli-seconds part and nanoseconds
658+
* @return an instance of {@link TimestampNtz}
659+
*/
660+
public static TimestampNtz readTimestampNtzData(
661+
MemorySegment[] segments, int baseOffset, long offsetAndNanos) {
662+
final int nanoOfMillisecond = (int) offsetAndNanos;
663+
final int subOffset = (int) (offsetAndNanos >> 32);
664+
final long millisecond = getLong(segments, baseOffset + subOffset);
665+
return TimestampNtz.fromMillis(millisecond, nanoOfMillisecond);
666+
}
389667
}

0 commit comments

Comments
 (0)