|
27 | 27 | import org.elasticsearch.compute.operator.AbstractPageMappingOperator; |
28 | 28 | import org.elasticsearch.compute.operator.DriverContext; |
29 | 29 | import org.elasticsearch.compute.operator.Operator; |
30 | | -import org.elasticsearch.core.Assertions; |
31 | 30 | import org.elasticsearch.core.Releasable; |
32 | 31 | import org.elasticsearch.core.Releasables; |
33 | 32 | import org.elasticsearch.index.fieldvisitor.StoredFieldLoader; |
@@ -158,12 +157,6 @@ public int get(int i) { |
158 | 157 | many.run(); |
159 | 158 | } |
160 | 159 | } |
161 | | - if (Assertions.ENABLED) { |
162 | | - for (int f = 0; f < fields.length; f++) { |
163 | | - assert blocks[f].elementType() == ElementType.NULL || blocks[f].elementType() == fields[f].info.type |
164 | | - : blocks[f].elementType() + " NOT IN (NULL, " + fields[f].info.type + ")"; |
165 | | - } |
166 | | - } |
167 | 160 | success = true; |
168 | 161 | return page.appendBlocks(blocks); |
169 | 162 | } catch (IOException e) { |
@@ -227,6 +220,7 @@ private void loadFromSingleLeaf(Block[] blocks, int shard, int segment, BlockLoa |
227 | 220 | BlockLoader.ColumnAtATimeReader columnAtATime = field.columnAtATime(ctx); |
228 | 221 | if (columnAtATime != null) { |
229 | 222 | blocks[f] = (Block) columnAtATime.read(loaderBlockFactory, docs); |
| 223 | + sanityCheckBlock(columnAtATime, docs.count(), blocks[f], f); |
230 | 224 | } else { |
231 | 225 | rowStrideReaders.add( |
232 | 226 | new RowStrideReaderWork( |
@@ -276,6 +270,7 @@ private void loadFromSingleLeaf(Block[] blocks, int shard, int segment, BlockLoa |
276 | 270 | } |
277 | 271 | for (RowStrideReaderWork work : rowStrideReaders) { |
278 | 272 | blocks[work.offset] = work.build(); |
| 273 | + sanityCheckBlock(work.reader, docs.count(), blocks[work.offset], work.offset); |
279 | 274 | } |
280 | 275 | } finally { |
281 | 276 | Releasables.close(rowStrideReaders); |
@@ -379,6 +374,7 @@ void run() throws IOException { |
379 | 374 | try (Block targetBlock = fieldTypeBuilders[f].build()) { |
380 | 375 | target[f] = targetBlock.filter(backwards); |
381 | 376 | } |
| 377 | + sanityCheckBlock(rowStride[f], docs.getPositionCount(), target[f], f); |
382 | 378 | } |
383 | 379 | } |
384 | 380 |
|
@@ -555,6 +551,40 @@ protected Status status(long processNanos, int pagesProcessed, long rowsReceived |
555 | 551 | return new Status(new TreeMap<>(readersBuilt), processNanos, pagesProcessed, rowsReceived, rowsEmitted); |
556 | 552 | } |
557 | 553 |
|
| 554 | + /** |
| 555 | + * Quick checks for on the loaded block to make sure it looks reasonable. |
| 556 | + * @param loader the object that did the loading - we use it to make error messages if the block is busted |
| 557 | + * @param expectedPositions how many positions the block should have - it's as many as the incoming {@link Page} has |
| 558 | + * @param block the block to sanity check |
| 559 | + * @param field offset into the {@link #fields} array for the block being loaded |
| 560 | + */ |
| 561 | + private void sanityCheckBlock(Object loader, int expectedPositions, Block block, int field) { |
| 562 | + if (block.getPositionCount() != expectedPositions) { |
| 563 | + throw new IllegalStateException( |
| 564 | + sanityCheckBlockErrorPrefix(loader, block, field) |
| 565 | + + " has [" |
| 566 | + + block.getPositionCount() |
| 567 | + + "] positions instead of [" |
| 568 | + + expectedPositions |
| 569 | + + "]" |
| 570 | + ); |
| 571 | + } |
| 572 | + if (block.elementType() != ElementType.NULL && block.elementType() != fields[field].info.type) { |
| 573 | + throw new IllegalStateException( |
| 574 | + sanityCheckBlockErrorPrefix(loader, block, field) |
| 575 | + + "'s element_type [" |
| 576 | + + block.elementType() |
| 577 | + + "] NOT IN (NULL, " |
| 578 | + + fields[field].info.type |
| 579 | + + ")" |
| 580 | + ); |
| 581 | + } |
| 582 | + } |
| 583 | + |
| 584 | + private String sanityCheckBlockErrorPrefix(Object loader, Block block, int field) { |
| 585 | + return fields[field].info.name + "[" + loader + "]: " + block; |
| 586 | + } |
| 587 | + |
558 | 588 | public static class Status extends AbstractPageMappingOperator.Status { |
559 | 589 | public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( |
560 | 590 | Operator.Status.class, |
|
0 commit comments