Skip to content

Commit abd9117

Browse files
committed
added not optimized block-based ordinal loading for tsid
1 parent 2f0330d commit abd9117

File tree

10 files changed

+296
-12
lines changed

10 files changed

+296
-12
lines changed

server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/BlockAwareNumericDocValues.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,6 @@
1313

1414
public abstract class BlockAwareNumericDocValues extends NumericDocValues {
1515

16-
public abstract SingletonLongDocValuesBlockLoader getSingletonBlockLoader();
16+
public abstract SingletonDocValuesBlockLoader getSingletonBlockLoader();
1717

1818
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.codec.tsdb.es819;
11+
12+
import org.apache.lucene.index.SortedDocValues;
13+
14+
public abstract class BlockAwareSortedDocValues extends SortedDocValues {
15+
16+
public abstract SingletonDocValuesBlockLoader getSingletonBlockLoader();
17+
18+
}

server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -369,10 +369,19 @@ public int advance(int target) throws IOException {
369369
public long cost() {
370370
return ords.cost();
371371
}
372+
373+
@Override
374+
public SingletonDocValuesBlockLoader getSingletonBlockLoader() {
375+
if (ords instanceof BlockAwareNumericDocValues b) {
376+
return b.getSingletonBlockLoader();
377+
} else {
378+
return null;
379+
}
380+
}
372381
};
373382
}
374383

