diff --git a/server/src/main/java/org/opensearch/index/fielddata/plain/HllFieldData.java b/server/src/main/java/org/opensearch/index/fielddata/plain/HllFieldData.java new file mode 100644 index 0000000000000..43a39a814dfaf --- /dev/null +++ b/server/src/main/java/org/opensearch/index/fielddata/plain/HllFieldData.java @@ -0,0 +1,176 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.fielddata.plain; + +import org.apache.lucene.index.BinaryDocValues; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.SortField; +import org.apache.lucene.util.BytesRef; +import org.opensearch.common.Nullable; +import org.opensearch.common.util.BigArrays; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.indices.breaker.CircuitBreakerService; +import org.opensearch.index.fielddata.IndexFieldData; +import org.opensearch.index.fielddata.IndexFieldData.XFieldComparatorSource.Nested; +import org.opensearch.index.fielddata.IndexFieldDataCache; +import org.opensearch.index.fielddata.LeafFieldData; +import org.opensearch.index.fielddata.ScriptDocValues; +import org.opensearch.index.fielddata.SortedBinaryDocValues; +import org.opensearch.search.DocValueFormat; +import org.opensearch.search.MultiValueMode; +import org.opensearch.search.aggregations.metrics.AbstractHyperLogLogPlusPlus; +import org.opensearch.search.aggregations.support.ValuesSourceType; +import org.opensearch.search.sort.BucketedSort; +import org.opensearch.search.sort.SortOrder; + +import java.io.IOException; + +import static org.opensearch.search.aggregations.support.CoreValuesSourceType.BYTES; + +/** + * Field data implementation for HyperLogLog++ sketch fields. + * Provides access to HLL++ sketches stored as binary doc values. + * + * @opensearch.internal + */ +public class HllFieldData implements IndexFieldData { + + /** + * Builder for HLL field data + * + * @opensearch.internal + */ + public static class Builder implements IndexFieldData.Builder { + private final String name; + private final int precision; + + public Builder(String name, int precision) { + this.name = name; + this.precision = precision; + } + + @Override + public HllFieldData build(IndexFieldDataCache cache, CircuitBreakerService breakerService) { + return new HllFieldData(name, precision); + } + } + + private final String fieldName; + private final int precision; + + private HllFieldData(String fieldName, int precision) { + this.fieldName = fieldName; + this.precision = precision; + } + + @Override + public final String getFieldName() { + return fieldName; + } + + public int getPrecision() { + return precision; + } + + @Override + public ValuesSourceType getValuesSourceType() { + // HLL fields use BYTES values source type since they store binary data + return BYTES; + } + + @Override + public HllLeafFieldData load(LeafReaderContext context) { + return new HllLeafFieldData(context.reader(), fieldName); + } + + @Override + public HllLeafFieldData loadDirect(LeafReaderContext context) throws Exception { + return load(context); + } + + @Override + public SortField sortField( + @Nullable Object missingValue, + MultiValueMode sortMode, + XFieldComparatorSource.Nested nested, + boolean reverse + ) { + throw new IllegalArgumentException("Sorting is not supported on [hll] fields"); + } + + @Override + public BucketedSort newBucketedSort( + BigArrays bigArrays, + Object missingValue, + MultiValueMode sortMode, + Nested nested, + SortOrder sortOrder, + DocValueFormat format, + int bucketSize, + BucketedSort.ExtraData extra + ) { + throw new IllegalArgumentException("Bucketed sort is not supported on [hll] fields"); + } + + /** + * Leaf-level field data for HLL++ sketches. + * + * @opensearch.internal + */ + public static class HllLeafFieldData implements LeafFieldData { + + private final LeafReader reader; + private final String fieldName; + + HllLeafFieldData(LeafReader reader, String fieldName) { + this.reader = reader; + this.fieldName = fieldName; + } + + @Override + public long ramBytesUsed() { + return 0; + } + + @Override + public void close() { + // Nothing to close + } + + @Override + public ScriptDocValues getScriptValues() { + throw new UnsupportedOperationException("HLL fields do not support getScriptValues"); + } + + @Override + public SortedBinaryDocValues getBytesValues() { + throw new UnsupportedOperationException("HLL fields do not support getBytesValues"); + } + + /** + * Get the HLL++ sketch for the given document ID. + * + * @param docId the document ID + * @return the HLL++ sketch, or null if the document doesn't have a value + * @throws IOException if an error occurs reading the sketch + */ + public AbstractHyperLogLogPlusPlus getSketch(int docId) throws IOException { + BinaryDocValues docValues = reader.getBinaryDocValues(fieldName); + if (docValues != null && docValues.advanceExact(docId)) { + BytesRef sketchBytes = docValues.binaryValue(); + return AbstractHyperLogLogPlusPlus.readFrom( + new BytesArray(sketchBytes.bytes, sketchBytes.offset, sketchBytes.length).streamInput(), + BigArrays.NON_RECYCLING_INSTANCE + ); + } + return null; + } + } +} diff --git a/server/src/main/java/org/opensearch/index/mapper/HllFieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/HllFieldMapper.java new file mode 100644 index 0000000000000..fa7eb1548f166 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/mapper/HllFieldMapper.java @@ -0,0 +1,234 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.mapper; + +import org.apache.lucene.document.BinaryDocValuesField; +import org.apache.lucene.search.FieldExistsQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.util.BytesRef; +import org.opensearch.common.util.BigArrays; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.index.fielddata.IndexFieldData; +import org.opensearch.index.fielddata.plain.HllFieldData; +import org.opensearch.index.query.QueryShardContext; +import org.opensearch.search.aggregations.metrics.AbstractHyperLogLog; +import org.opensearch.search.aggregations.metrics.AbstractHyperLogLogPlusPlus; +import org.opensearch.search.aggregations.metrics.HyperLogLogPlusPlus; +import org.opensearch.search.lookup.SearchLookup; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +/** + * A {@link FieldMapper} for HyperLogLog++ sketch fields. + * This field type stores pre-aggregated cardinality data using HLL++ sketch data structures. + * It is intended for internal use by OpenSearch and its plugins (such as ISM for multi-tier rollup). + * + * @opensearch.internal + */ +public class HllFieldMapper extends ParametrizedFieldMapper { + + public static final String CONTENT_TYPE = "hll"; + + private static HllFieldMapper toType(FieldMapper in) { + return (HllFieldMapper) in; + } + + /** + * Builder for the HLL field mapper + * + * @opensearch.internal + */ + public static class Builder extends ParametrizedFieldMapper.Builder { + + private final Parameter precision = Parameter.intParam( + "precision", + false, + m -> toType(m).precision, + HyperLogLogPlusPlus.DEFAULT_PRECISION + ).setValidator(Builder::validatePrecision); + + private static void validatePrecision(int precision) { + if (precision < AbstractHyperLogLog.MIN_PRECISION || precision > AbstractHyperLogLog.MAX_PRECISION) { + throw new IllegalArgumentException( + "precision must be between " + + AbstractHyperLogLog.MIN_PRECISION + + " and " + + AbstractHyperLogLog.MAX_PRECISION + + ", got: " + + precision + ); + } + } + + private final Parameter> meta = Parameter.metaParam(); + + public Builder(String name) { + super(name); + } + + @Override + protected List> getParameters() { + return Arrays.asList(precision, meta); + } + + @Override + public HllFieldMapper build(BuilderContext context) { + return new HllFieldMapper( + name, + new HllFieldType(buildFullName(context), precision.getValue(), meta.getValue()), + multiFieldsBuilder.build(this, context), + copyTo.build(), + precision.getValue() + ); + } + } + + public static final TypeParser PARSER = new TypeParser((n, c) -> { + // HLL fields are intended for internal use by OpenSearch and plugins only. + return new Builder(n); + }); + + /** + * HLL field type + * + * @opensearch.internal + */ + public static final class HllFieldType extends MappedFieldType { + + private final int precision; + + public HllFieldType(String name, int precision, Map meta) { + super(name, false, false, true, TextSearchInfo.NONE, meta); + this.precision = precision; + } + + @Override + public String typeName() { + return CONTENT_TYPE; + } + + public int precision() { + return precision; + } + + @Override + public ValueFetcher valueFetcher(QueryShardContext context, SearchLookup searchLookup, String format) { + return new SourceValueFetcher(name(), context) { + @Override + protected Object parseSourceValue(Object value) { + // Return the binary sketch data as base64 for readability + if (value instanceof BytesRef) { + return java.util.Base64.getEncoder().encodeToString(((BytesRef) value).bytes); + } + return value; + } + }; + } + + @Override + public IndexFieldData.Builder fielddataBuilder(String fullyQualifiedIndexName, Supplier searchLookup) { + failIfNoDocValues(); + return new HllFieldData.Builder(name(), precision); + } + + @Override + public Query existsQuery(QueryShardContext context) { + return new FieldExistsQuery(name()); + } + + @Override + public Query termQuery(Object value, QueryShardContext context) { + throw new IllegalArgumentException("Term queries are not supported on [hll] fields"); + } + } + + private final int precision; + + private HllFieldMapper(String simpleName, MappedFieldType mappedFieldType, MultiFields multiFields, CopyTo copyTo, int precision) { + super(simpleName, mappedFieldType, multiFields, copyTo); + this.precision = precision; + } + + @Override + protected void parseCreateField(ParseContext context) throws IOException { + // Parse binary HLL++ sketch data + byte[] value = context.parseExternalValue(byte[].class); + if (value == null) { + if (context.parser().currentToken() == XContentParser.Token.VALUE_NULL) { + return; + } else { + value = context.parser().binaryValue(); + } + } + + if (value == null) { + return; + } + + // Validate the sketch data + BytesRef sketchBytes = new BytesRef(value); + validateSketchData(sketchBytes); + + // Store as binary doc value + context.doc().add(new BinaryDocValuesField(fieldType().name(), sketchBytes)); + } + + /** + * Validates that the binary data is a valid HLL++ sketch by attempting to deserialize it. + * + * @param sketchBytes the binary sketch data to validate + * @throws MapperParsingException if the data is not a valid HLL++ sketch + */ + private void validateSketchData(BytesRef sketchBytes) throws MapperParsingException { + try (StreamInput in = new BytesArray(sketchBytes.bytes, sketchBytes.offset, sketchBytes.length).streamInput()) { + AbstractHyperLogLogPlusPlus sketch = AbstractHyperLogLogPlusPlus.readFrom(in, BigArrays.NON_RECYCLING_INSTANCE); + + // Verify the precision matches the field's configured precision + if (sketch.precision() != precision) { + throw new MapperParsingException( + "HLL++ sketch precision mismatch for field [" + + fieldType().name() + + "]: " + + "expected " + + precision + + ", got " + + sketch.precision() + ); + } + + // Close the sketch to release resources + sketch.close(); + } catch (MapperParsingException e) { + throw e; + } catch (Exception e) { + throw new MapperParsingException("Invalid HLL++ sketch data for field [" + fieldType().name() + "]", e); + } + } + + @Override + public HllFieldType fieldType() { + return (HllFieldType) super.fieldType(); + } + + @Override + protected String contentType() { + return CONTENT_TYPE; + } + + @Override + public ParametrizedFieldMapper.Builder getMergeBuilder() { + return new Builder(simpleName()).init(this); + } +} diff --git a/server/src/main/java/org/opensearch/indices/IndicesModule.java b/server/src/main/java/org/opensearch/indices/IndicesModule.java index 2192872c6c752..b3e7950020dff 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesModule.java +++ b/server/src/main/java/org/opensearch/indices/IndicesModule.java @@ -56,6 +56,7 @@ import org.opensearch.index.mapper.FieldNamesFieldMapper; import org.opensearch.index.mapper.FlatObjectFieldMapper; import org.opensearch.index.mapper.GeoPointFieldMapper; +import org.opensearch.index.mapper.HllFieldMapper; import org.opensearch.index.mapper.IdFieldMapper; import org.opensearch.index.mapper.IgnoredFieldMapper; import org.opensearch.index.mapper.IndexFieldMapper; @@ -160,6 +161,7 @@ public static Map getMappers(List mappe } mappers.put(BooleanFieldMapper.CONTENT_TYPE, BooleanFieldMapper.PARSER); mappers.put(BinaryFieldMapper.CONTENT_TYPE, BinaryFieldMapper.PARSER); + mappers.put(HllFieldMapper.CONTENT_TYPE, HllFieldMapper.PARSER); DateFieldMapper.Resolution milliseconds = DateFieldMapper.Resolution.MILLISECONDS; mappers.put(milliseconds.type(), DateFieldMapper.MILLIS_PARSER); DateFieldMapper.Resolution nanoseconds = DateFieldMapper.Resolution.NANOSECONDS; diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorFactory.java index a889f6507b7e8..0ab2cc83ed554 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorFactory.java @@ -32,6 +32,10 @@ package org.opensearch.search.aggregations.metrics; +import org.opensearch.index.fielddata.IndexFieldData; +import org.opensearch.index.fielddata.plain.HllFieldData; +import org.opensearch.index.mapper.HllFieldMapper; +import org.opensearch.index.mapper.MappedFieldType; import org.opensearch.index.query.QueryShardContext; import org.opensearch.search.aggregations.Aggregator; import org.opensearch.search.aggregations.AggregatorFactories; @@ -119,6 +123,17 @@ protected Aggregator doCreateInternal( CardinalityUpperBound cardinality, Map metadata ) throws IOException { + // Use HllCardinalityAggregator for HLL fields + if (config.fieldContext() != null) { + MappedFieldType fieldType = config.fieldContext().fieldType(); + if (fieldType instanceof HllFieldMapper.HllFieldType hllFieldType) { + IndexFieldData indexFieldData = searchContext.getQueryShardContext().getForField(fieldType); + if (indexFieldData instanceof HllFieldData hllFieldData) { + return new HllCardinalityAggregator(name, hllFieldData, hllFieldType.precision(), searchContext, parent, metadata); + } + } + } + if (searchContext.isStreamSearch() && (searchContext.getFlushMode() == null || searchContext.getFlushMode() == FlushMode.PER_SEGMENT)) { return new StreamCardinalityAggregator(name, config, precision(), searchContext, parent, metadata, executionMode); diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/HllCardinalityAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/HllCardinalityAggregator.java new file mode 100644 index 0000000000000..ecca8ef3258b8 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/HllCardinalityAggregator.java @@ -0,0 +1,133 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.aggregations.metrics; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.LeafReaderContext; +import org.opensearch.common.lease.Releasables; +import org.opensearch.common.util.BigArrays; +import org.opensearch.index.fielddata.plain.HllFieldData; +import org.opensearch.search.aggregations.Aggregator; +import org.opensearch.search.aggregations.InternalAggregation; +import org.opensearch.search.aggregations.LeafBucketCollector; +import org.opensearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.Map; + +/** + * An aggregator that computes cardinality from pre-aggregated HLL++ sketch fields. + * This aggregator merges HLL++ sketches stored in documents to produce a combined cardinality estimate. + * + * @opensearch.internal + */ +public class HllCardinalityAggregator extends NumericMetricsAggregator.SingleValue { + + private static final Logger logger = LogManager.getLogger(HllCardinalityAggregator.class); + + private final HllFieldData fieldData; + private final int precision; + private HyperLogLogPlusPlus counts; + + HllCardinalityAggregator( + String name, + HllFieldData fieldData, + int precision, + SearchContext context, + Aggregator parent, + Map metadata + ) throws IOException { + super(name, context, parent, metadata); + this.fieldData = fieldData; + this.precision = precision; + this.counts = null; // Lazy initialization + } + + @Override + public org.apache.lucene.search.ScoreMode scoreMode() { + return org.apache.lucene.search.ScoreMode.COMPLETE_NO_SCORES; + } + + @Override + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { + final HllFieldData.HllLeafFieldData leafData = fieldData.load(ctx); + + return new LeafBucketCollector() { + @Override + public void collect(int doc, long bucket) throws IOException { + AbstractHyperLogLogPlusPlus sketch = null; + try { + sketch = leafData.getSketch(doc); + if (sketch != null) { + // Lazy initialize counts on first sketch + // Initialize with enough capacity for the current bucket + if (counts == null) { + counts = new HyperLogLogPlusPlus(precision, context.bigArrays(), bucket + 1); + } + + // Grow if needed to accommodate this bucket + if (bucket >= counts.maxOrd()) { + HyperLogLogPlusPlus newCounts = new HyperLogLogPlusPlus(precision, context.bigArrays(), bucket + 1); + for (long i = 0; i < counts.maxOrd(); i++) { + if (counts.cardinality(i) > 0) { + newCounts.merge(i, counts, i); + } + } + counts.close(); + counts = newCounts; + } + + // Merge the stored sketch into our aggregation sketch + counts.merge(bucket, sketch, 0); + } + } catch (IllegalArgumentException e) { + // Log precision mismatch or other merge errors + logger.warn( + "Failed to merge HLL++ sketch for field [{}] in document {}: {}", + fieldData.getFieldName(), + doc, + e.getMessage() + ); + throw e; + } finally { + if (sketch != null) { + sketch.close(); + } + } + } + }; + } + + @Override + public double metric(long owningBucketOrd) { + return counts == null ? 0 : counts.cardinality(owningBucketOrd); + } + + @Override + public InternalAggregation buildAggregation(long owningBucketOrdinal) { + if (counts == null || owningBucketOrdinal >= counts.maxOrd() || counts.cardinality(owningBucketOrdinal) == 0) { + return buildEmptyAggregation(); + } + // Build a copy because the returned Aggregation needs to remain usable after + // this Aggregator (and its HLL++ counters) is released + AbstractHyperLogLogPlusPlus copy = counts.clone(owningBucketOrdinal, BigArrays.NON_RECYCLING_INSTANCE); + return new InternalCardinality(name, copy, metadata()); + } + + @Override + public InternalAggregation buildEmptyAggregation() { + return new InternalCardinality(name, null, metadata()); + } + + @Override + protected void doClose() { + Releasables.close(counts); + } +} diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/HyperLogLogPlusPlus.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/HyperLogLogPlusPlus.java index 7ab35eaed785c..f5c8a9cf058bc 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/HyperLogLogPlusPlus.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/HyperLogLogPlusPlus.java @@ -191,7 +191,9 @@ void upgradeToHll(long bucketOrd) { public void merge(long thisBucket, AbstractHyperLogLogPlusPlus other, long otherBucket) { if (precision() != other.precision()) { - throw new IllegalArgumentException(); + throw new IllegalArgumentException( + "Cannot merge HLL++ sketches with different precision: " + precision() + " vs " + other.precision() + ); } hll.ensureCapacity(thisBucket + 1); if (other.getAlgorithm(otherBucket) == LINEAR_COUNTING) { diff --git a/server/src/test/java/org/opensearch/index/mapper/HllFieldMapperIntegrationTests.java b/server/src/test/java/org/opensearch/index/mapper/HllFieldMapperIntegrationTests.java new file mode 100644 index 0000000000000..a15a04c31db7f --- /dev/null +++ b/server/src/test/java/org/opensearch/index/mapper/HllFieldMapperIntegrationTests.java @@ -0,0 +1,396 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.mapper; + +import org.opensearch.action.search.SearchResponse; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.util.BigArrays; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.indices.IndicesService; +import org.opensearch.search.aggregations.bucket.terms.Terms; +import org.opensearch.search.aggregations.metrics.Cardinality; +import org.opensearch.search.aggregations.metrics.HyperLogLogPlusPlus; +import org.opensearch.test.OpenSearchSingleNodeTestCase; + +import java.io.IOException; + +import static org.opensearch.common.util.BitMixer.mix64; +import static org.opensearch.search.aggregations.AggregationBuilders.cardinality; +import static org.opensearch.search.aggregations.AggregationBuilders.terms; +import static org.opensearch.search.aggregations.bucket.terms.Terms.Bucket; + +/** + * Integration tests for HLL field mapper demonstrating programmatic field creation + * and usage patterns for ISM plugin rollup scenarios. + */ +public class HllFieldMapperIntegrationTests extends OpenSearchSingleNodeTestCase { + + public void testProgrammaticFieldCreation() throws IOException { + // Test that HLL fields can be created + + String indexName = "test-hll-index"; + int precision = 12; + + // Create index with HLL field mapping + XContentBuilder mapping = XContentFactory.jsonBuilder() + .startObject() + .startObject("properties") + .startObject("cardinality_sketch") + .field("type", "hll") + .field("precision", precision) + .endObject() + .endObject() + .endObject(); + + client().admin().indices().prepareCreate(indexName).setMapping(mapping).get(); + + // Verify the field was created + MapperService mapperService = getInstanceFromNode(IndicesService.class).indexServiceSafe(resolveIndex(indexName)).mapperService(); + + MappedFieldType fieldType = mapperService.fieldType("cardinality_sketch"); + assertNotNull("HLL field should be created", fieldType); + assertTrue("Field should be HLL type", fieldType instanceof HllFieldMapper.HllFieldType); + + HllFieldMapper.HllFieldType hllFieldType = (HllFieldMapper.HllFieldType) fieldType; + assertEquals("Precision should match", precision, hllFieldType.precision()); + } + + public void testDataIngestion() throws IOException { + // Test ingesting pre-aggregated HLL sketch data (simulating rollup scenario) + + String indexName = "test-rollup-index"; + int precision = 11; + + // Create index with HLL field + XContentBuilder mapping = XContentFactory.jsonBuilder() + .startObject() + .startObject("properties") + .startObject("user_cardinality") + .field("type", "hll") + .field("precision", precision) + .endObject() + .startObject("timestamp") + .field("type", "date") + .endObject() + .endObject() + .endObject(); + + client().admin().indices().prepareCreate(indexName).setMapping(mapping).get(); + + // Create an HLL sketch with some user IDs + HyperLogLogPlusPlus sketch = new HyperLogLogPlusPlus(precision, BigArrays.NON_RECYCLING_INSTANCE, 1); + try { + // Creating the sketch manually here + // Plugins/consumers can further decide how the sketches are to be created or retrieved + for (int userId = 1; userId <= 100; userId++) { + sketch.collect(0, mix64(userId)); + } + + // Serialize the sketch + BytesStreamOutput out = new BytesStreamOutput(); + sketch.writeTo(0, out); + byte[] sketchBytes = out.bytes().toBytesRef().bytes; + + // Index a document with the pre-aggregated sketch + client().prepareIndex(indexName) + .setSource( + XContentFactory.jsonBuilder() + .startObject() + .field("timestamp", "2024-01-01T00:00:00Z") + .field("user_cardinality", sketchBytes) + .endObject() + ) + .get(); + + // Refresh to make the document searchable + client().admin().indices().prepareRefresh(indexName).get(); + + // Verify the document was indexed + long docCount = client().prepareSearch(indexName).setSize(0).get().getHits().getTotalHits().value(); + assertEquals("Document should be indexed", 1L, docCount); + } finally { + sketch.close(); + } + } + + public void testMultipleRollupDocuments() throws IOException { + // Test ingesting multiple rollup documents with HLL + // Testing indexing/retrieving multiple documents as useful for maybe rollup scenario + + String indexName = "test-multi-rollup"; + int precision = 11; + + // Create index + XContentBuilder mapping = XContentFactory.jsonBuilder() + .startObject() + .startObject("properties") + .startObject("hourly_users") + .field("type", "hll") + .field("precision", precision) + .endObject() + .startObject("hour") + .field("type", "date") + .endObject() + .endObject() + .endObject(); + + client().admin().indices().prepareCreate(indexName).setMapping(mapping).get(); + + // Index multiple hourly rollup documents + int numHours = 24; + for (int hour = 0; hour < numHours; hour++) { + HyperLogLogPlusPlus sketch = new HyperLogLogPlusPlus(precision, BigArrays.NON_RECYCLING_INSTANCE, 1); + try { + // Each hour has some unique users + for (int userId = hour * 10; userId < hour * 10 + 50; userId++) { + sketch.collect(0, mix64(userId)); + } + + BytesStreamOutput out = new BytesStreamOutput(); + sketch.writeTo(0, out); + byte[] sketchBytes = out.bytes().toBytesRef().bytes; + + client().prepareIndex(indexName) + .setSource( + XContentFactory.jsonBuilder() + .startObject() + .field("hour", "2024-01-01T" + String.format(java.util.Locale.ROOT, "%02d", hour) + ":00:00Z") + .field("hourly_users", sketchBytes) + .endObject() + ) + .get(); + } finally { + sketch.close(); + } + } + + // Refresh and verify + client().admin().indices().prepareRefresh(indexName).get(); + long docCount = client().prepareSearch(indexName).setSize(0).get().getHits().getTotalHits().value(); + assertEquals("All hourly documents should be indexed", numHours, docCount); + } + + public void testCardinalityAggregationMergesMultipleSketches() throws IOException { + // Test that cardinality aggregation merges multiple HLL sketches correctly + + String indexName = "test-hll-merge-aggregation"; + int precision = 11; + + // Create index + XContentBuilder mapping = XContentFactory.jsonBuilder() + .startObject() + .startObject("properties") + .startObject("hourly_users") + .field("type", "hll") + .field("precision", precision) + .endObject() + .endObject() + .endObject(); + + client().admin().indices().prepareCreate(indexName).setMapping(mapping).get(); + + // Create an expected merged sketch for comparison + HyperLogLogPlusPlus expectedMerged = new HyperLogLogPlusPlus(precision, BigArrays.NON_RECYCLING_INSTANCE, 1); + long expectedCardinality = 500; + + // Create multiple sketches with non-overlapping values + int numDocs = 5; + int valuesPerDoc = 100; + + try { + for (int docIdx = 0; docIdx < numDocs; docIdx++) { + HyperLogLogPlusPlus sketch = new HyperLogLogPlusPlus(precision, BigArrays.NON_RECYCLING_INSTANCE, 1); + try { + for (int i = 0; i < valuesPerDoc; i++) { + long value = (long) docIdx * valuesPerDoc + i; + long hash = mix64(value); + sketch.collect(0, hash); + expectedMerged.collect(0, hash); // Also collect into expected + } + + BytesStreamOutput out = new BytesStreamOutput(); + sketch.writeTo(0, out); + byte[] sketchBytes = out.bytes().toBytesRef().bytes; + + client().prepareIndex(indexName) + .setSource(XContentFactory.jsonBuilder().startObject().field("hourly_users", sketchBytes).endObject()) + .get(); + } finally { + sketch.close(); + } + } + + client().admin().indices().prepareRefresh(indexName).get(); + + // Run cardinality aggregation across all documents + SearchResponse response = client().prepareSearch(indexName) + .addAggregation(cardinality("total_users").field("hourly_users")) + .get(); + + Cardinality cardinalityAgg = response.getAggregations().get("total_users"); + assertNotNull("Cardinality aggregation should return a result", cardinalityAgg); + + long aggregatedCardinality = cardinalityAgg.getValue(); + + // The aggregated cardinality should match the expected cardinality + assertEquals("Aggregation should merge sketches correctly", expectedCardinality, aggregatedCardinality); + + // The aggregated cardinality should match the cardinality from expected expectedMerged + assertEquals("Aggregation should merge sketches correctly", expectedMerged.cardinality(0), aggregatedCardinality); + } finally { + expectedMerged.close(); + } + } + + public void testCardinalityAggregationWithBuckets() throws IOException { + // Test that cardinality aggregation works correctly within bucketed aggregations + + String indexName = "test-hll-bucketed-aggregation"; + int precision = 11; + + // Create index with HLL field and category field + XContentBuilder mapping = XContentFactory.jsonBuilder() + .startObject() + .startObject("properties") + .startObject("category") + .field("type", "keyword") + .endObject() + .startObject("user_sketch") + .field("type", "hll") + .field("precision", precision) + .endObject() + .endObject() + .endObject(); + + client().admin().indices().prepareCreate(indexName).setMapping(mapping).get(); + + // Create documents with different categories + String[] categories = { "electronics", "books", "clothing" }; + int valuesPerCategory = 100; + + for (String category : categories) { + HyperLogLogPlusPlus sketch = new HyperLogLogPlusPlus(precision, BigArrays.NON_RECYCLING_INSTANCE, 1); + try { + // Each category gets unique user IDs + int baseUserId = category.hashCode() & 0x7FFFFFFF; // Positive hash + for (int i = 0; i < valuesPerCategory; i++) { + sketch.collect(0, mix64(baseUserId + i)); + } + + BytesStreamOutput out = new BytesStreamOutput(); + sketch.writeTo(0, out); + byte[] sketchBytes = out.bytes().toBytesRef().bytes; + + client().prepareIndex(indexName) + .setSource( + XContentFactory.jsonBuilder() + .startObject() + .field("category", category) + .field("user_sketch", sketchBytes) + .endObject() + ) + .get(); + } finally { + sketch.close(); + } + } + + client().admin().indices().prepareRefresh(indexName).get(); + + // Run terms aggregation with cardinality sub-aggregation + SearchResponse response = client().prepareSearch(indexName) + .addAggregation(terms("by_category").field("category").subAggregation(cardinality("users_per_category").field("user_sketch"))) + .get(); + + Terms termsAgg = response.getAggregations().get("by_category"); + assertNotNull("Terms aggregation should return results", termsAgg); + assertEquals("Should have 3 category buckets", 3, termsAgg.getBuckets().size()); + + // Verify each bucket has cardinality + for (Bucket bucket : termsAgg.getBuckets()) { + Cardinality cardAgg = bucket.getAggregations().get("users_per_category"); + assertNotNull("Each bucket should have cardinality", cardAgg); + + long bucketCardinality = cardAgg.getValue(); + assertTrue( + "Bucket " + bucket.getKeyAsString() + " should have cardinality around " + valuesPerCategory + ", got " + bucketCardinality, + bucketCardinality > 0 && bucketCardinality <= valuesPerCategory * 1.2 + ); + } + } + + public void testSimpleTwoSketchMerge() throws IOException { + // Simplified test to debug the merge issue + + String indexName = "test-simple-merge"; + int precision = 11; + + // Create index + XContentBuilder mapping = XContentFactory.jsonBuilder() + .startObject() + .startObject("properties") + .startObject("sketch") + .field("type", "hll") + .field("precision", precision) + .endObject() + .endObject() + .endObject(); + + client().admin().indices().prepareCreate(indexName).setMapping(mapping).get(); + + // Create two simple sketches + HyperLogLogPlusPlus sketch1 = new HyperLogLogPlusPlus(precision, BigArrays.NON_RECYCLING_INSTANCE, 1); + HyperLogLogPlusPlus sketch2 = new HyperLogLogPlusPlus(precision, BigArrays.NON_RECYCLING_INSTANCE, 1); + HyperLogLogPlusPlus expected = new HyperLogLogPlusPlus(precision, BigArrays.NON_RECYCLING_INSTANCE, 1); + + try { + // Sketch 1: values 0-99 + for (int i = 0; i < 100; i++) { + long hash = mix64(i); + sketch1.collect(0, hash); + expected.collect(0, hash); + } + + // Sketch 2: values 100-199 + for (int i = 100; i < 200; i++) { + long hash = mix64(i); + sketch2.collect(0, hash); + expected.collect(0, hash); + } + + long expectedCard = expected.cardinality(0); + + // Index both sketches + BytesStreamOutput out1 = new BytesStreamOutput(); + sketch1.writeTo(0, out1); + client().prepareIndex(indexName) + .setSource(XContentFactory.jsonBuilder().startObject().field("sketch", out1.bytes().toBytesRef().bytes).endObject()) + .get(); + + BytesStreamOutput out2 = new BytesStreamOutput(); + sketch2.writeTo(0, out2); + client().prepareIndex(indexName) + .setSource(XContentFactory.jsonBuilder().startObject().field("sketch", out2.bytes().toBytesRef().bytes).endObject()) + .get(); + + client().admin().indices().prepareRefresh(indexName).get(); + + SearchResponse response = client().prepareSearch(indexName).addAggregation(cardinality("test").field("sketch")).get(); + + long result = response.getAggregations().get("test").getValue(); + + assertEquals("Should merge both sketches", expectedCard, result); + } finally { + sketch1.close(); + sketch2.close(); + expected.close(); + } + } +} diff --git a/server/src/test/java/org/opensearch/index/mapper/HllFieldMapperTests.java b/server/src/test/java/org/opensearch/index/mapper/HllFieldMapperTests.java new file mode 100644 index 0000000000000..84c100f5f758b --- /dev/null +++ b/server/src/test/java/org/opensearch/index/mapper/HllFieldMapperTests.java @@ -0,0 +1,552 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.mapper; + +import org.apache.lucene.util.BytesRef; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.util.BigArrays; +import org.opensearch.common.util.BitMixer; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.search.aggregations.metrics.AbstractHyperLogLog; +import org.opensearch.search.aggregations.metrics.AbstractHyperLogLogPlusPlus; +import org.opensearch.search.aggregations.metrics.HyperLogLogPlusPlus; + +import java.io.IOException; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; + +public class HllFieldMapperTests extends MapperTestCase { + + @Override + protected void writeFieldValue(XContentBuilder builder) throws IOException { + // Create a simple HLL++ sketch with default precision + HyperLogLogPlusPlus sketch = new HyperLogLogPlusPlus(HyperLogLogPlusPlus.DEFAULT_PRECISION, BigArrays.NON_RECYCLING_INSTANCE, 1); + try { + // Add some values to the sketch + sketch.collect(0, 1L); + sketch.collect(0, 2L); + sketch.collect(0, 3L); + + // Serialize the sketch + BytesStreamOutput out = new BytesStreamOutput(); + sketch.writeTo(0, out); + builder.value(out.bytes().toBytesRef().bytes); + } finally { + sketch.close(); + } + } + + @Override + protected void minimalMapping(XContentBuilder b) throws IOException { + b.field("type", "hll"); + } + + @Override + protected void registerParameters(ParameterChecker checker) throws IOException { + checker.registerConflictCheck("precision", b -> b.field("precision", 12)); + } + + public void testDefaultMapping() throws Exception { + MapperService mapperService = createMapperService(fieldMapping(this::minimalMapping)); + FieldMapper mapper = (FieldMapper) mapperService.documentMapper().mappers().getMapper("field"); + + assertThat(mapper, instanceOf(HllFieldMapper.class)); + HllFieldMapper hllMapper = (HllFieldMapper) mapper; + assertThat(hllMapper.fieldType().precision(), equalTo(HyperLogLogPlusPlus.DEFAULT_PRECISION)); + } + + public void testPrecisionParameterValidation() { + // Test that precision values outside [4, 18] are rejected + + MapperParsingException e = expectThrows(MapperParsingException.class, () -> createMapperService(fieldMapping(b -> { + b.field("type", "hll"); + b.field("precision", 3); + }))); + assertThat( + e.getMessage(), + containsString("precision must be between " + AbstractHyperLogLog.MIN_PRECISION + " and " + AbstractHyperLogLog.MAX_PRECISION) + ); + + e = expectThrows(MapperParsingException.class, () -> createMapperService(fieldMapping(b -> { + b.field("type", "hll"); + b.field("precision", 19); + }))); + assertThat( + e.getMessage(), + containsString("precision must be between " + AbstractHyperLogLog.MIN_PRECISION + " and " + AbstractHyperLogLog.MAX_PRECISION) + ); + } + + public void testDefaultPrecision() throws Exception { + // Test that when no precision is specified, the default is used + MapperService mapperService = createMapperService(fieldMapping(b -> { b.field("type", "hll"); })); + + FieldMapper mapper = (FieldMapper) mapperService.documentMapper().mappers().getMapper("field"); + assertThat(mapper, instanceOf(HllFieldMapper.class)); + HllFieldMapper hllMapper = (HllFieldMapper) mapper; + assertThat(hllMapper.fieldType().precision(), equalTo(HyperLogLogPlusPlus.DEFAULT_PRECISION)); + } + + public void testCustomPrecision() throws Exception { + // Test that custom precision values are respected + int customPrecision = 14; + MapperService mapperService = createMapperService(fieldMapping(b -> { + b.field("type", "hll"); + b.field("precision", customPrecision); + })); + + FieldMapper mapper = (FieldMapper) mapperService.documentMapper().mappers().getMapper("field"); + assertThat(mapper, instanceOf(HllFieldMapper.class)); + HllFieldMapper hllMapper = (HllFieldMapper) mapper; + assertThat(hllMapper.fieldType().precision(), equalTo(customPrecision)); + } + + public void testTermQueryNotSupported() throws IOException { + MapperService mapperService = createMapperService(fieldMapping(this::minimalMapping)); + MappedFieldType fieldType = mapperService.fieldType("field"); + + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> fieldType.termQuery("value", null)); + assertThat(e.getMessage(), containsString("Term queries are not supported on [hll] fields")); + } + + public void testExistsQuery() throws IOException { + MapperService mapperService = createMapperService(fieldMapping(this::minimalMapping)); + assertExistsQuery(mapperService); + } + + public void testSerializationRoundTrip() throws IOException { + // Test that serializing and deserializing HLL++ sketches preserves cardinality + // This is a property-based test simulated with multiple random sketches + + int[] precisions = { 4, 8, 11, 14, 18 }; // Test various precisions + int[] cardinalities = { 0, 1, 10, 100, 1000, 10000 }; // Test various cardinalities + + for (int precision : precisions) { + for (int cardinality : cardinalities) { + // Create a sketch with the given precision + HyperLogLogPlusPlus originalSketch = new HyperLogLogPlusPlus(precision, BigArrays.NON_RECYCLING_INSTANCE, 1); + try { + // Add values to reach the target cardinality + for (int i = 0; i < cardinality; i++) { + originalSketch.collect(0, i); + } + + long originalCardinality = originalSketch.cardinality(0); + + // Serialize the sketch + BytesStreamOutput out = new BytesStreamOutput(); + originalSketch.writeTo(0, out); + byte[] serialized = out.bytes().toBytesRef().bytes; + + // Create a mapper with the same precision + MapperService mapperService = createMapperService(fieldMapping(b -> { + b.field("type", "hll"); + b.field("precision", precision); + })); + + // Parse a document with the serialized sketch + ParsedDocument doc = mapperService.documentMapper().parse(source(b -> b.field("field", serialized))); + + // Verify the document was created successfully + assertNotNull(doc); + assertNotNull(doc.rootDoc().getBinaryValue("field")); + + // Deserialize and verify cardinality is preserved + byte[] storedBytes = doc.rootDoc().getBinaryValue("field").bytes; + org.opensearch.search.aggregations.metrics.AbstractHyperLogLogPlusPlus deserializedSketch = + org.opensearch.search.aggregations.metrics.AbstractHyperLogLogPlusPlus.readFrom( + new org.opensearch.core.common.bytes.BytesArray(storedBytes).streamInput(), + BigArrays.NON_RECYCLING_INSTANCE + ); + + try { + long deserializedCardinality = deserializedSketch.cardinality(0); + + // Cardinality should be preserved (within HLL++ error bounds) + // For small cardinalities, they should be exact + if (cardinality <= 100) { + assertEquals( + "Cardinality mismatch for precision=" + precision + ", cardinality=" + cardinality, + originalCardinality, + deserializedCardinality + ); + } else { + // For larger cardinalities, allow for HLL++ error + double errorBound = 1.04 / Math.sqrt(1 << precision); + double relativeError = Math.abs(deserializedCardinality - originalCardinality) / (double) cardinality; + assertTrue( + "Relative error " + + relativeError + + " exceeds bound " + + errorBound + + " for precision=" + + precision + + ", cardinality=" + + cardinality, + relativeError <= errorBound * 3 // Allow 3x error bound for safety + ); + } + } finally { + deserializedSketch.close(); + } + } finally { + originalSketch.close(); + } + } + } + } + + public void testInvalidSketchRejection() throws IOException { + MapperService mapperService = createMapperService(fieldMapping(b -> { + b.field("type", "hll"); + b.field("precision", 11); + })); + + // Test 1: Completely invalid binary data + byte[] invalidData = new byte[] { 1, 2, 3, 4, 5 }; + MapperParsingException e1 = expectThrows( + MapperParsingException.class, + () -> mapperService.documentMapper().parse(source(b -> b.field("field", invalidData))) + ); + // The error gets wrapped, so check for the generic parse failure message + assertTrue(e1.getMessage().contains("failed to parse") || e1.getCause() != null); + + // Test 2: Sketch with wrong precision + HyperLogLogPlusPlus wrongPrecisionSketch = new HyperLogLogPlusPlus(14, BigArrays.NON_RECYCLING_INSTANCE, 1); + try { + wrongPrecisionSketch.collect(0, 1L); + BytesStreamOutput out = new BytesStreamOutput(); + wrongPrecisionSketch.writeTo(0, out); + byte[] wrongPrecisionData = out.bytes().toBytesRef().bytes; + + MapperParsingException e2 = expectThrows( + MapperParsingException.class, + () -> mapperService.documentMapper().parse(source(b -> b.field("field", wrongPrecisionData))) + ); + // Check the cause for the precision mismatch message + String fullMessage = e2.getMessage() + (e2.getCause() != null ? e2.getCause().getMessage() : ""); + assertThat(fullMessage, containsString("precision mismatch")); + } finally { + wrongPrecisionSketch.close(); + } + } + + public void testStoredSketchRetrieval() throws IOException { + + int[] precisions = { 4, 8, 11, 14, 18 }; + int[] cardinalities = { 0, 5, 50, 500, 5000 }; + + for (int precision : precisions) { + MapperService mapperService = createMapperService(fieldMapping(b -> { + b.field("type", "hll"); + b.field("precision", precision); + })); + + for (int cardinality : cardinalities) { + // Create and populate a sketch + HyperLogLogPlusPlus originalSketch = new HyperLogLogPlusPlus(precision, BigArrays.NON_RECYCLING_INSTANCE, 1); + try { + for (int i = 0; i < cardinality; i++) { + originalSketch.collect(0, i); + } + + // Serialize the sketch + BytesStreamOutput out = new BytesStreamOutput(); + originalSketch.writeTo(0, out); + byte[] serialized = out.bytes().toBytesRef().bytes; + + // Store the sketch in a document + ParsedDocument doc = mapperService.documentMapper().parse(source(b -> b.field("field", serialized))); + + // Verify the sketch can be retrieved + assertNotNull("Document should be created", doc); + BytesRef storedValue = doc.rootDoc().getBinaryValue("field"); + assertNotNull("Sketch should be stored", storedValue); + + // Verify the retrieved sketch is valid and has the correct cardinality + AbstractHyperLogLogPlusPlus retrievedSketch = AbstractHyperLogLogPlusPlus.readFrom( + new BytesArray(storedValue.bytes, storedValue.offset, storedValue.length).streamInput(), + BigArrays.NON_RECYCLING_INSTANCE + ); + + try { + assertEquals("Precision should match", precision, retrievedSketch.precision()); + long retrievedCardinality = retrievedSketch.cardinality(0); + long originalCardinality = originalSketch.cardinality(0); + assertEquals( + "Cardinality should be preserved for precision=" + precision + ", cardinality=" + cardinality, + originalCardinality, + retrievedCardinality + ); + } finally { + retrievedSketch.close(); + } + } finally { + originalSketch.close(); + } + } + } + } + + public void testMergeProducesUnionCardinality() { + // Test that merging two sketches produces a sketch representing their union + // We test this by collecting the same values into a single sketch and into two separate sketches, + // then verifying the merged result matches the single sketch + + int precision = 11; + int numValues = 1000; + + HyperLogLogPlusPlus single = new HyperLogLogPlusPlus(precision, BigArrays.NON_RECYCLING_INSTANCE, 1); + HyperLogLogPlusPlus sketchA = new HyperLogLogPlusPlus(precision, BigArrays.NON_RECYCLING_INSTANCE, 1); + HyperLogLogPlusPlus sketchB = new HyperLogLogPlusPlus(precision, BigArrays.NON_RECYCLING_INSTANCE, 1); + HyperLogLogPlusPlus merged = new HyperLogLogPlusPlus(precision, BigArrays.NON_RECYCLING_INSTANCE, 1); + + try { + // Collect all values into single sketch + // Split values between sketchA and sketchB + for (int i = 0; i < numValues; i++) { + long hash = BitMixer.mix64(i); + single.collect(0, hash); + if (i < numValues / 2) { + sketchA.collect(0, hash); + } else { + sketchB.collect(0, hash); + } + } + + // Merge the two sketches + merged.merge(0, sketchA, 0); + merged.merge(0, sketchB, 0); + + // Merged cardinality should equal single sketch cardinality + long singleCardinality = single.cardinality(0); + long mergedCardinality = merged.cardinality(0); + + assertEquals("Merged cardinality should equal single sketch cardinality", singleCardinality, mergedCardinality); + } finally { + single.close(); + sketchA.close(); + sketchB.close(); + merged.close(); + } + } + + public void testMergeAccuracy() { + // Test that merging multiple sketches maintains HLL++ accuracy guarantees + // Similar to testMergeProducesUnionCardinality but with more sketches + + int precision = 11; + int numSketches = 5; + int numValues = 1000; + + HyperLogLogPlusPlus single = new HyperLogLogPlusPlus(precision, BigArrays.NON_RECYCLING_INSTANCE, 1); + HyperLogLogPlusPlus[] sketches = new HyperLogLogPlusPlus[numSketches]; + HyperLogLogPlusPlus merged = new HyperLogLogPlusPlus(precision, BigArrays.NON_RECYCLING_INSTANCE, 1); + + try { + // Initialize all sketches + for (int i = 0; i < numSketches; i++) { + sketches[i] = new HyperLogLogPlusPlus(precision, BigArrays.NON_RECYCLING_INSTANCE, 1); + } + + // Collect same values into single sketch and distribute across multiple sketches + for (int i = 0; i < numValues; i++) { + long hash = BitMixer.mix64(i); + single.collect(0, hash); + // Distribute values across sketches + sketches[i % numSketches].collect(0, hash); + } + + // Merge all sketches + for (HyperLogLogPlusPlus sketch : sketches) { + merged.merge(0, sketch, 0); + } + + // Merged cardinality should equal single sketch cardinality + long singleCardinality = single.cardinality(0); + long mergedCardinality = merged.cardinality(0); + + assertEquals("Merged cardinality should equal single sketch cardinality", singleCardinality, mergedCardinality); + } finally { + single.close(); + for (HyperLogLogPlusPlus sketch : sketches) { + if (sketch != null) sketch.close(); + } + merged.close(); + } + } + + public void testMergeIsCommutative() { + // Test that merge(A, B) produces the same cardinality as merge(B, A) + + int[] precisions = { 8, 11, 14 }; + + for (int precision : precisions) { + HyperLogLogPlusPlus sketchA = new HyperLogLogPlusPlus(precision, BigArrays.NON_RECYCLING_INSTANCE, 1); + HyperLogLogPlusPlus sketchB = new HyperLogLogPlusPlus(precision, BigArrays.NON_RECYCLING_INSTANCE, 1); + HyperLogLogPlusPlus mergedAB = new HyperLogLogPlusPlus(precision, BigArrays.NON_RECYCLING_INSTANCE, 1); + HyperLogLogPlusPlus mergedBA = new HyperLogLogPlusPlus(precision, BigArrays.NON_RECYCLING_INSTANCE, 1); + + try { + // Add different values to each sketch + for (int i = 0; i < 100; i++) { + sketchA.collect(0, i); + } + for (int i = 50; i < 150; i++) { + sketchB.collect(0, i); + } + + // Merge in both orders + mergedAB.merge(0, sketchA, 0); + mergedAB.merge(0, sketchB, 0); + + mergedBA.merge(0, sketchB, 0); + mergedBA.merge(0, sketchA, 0); + + // Cardinality should be the same regardless of merge order + long cardinalityAB = mergedAB.cardinality(0); + long cardinalityBA = mergedBA.cardinality(0); + + assertEquals("Merge should be commutative for precision=" + precision, cardinalityAB, cardinalityBA); + } finally { + sketchA.close(); + sketchB.close(); + mergedAB.close(); + mergedBA.close(); + } + } + } + + public void testMergeIsAssociative() { + // Test that merge(merge(A, B), C) produces the same cardinality as merge(A, merge(B, C)) + + int precision = 11; + + HyperLogLogPlusPlus sketchA = new HyperLogLogPlusPlus(precision, BigArrays.NON_RECYCLING_INSTANCE, 1); + HyperLogLogPlusPlus sketchB = new HyperLogLogPlusPlus(precision, BigArrays.NON_RECYCLING_INSTANCE, 1); + HyperLogLogPlusPlus sketchC = new HyperLogLogPlusPlus(precision, BigArrays.NON_RECYCLING_INSTANCE, 1); + HyperLogLogPlusPlus leftAssoc = new HyperLogLogPlusPlus(precision, BigArrays.NON_RECYCLING_INSTANCE, 1); + HyperLogLogPlusPlus rightAssoc = new HyperLogLogPlusPlus(precision, BigArrays.NON_RECYCLING_INSTANCE, 1); + HyperLogLogPlusPlus tempLeft = new HyperLogLogPlusPlus(precision, BigArrays.NON_RECYCLING_INSTANCE, 1); + HyperLogLogPlusPlus tempRight = new HyperLogLogPlusPlus(precision, BigArrays.NON_RECYCLING_INSTANCE, 1); + + try { + // Add different values to each sketch + for (int i = 0; i < 50; i++) { + sketchA.collect(0, i); + } + for (int i = 25; i < 75; i++) { + sketchB.collect(0, i); + } + for (int i = 50; i < 100; i++) { + sketchC.collect(0, i); + } + + // Left associative: merge(merge(A, B), C) + tempLeft.merge(0, sketchA, 0); + tempLeft.merge(0, sketchB, 0); + leftAssoc.merge(0, tempLeft, 0); + leftAssoc.merge(0, sketchC, 0); + + // Right associative: merge(A, merge(B, C)) + tempRight.merge(0, sketchB, 0); + tempRight.merge(0, sketchC, 0); + rightAssoc.merge(0, sketchA, 0); + rightAssoc.merge(0, tempRight, 0); + + // Cardinality should be the same regardless of grouping + long leftCardinality = leftAssoc.cardinality(0); + long rightCardinality = rightAssoc.cardinality(0); + + assertEquals("Merge should be associative", leftCardinality, rightCardinality); + } finally { + sketchA.close(); + sketchB.close(); + sketchC.close(); + leftAssoc.close(); + rightAssoc.close(); + tempLeft.close(); + tempRight.close(); + } + } + + public void testIncompatiblePrecisionHandling() { + // Test that merging sketches with different precision is rejected + + int precision1 = 8; + int precision2 = 14; + + HyperLogLogPlusPlus sketch1 = new HyperLogLogPlusPlus(precision1, BigArrays.NON_RECYCLING_INSTANCE, 1); + HyperLogLogPlusPlus sketch2 = new HyperLogLogPlusPlus(precision2, BigArrays.NON_RECYCLING_INSTANCE, 1); + + try { + sketch1.collect(0, 1L); + sketch2.collect(0, 2L); + + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> sketch1.merge(0, sketch2, 0)); + + assertThat(e.getMessage(), containsString("Cannot merge HLL++ sketches with different precision")); + assertThat(e.getMessage(), containsString(String.valueOf(precision1))); + assertThat(e.getMessage(), containsString(String.valueOf(precision2))); + } finally { + sketch1.close(); + sketch2.close(); + } + } + + public void testSerializationFormatCompatibility() throws IOException { + // Test that HLL++ field uses the same format as cardinality aggregation + // Sketches serialized by cardinality aggregation should be readable by HLL field and vice versa + + int precision = 11; + int cardinality = 500; + + // Create a sketch using the cardinality aggregation code + HyperLogLogPlusPlus cardinalitySketch = new HyperLogLogPlusPlus(precision, BigArrays.NON_RECYCLING_INSTANCE, 1); + try { + for (int i = 0; i < cardinality; i++) { + cardinalitySketch.collect(0, i); + } + + long originalCardinality = cardinalitySketch.cardinality(0); + + // Serialize using cardinality aggregation format + BytesStreamOutput out = new BytesStreamOutput(); + cardinalitySketch.writeTo(0, out); + byte[] serialized = out.bytes().toBytesRef().bytes; + + // Store in HLL field + MapperService mapperService = createMapperService(fieldMapping(b -> { + b.field("type", "hll"); + b.field("precision", precision); + })); + + ParsedDocument doc = mapperService.documentMapper().parse(source(b -> b.field("field", serialized))); + + // Retrieve and deserialize + BytesRef storedBytes = doc.rootDoc().getBinaryValue("field"); + AbstractHyperLogLogPlusPlus retrievedSketch = AbstractHyperLogLogPlusPlus.readFrom( + new BytesArray(storedBytes.bytes, storedBytes.offset, storedBytes.length).streamInput(), + BigArrays.NON_RECYCLING_INSTANCE + ); + + try { + long retrievedCardinality = retrievedSketch.cardinality(0); + assertEquals("Cardinality should be preserved across serialization formats", originalCardinality, retrievedCardinality); + } finally { + retrievedSketch.close(); + } + } finally { + cardinalitySketch.close(); + } + } +} diff --git a/server/src/test/java/org/opensearch/search/aggregations/metrics/HllCardinalityAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/metrics/HllCardinalityAggregatorTests.java new file mode 100644 index 0000000000000..8b137891791fe --- /dev/null +++ b/server/src/test/java/org/opensearch/search/aggregations/metrics/HllCardinalityAggregatorTests.java @@ -0,0 +1 @@ +