diff --git a/docs/changelog.asciidoc b/docs/changelog.asciidoc index 734916a27..e2fa3d194 100644 --- a/docs/changelog.asciidoc +++ b/docs/changelog.asciidoc @@ -13,6 +13,11 @@ You can find all the API changes https://www.elastic.co/guide/en/elasticsearch/reference/8.16/release-notes-8.16.0.html[here]. +[discrete] +===== Support Apache Arrow in ES|QL helper + +The ES|QL helper can now return results as an Apache Arrow `Table` or `RecordBatchReader`, which enables high-performance calculations on ES|QL results, even if the response data is larger than the system's available memory. See <> for more information. + [discrete] ==== Fixes diff --git a/docs/connecting.asciidoc b/docs/connecting.asciidoc index 15007ceb3..4646ee5f1 100644 --- a/docs/connecting.asciidoc +++ b/docs/connecting.asciidoc @@ -1,7 +1,7 @@ [[client-connecting]] -== Connecting +== Connecting -This page contains the information you need to connect and use the Client with +This page contains the information you need to connect and use the Client with {es}. **On this page** @@ -19,7 +19,7 @@ This page contains the information you need to connect and use the Client with [discrete] === Authentication -This document contains code snippets to show you how to connect to various {es} +This document contains code snippets to show you how to connect to various {es} providers. @@ -27,18 +27,18 @@ providers. [[auth-ec]] ==== Elastic Cloud -If you are using https://www.elastic.co/cloud[Elastic Cloud], the client offers -an easy way to connect to it via the `cloud` option. You must pass the Cloud ID -that you can find in the cloud console, then your username and password inside +If you are using https://www.elastic.co/cloud[Elastic Cloud], the client offers +an easy way to connect to it via the `cloud` option. You must pass the Cloud ID +that you can find in the cloud console, then your username and password inside the `auth` option. -NOTE: When connecting to Elastic Cloud, the client will automatically enable -both request and response compression by default, since it yields significant -throughput improvements. Moreover, the client will also set the tls option -`secureProtocol` to `TLSv1_2_method` unless specified otherwise. You can still +NOTE: When connecting to Elastic Cloud, the client will automatically enable +both request and response compression by default, since it yields significant +throughput improvements. Moreover, the client will also set the tls option +`secureProtocol` to `TLSv1_2_method` unless specified otherwise. You can still override this option by configuring them. -IMPORTANT: Do not enable sniffing when using Elastic Cloud, since the nodes are +IMPORTANT: Do not enable sniffing when using Elastic Cloud, since the nodes are behind a load balancer, Elastic Cloud will take care of everything for you. Take a look https://www.elastic.co/blog/elasticsearch-sniffing-best-practices-what-when-why-how[here] to know more. @@ -61,18 +61,18 @@ const client = new Client({ [[connect-self-managed-new]] === Connecting to a self-managed cluster -By default {es} will start with security features like authentication and TLS -enabled. To connect to the {es} cluster you'll need to configure the Node.js {es} -client to use HTTPS with the generated CA certificate in order to make requests +By default {es} will start with security features like authentication and TLS +enabled. To connect to the {es} cluster you'll need to configure the Node.js {es} +client to use HTTPS with the generated CA certificate in order to make requests successfully. -If you're just getting started with {es} we recommend reading the documentation -on https://www.elastic.co/guide/en/elasticsearch/reference/current/settings.html[configuring] -and -https://www.elastic.co/guide/en/elasticsearch/reference/current/starting-elasticsearch.html[starting {es}] +If you're just getting started with {es} we recommend reading the documentation +on https://www.elastic.co/guide/en/elasticsearch/reference/current/settings.html[configuring] +and +https://www.elastic.co/guide/en/elasticsearch/reference/current/starting-elasticsearch.html[starting {es}] to ensure your cluster is running as expected. -When you start {es} for the first time you'll see a distinct block like the one +When you start {es} for the first time you'll see a distinct block like the one below in the output from {es} (you may have to scroll up if it's been a while): [source,sh] @@ -90,24 +90,24 @@ below in the output from {es} (you may have to scroll up if it's been a while): ---- -Depending on the circumstances there are two options for verifying the HTTPS -connection, either verifying with the CA certificate itself or via the HTTP CA +Depending on the circumstances there are two options for verifying the HTTPS +connection, either verifying with the CA certificate itself or via the HTTP CA certificate fingerprint. [discrete] [[auth-tls]] ==== TLS configuration -The generated root CA certificate can be found in the `certs` directory in your -{es} config location (`$ES_CONF_PATH/certs/http_ca.crt`). If you're running {es} -in Docker there is +The generated root CA certificate can be found in the `certs` directory in your +{es} config location (`$ES_CONF_PATH/certs/http_ca.crt`). If you're running {es} +in Docker there is https://www.elastic.co/guide/en/elasticsearch/reference/current/docker.html[additional documentation for retrieving the CA certificate]. -Without any additional configuration you can specify `https://` node urls, and -the certificates used to sign these requests will be verified. To turn off -certificate verification, you must specify an `tls` object in the top level -config and set `rejectUnauthorized: false`. The default `tls` values are the -same that Node.js's https://nodejs.org/api/tls.html#tls_tls_connect_options_callback[`tls.connect()`] +Without any additional configuration you can specify `https://` node urls, and +the certificates used to sign these requests will be verified. To turn off +certificate verification, you must specify an `tls` object in the top level +config and set `rejectUnauthorized: false`. The default `tls` values are the +same that Node.js's https://nodejs.org/api/tls.html#tls_tls_connect_options_callback[`tls.connect()`] uses. [source,js] @@ -152,7 +152,7 @@ const client = new Client({ }) ---- -The certificate fingerprint can be calculated using `openssl x509` with the +The certificate fingerprint can be calculated using `openssl x509` with the certificate file: [source,sh] @@ -160,8 +160,8 @@ certificate file: openssl x509 -fingerprint -sha256 -noout -in /path/to/http_ca.crt ---- -If you don't have access to the generated CA file from {es} you can use the -following script to output the root CA fingerprint of the {es} instance with +If you don't have access to the generated CA file from {es} you can use the +following script to output the root CA fingerprint of the {es} instance with `openssl s_client`: [source,sh] @@ -186,8 +186,8 @@ SHA256 Fingerprint=A5:2D:D9:35:11:E8:C6:04:5E:21:F1:66:54:B7:7C:9E:E0:F3:4A:EA:2 WARNING: Running {es} without security enabled is not recommended. -If your cluster is configured with -https://www.elastic.co/guide/en/elasticsearch/reference/current/security-settings.html[security explicitly disabled] +If your cluster is configured with +https://www.elastic.co/guide/en/elasticsearch/reference/current/security-settings.html[security explicitly disabled] then you can connect via HTTP: [source,js] @@ -208,14 +208,14 @@ Following you can find all the supported authentication strategies. [[auth-apikey]] ==== ApiKey authentication -You can use the -{ref-7x}/security-api-create-api-key.html[ApiKey] -authentication by passing the `apiKey` parameter via the `auth` option. The -`apiKey` parameter can be either a base64 encoded string or an object with the -values that you can obtain from the +You can use the +{ref-7x}/security-api-create-api-key.html[ApiKey] +authentication by passing the `apiKey` parameter via the `auth` option. The +`apiKey` parameter can be either a base64 encoded string or an object with the +values that you can obtain from the {ref-7x}/security-api-create-api-key.html[create api key endpoint]. -NOTE: If you provide both basic authentication credentials and the ApiKey +NOTE: If you provide both basic authentication credentials and the ApiKey configuration, the ApiKey takes precedence. [source,js] @@ -268,10 +268,10 @@ const client = new Client({ [[auth-basic]] ==== Basic authentication -You can provide your credentials by passing the `username` and `password` +You can provide your credentials by passing the `username` and `password` parameters via the `auth` option. -NOTE: If you provide both basic authentication credentials and the Api Key +NOTE: If you provide both basic authentication credentials and the Api Key configuration, the Api Key will take precedence. [source,js] @@ -342,14 +342,14 @@ const result = await client.search({ }, { meta: true }) ---- -In this case, the result will be: +In this case, the result will be: [source,ts] ---- { body: object | boolean statusCode: number headers: object - warnings: [string], + warnings: string[], meta: object } ---- @@ -361,7 +361,7 @@ NOTE: The body is a boolean value when you use `HEAD` APIs. If needed, you can abort a running request by using the `AbortController` standard. -CAUTION: If you abort a request, the request will fail with a +CAUTION: If you abort a request, the request will fail with a `RequestAbortedError`. @@ -410,19 +410,23 @@ The supported request specific options are: [cols=2*] |=== |`ignore` -|`[number]` -  HTTP status codes which should not be considered errors for this request. + +|`number[]` -  HTTP status codes which should not be considered errors for this request. + _Default:_ `null` |`requestTimeout` -|`number` - Max request timeout for the request in milliseconds, it overrides the client default. + +|`number | string` - Max request timeout for the request in milliseconds, it overrides the client default. + _Default:_ `30000` +|`retryOnTimeout` +|`boolean` - Retry requests that have timed out. +_Default:_ `false` + |`maxRetries` |`number` - Max number of retries for the request, it overrides the client default. + _Default:_ `3` |`compression` -|`string, boolean` - Enables body compression for the request. + +|`string | boolean` - Enables body compression for the request. + _Options:_ `false`, `'gzip'` + _Default:_ `false` @@ -446,6 +450,10 @@ _Default:_ `null` |`any` - Custom object per request. _(you can use it to pass data to the clients events)_ + _Default:_ `null` +|`opaqueId` +|`string` - Set the `X-Opaque-Id` HTTP header. See {ref}/api-conventions.html#x-opaque-id +_Default:_ `null` + |`maxResponseSize` |`number` - When configured, it verifies that the uncompressed response size is lower than the configured number, if it's higher it will abort the request. It cannot be higher than buffer.constants.MAX_STRING_LENTGH + _Default:_ `null` @@ -458,6 +466,17 @@ _Default:_ `null` |`AbortSignal` - The AbortSignal instance to allow request abortion. + _Default:_ `null` +|`meta` +|`boolean` - Rather than returning the body, return an object containing `body`, `statusCode`, `headers` and `meta` keys + +_Default_: `false` + +|`redaction` +|`object` - Options for redacting potentially sensitive data from error metadata. See <>. + +|`retryBackoff` +|`(min: number, max: number, attempt: number) => number;` - A function that calculates how long to sleep, in seconds, before the next request retry + +_Default:_ A built-in function that uses exponential backoff with jitter. + |=== [discrete] @@ -537,8 +556,8 @@ Resources used to assess these recommendations: ~Added~ ~in~ ~`v7.10.0`~ -If you need to pass through an http(s) proxy for connecting to {es}, the client -out of the box offers a handy configuration for helping you with it. Under the +If you need to pass through an http(s) proxy for connecting to {es}, the client +out of the box offers a handy configuration for helping you with it. Under the hood, it uses the https://github.com/delvedor/hpagent[`hpagent`] module. IMPORTANT: In versions 8.0+ of the client, the default `Connection` type is set to `UndiciConnection`, which does not support proxy configurations. @@ -715,5 +734,5 @@ This pre-flight product check allows the client to establish the version of Elas that it is communicating with. The product check requires one additional HTTP request to be sent to the server as part of the request pipeline before the main API call is sent. In most cases, this will succeed during the very first API call that the client sends. -Once the product check completes, no further product check HTTP requests are sent for +Once the product check completes, no further product check HTTP requests are sent for subsequent API calls. diff --git a/docs/helpers.asciidoc b/docs/helpers.asciidoc index fa8394a9d..cb60dbc51 100644 --- a/docs/helpers.asciidoc +++ b/docs/helpers.asciidoc @@ -1,10 +1,10 @@ [[client-helpers]] == Client helpers -The client comes with an handy collection of helpers to give you a more +The client comes with an handy collection of helpers to give you a more comfortable experience with some APIs. -CAUTION: The client helpers are experimental, and the API may change in the next +CAUTION: The client helpers are experimental, and the API may change in the next minor releases. The helpers will not work in any Node.js version lower than 10. @@ -14,7 +14,7 @@ minor releases. The helpers will not work in any Node.js version lower than 10. ~Added~ ~in~ ~`v7.7.0`~ -Running bulk requests can be complex due to the shape of the API, this helper +Running bulk requests can be complex due to the shape of the API, this helper aims to provide a nicer developer experience around the Bulk API. @@ -52,7 +52,7 @@ console.log(result) // } ---- -To create a new instance of the Bulk helper, access it as shown in the example +To create a new instance of the Bulk helper, access it as shown in the example above, the configuration options are: [cols=2*] |=== @@ -83,7 +83,7 @@ const b = client.helpers.bulk({ return { index: { _index: 'my-index' } } - } + } }) ---- @@ -94,7 +94,7 @@ a|A function that is called for everytime a document can't be indexed and it has const b = client.helpers.bulk({ onDrop (doc) { console.log(doc) - } + } }) ---- @@ -105,7 +105,7 @@ a|A function that is called for each successful operation in the bulk request, w const b = client.helpers.bulk({ onSuccess ({ result, document }) { console.log(`SUCCESS: Document ${result.index._id} indexed to ${result.index._index}`) - } + } }) ---- @@ -249,11 +249,11 @@ client.helpers.bulk({ [discrete] ==== Abort a bulk operation -If needed, you can abort a bulk operation at any time. The bulk helper returns a +If needed, you can abort a bulk operation at any time. The bulk helper returns a https://promisesaplus.com/[thenable], which has an `abort` method. -NOTE: The abort method stops the execution of the bulk operation, but if you -are using a concurrency higher than one, the operations that are already running +NOTE: The abort method stops the execution of the bulk operation, but if you +are using a concurrency higher than one, the operations that are already running will not be stopped. [source,js] @@ -275,7 +275,7 @@ const b = client.helpers.bulk({ }, onDrop (doc) { b.abort() - } + } }) console.log(await b) @@ -285,8 +285,8 @@ console.log(await b) [discrete] ==== Passing custom options to the Bulk API -You can pass any option supported by the link: -{ref}/docs-bulk.html#docs-bulk-api-query-params[Bulk API] to the helper, and the +You can pass any option supported by the link: +{ref}/docs-bulk.html#docs-bulk-api-query-params[Bulk API] to the helper, and the helper uses those options in conjunction with the Bulk API call. [source,js] @@ -371,10 +371,10 @@ console.log(result) ~Added~ ~in~ ~`v7.8.0`~ -If you send search request at a high rate, this helper might be useful -for you. It uses the multi search API under the hood to batch the requests -and improve the overall performances of your application. The `result` exposes a -`documents` property as well, which allows you to access directly the hits +If you send search request at a high rate, this helper might be useful +for you. It uses the multi search API under the hood to batch the requests +and improve the overall performances of your application. The `result` exposes a +`documents` property as well, which allows you to access directly the hits sources. @@ -399,7 +399,7 @@ m.search( .catch(err => console.error(err)) ---- -To create a new instance of the multi search (msearch) helper, you should access +To create a new instance of the multi search (msearch) helper, you should access it as shown in the example above, the configuration options are: [cols=2*] |=== @@ -459,18 +459,18 @@ const m = client.helpers.msearch({ [discrete] ==== Stopping the msearch helper -If needed, you can stop an msearch processor at any time. The msearch helper +If needed, you can stop an msearch processor at any time. The msearch helper returns a https://promisesaplus.com/[thenable], which has an `stop` method. -If you are creating multiple msearch helpers instances and using them for a -limitied period of time, remember to always use the `stop` method once you have +If you are creating multiple msearch helpers instances and using them for a +limitied period of time, remember to always use the `stop` method once you have finished using them, otherwise your application will start leaking memory. -The `stop` method accepts an optional error, that will be dispatched every +The `stop` method accepts an optional error, that will be dispatched every subsequent search request. -NOTE: The stop method stops the execution of the msearch processor, but if -you are using a concurrency higher than one, the operations that are already +NOTE: The stop method stops the execution of the msearch processor, but if +you are using a concurrency higher than one, the operations that are already running will not be stopped. [source,js] @@ -507,9 +507,9 @@ setImmediate(() => m.stop()) ~Added~ ~in~ ~`v7.7.0`~ -A simple wrapper around the search API. Instead of returning the entire `result` -object it returns only the search documents source. For improving the -performances, this helper automatically adds `filter_path=hits.hits._source` to +A simple wrapper around the search API. Instead of returning the entire `result` +object it returns only the search documents source. For improving the +performances, this helper automatically adds `filter_path=hits.hits._source` to the query string. [source,js] @@ -535,10 +535,10 @@ for (const doc of documents) { ~Added~ ~in~ ~`v7.7.0`~ -This helpers offers a simple and intuitive way to use the scroll search API. -Once called, it returns an -https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/async_function[async iterator] -which can be used in conjuction with a for-await...of. It handles automatically +This helpers offers a simple and intuitive way to use the scroll search API. +Once called, it returns an +https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/async_function[async iterator] +which can be used in conjuction with a for-await...of. It handles automatically the `429` error and uses the `maxRetries` option of the client. [source,js] @@ -576,7 +576,7 @@ for await (const result of scrollSearch) { [discrete] ==== Quickly getting the documents -If you only need the documents from the result of a scroll search, you can +If you only need the documents from the result of a scroll search, you can access them via `result.documents`: [source,js] @@ -593,9 +593,9 @@ for await (const result of scrollSearch) { ~Added~ ~in~ ~`v7.7.0`~ -It works in the same way as the scroll search helper, but it returns only the -documents instead. Note, every loop cycle returns a single document, and you -can't use the `clear` method. For improving the performances, this helper +It works in the same way as the scroll search helper, but it returns only the +documents instead. Note, every loop cycle returns a single document, and you +can't use the `clear` method. For improving the performances, this helper automatically adds `filter_path=hits.hits._source` to the query string. [source,js] @@ -707,3 +707,42 @@ const result = await client.helpers .esql({ query: 'FROM sample_data | LIMIT 2' }) .toRecords() ---- + +[discrete] +===== `toArrowReader` + +~Added~ ~in~ ~`v8.16.0`~ + +ES|QL can return results in multiple binary formats, including https://arrow.apache.org/[Apache Arrow]'s streaming format. Because it is a very efficient format to read, it can be valuable for performing high-performance in-memory analytics. And, because the response is streamed as batches of records, it can be used to produce aggregations and other calculations on larger-than-memory data sets. + +`toArrowReader` returns a https://arrow.apache.org/docs/js/classes/Arrow_dom.RecordBatchReader.html[`RecordBatchStreamReader`]. + +[source,ts] +---- +const reader = await client.helpers + .esql({ query: 'FROM sample_data' }) + .toArrowReader() + +// print each record as JSON +for (const recordBatch of reader) { + for (const record of recordBatch) { + console.log(record.toJSON()) + } +} +---- + +[discrete] +===== `toArrowTable` + +~Added~ ~in~ ~`v8.16.0`~ + +If you would like to pull the entire data set in Arrow format but without streaming, you can use the `toArrowTable` helper to get a https://arrow.apache.org/docs/js/classes/Arrow_dom.Table.html[Table] back instead. + +[source,ts] +---- +const table = await client.helpers + .esql({ query: 'FROM sample_data' }) + .toArrowTable() + +console.log(table.toArray()) +---- diff --git a/docs/transport.asciidoc b/docs/transport.asciidoc index 5096616ea..d32606b63 100644 --- a/docs/transport.asciidoc +++ b/docs/transport.asciidoc @@ -1,7 +1,7 @@ [[transport]] === Transport -This class is responsible for performing the request to {es} and handling +This class is responsible for performing the request to {es} and handling errors, it also handles sniffing. [source,js] @@ -20,7 +20,7 @@ const client = new Client({ }) ---- -Sometimes you need to inject a small snippet of your code and then continue to +Sometimes you need to inject a small snippet of your code and then continue to use the usual client code. In such cases, call `super.method`: [source,js] @@ -35,8 +35,39 @@ class MyTransport extends Transport { ==== Supported content types -- `application/json`, in this case the transport will return a plain JavaScript object -- `text/plain`, in this case the transport will return a plain string -- `application/vnd.mapbox-vector-tile`, in this case the transport will return a Buffer -- `application/vnd.elasticsearch+json`, in this case the transport will return a plain JavaScript object +Depending on the `content-type` of the response, the transport will return the body as different types: +[cols="1,1"] +|=== +|Content-Type |JavaScript type + +|`application/json` +|`object` + +|`text/plain` +|`string` + +|`application/vnd.elasticsearch+json` +|`object` + +|`application/vnd.mapbox-vector-tile` +|`Buffer` + +|`application/vnd.apache.arrow.stream` +|`Buffer` + +|`application/vnd.elasticsearch+arrow+stream` +|`Buffer` + +|`application/smile` +|`Buffer` + +|`application/vnd.elasticsearch+smile` +|`Buffer` + +|`application/cbor` +|`Buffer` + +|`application/vnd.elasticsearch+cbor` +|`Buffer` +|=== diff --git a/package.json b/package.json index b562b73a3..001c92e1e 100644 --- a/package.json +++ b/package.json @@ -87,8 +87,8 @@ "zx": "7.2.3" }, "dependencies": { - "@elastic/transport": "^8.9.0", - "@apache-arrow/esnext-cjs": "^17.0.0", + "@elastic/transport": "^8.9.1", + "apache-arrow": "^18.0.0", "tslib": "^2.4.0" }, "tap": { diff --git a/src/helpers.ts b/src/helpers.ts index a54ee0964..16af051b6 100644 --- a/src/helpers.ts +++ b/src/helpers.ts @@ -25,7 +25,7 @@ import assert from 'node:assert' import * as timersPromises from 'node:timers/promises' import { Readable } from 'node:stream' import { errors, TransportResult, TransportRequestOptions, TransportRequestOptionsWithMeta } from '@elastic/transport' -import { Table, TypeMap, tableFromIPC } from '@apache-arrow/esnext-cjs' +import { Table, TypeMap, tableFromIPC, RecordBatchStreamReader } from 'apache-arrow/Arrow.node' import Client from './client' import * as T from './api/types' @@ -156,7 +156,8 @@ export interface EsqlResponse { export interface EsqlHelper { toRecords: () => Promise> - toArrow: () => Promise> + toArrowTable: () => Promise> + toArrowReader: () => Promise } export interface EsqlToRecords { @@ -1003,7 +1004,7 @@ export default class Helpers { return { records, columns } }, - async toArrow (): Promise> { + async toArrowTable (): Promise> { if (metaHeader !== null) { reqOptions.headers = reqOptions.headers ?? {} reqOptions.headers['x-elastic-client-meta'] = `${metaHeader as string},h=qa` @@ -1013,6 +1014,19 @@ export default class Helpers { const response = await client.esql.query(params, reqOptions) return tableFromIPC(response) + }, + + async toArrowReader (): Promise { + if (metaHeader !== null) { + reqOptions.headers = reqOptions.headers ?? {} + reqOptions.headers['x-elastic-client-meta'] = `${metaHeader as string},h=qa` + reqOptions.asStream = true + } + + params.format = 'arrow' + + const response = await client.esql.query(params, reqOptions) + return RecordBatchStreamReader.from(response) } } diff --git a/test/unit/helpers/esql.test.ts b/test/unit/helpers/esql.test.ts index 3685b7c53..c91e3cb03 100644 --- a/test/unit/helpers/esql.test.ts +++ b/test/unit/helpers/esql.test.ts @@ -18,7 +18,7 @@ */ import { test } from 'tap' -import { Table } from '@apache-arrow/esnext-cjs' +import * as arrow from 'apache-arrow' import { connection } from '../../utils' import { Client } from '../../../' @@ -111,7 +111,7 @@ test('ES|QL helper', t => { t.end() }) - test('toArrow', t => { + test('toArrowTable', t => { t.test('Parses a binary response into an Arrow table', async t => { const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA=' @@ -132,8 +132,8 @@ test('ES|QL helper', t => { Connection: MockConnection }) - const result = await client.helpers.esql({ query: 'FROM sample_data' }).toArrow() - t.ok(result instanceof Table) + const result = await client.helpers.esql({ query: 'FROM sample_data' }).toArrowTable() + t.ok(result instanceof arrow.Table) const table = [...result] t.same(table[0], [ @@ -165,7 +165,125 @@ test('ES|QL helper', t => { Connection: MockConnection }) - await client.helpers.esql({ query: 'FROM sample_data' }).toArrow() + await client.helpers.esql({ query: 'FROM sample_data' }).toArrowTable() + t.end() + }) + + t.end() + }) + + test('toArrowReader', t => { + t.test('Parses a binary response into an Arrow stream reader', async t => { + const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA=' + + const MockConnection = connection.buildMockConnection({ + onRequest (_params) { + return { + body: Buffer.from(binaryContent, 'base64'), + statusCode: 200, + headers: { + 'content-type': 'application/vnd.elasticsearch+arrow+stream' + } + } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + + const result = await client.helpers.esql({ query: 'FROM sample_data' }).toArrowReader() + t.ok(result.isStream()) + + const recordBatch = result.next().value + t.same(recordBatch.get(0)?.toJSON(), { + amount: 4.900000095367432, + date: 1729532586965, + }) + t.end() + }) + + t.test('ESQL helper uses correct x-elastic-client-meta helper value', async t => { + const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA=' + + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + const header = params.headers?.['x-elastic-client-meta'] ?? '' + t.ok(header.includes('h=qa'), `Client meta header does not include ESQL helper value: ${header}`) + return { + body: Buffer.from(binaryContent, 'base64'), + statusCode: 200, + headers: { + 'content-type': 'application/vnd.elasticsearch+arrow+stream' + } + } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + + await client.helpers.esql({ query: 'FROM sample_data' }).toArrowReader() + t.end() + }) + + t.test('multi-batch support', async t => { + const intType = new arrow.Uint32 + const floatType = new arrow.Float32 + const schema = new arrow.Schema([ + arrow.Field.new('id', intType), + arrow.Field.new('val', floatType) + ]) + + function getBatch(ids: number[], vals: number[]) { + const id = arrow.makeData({ type: intType, data: ids }) + const val = arrow.makeData({ type: floatType, data: vals }) + return new arrow.RecordBatch({ id, val }) + } + + const batch1 = getBatch([1, 2, 3], [0.1, 0.2, 0.3]) + const batch2 = getBatch([4, 5, 6], [0.4, 0.5, 0.6]) + const batch3 = getBatch([7, 8, 9], [0.7, 0.8, 0.9]) + + const table = new arrow.Table(schema, [ + new arrow.RecordBatch(schema, batch1.data), + new arrow.RecordBatch(schema, batch2.data), + new arrow.RecordBatch(schema, batch3.data), + ]) + + const MockConnection = connection.buildMockConnection({ + onRequest (_params) { + return { + body: Buffer.from(arrow.tableToIPC(table, "stream")), + statusCode: 200, + headers: { + 'content-type': 'application/vnd.elasticsearch+arrow+stream' + } + } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + + const result = await client.helpers.esql({ query: 'FROM sample_data' }).toArrowReader() + t.ok(result.isStream()) + + let counter = 0 + for (const batch of result) { + for (const row of batch) { + counter++ + const { id, val } = row.toJSON() + t.equal(id, counter) + // floating points are hard in JS + t.equal((Math.round(val * 10) / 10).toFixed(1), (counter * 0.1).toFixed(1)) + } + } t.end() })