Skip to content

Commit eb0c342

Browse files
committed
Copy consumer code for non-merging
1 parent c7d15d0 commit eb0c342

File tree

3 files changed

+340
-41
lines changed

3 files changed

+340
-41
lines changed
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.common.compress.fsst;
11+
12+
import org.apache.lucene.store.DataOutput;
13+
14+
import java.io.Closeable;
15+
import java.io.IOException;
16+
17+
public class BulkCompressBufferer implements Closeable {
18+
private static final int MAX_LINES = 512;
19+
private static final int MAX_INPUT_DATA = 128 << 10;
20+
private static final int MAX_OUTPUT_DATA = MAX_INPUT_DATA * 2;
21+
22+
final byte[] inData = new byte[MAX_INPUT_DATA + 8];
23+
final int[] inOffsets = new int[MAX_LINES + 1]; // 1 additional space for offset where next item would have been
24+
byte[] outBuf = new byte[MAX_OUTPUT_DATA + 8];
25+
int[] outOffsets = new int[MAX_LINES + 1]; // 1 additional space for offset where next item would have been
26+
private final DataOutput finalOutput;
27+
private final FSST.SymbolTable st;
28+
private final FSST.OffsetWriter offsetWriter;
29+
private int numLines = 0;
30+
private int inOff = 0;
31+
32+
public BulkCompressBufferer(DataOutput finalOutput, FSST.SymbolTable st, FSST.OffsetWriter offsetWriter) {
33+
this.finalOutput = finalOutput;
34+
this.st = st;
35+
this.offsetWriter = offsetWriter;
36+
}
37+
38+
private void addToBuffer(byte[] bytes, int offset, int length) {
39+
System.arraycopy(bytes, offset, inData, inOff, length);
40+
int lineIdx = numLines;
41+
inOffsets[lineIdx] = inOff;
42+
inOff += length;
43+
numLines++;
44+
}
45+
46+
public void addLine(byte[] bytes, int offset, int length) throws IOException {
47+
if (inOff + length > MAX_INPUT_DATA || numLines == MAX_LINES) {
48+
// can't fit another
49+
compressAndWriteBuffer();
50+
51+
if (length > MAX_INPUT_DATA) {
52+
// new item doesn't fit by itself, so deal with it by itself
53+
compressAndWriteSingle(bytes, offset, length);
54+
} else {
55+
// does fit
56+
addToBuffer(bytes, offset, length);
57+
}
58+
} else {
59+
// does fit
60+
addToBuffer(bytes, offset, length);
61+
}
62+
}
63+
64+
private void compressAndWriteSingle(byte[] bytes, int offset, int length) throws IOException {
65+
assert numLines == 0 && inOff == 0;
66+
67+
int off = offset;
68+
int lenToWrite = length;
69+
int totalOutLen = 0;
70+
71+
while (lenToWrite > 0) {
72+
int len = Math.min(lenToWrite, MAX_INPUT_DATA);
73+
74+
// copy data into buffer
75+
numLines = 1;
76+
inOffsets[0] = off;
77+
inOffsets[1] = off + len;
78+
79+
long outLine = st.compressBulk(numLines, bytes, inOffsets, outBuf, outOffsets);
80+
assert outLine == numLines;
81+
long outLen = outOffsets[(int) outLine];
82+
totalOutLen += (int) outLen;
83+
finalOutput.writeBytes(outBuf, 0, (int) outLen);
84+
85+
off += len;
86+
lenToWrite -= len;
87+
88+
}
89+
offsetWriter.addLen(totalOutLen);
90+
91+
clear();
92+
}
93+
94+
private void compressAndWriteBuffer() throws IOException {
95+
assert numLines < MAX_LINES + 1;
96+
assert inOff <= MAX_INPUT_DATA;
97+
98+
// add a pseudo-offset to provide last line's length
99+
inOffsets[numLines] = inOff;
100+
101+
long outLines = st.compressBulk(numLines, inData, inOffsets, outBuf, outOffsets);
102+
assert outLines == numLines;
103+
long fullOutLen = outOffsets[(int) outLines];
104+
105+
finalOutput.writeBytes(outBuf, 0, (int) fullOutLen);
106+
for (int i = 0; i < numLines; ++i) {
107+
int len = outOffsets[i+1] - outOffsets[i];
108+
offsetWriter.addLen(len);
109+
}
110+
111+
clear();
112+
}
113+
114+
void clear() {
115+
numLines = inOff = 0;
116+
}
117+
118+
@Override
119+
public void close() throws IOException {
120+
if (numLines > 0) {
121+
compressAndWriteBuffer();
122+
}
123+
clear();
124+
}
125+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.common.compress.fsst;
11+
12+
import java.util.ArrayList;
13+
import java.util.Arrays;
14+
import java.util.List;
15+
import java.util.Random;
16+
17+
import static org.elasticsearch.common.compress.fsst.FSST.FSST_SAMPLELINE;
18+
import static org.elasticsearch.common.compress.fsst.FSST.FSST_SAMPLEMAXSZ;
19+
import static org.elasticsearch.common.compress.fsst.FSST.FSST_SAMPLETARGET;
20+
21+
public class ReservoirSampler {
22+
private static final int SAMPLE_TARGET = FSST_SAMPLETARGET;
23+
private static final int SAMPLE_MAX = FSST_SAMPLEMAXSZ;
24+
private static final int SAMPLE_LINE = FSST_SAMPLELINE;
25+
private int numBytesInSample = 0;
26+
private int numChunksSeen = 0;
27+
private final Random random = new Random(1234);
28+
private List<byte[]> sample = new ArrayList<>();
29+
30+
public List<byte[]> getSample() {
31+
return sample;
32+
}
33+
34+
// The byte array is only valid during this call, thus bytes need to be deep copied
35+
public void processLine(byte[] bytes, int offset, int length) {
36+
if (length == 0) {
37+
return;
38+
}
39+
40+
// iterate over the chunks
41+
int numChunks = length / SAMPLE_LINE + (length % SAMPLE_LINE == 0 ? 0 : 1);
42+
for (int c = 0; c < numChunks; ++c) {
43+
numChunksSeen++;
44+
int chunkOffset = c * SAMPLE_LINE;
45+
int chunkLen = c == numChunks - 1 ? length - chunkOffset : SAMPLE_LINE;
46+
47+
if (numBytesInSample < SAMPLE_TARGET + SAMPLE_LINE) {
48+
// If the reservoir isn't full, just add to it.
49+
// This will occur on startup, but also if a recent swap caused us to go below the target.
50+
// Add a buffer of an additional sample line, so that one swap doesn't cause us to fall below target.
51+
byte[] chunkBytes = Arrays.copyOfRange(bytes, offset + chunkOffset, offset + chunkOffset + chunkLen);
52+
sample.add(chunkBytes);
53+
numBytesInSample += chunkBytes.length;
54+
} else {
55+
int p = random.nextInt(numChunksSeen);
56+
if (p < sample.size()) {
57+
// swap for an existing value
58+
byte[] toAdd = Arrays.copyOfRange(bytes, offset + chunkOffset, offset + chunkOffset + chunkLen);
59+
byte[] toRemove = sample.get(p);
60+
numBytesInSample -= toRemove.length;
61+
numBytesInSample += toAdd.length;
62+
sample.set(p, toAdd);
63+
64+
// Sample could now be too small if we swapped a small chunk for a big one.
65+
// This will be rectified as the next chunk will just be added to the sample, in the if-block above
66+
67+
// But if the sample is too large (from swapping big samples for small samples),
68+
// we need to discard some
69+
while (numBytesInSample > SAMPLE_MAX) {
70+
numBytesInSample -= sample.removeLast().length;
71+
}
72+
}
73+
}
74+
}
75+
}
76+
}

0 commit comments

Comments
 (0)