1717import org .apache .arrow .vector .ipc .message .MessageSerializer ;
1818import org .apache .arrow .vector .types .Types .MinorType ;
1919import org .apache .arrow .vector .types .pojo .Field ;
20+ import org .apache .arrow .vector .types .pojo .FieldType ;
2021import org .apache .arrow .vector .types .pojo .Schema ;
2122import org .apache .lucene .util .BytesRef ;
2223import org .elasticsearch .action .ActionListener ;
@@ -44,6 +45,7 @@ public class ArrowResponse implements ChunkedRestResponseBodyPart, Releasable {
4445 public static class Column {
4546 private final BlockConverter converter ;
4647 private final String name ;
48+ private boolean multivalued ;
4749
4850 public Column (String esqlType , String name ) {
4951 this .converter = ESQL_CONVERTERS .get (esqlType );
@@ -61,20 +63,24 @@ public Column(String esqlType, String name) {
6163 public ArrowResponse (List <Column > columns , List <Page > pages ) {
6264 this .columns = columns ;
6365
66+ // Find multivalued columns
67+ int colSize = columns .size ();
68+ for (int col = 0 ; col < colSize ; col ++) {
69+ for (Page page : pages ) {
70+ if (page .getBlock (col ).mayHaveMultivaluedFields ()) {
71+ columns .get (col ).multivalued = true ;
72+ break ;
73+ }
74+ }
75+ }
76+
6477 currentSegment = new SchemaResponse (this );
6578 List <ResponseSegment > rest = new ArrayList <>(pages .size ());
66- for ( int p = 0 ; p < pages . size (); p ++) {
67- var page = pages . get ( p );
79+
80+ for ( Page page : pages ) {
6881 rest .add (new PageResponse (this , page ));
69- // Multivalued fields are not supported yet.
70- for (int b = 0 ; b < page .getBlockCount (); b ++) {
71- if (page .getBlock (b ).mayHaveMultivaluedFields ()) {
72- throw new IllegalArgumentException (
73- "ES|QL response field [" + columns .get (b ).name + "] is multi-valued. This isn't supported yet by the Arrow format"
74- );
75- }
76- }
7782 }
83+
7884 rest .add (new EndResponse (this ));
7985 segments = rest .iterator ();
8086 }
@@ -185,6 +191,9 @@ public void close() {}
185191 * @see <a href="https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format">IPC Streaming Format</a>
186192 */
187193 private static class SchemaResponse extends ResponseSegment {
194+
195+ private static final FieldType LIST_FIELD_TYPE = FieldType .nullable (MinorType .LIST .getType ());
196+
188197 private boolean done = false ;
189198
190199 SchemaResponse (ArrowResponse response ) {
@@ -204,7 +213,20 @@ protected void encodeChunk(int sizeHint, RecyclerBytesStreamOutput out) throws I
204213 }
205214
206215 private Schema arrowSchema () {
207- return new Schema (response .columns .stream ().map (c -> new Field (c .name , c .converter .arrowFieldType (), List .of ())).toList ());
216+ return new Schema (response .columns .stream ().map (c -> {
217+ var fieldType = c .converter .arrowFieldType ();
218+ if (c .multivalued ) {
219+ // A variable-sized list is a vector of offsets and a child vector of values
220+ // See https://arrow.apache.org/docs/format/Columnar.html#variable-size-list-layout
221+ var listType = new FieldType (true , LIST_FIELD_TYPE .getType (), null , fieldType .getMetadata ());
222+ // Value vector is non-nullable (ES|QL multivalues cannot contain nulls).
223+ var valueType = new FieldType (false , fieldType .getType (), fieldType .getDictionary (), null );
224+ // The nested vector is named "$data$", following what the Arrow/Java library does.
225+ return new Field (c .name , listType , List .of (new Field ("$data$" , valueType , null )));
226+ } else {
227+ return new Field (c .name , fieldType , null );
228+ }
229+ }).toList ());
208230 }
209231 }
210232
@@ -257,7 +279,14 @@ protected void encodeChunk(int sizeHint, RecyclerBytesStreamOutput out) throws I
257279
258280 @ Override
259281 public void write (ArrowBuf buffer ) throws IOException {
260- extraPosition += bufWriters .get (bufIdx ++).write (out );
282+ var len = bufWriters .get (bufIdx ++).write (out );
283+ // Consistency check
284+ if (len != buffer .writerIndex ()) {
285+ throw new IllegalStateException (
286+ "Buffer [" + (bufIdx - 1 ) + "]: wrote [" + len + "] bytes, but expected [" + buffer .writerIndex () + "]"
287+ );
288+ }
289+ extraPosition += len ;
261290 }
262291
263292 @ Override
@@ -277,11 +306,26 @@ public long align() throws IOException {
277306
278307 // Create Arrow buffers for each of the blocks in this page
279308 for (int b = 0 ; b < page .getBlockCount (); b ++) {
280- var converter = response .columns .get (b ).converter ;
309+ var column = response .columns .get (b );
310+ var converter = column .converter ;
281311
282312 Block block = page .getBlock (b );
283- nodes .add (new ArrowFieldNode (block .getPositionCount (), converter .nullValuesCount (block )));
284- converter .convert (block , bufs , bufWriters );
313+ if (column .multivalued ) {
314+ // List node.
315+ nodes .add (new ArrowFieldNode (block .getPositionCount (), converter .nullValuesCount (block )));
316+ // Value vector, does not contain nulls.
317+ nodes .add (new ArrowFieldNode (BlockConverter .valueCount (block ), 0 ));
318+ } else {
319+ nodes .add (new ArrowFieldNode (block .getPositionCount (), converter .nullValuesCount (block )));
320+ }
321+ converter .convert (block , column .multivalued , bufs , bufWriters );
322+ }
323+
324+ // Consistency check
325+ if (bufs .size () != bufWriters .size ()) {
326+ throw new IllegalStateException (
327+ "Inconsistent Arrow buffers: [" + bufs .size () + "] buffers and [" + bufWriters .size () + "] writers"
328+ );
285329 }
286330
287331 // Create the batch and serialize it
0 commit comments