375-
abstract class BaseSortedDocValues extends SortedDocValues {
384+
abstract class BaseSortedDocValues extends BlockAwareSortedDocValues {
376385

377386
final SortedEntry entry;
378387
final TermsEnum termsEnum;
@@ -1203,17 +1212,17 @@ public long longValue() throws IOException {
12031212
return currentBlock[blockInIndex];
12041213
}
12051214

1206-
private SingletonLongDocValuesBlockLoader loader;
1215+
private SingletonDocValuesBlockLoader loader;
12071216

12081217
@Override
1209-
public SingletonLongDocValuesBlockLoader getSingletonBlockLoader() {
1218+
public SingletonDocValuesBlockLoader getSingletonBlockLoader() {
12101219
if (loader == null) {
1211-
assert maxOrd == -1;
1212-
loader = new SingletonLongDocValuesBlockLoader() {
1220+
loader = new SingletonDocValuesBlockLoader() {
12131221

12141222
@Override
12151223
public void loadBlock(BlockLoader.SingletonLongBuilder builder, BlockLoader.Docs docs, int offset)
12161224
throws IOException {
1225+
assert maxOrd == -1;
12171226
doc = docs.get(docs.count() - 1);
12181227
boolean isDense = doc - docs.get(0) == docs.count() - 1;
12191228
if (isDense) {
@@ -1315,6 +1324,27 @@ public void loadBlock(BlockLoader.SingletonLongBuilder builder, BlockLoader.Docs
13151324
}
13161325
}
13171326

1327+
@Override
1328+
public void loadBlock(BlockLoader.SingletonOrdinalsBuilder builder, BlockLoader.Docs docs, int offset)
1329+
throws IOException {
1330+
assert maxOrd >= 0;
1331+
for (int i = offset; i < docs.count(); i++) {
1332+
int index = docs.get(i);
1333+
final int blockIndex = index >>> ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SHIFT;
1334+
final int blockInIndex = index & ES819TSDBDocValuesFormat.NUMERIC_BLOCK_MASK;
1335+
if (blockIndex != currentBlockIndex) {
1336+
assert blockIndex > currentBlockIndex : blockIndex + " < " + currentBlockIndex;
1337+
// no need to seek if the loading block is the next block
1338+
if (currentBlockIndex + 1 != blockIndex) {
1339+
valuesData.seek(indexReader.get(blockIndex));
1340+
}
1341+
currentBlockIndex = blockIndex;
1342+
decoder.decodeOrdinals(valuesData, currentBlock, bitsPerOrd);
1343+
}
1344+
builder.appendOrd(Math.toIntExact(currentBlock[blockInIndex]));
1345+
}
1346+
}
1347+
13181348
@Override
13191349
public int docID() {
13201350
return doc;
Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@
1313

1414
import java.io.IOException;
1515

16-
public interface SingletonLongDocValuesBlockLoader {
16+
public interface SingletonDocValuesBlockLoader {
1717

1818
void loadBlock(BlockLoader.SingletonLongBuilder builder, BlockLoader.Docs docs, int offset) throws IOException;
1919

20+
void loadBlock(BlockLoader.SingletonOrdinalsBuilder builder, BlockLoader.Docs docs, int offset) throws IOException;
21+
2022
int docID();
2123

2224
}

server/src/main/java/org/elasticsearch/index/mapper/BlockDocValuesReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -632,7 +632,7 @@ public String toString() {
632632
}
633633
}
634634

635-
private static class SingletonOrdinals extends BlockDocValuesReader {
635+
public static class SingletonOrdinals extends BlockDocValuesReader {
636636
private final SortedDocValues ordinals;
637637

638638
SingletonOrdinals(SortedDocValues ordinals) {
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.mapper;
11+
12+
import org.apache.lucene.index.DocValues;
13+
import org.apache.lucene.index.LeafReaderContext;
14+
import org.apache.lucene.index.SortedDocValues;
15+
import org.apache.lucene.index.SortedSetDocValues;
16+
import org.elasticsearch.index.codec.tsdb.es819.BlockAwareSortedDocValues;
17+
import org.elasticsearch.index.codec.tsdb.es819.SingletonDocValuesBlockLoader;
18+
import org.elasticsearch.search.fetch.StoredFieldsSpec;
19+
20+
import java.io.IOException;
21+
22+
public final class TSIDBlockLoader implements BlockLoader {
23+
24+
private static final String FIELD_NAME = TimeSeriesIdFieldMapper.NAME;
25+
26+
@Override
27+
public Builder builder(BlockFactory factory, int expectedCount) {
28+
return factory.bytesRefs(expectedCount);
29+
}
30+
31+
@Override
32+
public ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) throws IOException {
33+
var singleton = context.reader().getSortedDocValues(FIELD_NAME);
34+
return new TSIDs((BlockAwareSortedDocValues) singleton);
35+
}
36+
37+
@Override
38+
public RowStrideReader rowStrideReader(LeafReaderContext context) throws IOException {
39+
var singleton = context.reader().getSortedDocValues(FIELD_NAME);
40+
return new BlockDocValuesReader.SingletonOrdinals(singleton);
41+
}
42+
43+
@Override
44+
public StoredFieldsSpec rowStrideStoredFieldSpec() {
45+
return StoredFieldsSpec.NO_REQUIREMENTS;
46+
}
47+
48+
@Override
49+
public boolean supportsOrdinals() {
50+
return true;
51+
}
52+
53+
@Override
54+
public SortedSetDocValues ordinals(LeafReaderContext context) throws IOException {
55+
return DocValues.getSortedSet(context.reader(), FIELD_NAME);
56+
}
57+
58+
public static final class TSIDs implements ColumnAtATimeReader {
59+
private final Thread creationThread;
60+
private final SortedDocValues sorted;
61+
private final SingletonDocValuesBlockLoader blockLoader;
62+
63+
TSIDs(BlockAwareSortedDocValues sorted) {
64+
this.creationThread = Thread.currentThread();
65+
this.sorted = sorted;
66+
this.blockLoader = sorted.getSingletonBlockLoader();
67+
}
68+
69+
@Override
70+
public Block read(BlockFactory factory, Docs docs, int offset) throws IOException {
71+
try (SingletonOrdinalsBuilder builder = factory.singletonOrdinalsBuilder(sorted, docs.count() - offset)) {
72+
blockLoader.loadBlock(builder, docs, offset);
73+
return builder.build();
74+
}
75+
}
76+
77+
@Override
78+
public boolean canReuse(int startingDocID) {
79+
return creationThread == Thread.currentThread() && blockLoader.docID() <= startingDocID;
80+
}
81+
82+
@Override
83+
public String toString() {
84+
return "TSIDBlockLoader.TSIDs";
85+
}
86+
}
87+
}

server/src/main/java/org/elasticsearch/index/mapper/TimeSeriesIdFieldMapper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ public Query termQuery(Object value, SearchExecutionContext context) {
135135

136136
@Override
137137
public BlockLoader blockLoader(BlockLoaderContext blContext) {
138-
return new BlockDocValuesReader.BytesRefsFromOrdsBlockLoader(name());
138+
return new TSIDBlockLoader();
139139
}
140140
}
141141

server/src/main/java/org/elasticsearch/index/mapper/TimestampBlockLoader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import org.apache.lucene.index.NumericDocValues;
1515
import org.apache.lucene.index.SortedSetDocValues;
1616
import org.elasticsearch.index.codec.tsdb.es819.BlockAwareNumericDocValues;
17-
import org.elasticsearch.index.codec.tsdb.es819.SingletonLongDocValuesBlockLoader;
17+
import org.elasticsearch.index.codec.tsdb.es819.SingletonDocValuesBlockLoader;
1818
import org.elasticsearch.search.fetch.StoredFieldsSpec;
1919

2020
import java.io.IOException;
@@ -65,7 +65,7 @@ public SortedSetDocValues ordinals(LeafReaderContext context) throws IOException
6565

6666
public static final class Timestamps implements ColumnAtATimeReader {
6767
private final Thread creationThread;
68-
private final SingletonLongDocValuesBlockLoader blockLoader;
68+
private final SingletonDocValuesBlockLoader blockLoader;
6969

7070
Timestamps(BlockAwareNumericDocValues blockAware) {
7171
this.creationThread = Thread.currentThread();
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.mapper;
11+
12+
import org.apache.lucene.document.SortedDocValuesField;
13+
import org.apache.lucene.index.DirectoryReader;
14+
import org.apache.lucene.index.IndexWriter;
15+
import org.apache.lucene.index.IndexWriterConfig;
16+
import org.apache.lucene.index.LeafReaderContext;
17+
import org.apache.lucene.search.Sort;
18+
import org.apache.lucene.search.SortField;
19+
import org.apache.lucene.store.Directory;
20+
import org.apache.lucene.tests.analysis.MockAnalyzer;
21+
import org.apache.lucene.tests.util.TestUtil;
22+
import org.apache.lucene.util.BytesRef;
23+
import org.elasticsearch.cluster.metadata.DataStream;
24+
import org.elasticsearch.common.settings.Settings;
25+
import org.elasticsearch.index.IndexVersion;
26+
import org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat;
27+
28+
import java.util.Locale;
29+
import java.util.stream.IntStream;
30+
31+
import static org.hamcrest.Matchers.equalTo;
32+
import static org.hamcrest.Matchers.instanceOf;
33+
import static org.mockito.Mockito.mock;
34+
35+
public class TSIDBulkBlockLoadingTests extends MapperServiceTestCase {
36+
37+
public void testManyValues() throws Exception {
38+
final String mappings = """
39+
{
40+
"_doc" : {
41+
"properties": {
42+
"@timestamp": {
43+
"type": "date",
44+
"ignore_malformed": false
45+
},
46+
"host_name": {
47+
"type": "keyword",
48+
"time_series_dimension": true
49+
}
50+
}
51+
}
52+
}
53+
""";
54+
Settings settings = indexSettings(IndexVersion.current(), 1, 1).put("index.mode", "time_series")
55+
.put("index.routing_path", "host_name")
56+
.build();
57+
var mapperService = createMapperService(settings, mappings);
58+
try (Directory directory = newDirectory()) {
59+
int from = 0;
60+
int to = 10_000;
61+
int uniqueTsidEvery = 200;
62+
IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random()));
63+
iwc.setLeafSorter(DataStream.TIMESERIES_LEAF_READERS_SORTER);
64+
iwc.setIndexSort(new Sort(new SortField(TimeSeriesIdFieldMapper.NAME, SortField.Type.STRING, false)));
65+
iwc.setCodec(TestUtil.alwaysDocValuesFormat(new ES819TSDBDocValuesFormat()));
66+
try (IndexWriter iw = new IndexWriter(directory, iwc)) {
67+
for (int i = from; i < to; i++) {
68+
LuceneDocument doc = new LuceneDocument();
69+
int tsid = i / uniqueTsidEvery;
70+
doc.add(new SortedDocValuesField(TimeSeriesIdFieldMapper.NAME, new BytesRef(String.format(Locale.ROOT, "%04d", tsid))));
71+
iw.addDocument(doc);
72+
}
73+
iw.forceMerge(1);
74+
}
75+
var mockBlockContext = mock(MappedFieldType.BlockLoaderContext.class);
76+
var blockLoader = mapperService.fieldType(TimeSeriesIdFieldMapper.NAME).blockLoader(mockBlockContext);
77+
try (DirectoryReader reader = DirectoryReader.open(directory)) {
78+
LeafReaderContext context = reader.leaves().get(0);
79+
{
80+
// One big doc block
81+
var columnReader = blockLoader.columnAtATimeReader(context);
82+
assertThat(columnReader, instanceOf(TSIDBlockLoader.TSIDs.class));
83+
var docBlock = TestBlock.docs(IntStream.range(from, to).toArray());
84+
var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0);
85+
assertThat(block.size(), equalTo(to - from));
86+
for (int i = 0; i < block.size(); i++) {
87+
String actual = ((BytesRef) block.get(i)).utf8ToString();
88+
int expectedTsid = i / uniqueTsidEvery;
89+
assertThat(actual, equalTo(String.format(Locale.ROOT, "%04d", expectedTsid)));
90+
}
91+
}
92+
{
93+
// Smaller doc blocks
94+
int docBlockSize = 1000;
95+
var columnReader = blockLoader.columnAtATimeReader(context);
96+
assertThat(columnReader, instanceOf(TSIDBlockLoader.TSIDs.class));
97+
for (int i = from; i < to; i += docBlockSize) {
98+
var docBlock = TestBlock.docs(IntStream.range(i, i + docBlockSize).toArray());
99+
var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0);
100+
assertThat(block.size(), equalTo(docBlockSize));
101+
for (int j = 0; j < block.size(); j++) {
102+
String actual = ((BytesRef) block.get(j)).utf8ToString();
103+
int expectedTsid = (i + j) / uniqueTsidEvery;
104+
assertThat(actual, equalTo(String.format(Locale.ROOT, "%04d", expectedTsid)));
105+
}
106+
}
107+
}
108+
{
109+
// One smaller doc block:
110+
var columnReader = blockLoader.columnAtATimeReader(context);
111+
assertThat(columnReader, instanceOf(TSIDBlockLoader.TSIDs.class));
112+
var docBlock = TestBlock.docs(IntStream.range(1010, 2020).toArray());
113+
var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0);
114+
assertThat(block.size(), equalTo(1010));
115+
for (int i = 0; i < block.size(); i++) {
116+
String actual = ((BytesRef) block.get(i)).utf8ToString();
117+
int expectedTsid = (1010 + i) / uniqueTsidEvery;
118+
assertThat(actual, equalTo(String.format(Locale.ROOT, "%04d", expectedTsid)));
119+
}
120+
}
121+
{
122+
// Read two tiny blocks:
123+
var columnReader = blockLoader.columnAtATimeReader(context);
124+
assertThat(columnReader, instanceOf(TSIDBlockLoader.TSIDs.class));
125+
var docBlock = TestBlock.docs(IntStream.range(32, 64).toArray());
126+
var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0);
127+
assertThat(block.size(), equalTo(32));
128+
for (int i = 0; i < block.size(); i++) {
129+
String actual = ((BytesRef) block.get(i)).utf8ToString();
130+
int expectedTsid = (32 + i) / uniqueTsidEvery;
131+
assertThat(actual, equalTo(String.format(Locale.ROOT, "%04d", expectedTsid)));
132+
}
133+
134+
docBlock = TestBlock.docs(IntStream.range(64, 96).toArray());
135+
block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0);
136+
assertThat(block.size(), equalTo(32));
137+
for (int i = 0; i < block.size(); i++) {
138+
String actual = ((BytesRef) block.get(i)).utf8ToString();
139+
int expectedTsid = (64 + i) / uniqueTsidEvery;
140+
assertThat(actual, equalTo(String.format(Locale.ROOT, "%04d", expectedTsid)));
141+
}
142+
}
143+
}
144+
}
145+
}
146+
147+
}

test/framework/src/main/java/org/elasticsearch/index/mapper/TestBlock.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ private SingletonOrdsBuilder() {
281281
@Override
282282
public SingletonOrdsBuilder appendOrd(int value) {
283283
try {
284-
add(ordinals.lookupOrd(value));
284+
add(BytesRef.deepCopyOf(ordinals.lookupOrd(value)));
285285
return this;
286286
} catch (IOException e) {
287287
throw new UncheckedIOException(e);

0 commit comments

Comments
 (0)