Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public enum BuiltinFunctionName {
MVAPPEND(FunctionName.of("mvappend")),
MVJOIN(FunctionName.of("mvjoin")),
MVINDEX(FunctionName.of("mvindex")),
MVZIP(FunctionName.of("mvzip")),
FORALL(FunctionName.of("forall")),
EXISTS(FunctionName.of("exists")),
FILTER(FunctionName.of("filter")),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.expression.function.CollectionUDF;

import java.util.ArrayList;
import java.util.List;

/** Core logic for `mvzip` command to combine two multivalue fields pairwise */
public class MVZipCore {

/**
* Combines values from two multivalue fields pairwise with a delimiter.
*
* <p>This function zips together two fields by combining the first value of left with the first
* value of right, the second with the second, and so on, up to the length of the shorter field.
*
* <p>Behavior:
*
* <ul>
* <li>Returns null if either left or right is null
* <li>Treats scalar values as single-element arrays
* <li>Stops at the length of the shorter field (like Python's zip)
* <li>Uses the provided delimiter to join values (default: ",")
* </ul>
*
* @param left The left multivalue field or scalar value
* @param right The right multivalue field or scalar value
* @param delimiter The delimiter to use for joining values
* @return A list of combined values, or null if either input is null
*/
public static List<Object> zipElements(Object left, Object right, String delimiter) {
// Return null if either field is null
if (left == null || right == null) {
return null;
}

// Convert inputs to lists (treating scalars as single-element arrays)
List<?> leftList = toList(left);
List<?> rightList = toList(right);

// Create result list
List<Object> result = new ArrayList<>();

// Zip up to the shorter length
int minLength = Math.min(leftList.size(), rightList.size());
for (int i = 0; i < minLength; i++) {
Object leftValue = leftList.get(i);
Object rightValue = rightList.get(i);

// Combine the values with the delimiter
String combined = leftValue + delimiter + rightValue;
result.add(combined);
}

return result.isEmpty() ? null : result;
}

/**
* Converts an object to a list. If the object is already a list, returns it as-is. Otherwise,
* wraps the object in a single-element list.
*/
private static List<?> toList(Object obj) {
if (obj instanceof List) {
return (List<?>) obj;
} else {
return List.of(obj);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.expression.function.CollectionUDF;

import static org.apache.calcite.sql.type.SqlTypeUtil.createArrayType;

import java.util.List;
import org.apache.calcite.adapter.enumerable.NotNullImplementor;
import org.apache.calcite.adapter.enumerable.NullPolicy;
import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.linq4j.tree.Types;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.sql.type.SqlReturnTypeInference;
import org.apache.calcite.sql.type.SqlTypeName;
import org.opensearch.sql.expression.function.ImplementorUDF;
import org.opensearch.sql.expression.function.UDFOperandMetadata;

/**
* MVZip function that combines two multivalue fields pairwise with a delimiter. Returns an array of
* strings or null if either input is null.
*/
public class MVZipFunctionImpl extends ImplementorUDF {

public MVZipFunctionImpl() {
super(new MVZipImplementor(), NullPolicy.ALL);
}

@Override
public SqlReturnTypeInference getReturnTypeInference() {
return sqlOperatorBinding -> {
RelDataTypeFactory typeFactory = sqlOperatorBinding.getTypeFactory();

// mvzip returns an array of VARCHAR (strings)
RelDataType elementType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
return createArrayType(
typeFactory, typeFactory.createTypeWithNullability(elementType, true), true);
};
}

@Override
public UDFOperandMetadata getOperandMetadata() {
return null;
}

public static class MVZipImplementor implements NotNullImplementor {
@Override
public Expression implement(
RexToLixTranslator translator, RexCall call, List<Expression> translatedOperands) {
// Handle both 2-argument (with default delimiter) and 3-argument cases
if (translatedOperands.size() == 2) {
// mvzip(left, right) - use default delimiter ","
return Expressions.call(
Types.lookupMethod(
MVZipFunctionImpl.class, "mvzip", Object.class, Object.class, String.class),
translatedOperands.get(0),
translatedOperands.get(1),
Expressions.constant(","));
} else if (translatedOperands.size() == 3) {
// mvzip(left, right, delimiter)
return Expressions.call(
Types.lookupMethod(
MVZipFunctionImpl.class, "mvzip", Object.class, Object.class, String.class),
translatedOperands.get(0),
translatedOperands.get(1),
translatedOperands.get(2));
} else {
throw new IllegalArgumentException(
"mvzip expects 2 or 3 arguments, got " + translatedOperands.size());
}
}
}

/**
* Entry point for mvzip function.
*
* @param left The left multivalue field or scalar value
* @param right The right multivalue field or scalar value
* @param delimiter The delimiter to use for joining values (default: ",")
* @return A list of combined values, or null if either input is null
*/
public static Object mvzip(Object left, Object right, String delimiter) {
return MVZipCore.zipElements(left, right, delimiter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.sql.expression.function.CollectionUDF.FilterFunctionImpl;
import org.opensearch.sql.expression.function.CollectionUDF.ForallFunctionImpl;
import org.opensearch.sql.expression.function.CollectionUDF.MVAppendFunctionImpl;
import org.opensearch.sql.expression.function.CollectionUDF.MVZipFunctionImpl;
import org.opensearch.sql.expression.function.CollectionUDF.MapAppendFunctionImpl;
import org.opensearch.sql.expression.function.CollectionUDF.MapRemoveFunctionImpl;
import org.opensearch.sql.expression.function.CollectionUDF.ReduceFunctionImpl;
Expand Down Expand Up @@ -391,6 +392,7 @@ public class PPLBuiltinOperators extends ReflectiveSqlOperatorTable {
public static final SqlOperator MAP_APPEND = new MapAppendFunctionImpl().toUDF("map_append");
public static final SqlOperator MAP_REMOVE = new MapRemoveFunctionImpl().toUDF("MAP_REMOVE");
public static final SqlOperator MVAPPEND = new MVAppendFunctionImpl().toUDF("mvappend");
public static final SqlOperator MVZIP = new MVZipFunctionImpl().toUDF("mvzip");
public static final SqlOperator FILTER = new FilterFunctionImpl().toUDF("filter");
public static final SqlOperator TRANSFORM = new TransformFunctionImpl().toUDF("transform");
public static final SqlOperator REDUCE = new ReduceFunctionImpl().toUDF("reduce");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MVAPPEND;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MVINDEX;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MVJOIN;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MVZIP;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.NOT;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.NOTEQUAL;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.NOW;
Expand Down Expand Up @@ -989,6 +990,7 @@ void populate() {

registerOperator(ARRAY, PPLBuiltinOperators.ARRAY);
registerOperator(MVAPPEND, PPLBuiltinOperators.MVAPPEND);
registerOperator(MVZIP, PPLBuiltinOperators.MVZIP);
registerOperator(MAP_APPEND, PPLBuiltinOperators.MAP_APPEND);
registerOperator(MAP_CONCAT, SqlLibraryOperators.MAP_CONCAT);
registerOperator(MAP_REMOVE, PPLBuiltinOperators.MAP_REMOVE);
Expand Down
42 changes: 42 additions & 0 deletions docs/user/ppl/functions/collection.rst
Original file line number Diff line number Diff line change
Expand Up @@ -356,3 +356,45 @@ Example::
| [alex,celestino,claudia] |
+--------------------------+


MVZIP
-----

Description
>>>>>>>>>>>

Usage: mvzip(mv_left, mv_right, [delim]) combines the values in two multivalue fields. The delimiter is used to specify a delimiting character to join the two values. This is similar to the Python zip command.

The values are stitched together combining the first value of mv_left with the first value of mv_right, then the second with the second, and so on. The function stops at the length of the shorter field.

The delimiter is optional. When specified, it must be enclosed in quotation marks. The default delimiter is a comma ( , ).

Argument type: mv_left: ANY, mv_right: ANY, delim: STRING (optional)

Return type: ARRAY

Example::

os> source=people | eval hosts = array('host1', 'host2'), ports = array(80, 443), nserver = mvzip(hosts, ports) | fields nserver | head 1
fetched rows / total rows = 1/1
+----------------------+
| nserver |
|----------------------|
| [host1,80,host2,443] |
+----------------------+

os> source=people | eval arr1 = array('a', 'b', 'c'), arr2 = array('x', 'y', 'z'), result = mvzip(arr1, arr2, '|') | fields result | head 1
fetched rows / total rows = 1/1
+---------------+
| result |
|---------------|
| [a|x,b|y,c|z] |
+---------------+

os> source=people | eval arr1 = array(1, 2, 3), arr2 = array('a', 'b'), result = mvzip(arr1, arr2) | fields result | head 1
fetched rows / total rows = 1/1
+-----------+
| result |
|-----------|
| [1,a,2,b] |
+-----------+
Original file line number Diff line number Diff line change
Expand Up @@ -489,4 +489,60 @@ public void testMvindexRangeSingleElement() throws IOException {
verifySchema(actual, schema("result", "array"));
verifyDataRows(actual, rows(List.of(3)));
}

@Test
public void testMvzipBasic() throws IOException {
// Basic example from spec: eval nserver=mvzip(hosts,ports)
JSONObject actual =
executeQuery(
String.format(
"source=%s | eval hosts = array('host1', 'host2'), ports = array(80, 443), nserver"
+ " = mvzip(hosts, ports) | head 1 | fields nserver",
TEST_INDEX_BANK));

verifySchema(actual, schema("nserver", "array"));
verifyDataRows(actual, rows(List.of("host1,80", "host2,443")));
}

@Test
public void testMvzipWithCustomDelimiter() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | eval arr1 = array('a', 'b', 'c'), arr2 = array('x', 'y', 'z'), result"
+ " = mvzip(arr1, arr2, '|') | head 1 | fields result",
TEST_INDEX_BANK));

verifySchema(actual, schema("result", "array"));
verifyDataRows(actual, rows(List.of("a|x", "b|y", "c|z")));
}

@Test
public void testMvzipNested() throws IOException {
// Example from spec: mvzip(mvzip(field1,field2,"|"),field3,"|")
JSONObject actual =
executeQuery(
String.format(
"source=%s | eval field1 = array('a', 'b'), field2 = array('c', 'd'), field3 ="
+ " array('e', 'f'), result = mvzip(mvzip(field1, field2, '|'), field3, '|') |"
+ " head 1 | fields result",
TEST_INDEX_BANK));

verifySchema(actual, schema("result", "array"));
verifyDataRows(actual, rows(List.of("a|c|e", "b|d|f")));
}

@Test
public void testMvzipWithNull() throws IOException {
// When either input is null, result should be null
JSONObject actual =
executeQuery(
String.format(
"source=%s | eval result = mvzip(nullif(1, 1), array('test')) | head 1 | fields"
+ " result",
TEST_INDEX_BANK));

verifySchema(actual, schema("result", "array"));
verifyDataRows(actual, rows((Object) null));
}
}
1 change: 1 addition & 0 deletions ppl/src/main/antlr/OpenSearchPPLLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ ARRAY_LENGTH: 'ARRAY_LENGTH';
MVAPPEND: 'MVAPPEND';
MVJOIN: 'MVJOIN';
MVINDEX: 'MVINDEX';
MVZIP: 'MVZIP';
FORALL: 'FORALL';
FILTER: 'FILTER';
TRANSFORM: 'TRANSFORM';
Expand Down
1 change: 1 addition & 0 deletions ppl/src/main/antlr/OpenSearchPPLParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,7 @@ collectionFunctionName
| MVAPPEND
| MVJOIN
| MVINDEX
| MVZIP
| FORALL
| EXISTS
| FILTER
Expand Down
Loading
Loading