Skip to content

Commit 23ff705

Browse files
committed
Add streaming support to Arrow helper
1 parent 6722177 commit 23ff705

File tree

4 files changed

+154
-42
lines changed

4 files changed

+154
-42
lines changed

docs/helpers.asciidoc

Lines changed: 72 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
[[client-helpers]]
22
== Client helpers
33

4-
The client comes with an handy collection of helpers to give you a more
4+
The client comes with an handy collection of helpers to give you a more
55
comfortable experience with some APIs.
66

7-
CAUTION: The client helpers are experimental, and the API may change in the next
7+
CAUTION: The client helpers are experimental, and the API may change in the next
88
minor releases. The helpers will not work in any Node.js version lower than 10.
99

1010

@@ -14,7 +14,7 @@ minor releases. The helpers will not work in any Node.js version lower than 10.
1414

1515
~Added~ ~in~ ~`v7.7.0`~
1616

17-
Running bulk requests can be complex due to the shape of the API, this helper
17+
Running bulk requests can be complex due to the shape of the API, this helper
1818
aims to provide a nicer developer experience around the Bulk API.
1919

2020

@@ -52,7 +52,7 @@ console.log(result)
5252
// }
5353
----
5454

55-
To create a new instance of the Bulk helper, access it as shown in the example
55+
To create a new instance of the Bulk helper, access it as shown in the example
5656
above, the configuration options are:
5757
[cols=2*]
5858
|===
@@ -83,7 +83,7 @@ const b = client.helpers.bulk({
8383
return {
8484
index: { _index: 'my-index' }
8585
}
86-
}
86+
}
8787
})
8888
----
8989

@@ -94,7 +94,7 @@ a|A function that is called for everytime a document can't be indexed and it has
9494
const b = client.helpers.bulk({
9595
onDrop (doc) {
9696
console.log(doc)
97-
}
97+
}
9898
})
9999
----
100100

@@ -105,7 +105,7 @@ a|A function that is called for each successful operation in the bulk request, w
105105
const b = client.helpers.bulk({
106106
onSuccess ({ result, document }) {
107107
console.log(`SUCCESS: Document ${result.index._id} indexed to ${result.index._index}`)
108-
}
108+
}
109109
})
110110
----
111111

@@ -249,11 +249,11 @@ client.helpers.bulk({
249249
[discrete]
250250
==== Abort a bulk operation
251251

252-
If needed, you can abort a bulk operation at any time. The bulk helper returns a
252+
If needed, you can abort a bulk operation at any time. The bulk helper returns a
253253
https://promisesaplus.com/[thenable], which has an `abort` method.
254254

255-
NOTE: The abort method stops the execution of the bulk operation, but if you
256-
are using a concurrency higher than one, the operations that are already running
255+
NOTE: The abort method stops the execution of the bulk operation, but if you
256+
are using a concurrency higher than one, the operations that are already running
257257
will not be stopped.
258258

259259
[source,js]
@@ -275,7 +275,7 @@ const b = client.helpers.bulk({
275275
},
276276
onDrop (doc) {
277277
b.abort()
278-
}
278+
}
279279
})
280280
281281
console.log(await b)
@@ -285,8 +285,8 @@ console.log(await b)
285285
[discrete]
286286
==== Passing custom options to the Bulk API
287287

288-
You can pass any option supported by the link:
289-
{ref}/docs-bulk.html#docs-bulk-api-query-params[Bulk API] to the helper, and the
288+
You can pass any option supported by the link:
289+
{ref}/docs-bulk.html#docs-bulk-api-query-params[Bulk API] to the helper, and the
290290
helper uses those options in conjunction with the Bulk API call.
291291

292292
[source,js]
@@ -371,10 +371,10 @@ console.log(result)
371371

372372
~Added~ ~in~ ~`v7.8.0`~
373373

