|
36 | 36 | import org.elasticsearch.ResourceNotFoundException; |
37 | 37 | import org.elasticsearch.TransportVersion; |
38 | 38 | import org.elasticsearch.action.get.GetRequest; |
| 39 | +import org.elasticsearch.common.CheckedSupplier; |
39 | 40 | import org.elasticsearch.common.bytes.BytesReference; |
40 | 41 | import org.elasticsearch.common.io.stream.InputStreamStreamInput; |
41 | 42 | import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; |
|
62 | 63 | import org.elasticsearch.index.query.SearchExecutionContext; |
63 | 64 | import org.elasticsearch.indices.breaker.CircuitBreakerService; |
64 | 65 | import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; |
| 66 | +import org.elasticsearch.logging.LogManager; |
| 67 | +import org.elasticsearch.logging.Logger; |
| 68 | +import org.elasticsearch.search.lookup.Source; |
| 69 | +import org.elasticsearch.search.lookup.SourceFilter; |
| 70 | +import org.elasticsearch.search.lookup.SourceProvider; |
65 | 71 | import org.elasticsearch.xcontent.ConstructingObjectParser; |
66 | 72 | import org.elasticsearch.xcontent.NamedXContentRegistry; |
67 | 73 | import org.elasticsearch.xcontent.ParseField; |
|
85 | 91 | import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg; |
86 | 92 |
|
87 | 93 | public class PercolateQueryBuilder extends AbstractQueryBuilder<PercolateQueryBuilder> { |
| 94 | + private static final Logger LOGGER = LogManager.getLogger(PercolateQueryBuilder.class); |
| 95 | + |
88 | 96 | public static final String NAME = "percolate"; |
89 | 97 |
|
90 | 98 | static final ParseField DOCUMENT_FIELD = new ParseField("document"); |
@@ -540,41 +548,81 @@ static PercolateQuery.QueryStore createStore(MappedFieldType queryBuilderFieldTy |
540 | 548 | return docId -> { |
541 | 549 | if (binaryDocValues.advanceExact(docId)) { |
542 | 550 | BytesRef qbSource = binaryDocValues.binaryValue(); |
543 | | - try ( |
544 | | - InputStream in = new ByteArrayInputStream(qbSource.bytes, qbSource.offset, qbSource.length); |
545 | | - StreamInput input = new NamedWriteableAwareStreamInput(new InputStreamStreamInput(in, qbSource.length), registry) |
546 | | - ) { |
547 | | - // Query builder's content is stored via BinaryFieldMapper, which has a custom encoding |
548 | | - // to encode multiple binary values into a single binary doc values field. |
549 | | - // This is the reason we need to first read the number of values and |
550 | | - // then the length of the field value in bytes. |
551 | | - int numValues = input.readVInt(); |
552 | | - assert numValues == 1; |
553 | | - int valueLength = input.readVInt(); |
554 | | - assert valueLength > 0; |
555 | | - |
556 | | - TransportVersion transportVersion; |
557 | | - if (indexVersion.before(IndexVersions.V_8_8_0)) { |
558 | | - transportVersion = TransportVersion.fromId(indexVersion.id()); |
559 | | - } else { |
560 | | - transportVersion = TransportVersion.readVersion(input); |
| 551 | + QueryBuilder queryBuilder = readQueryBuilder(qbSource, registry, indexVersion, () -> { |
| 552 | + // query builder is written in an incompatible format, fall-back to reading it from source |
| 553 | + if (context.isSourceEnabled() == false) { |
| 554 | + throw new ElasticsearchException( |
| 555 | + "Unable to read percolator query. Original transport version is incompatible and source is " |
| 556 | + + "unavailable on index [{}].", |
| 557 | + context.index().getName() |
| 558 | + ); |
561 | 559 | } |
562 | | - // set the transportversion here - only read vints so far, so can change the version freely at this point |
563 | | - input.setTransportVersion(transportVersion); |
564 | | - |
565 | | - QueryBuilder queryBuilder = input.readNamedWriteable(QueryBuilder.class); |
566 | | - assert in.read() == -1; |
567 | | - queryBuilder = Rewriteable.rewrite(queryBuilder, context); |
568 | | - return queryBuilder.toQuery(context); |
569 | | - } |
570 | | - |
| 560 | + LOGGER.warn( |
| 561 | + "Reading percolator query from source. For best performance, reindexing of index [{}] is required.", |
| 562 | + context.index().getName() |
| 563 | + ); |
| 564 | + SourceProvider sourceProvider = context.createSourceProvider(new SourceFilter(null, null)); |
| 565 | + Source source = sourceProvider.getSource(ctx, docId); |
| 566 | + SourceToParse sourceToParse = new SourceToParse( |
| 567 | + String.valueOf(docId), |
| 568 | + source.internalSourceRef(), |
| 569 | + source.sourceContentType() |
| 570 | + ); |
| 571 | + |
| 572 | + return context.parseDocument(sourceToParse).rootDoc().getBinaryValue(queryBuilderFieldType.name()); |
| 573 | + }); |
| 574 | + |
| 575 | + queryBuilder = Rewriteable.rewrite(queryBuilder, context); |
| 576 | + return queryBuilder.toQuery(context); |
571 | 577 | } else { |
572 | 578 | return null; |
573 | 579 | } |
574 | 580 | }; |
575 | 581 | }; |
576 | 582 | } |
577 | 583 |
|
| 584 | + private static QueryBuilder readQueryBuilder( |
| 585 | + BytesRef bytesRef, |
| 586 | + NamedWriteableRegistry registry, |
| 587 | + IndexVersion indexVersion, |
| 588 | + CheckedSupplier<BytesRef, IOException> fallbackSource |
| 589 | + ) throws IOException { |
| 590 | + try ( |
| 591 | + InputStream in = new ByteArrayInputStream(bytesRef.bytes, bytesRef.offset, bytesRef.length); |
| 592 | + StreamInput input = new NamedWriteableAwareStreamInput(new InputStreamStreamInput(in, bytesRef.length), registry) |
| 593 | + ) { |
| 594 | + // Query builder's content is stored via BinaryFieldMapper, which has a custom encoding |
| 595 | + // to encode multiple binary values into a single binary doc values field. |
| 596 | + // This is the reason we need to first read the number of values and |
| 597 | + // then the length of the field value in bytes. |
| 598 | + int numValues = input.readVInt(); |
| 599 | + assert numValues == 1; |
| 600 | + int valueLength = input.readVInt(); |
| 601 | + assert valueLength > 0; |
| 602 | + |
| 603 | + TransportVersion transportVersion; |
| 604 | + if (indexVersion.before(IndexVersions.V_8_8_0)) { |
| 605 | + transportVersion = TransportVersion.fromId(indexVersion.id()); |
| 606 | + } else { |
| 607 | + transportVersion = TransportVersion.readVersion(input); |
| 608 | + } |
| 609 | + |
| 610 | + QueryBuilder queryBuilder; |
| 611 | + |
| 612 | + if (TransportVersion.isCompatible(transportVersion) || fallbackSource == null) { |
| 613 | + // set the transportversion here - only read vints so far, so can change the version freely at this point |
| 614 | + input.setTransportVersion(transportVersion); |
| 615 | + queryBuilder = input.readNamedWriteable(QueryBuilder.class); |
| 616 | + assert in.read() == -1; |
| 617 | + } else { |
| 618 | + // incompatible transport version, try the fallback |
| 619 | + queryBuilder = readQueryBuilder(fallbackSource.get(), registry, indexVersion, null); |
| 620 | + } |
| 621 | + |
| 622 | + return queryBuilder; |
| 623 | + } |
| 624 | + } |
| 625 | + |
578 | 626 | static SearchExecutionContext wrap(SearchExecutionContext delegate) { |
579 | 627 | return new SearchExecutionContext(delegate) { |
580 | 628 |
|
|
0 commit comments