|
24 | 24 | import org.apache.lucene.index.IndexWriter; |
25 | 25 | import org.apache.lucene.index.IndexWriterConfig; |
26 | 26 | import org.apache.lucene.index.LeafReaderContext; |
| 27 | +import org.apache.lucene.index.LogByteSizeMergePolicy; |
27 | 28 | import org.apache.lucene.index.SortedDocValues; |
28 | 29 | import org.apache.lucene.index.SortedNumericDocValues; |
29 | 30 | import org.apache.lucene.index.SortedSetDocValues; |
|
38 | 39 | import org.apache.lucene.tests.index.RandomIndexWriter; |
39 | 40 | import org.apache.lucene.tests.util.TestUtil; |
40 | 41 | import org.apache.lucene.util.BytesRef; |
| 42 | +import org.elasticsearch.cluster.metadata.DataStream; |
41 | 43 |
|
42 | 44 | import java.io.IOException; |
43 | 45 | import java.util.ArrayList; |
@@ -258,14 +260,7 @@ public void testForceMergeDenseCase() throws Exception { |
258 | 260 | String hostnameField = "host.name"; |
259 | 261 | long baseTimestamp = 1704067200000L; |
260 | 262 |
|
261 | | - var config = new IndexWriterConfig(); |
262 | | - config.setIndexSort( |
263 | | - new Sort( |
264 | | - new SortField(hostnameField, SortField.Type.STRING, false), |
265 | | - new SortedNumericSortField(timestampField, SortField.Type.LONG, true) |
266 | | - ) |
267 | | - ); |
268 | | - config.setCodec(getCodec()); |
| 263 | + var config = getTimeSeriesIndexWriterConfig(hostnameField, timestampField); |
269 | 264 | try (var dir = newDirectory(); var iw = new IndexWriter(dir, config)) { |
270 | 265 | long counter1 = 0; |
271 | 266 | long counter2 = 10_000_000; |
@@ -369,4 +364,99 @@ public void testForceMergeDenseCase() throws Exception { |
369 | 364 | } |
370 | 365 | } |
371 | 366 | } |
| 367 | + |
| 368 | + public void testWithNoValueMultiValue() throws Exception { |
| 369 | + String timestampField = "@timestamp"; |
| 370 | + String hostnameField = "host.name"; |
| 371 | + long baseTimestamp = 1704067200000L; |
| 372 | + |
| 373 | + var config = getTimeSeriesIndexWriterConfig(hostnameField, timestampField); |
| 374 | + try (var dir = newDirectory(); var iw = new IndexWriter(dir, config)) { |
| 375 | + int numRounds = 4 + random().nextInt(28); |
| 376 | + int numDocsPerRound = 8 + random().nextInt(56); |
| 377 | + long[] gauge1Values = new long[] { 2, 4, 6, 8, 10, 12, 14, 16 }; |
| 378 | + String[] tags = new String[] { "tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8" }; |
| 379 | + { |
| 380 | + long timestamp = baseTimestamp; |
| 381 | + for (int i = 0; i < numRounds; i++) { |
| 382 | + int r = random().nextInt(10); |
| 383 | + for (int j = 0; j < numDocsPerRound; j++) { |
| 384 | + var d = new Document(); |
| 385 | + String hostName = String.format(Locale.ROOT, "host-%03d", i); |
| 386 | + d.add(new SortedDocValuesField(hostnameField, new BytesRef(hostName))); |
| 387 | + // Index sorting doesn't work with NumericDocValuesField: |
| 388 | + d.add(new SortedNumericDocValuesField(timestampField, timestamp++)); |
| 389 | + |
| 390 | + if (r % 10 == 5) { |
| 391 | + // sometimes no values |
| 392 | + } else if (r % 10 > 5) { |
| 393 | + // often multiple values: |
| 394 | + int numValues = 2 + random().nextInt(4); |
| 395 | + for (int k = 0; k < numValues; k++) { |
| 396 | + d.add(new SortedNumericDocValuesField("gauge_1", gauge1Values[(j + k) % gauge1Values.length])); |
| 397 | + d.add(new SortedSetDocValuesField("tags", new BytesRef(tags[(j + k) % tags.length]))); |
| 398 | + } |
| 399 | + } else { |
| 400 | + // otherwise single value: |
| 401 | + d.add(new SortedNumericDocValuesField("gauge_1", gauge1Values[j % gauge1Values.length])); |
| 402 | + d.add(new SortedSetDocValuesField("tags", new BytesRef(tags[j % tags.length]))); |
| 403 | + } |
| 404 | + |
| 405 | + iw.addDocument(d); |
| 406 | + } |
| 407 | + iw.commit(); |
| 408 | + } |
| 409 | + iw.forceMerge(1); |
| 410 | + } |
| 411 | + |
| 412 | + int numDocs = numRounds * numDocsPerRound; |
| 413 | + try (var reader = DirectoryReader.open(iw)) { |
| 414 | + assertEquals(1, reader.leaves().size()); |
| 415 | + assertEquals(numDocs, reader.maxDoc()); |
| 416 | + var leaf = reader.leaves().get(0).reader(); |
| 417 | + var hostNameDV = leaf.getSortedDocValues(hostnameField); |
| 418 | + assertNotNull(hostNameDV); |
| 419 | + var timestampDV = DocValues.unwrapSingleton(leaf.getSortedNumericDocValues(timestampField)); |
| 420 | + assertNotNull(timestampDV); |
| 421 | + var gaugeOneDV = leaf.getSortedNumericDocValues("gauge_1"); |
| 422 | + assertNotNull(gaugeOneDV); |
| 423 | + for (int i = 0; i < numDocs; i++) { |
| 424 | + assertEquals(i, hostNameDV.nextDoc()); |
| 425 | + int round = i / numDocsPerRound; |
| 426 | + String expectedHostName = String.format(Locale.ROOT, "host-%03d", round); |
| 427 | + String actualHostName = hostNameDV.lookupOrd(hostNameDV.ordValue()).utf8ToString(); |
| 428 | + assertEquals(expectedHostName, actualHostName); |
| 429 | + |
| 430 | + assertEquals(i, timestampDV.nextDoc()); |
| 431 | + long timestamp = timestampDV.longValue(); |
| 432 | + long lowerBound = baseTimestamp; |
| 433 | + long upperBound = baseTimestamp + numDocs; |
| 434 | + assertTrue( |
| 435 | + "unexpected timestamp [" + timestamp + "], expected between [" + lowerBound + "] and [" + upperBound + "]", |
| 436 | + timestamp >= lowerBound && timestamp < upperBound |
| 437 | + ); |
| 438 | + if (gaugeOneDV.advanceExact(i)) { |
| 439 | + for (int j = 0; j < gaugeOneDV.docValueCount(); j++) { |
| 440 | + long value = gaugeOneDV.nextValue(); |
| 441 | + assertTrue("unexpected gauge [" + value + "]", Arrays.binarySearch(gauge1Values, value) >= 0); |
| 442 | + } |
| 443 | + } |
| 444 | + } |
| 445 | + } |
| 446 | + } |
| 447 | + } |
| 448 | + |
| 449 | + private IndexWriterConfig getTimeSeriesIndexWriterConfig(String hostnameField, String timestampField) { |
| 450 | + var config = new IndexWriterConfig(); |
| 451 | + config.setIndexSort( |
| 452 | + new Sort( |
| 453 | + new SortField(hostnameField, SortField.Type.STRING, false), |
| 454 | + new SortedNumericSortField(timestampField, SortField.Type.LONG, true) |
| 455 | + ) |
| 456 | + ); |
| 457 | + config.setLeafSorter(DataStream.TIMESERIES_LEAF_READERS_SORTER); |
| 458 | + config.setMergePolicy(new LogByteSizeMergePolicy()); |
| 459 | + config.setCodec(getCodec()); |
| 460 | + return config; |
| 461 | + } |
372 | 462 | } |
0 commit comments