374-
If you send search request at a high rate, this helper might be useful
375-
for you. It uses the multi search API under the hood to batch the requests
376-
and improve the overall performances of your application. The `result` exposes a
377-
`documents` property as well, which allows you to access directly the hits
374+
If you send search request at a high rate, this helper might be useful
375+
for you. It uses the multi search API under the hood to batch the requests
376+
and improve the overall performances of your application. The `result` exposes a
377+
`documents` property as well, which allows you to access directly the hits
378378
sources.
379379

380380

@@ -399,7 +399,7 @@ m.search(
399399
.catch(err => console.error(err))
400400
----
401401

402-
To create a new instance of the multi search (msearch) helper, you should access
402+
To create a new instance of the multi search (msearch) helper, you should access
403403
it as shown in the example above, the configuration options are:
404404
[cols=2*]
405405
|===
@@ -459,18 +459,18 @@ const m = client.helpers.msearch({
459459
[discrete]
460460
==== Stopping the msearch helper
461461

462-
If needed, you can stop an msearch processor at any time. The msearch helper
462+
If needed, you can stop an msearch processor at any time. The msearch helper
463463
returns a https://promisesaplus.com/[thenable], which has an `stop` method.
464464

465-
If you are creating multiple msearch helpers instances and using them for a
466-
limitied period of time, remember to always use the `stop` method once you have
465+
If you are creating multiple msearch helpers instances and using them for a
466+
limitied period of time, remember to always use the `stop` method once you have
467467
finished using them, otherwise your application will start leaking memory.
468468

469-
The `stop` method accepts an optional error, that will be dispatched every
469+
The `stop` method accepts an optional error, that will be dispatched every
470470
subsequent search request.
471471

472-
NOTE: The stop method stops the execution of the msearch processor, but if
473-
you are using a concurrency higher than one, the operations that are already
472+
NOTE: The stop method stops the execution of the msearch processor, but if
473+
you are using a concurrency higher than one, the operations that are already
474474
running will not be stopped.
475475

476476
[source,js]
@@ -507,9 +507,9 @@ setImmediate(() => m.stop())
507507

508508
~Added~ ~in~ ~`v7.7.0`~
509509

510-
A simple wrapper around the search API. Instead of returning the entire `result`
511-
object it returns only the search documents source. For improving the
512-
performances, this helper automatically adds `filter_path=hits.hits._source` to
510+
A simple wrapper around the search API. Instead of returning the entire `result`
511+
object it returns only the search documents source. For improving the
512+
performances, this helper automatically adds `filter_path=hits.hits._source` to
513513
the query string.
514514

515515
[source,js]
@@ -535,10 +535,10 @@ for (const doc of documents) {
535535

536536
~Added~ ~in~ ~`v7.7.0`~
537537

538-
This helpers offers a simple and intuitive way to use the scroll search API.
539-
Once called, it returns an
540-
https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/async_function[async iterator]
541-
which can be used in conjuction with a for-await...of. It handles automatically
538+
This helpers offers a simple and intuitive way to use the scroll search API.
539+
Once called, it returns an
540+
https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/async_function[async iterator]
541+
which can be used in conjuction with a for-await...of. It handles automatically
542542
the `429` error and uses the `maxRetries` option of the client.
543543

544544
[source,js]
@@ -576,7 +576,7 @@ for await (const result of scrollSearch) {
576576
[discrete]
577577
==== Quickly getting the documents
578578

579-
If you only need the documents from the result of a scroll search, you can
579+
If you only need the documents from the result of a scroll search, you can
580580
access them via `result.documents`:
581581

582582
[source,js]
@@ -593,9 +593,9 @@ for await (const result of scrollSearch) {
593593

594594
~Added~ ~in~ ~`v7.7.0`~
595595

596-
It works in the same way as the scroll search helper, but it returns only the
597-
documents instead. Note, every loop cycle returns a single document, and you
598-
can't use the `clear` method. For improving the performances, this helper
596+
It works in the same way as the scroll search helper, but it returns only the
597+
documents instead. Note, every loop cycle returns a single document, and you
598+
can't use the `clear` method. For improving the performances, this helper
599599
automatically adds `filter_path=hits.hits._source` to the query string.
600600

601601
[source,js]
@@ -707,3 +707,40 @@ const result = await client.helpers
707707
.esql({ query: 'FROM sample_data | LIMIT 2' })
708708
.toRecords<EventLog>()
709709
----
710+
711+
[discrete]
712+
===== `toArrowReader`
713+
714+
~Added~ ~in~ ~`v8.16.0`~
715+
716+
ES|QL can return results in multiple binary formats, including https://arrow.apache.org/[Apache Arrow]'s streaming format. Because it is a very efficient format to read, it can be valuable for performing high-performance in-memory analytics. And, because the response is streamed as batches of records, it can be used to produce aggregations and other calculations on larger-than-memory data sets.
717+
718+
`toArrowReader` returns a https://arrow.apache.org/docs/js/classes/Arrow_dom.RecordBatchReader.html[`RecordBatchStreamReader`].
719+
720+
[source,ts]
721+
----
722+
const reader = await client.helpers
723+
.esql({ query: 'FROM sample_data' })
724+
.toArrowReader()
725+
726+
for (let recordBatch of reader) {
727+
// prints the first record in each record batch as JSON
728+
console.log(recordBatch.get(0).toJSON())
729+
}
730+
----
731+
732+
[discrete]
733+
===== `toArrowTable`
734+
735+
~Added~ ~in~ ~`v8.16.0`~
736+
737+
If you would like to pull the entire data set in Arrow format but without streaming, you can use the `toArrowTable` helper to get a https://arrow.apache.org/docs/js/classes/Arrow_dom.Table.html[Table] back instead.
738+
739+
[source,ts]
740+
----
741+
const table = await client.helpers
742+
.esql({ query: 'FROM sample_data' })
743+
.toArrowTable()
744+
745+
console.log(table.toArray())
746+
----

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@
8787
"zx": "^7.2.2"
8888
},
8989
"dependencies": {
90-
"@elastic/transport": "^8.9.0",
90+
"@elastic/transport": "^8.9.1",
9191
"@apache-arrow/esnext-cjs": "^17.0.0",
9292
"tslib": "^2.4.0"
9393
},

src/helpers.ts

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import assert from 'node:assert'
2525
import * as timersPromises from 'node:timers/promises'
2626
import { Readable } from 'node:stream'
2727
import { errors, TransportResult, TransportRequestOptions, TransportRequestOptionsWithMeta } from '@elastic/transport'
28-
import { Table, TypeMap, tableFromIPC } from '@apache-arrow/esnext-cjs'
28+
import { Table, TypeMap, tableFromIPC, RecordBatchStreamReader } from '@apache-arrow/esnext-cjs/Arrow.node'
2929
import Client from './client'
3030
import * as T from './api/types'
3131

@@ -156,7 +156,8 @@ export interface EsqlResponse {
156156

157157
export interface EsqlHelper {
158158
toRecords: <TDocument>() => Promise<EsqlToRecords<TDocument>>
159-
toArrow: () => Promise<Table<TypeMap>>
159+
toArrowTable: () => Promise<Table<TypeMap>>
160+
toArrowReader: () => Promise<RecordBatchStreamReader>
160161
}
161162

162163
export interface EsqlToRecords<TDocument> {
@@ -1003,7 +1004,7 @@ export default class Helpers {
10031004
return { records, columns }
10041005
},
10051006

1006-
async toArrow (): Promise<Table<TypeMap>> {
1007+
async toArrowTable (): Promise<Table<TypeMap>> {
10071008
if (metaHeader !== null) {
10081009
reqOptions.headers = reqOptions.headers ?? {}
10091010
reqOptions.headers['x-elastic-client-meta'] = `${metaHeader as string},h=qa`
@@ -1013,6 +1014,19 @@ export default class Helpers {
10131014

10141015
const response = await client.esql.query(params, reqOptions)
10151016
return tableFromIPC(response)
1017+
},
1018+
1019+
async toArrowReader (): Promise<RecordBatchStreamReader> {
1020+
if (metaHeader !== null) {
1021+
reqOptions.headers = reqOptions.headers ?? {}
1022+
reqOptions.headers['x-elastic-client-meta'] = `${metaHeader as string},h=qa`
1023+
reqOptions.asStream = true
1024+
}
1025+
1026+
params.format = 'arrow'
1027+
1028+
const response = await client.esql.query(params, reqOptions)
1029+
return RecordBatchStreamReader.from(response)
10161030
}
10171031
}
10181032

test/unit/helpers/esql.test.ts

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ test('ES|QL helper', t => {
111111
t.end()
112112
})
113113

114-
test('toArrow', t => {
114+
test('toArrowTable', t => {
115115
t.test('Parses a binary response into an Arrow table', async t => {
116116
const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA='
117117

@@ -132,7 +132,7 @@ test('ES|QL helper', t => {
132132
Connection: MockConnection
133133
})
134134

135-
const result = await client.helpers.esql({ query: 'FROM sample_data' }).toArrow()
135+
const result = await client.helpers.esql({ query: 'FROM sample_data' }).toArrowTable()
136136
t.ok(result instanceof Table)
137137

138138
const table = [...result]
@@ -165,7 +165,68 @@ test('ES|QL helper', t => {
165165
Connection: MockConnection
166166
})
167167

168-
await client.helpers.esql({ query: 'FROM sample_data' }).toArrow()
168+
await client.helpers.esql({ query: 'FROM sample_data' }).toArrowTable()
169+
t.end()
170+
})
171+
172+
t.end()
173+
})
174+
175+
test('toArrowReader', t => {
176+
t.test('Parses a binary response into an Arrow stream reader', async t => {
177+
const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA='
178+
179+
const MockConnection = connection.buildMockConnection({
180+
onRequest (_params) {
181+
return {
182+
body: Buffer.from(binaryContent, 'base64'),
183+
statusCode: 200,
184+
headers: {
185+
'content-type': 'application/vnd.elasticsearch+arrow+stream'
186+
}
187+
}
188+
}
189+
})
190+
191+
const client = new Client({
192+
node: 'http://localhost:9200',
193+
Connection: MockConnection
194+
})
195+
196+
const result = await client.helpers.esql({ query: 'FROM sample_data' }).toArrowReader()
197+
t.ok(result.isStream())
198+
199+
const recordBatch = result.next().value
200+
t.same(recordBatch.get(0)?.toJSON(), {
201+
amount: 4.900000095367432,
202+
date: 1729532586965,
203+
})
204+
t.end()
205+
})
206+
207+
t.test('ESQL helper uses correct x-elastic-client-meta helper value', async t => {
208+
const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA='
209+
210+
const MockConnection = connection.buildMockConnection({
211+
onRequest (params) {
212+
const header = params.headers?.['x-elastic-client-meta'] ?? ''
213+
t.ok(header.includes('h=qa'), `Client meta header does not include ESQL helper value: ${header}`)
214+
return {
215+
body: Buffer.from(binaryContent, 'base64'),
216+
statusCode: 200,
217+
headers: {
218+
'content-type': 'application/vnd.elasticsearch+arrow+stream'
219+
}
220+
}
221+
}
222+
})
223+
224+
const client = new Client({
225+
node: 'http://localhost:9200',
226+
Connection: MockConnection
227+
})
228+
229+
await client.helpers.esql({ query: 'FROM sample_data' }).toArrowReader()
169230
t.end()
170231
})
171232

0 commit comments

Comments
 (0)