Skip to content

Commit e370f43

Browse files
committed
IndexedDISIBuilder
1 parent 44e8bb4 commit e370f43

File tree

2 files changed

+192
-14
lines changed

2 files changed

+192
-14
lines changed

server/src/main/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesConsumer.java

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import org.apache.lucene.util.BytesRef;
4040
import org.apache.lucene.util.BytesRefBuilder;
4141
import org.apache.lucene.util.LongsRef;
42-
import org.apache.lucene.util.RoaringDocIdSet;
4342
import org.apache.lucene.util.StringHelper;
4443
import org.apache.lucene.util.compress.LZ4;
4544
import org.apache.lucene.util.packed.DirectMonotonicWriter;
@@ -159,8 +158,9 @@ private long[] writeField(
159158
meta.writeLong(numValues);
160159
meta.writeInt(numDocsWithValue);
161160

162-
// TODO: write DISI to temp file and append it later to data part:
163-
var docIdSetBuilder = new RoaringDocIdSet.Builder(maxDoc);
161+
// TODO: which IOContext should be used here?
162+
IndexOutput disiTempOutput = null;
163+
IndexedDISIBuilder docIdSetBuilder = null;
164164
if (numValues > 0) {
165165
// Special case for maxOrd of 1, signal -1 that no blocks will be written
166166
meta.writeInt(maxOrd != 1 ? ES87TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT : -1);
@@ -181,13 +181,15 @@ private long[] writeField(
181181
values = valuesProducer.getSortedNumeric(field);
182182
final int bitsPerOrd = maxOrd >= 0 ? PackedInts.bitsRequired(maxOrd - 1) : -1;
183183

184-
// Reset and recompute. The value gathered from TsdbDocValuesProducer may not be accurate if one of the leaves was singleton
185-
// This could cause failures when writing addresses in writeSortedNumericField(...)
186-
numDocsWithValue = 0;
184+
if (numDocsWithValue != 0 && numDocsWithValue != maxDoc) {
185+
disiTempOutput = dir.createTempOutput(data.getName(), "disi", IOContext.DEFAULT);
186+
docIdSetBuilder = new IndexedDISIBuilder(disiTempOutput, IndexedDISI.DEFAULT_DENSE_RANK_POWER);
187+
}
187188

188189
for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) {
189-
numDocsWithValue++;
190-
docIdSetBuilder.add(doc);
190+
if (docIdSetBuilder != null) {
191+
docIdSetBuilder.addDocId(doc);
192+
}
191193
final int count = values.docValueCount();
192194
if (docCountConsumer != null) {
193195
docCountConsumer.accept(count);
@@ -244,13 +246,17 @@ private long[] writeField(
244246
long offset = data.getFilePointer();
245247
meta.writeLong(offset); // docsWithFieldOffset
246248
final short jumpTableEntryCount;
247-
if (maxOrd != 1) {
248-
var bitSet = docIdSetBuilder.build();
249-
var iterator = bitSet.iterator();
250-
if (iterator == null) {
251-
iterator = DocIdSetIterator.empty();
249+
if (maxOrd != 1 && docIdSetBuilder != null) {
250+
jumpTableEntryCount = docIdSetBuilder.build();
251+
String skipListTempFileName = disiTempOutput.getName();
252+
disiTempOutput.close();
253+
try (
254+
// TODO: which IOContext should be used here?
255+
var addressDataInput = dir.openInput(skipListTempFileName, IOContext.DEFAULT)
256+
) {
257+
data.copyBytes(addressDataInput, addressDataInput.length());
252258
}
253-
jumpTableEntryCount = IndexedDISI.writeBitSet(iterator, data, IndexedDISI.DEFAULT_DENSE_RANK_POWER);
259+
org.apache.lucene.util.IOUtils.deleteFilesIgnoringExceptions(dir, skipListTempFileName);
254260
} else {
255261
values = valuesProducer.getSortedNumeric(field);
256262
jumpTableEntryCount = IndexedDISI.writeBitSet(values, data, IndexedDISI.DEFAULT_DENSE_RANK_POWER);
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.codec.tsdb;
11+
12+
import org.apache.lucene.search.DocIdSetIterator;
13+
import org.apache.lucene.store.IndexOutput;
14+
import org.apache.lucene.util.ArrayUtil;
15+
import org.apache.lucene.util.BitSetIterator;
16+
import org.apache.lucene.util.FixedBitSet;
17+
18+
import java.io.IOException;
19+
20+
/**
21+
* Fork of {@link org.apache.lucene.codecs.lucene90.IndexedDISI#writeBitSet(DocIdSetIterator, IndexOutput)} but that allows
22+
* building jump list iteratively with one docid at a time instead of relying on docidset iterator.
23+
*/
24+
final class IndexedDISIBuilder {
25+
26+
private static final int BLOCK_SIZE = 65536; // The number of docIDs that a single block represents
27+
28+
private static final int DENSE_BLOCK_LONGS = BLOCK_SIZE / Long.SIZE; // 1024
29+
public static final byte DEFAULT_DENSE_RANK_POWER = 9; // Every 512 docIDs / 8 longs
30+
31+
static final int MAX_ARRAY_LENGTH = (1 << 12) - 1;
32+
33+
final IndexOutput out;
34+
final byte denseRankPower;
35+
final long origo;
36+
37+
int totalCardinality = 0;
38+
int blockCardinality = 0;
39+
final FixedBitSet buffer = new FixedBitSet(1 << 16);
40+
int[] jumps = new int[ArrayUtil.oversize(1, Integer.BYTES * 2)];
41+
int prevBlock = -1;
42+
int jumpBlockIndex = 0;
43+
44+
IndexedDISIBuilder(IndexOutput out, byte denseRankPower) {
45+
this.out = out;
46+
this.denseRankPower = denseRankPower;
47+
48+
this.origo = out.getFilePointer(); // All jumps are relative to the origo
49+
if ((denseRankPower < 7 || denseRankPower > 15) && denseRankPower != -1) {
50+
throw new IllegalArgumentException(
51+
"Acceptable values for denseRankPower are 7-15 (every 128-32768 docIDs). "
52+
+ "The provided power was "
53+
+ denseRankPower
54+
+ " (every "
55+
+ (int) Math.pow(2, denseRankPower)
56+
+ " docIDs)"
57+
);
58+
}
59+
}
60+
61+
void addDocId(int doc) throws IOException {
62+
final int block = doc >>> 16;
63+
if (prevBlock != -1 && block != prevBlock) {
64+
// Track offset+index from previous block up to current
65+
jumps = addJumps(jumps, out.getFilePointer() - origo, totalCardinality, jumpBlockIndex, prevBlock + 1);
66+
jumpBlockIndex = prevBlock + 1;
67+
// Flush block
68+
flush(prevBlock, buffer, blockCardinality, denseRankPower, out);
69+
// Reset for next block
70+
buffer.clear();
71+
totalCardinality += blockCardinality;
72+
blockCardinality = 0;
73+
}
74+
buffer.set(doc & 0xFFFF);
75+
blockCardinality++;
76+
prevBlock = block;
77+
}
78+
79+
short build() throws IOException {
80+
if (blockCardinality > 0) {
81+
jumps = addJumps(jumps, out.getFilePointer() - origo, totalCardinality, jumpBlockIndex, prevBlock + 1);
82+
totalCardinality += blockCardinality;
83+
flush(prevBlock, buffer, blockCardinality, denseRankPower, out);
84+
buffer.clear();
85+
prevBlock++;
86+
}
87+
final int lastBlock = prevBlock == -1 ? 0 : prevBlock; // There will always be at least 1 block (NO_MORE_DOCS)
88+
// Last entry is a SPARSE with blockIndex == 32767 and the single entry 65535, which becomes the
89+
// docID NO_MORE_DOCS
90+
// To avoid creating 65K jump-table entries, only a single entry is created pointing to the
91+
// offset of the
92+
// NO_MORE_DOCS block, with the jumpBlockIndex set to the logical EMPTY block after all real
93+
// blocks.
94+
jumps = addJumps(jumps, out.getFilePointer() - origo, totalCardinality, lastBlock, lastBlock + 1);
95+
buffer.set(DocIdSetIterator.NO_MORE_DOCS & 0xFFFF);
96+
flush(DocIdSetIterator.NO_MORE_DOCS >>> 16, buffer, 1, denseRankPower, out);
97+
// offset+index jump-table stored at the end
98+
return flushBlockJumps(jumps, lastBlock + 1, out);
99+
}
100+
101+
// Adds entries to the offset & index jump-table for blocks
102+
private static int[] addJumps(int[] jumps, long offset, int index, int startBlock, int endBlock) {
103+
assert offset < Integer.MAX_VALUE : "Logically the offset should not exceed 2^30 but was >= Integer.MAX_VALUE";
104+
jumps = ArrayUtil.grow(jumps, (endBlock + 1) * 2);
105+
for (int b = startBlock; b < endBlock; b++) {
106+
jumps[b * 2] = index;
107+
jumps[b * 2 + 1] = (int) offset;
108+
}
109+
return jumps;
110+
}
111+
112+
private static void flush(int block, FixedBitSet buffer, int cardinality, byte denseRankPower, IndexOutput out) throws IOException {
113+
assert block >= 0 && block < BLOCK_SIZE;
114+
out.writeShort((short) block);
115+
assert cardinality > 0 && cardinality <= BLOCK_SIZE;
116+
out.writeShort((short) (cardinality - 1));
117+
if (cardinality > MAX_ARRAY_LENGTH) {
118+
if (cardinality != BLOCK_SIZE) { // all docs are set
119+
if (denseRankPower != -1) {
120+
final byte[] rank = createRank(buffer, denseRankPower);
121+
out.writeBytes(rank, rank.length);
122+
}
123+
for (long word : buffer.getBits()) {
124+
out.writeLong(word);
125+
}
126+
}
127+
} else {
128+
BitSetIterator it = new BitSetIterator(buffer, cardinality);
129+
for (int doc = it.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = it.nextDoc()) {
130+
out.writeShort((short) doc);
131+
}
132+
}
133+
}
134+
135+
// Flushes the offset & index jump-table for blocks. This should be the last data written to out
136+
// This method returns the blockCount for the blocks reachable for the jump_table or -1 for no
137+
// jump-table
138+
private static short flushBlockJumps(int[] jumps, int blockCount, IndexOutput out) throws IOException {
139+
if (blockCount == 2) { // Jumps with a single real entry + NO_MORE_DOCS is just wasted space so we ignore
140+
// that
141+
blockCount = 0;
142+
}
143+
for (int i = 0; i < blockCount; i++) {
144+
out.writeInt(jumps[i * 2]); // index
145+
out.writeInt(jumps[i * 2 + 1]); // offset
146+
}
147+
// As there are at most 32k blocks, the count is a short
148+
// The jumpTableOffset will be at lastPos - (blockCount * Long.BYTES)
149+
return (short) blockCount;
150+
}
151+
152+
// Creates a DENSE rank-entry (the number of set bits up to a given point) for the buffer.
153+
// One rank-entry for every {@code 2^denseRankPower} bits, with each rank-entry using 2 bytes.
154+
// Represented as a byte[] for fast flushing and mirroring of the retrieval representation.
155+
private static byte[] createRank(FixedBitSet buffer, byte denseRankPower) {
156+
final int longsPerRank = 1 << (denseRankPower - 6);
157+
final int rankMark = longsPerRank - 1;
158+
final int rankIndexShift = denseRankPower - 7; // 6 for the long (2^6) + 1 for 2 bytes/entry
159+
final byte[] rank = new byte[DENSE_BLOCK_LONGS >> rankIndexShift];
160+
final long[] bits = buffer.getBits();
161+
int bitCount = 0;
162+
for (int word = 0; word < DENSE_BLOCK_LONGS; word++) {
163+
if ((word & rankMark) == 0) { // Every longsPerRank longs
164+
rank[word >> rankIndexShift] = (byte) (bitCount >> 8);
165+
rank[(word >> rankIndexShift) + 1] = (byte) (bitCount & 0xFF);
166+
}
167+
bitCount += Long.bitCount(bits[word]);
168+
}
169+
return rank;
170+
}
171+
172+
}

0 commit comments

Comments
 (0)