|  | 
|  | 1 | +/* | 
|  | 2 | + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | 
|  | 3 | + * or more contributor license agreements. Licensed under the Elastic License | 
|  | 4 | + * 2.0; you may not use this file except in compliance with the Elastic License | 
|  | 5 | + * 2.0. | 
|  | 6 | + */ | 
|  | 7 | + | 
|  | 8 | +package org.elasticsearch.xpack.esql.qa.single_node; | 
|  | 9 | + | 
|  | 10 | +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; | 
|  | 11 | + | 
|  | 12 | +import org.apache.arrow.memory.RootAllocator; | 
|  | 13 | +import org.apache.arrow.vector.IntVector; | 
|  | 14 | +import org.apache.arrow.vector.VarBinaryVector; | 
|  | 15 | +import org.apache.arrow.vector.VarCharVector; | 
|  | 16 | +import org.apache.arrow.vector.VectorSchemaRoot; | 
|  | 17 | +import org.apache.arrow.vector.ipc.ArrowStreamReader; | 
|  | 18 | +import org.apache.arrow.vector.types.pojo.Field; | 
|  | 19 | +import org.apache.arrow.vector.util.VectorSchemaRootAppender; | 
|  | 20 | +import org.elasticsearch.client.Request; | 
|  | 21 | +import org.elasticsearch.client.Response; | 
|  | 22 | +import org.elasticsearch.test.TestClustersThreadFilter; | 
|  | 23 | +import org.elasticsearch.test.cluster.ElasticsearchCluster; | 
|  | 24 | +import org.elasticsearch.test.rest.ESRestTestCase; | 
|  | 25 | +import org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase; | 
|  | 26 | +import org.junit.After; | 
|  | 27 | +import org.junit.AfterClass; | 
|  | 28 | +import org.junit.Before; | 
|  | 29 | +import org.junit.ClassRule; | 
|  | 30 | + | 
|  | 31 | +import java.io.IOException; | 
|  | 32 | +import java.io.InputStream; | 
|  | 33 | +import java.util.List; | 
|  | 34 | + | 
|  | 35 | +@ThreadLeakFilters(filters = TestClustersThreadFilter.class) | 
|  | 36 | +public class ArrowFormatIT extends ESRestTestCase { | 
|  | 37 | + | 
|  | 38 | +    private static final RootAllocator ALLOCATOR = new RootAllocator(); | 
|  | 39 | + | 
|  | 40 | +    @AfterClass | 
|  | 41 | +    public static void afterClass() { | 
|  | 42 | +        ALLOCATOR.close(); | 
|  | 43 | +    } | 
|  | 44 | + | 
|  | 45 | +    @ClassRule | 
|  | 46 | +    public static ElasticsearchCluster cluster = Clusters.testCluster(); | 
|  | 47 | + | 
|  | 48 | +    @Override | 
|  | 49 | +    protected String getTestRestCluster() { | 
|  | 50 | +        return cluster.getHttpAddresses(); | 
|  | 51 | +    } | 
|  | 52 | + | 
|  | 53 | +    @Before | 
|  | 54 | +    @After | 
|  | 55 | +    public void assertRequestBreakerEmpty() throws Exception { | 
|  | 56 | +        EsqlSpecTestCase.assertRequestBreakerEmpty(); | 
|  | 57 | +    } | 
|  | 58 | + | 
|  | 59 | +    @Before | 
|  | 60 | +    public void initIndex() throws IOException { | 
|  | 61 | +        Request request = new Request("PUT", "/arrow-test"); | 
|  | 62 | +        request.setJsonEntity(""" | 
|  | 63 | +            { | 
|  | 64 | +              "mappings": { | 
|  | 65 | +                "properties": { | 
|  | 66 | +                  "value": { | 
|  | 67 | +                    "type": "integer" | 
|  | 68 | +                  }, | 
|  | 69 | +                  "description": { | 
|  | 70 | +                    "type": "keyword" | 
|  | 71 | +                  }, | 
|  | 72 | +                  "ip": { | 
|  | 73 | +                    "type": "ip" | 
|  | 74 | +                  }, | 
|  | 75 | +                  "v": { | 
|  | 76 | +                    "type": "version" | 
|  | 77 | +                  } | 
|  | 78 | +                } | 
|  | 79 | +              } | 
|  | 80 | +            } | 
|  | 81 | +            """); | 
|  | 82 | +        assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode()); | 
|  | 83 | + | 
|  | 84 | +        request = new Request("POST", "/_bulk?index=arrow-test&refresh=true"); | 
|  | 85 | +        // 4 documents with a null in the middle, leading to 3 ESQL pages and 3 Arrow batches | 
|  | 86 | +        request.setJsonEntity(""" | 
|  | 87 | +            {"index": {"_id": "1"}} | 
|  | 88 | +            {"value": 1, "ip": "192.168.0.1", "v": "1.0.1", "description": "number one"} | 
|  | 89 | +            {"index": {"_id": "2"}} | 
|  | 90 | +            {"value": 2, "ip": "192.168.0.2", "v": "1.0.2", "description": "number two"} | 
|  | 91 | +            {"index": {"_id": "3"}} | 
|  | 92 | +            {"value": 3, "ip": "2001:db8::1:0:0:1"} | 
|  | 93 | +            {"index": {"_id": "4"}} | 
|  | 94 | +            {"value": 4, "ip": "::afff:4567:890a", "v": "1.0.4", "description": "number four"} | 
|  | 95 | +            """); | 
|  | 96 | +        assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode()); | 
|  | 97 | +    } | 
|  | 98 | + | 
|  | 99 | +    private VectorSchemaRoot esql(String query) throws IOException { | 
|  | 100 | +        Request request = new Request("POST", "/_query?format=arrow"); | 
|  | 101 | +        request.setJsonEntity(query); | 
|  | 102 | +        Response response = client().performRequest(request); | 
|  | 103 | + | 
|  | 104 | +        assertEquals("application/vnd.apache.arrow.stream", response.getEntity().getContentType().getValue()); | 
|  | 105 | +        return readArrow(response.getEntity().getContent()); | 
|  | 106 | +    } | 
|  | 107 | + | 
|  | 108 | +    public void testInteger() throws Exception { | 
|  | 109 | +        try (VectorSchemaRoot root = esql(""" | 
|  | 110 | +            { | 
|  | 111 | +                "query": "FROM arrow-test | SORT value | LIMIT 100 | KEEP value" | 
|  | 112 | +            }""")) { | 
|  | 113 | +            List<Field> fields = root.getSchema().getFields(); | 
|  | 114 | +            assertEquals(1, fields.size()); | 
|  | 115 | + | 
|  | 116 | +            assertValues(root); | 
|  | 117 | +        } | 
|  | 118 | +    } | 
|  | 119 | + | 
|  | 120 | +    public void testString() throws Exception { | 
|  | 121 | +        try (VectorSchemaRoot root = esql(""" | 
|  | 122 | +            { | 
|  | 123 | +                "query": "FROM arrow-test | SORT value | LIMIT 100 | KEEP description" | 
|  | 124 | +            }""")) { | 
|  | 125 | +            List<Field> fields = root.getSchema().getFields(); | 
|  | 126 | +            assertEquals(1, fields.size()); | 
|  | 127 | + | 
|  | 128 | +            assertDescription(root); | 
|  | 129 | +        } | 
|  | 130 | +    } | 
|  | 131 | + | 
|  | 132 | +    public void testIp() throws Exception { | 
|  | 133 | +        try (VectorSchemaRoot root = esql(""" | 
|  | 134 | +            { | 
|  | 135 | +                "query": "FROM arrow-test | SORT value | LIMIT 100 | KEEP ip" | 
|  | 136 | +            }""")) { | 
|  | 137 | +            List<Field> fields = root.getSchema().getFields(); | 
|  | 138 | +            assertEquals(1, fields.size()); | 
|  | 139 | + | 
|  | 140 | +            assertIp(root); | 
|  | 141 | +        } | 
|  | 142 | +    } | 
|  | 143 | + | 
|  | 144 | +    public void testVersion() throws Exception { | 
|  | 145 | +        try (VectorSchemaRoot root = esql(""" | 
|  | 146 | +            { | 
|  | 147 | +                "query": "FROM arrow-test | SORT value | LIMIT 100 | KEEP v" | 
|  | 148 | +            }""")) { | 
|  | 149 | +            List<Field> fields = root.getSchema().getFields(); | 
|  | 150 | +            assertEquals(1, fields.size()); | 
|  | 151 | + | 
|  | 152 | +            assertVersion(root); | 
|  | 153 | +        } | 
|  | 154 | +    } | 
|  | 155 | + | 
|  | 156 | +    public void testEverything() throws Exception { | 
|  | 157 | +        try (VectorSchemaRoot root = esql(""" | 
|  | 158 | +            { | 
|  | 159 | +                "query": "FROM arrow-test | SORT value | LIMIT 100" | 
|  | 160 | +            }""")) { | 
|  | 161 | +            List<Field> fields = root.getSchema().getFields(); | 
|  | 162 | +            assertEquals(4, fields.size()); | 
|  | 163 | + | 
|  | 164 | +            assertDescription(root); | 
|  | 165 | +            assertValues(root); | 
|  | 166 | +            assertIp(root); | 
|  | 167 | +            assertVersion(root); | 
|  | 168 | +        } | 
|  | 169 | +    } | 
|  | 170 | + | 
|  | 171 | +    private VectorSchemaRoot readArrow(InputStream input) throws IOException { | 
|  | 172 | +        try ( | 
|  | 173 | +            ArrowStreamReader reader = new ArrowStreamReader(input, ALLOCATOR); | 
|  | 174 | +            VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot(); | 
|  | 175 | +        ) { | 
|  | 176 | +            VectorSchemaRoot root = VectorSchemaRoot.create(readerRoot.getSchema(), ALLOCATOR); | 
|  | 177 | +            root.allocateNew(); | 
|  | 178 | + | 
|  | 179 | +            while (reader.loadNextBatch()) { | 
|  | 180 | +                VectorSchemaRootAppender.append(root, readerRoot); | 
|  | 181 | +            } | 
|  | 182 | + | 
|  | 183 | +            return root; | 
|  | 184 | +        } | 
|  | 185 | +    } | 
|  | 186 | + | 
|  | 187 | +    private void assertValues(VectorSchemaRoot root) { | 
|  | 188 | +        var valueVector = (IntVector) root.getVector("value"); | 
|  | 189 | +        assertEquals(1, valueVector.get(0)); | 
|  | 190 | +        assertEquals(2, valueVector.get(1)); | 
|  | 191 | +        assertEquals(3, valueVector.get(2)); | 
|  | 192 | +        assertEquals(4, valueVector.get(3)); | 
|  | 193 | +    } | 
|  | 194 | + | 
|  | 195 | +    private void assertDescription(VectorSchemaRoot root) { | 
|  | 196 | +        var descVector = (VarCharVector) root.getVector("description"); | 
|  | 197 | +        assertEquals("number one", descVector.getObject(0).toString()); | 
|  | 198 | +        assertEquals("number two", descVector.getObject(1).toString()); | 
|  | 199 | +        assertTrue(descVector.isNull(2)); | 
|  | 200 | +        assertEquals("number four", descVector.getObject(3).toString()); | 
|  | 201 | +    } | 
|  | 202 | + | 
|  | 203 | +    private void assertIp(VectorSchemaRoot root) { | 
|  | 204 | +        // Test data that has been transformed during output (ipV4 truncated to 32bits) | 
|  | 205 | +        var ipVector = (VarBinaryVector) root.getVector("ip"); | 
|  | 206 | +        assertArrayEquals(new byte[] { (byte) 192, (byte) 168, 0, 1 }, ipVector.getObject(0)); | 
|  | 207 | +        assertArrayEquals(new byte[] { (byte) 192, (byte) 168, 0, 2 }, ipVector.getObject(1)); | 
|  | 208 | +        assertArrayEquals( | 
|  | 209 | +            new byte[] { 0x20, 0x01, 0x0d, (byte) 0xb8, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01 }, | 
|  | 210 | +            ipVector.getObject(2) | 
|  | 211 | +        ); | 
|  | 212 | +        assertArrayEquals( | 
|  | 213 | +            new byte[] { | 
|  | 214 | +                0x00, | 
|  | 215 | +                0x00, | 
|  | 216 | +                0x00, | 
|  | 217 | +                0x00, | 
|  | 218 | +                0x00, | 
|  | 219 | +                0x00, | 
|  | 220 | +                0x00, | 
|  | 221 | +                0x00, | 
|  | 222 | +                0x00, | 
|  | 223 | +                0x00, | 
|  | 224 | +                (byte) 0xaf, | 
|  | 225 | +                (byte) 0xff, | 
|  | 226 | +                0x45, | 
|  | 227 | +                0x67, | 
|  | 228 | +                (byte) 0x89, | 
|  | 229 | +                0x0A }, | 
|  | 230 | +            ipVector.getObject(3) | 
|  | 231 | +        ); | 
|  | 232 | +    } | 
|  | 233 | + | 
|  | 234 | +    private void assertVersion(VectorSchemaRoot root) { | 
|  | 235 | +        // Version is binary-encoded in ESQL vectors, turned into a string in arrow output | 
|  | 236 | +        var versionVector = (VarCharVector) root.getVector("v"); | 
|  | 237 | +        assertEquals("1.0.1", versionVector.getObject(0).toString()); | 
|  | 238 | +        assertEquals("1.0.2", versionVector.getObject(1).toString()); | 
|  | 239 | +        assertTrue(versionVector.isNull(2)); | 
|  | 240 | +        assertEquals("1.0.4", versionVector.getObject(3).toString()); | 
|  | 241 | +    } | 
|  | 242 | +} | 
0 commit comments