Skip to content

Commit f62d73a

Browse files
authored
Merge branch 'main' into prototype-poison
2 parents 8e015b0 + 490538c commit f62d73a

File tree

4 files changed

+220
-7
lines changed

4 files changed

+220
-7
lines changed

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
},
8282
"dependencies": {
8383
"@elastic/transport": "^8.9.1",
84+
"apache-arrow": "^18.0.0",
8485
"tslib": "^2.5.0"
8586
},
8687
"tap": {

src/helpers.ts

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import assert from 'node:assert'
2525
import * as timersPromises from 'node:timers/promises'
2626
import { Readable } from 'node:stream'
2727
import { errors, TransportResult, TransportRequestOptions, TransportRequestOptionsWithMeta } from '@elastic/transport'
28+
import { Table, TypeMap, tableFromIPC, RecordBatchStreamReader } from 'apache-arrow/Arrow.node'
2829
import Client from './client'
2930
import * as T from './api/types'
3031

@@ -154,6 +155,8 @@ export interface EsqlResponse {
154155

155156
export interface EsqlHelper {
156157
toRecords: <TDocument>() => Promise<EsqlToRecords<TDocument>>
158+
toArrowTable: () => Promise<Table<TypeMap>>
159+
toArrowReader: () => Promise<RecordBatchStreamReader>
157160
}
158161

159162
export interface EsqlToRecords<TDocument> {
@@ -955,11 +958,6 @@ export default class Helpers {
955958
* @returns {object} EsqlHelper instance
956959
*/
957960
esql (params: T.EsqlQueryRequest, reqOptions: TransportRequestOptions = {}): EsqlHelper {
958-
if (this[kMetaHeader] !== null) {
959-
reqOptions.headers = reqOptions.headers ?? {}
960-
reqOptions.headers['x-elastic-client-meta'] = `${this[kMetaHeader] as string},h=qo`
961-
}
962-
963961
const client = this[kClient]
964962

965963
function toRecords<TDocument> (response: EsqlResponse): TDocument[] {
@@ -975,17 +973,50 @@ export default class Helpers {
975973
})
976974
}
977975

976+
const metaHeader = this[kMetaHeader]
977+
978978
const helper: EsqlHelper = {
979979
/**
980980
* Pivots ES|QL query results into an array of row objects, rather than the default format where each row is an array of values.
981981
*/
982982
async toRecords<TDocument>(): Promise<EsqlToRecords<TDocument>> {
983+
if (metaHeader !== null) {
984+
reqOptions.headers = reqOptions.headers ?? {}
985+
reqOptions.headers['x-elastic-client-meta'] = `${metaHeader as string},h=qo`
986+
}
987+
983988
params.format = 'json'
989+
params.columnar = false
984990
// @ts-expect-error it's typed as ArrayBuffer but we know it will be JSON
985991
const response: EsqlResponse = await client.esql.query(params, reqOptions)
986992
const records: TDocument[] = toRecords(response)
987993
const { columns } = response
988994
return { records, columns }
995+
},
996+
997+
async toArrowTable (): Promise<Table<TypeMap>> {
998+
if (metaHeader !== null) {
999+
reqOptions.headers = reqOptions.headers ?? {}
1000+
reqOptions.headers['x-elastic-client-meta'] = `${metaHeader as string},h=qa`
1001+
}
1002+
1003+
params.format = 'arrow'
1004+
1005+
const response = await client.esql.query(params, reqOptions)
1006+
return tableFromIPC(response)
1007+
},
1008+
1009+
async toArrowReader (): Promise<RecordBatchStreamReader> {
1010+
if (metaHeader !== null) {
1011+
reqOptions.headers = reqOptions.headers ?? {}
1012+
reqOptions.headers['x-elastic-client-meta'] = `${metaHeader as string},h=qa`
1013+
reqOptions.asStream = true
1014+
}
1015+
1016+
params.format = 'arrow'
1017+
1018+
const response = await client.esql.query(params, reqOptions)
1019+
return RecordBatchStreamReader.from(response)
9891020
}
9901021
}
9911022

test/unit/helpers/esql.test.ts

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919

2020
import { test } from 'tap'
21+
import * as arrow from 'apache-arrow'
2122
import { connection } from '../../utils'
2223
import { Client } from '../../../'
2324

@@ -109,5 +110,184 @@ test('ES|QL helper', t => {
109110

110111
t.end()
111112
})
113+
114+
test('toArrowTable', t => {
115+
t.test('Parses a binary response into an Arrow table', async t => {
116+
const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA='
117+
118+
const MockConnection = connection.buildMockConnection({
119+
onRequest (_params) {
120+
return {
121+
body: Buffer.from(binaryContent, 'base64'),
122+
statusCode: 200,
123+
headers: {
124+
'content-type': 'application/vnd.elasticsearch+arrow+stream'
125+
}
126+
}
127+
}
128+
})
129+
130+
const client = new Client({
131+
node: 'http://localhost:9200',
132+
Connection: MockConnection
133+
})
134+
135+
const result = await client.helpers.esql({ query: 'FROM sample_data' }).toArrowTable()
136+
t.ok(result instanceof arrow.Table)
137+
138+
const table = [...result]
139+
t.same(table[0], [
140+
["amount", 4.900000095367432],
141+
["date", 1729532586965],
142+
])
143+
t.end()
144+
})
145+
146+
t.test('ESQL helper uses correct x-elastic-client-meta helper value', async t => {
147+
const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA='
148+
149+
const MockConnection = connection.buildMockConnection({
150+
onRequest (params) {
151+
const header = params.headers?.['x-elastic-client-meta'] ?? ''
152+
t.ok(header.includes('h=qa'), `Client meta header does not include ESQL helper value: ${header}`)
153+
return {
154+
body: Buffer.from(binaryContent, 'base64'),
155+
statusCode: 200,
156+
headers: {
157+
'content-type': 'application/vnd.elasticsearch+arrow+stream'
158+
}
159+
}
160+
}
161+
})
162+
163+
const client = new Client({
164+
node: 'http://localhost:9200',
165+
Connection: MockConnection
166+
})
167+
168+
await client.helpers.esql({ query: 'FROM sample_data' }).toArrowTable()
169+
t.end()
170+
})
171+
172+
t.end()
173+
})
174+
175+
test('toArrowReader', t => {
176+
t.test('Parses a binary response into an Arrow stream reader', async t => {
177+
const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA='
178+
179+
const MockConnection = connection.buildMockConnection({
180+
onRequest (_params) {
181+
return {
182+
body: Buffer.from(binaryContent, 'base64'),
183+
statusCode: 200,
184+
headers: {
185+
'content-type': 'application/vnd.elasticsearch+arrow+stream'
186+
}
187+
}
188+
}
189+
})
190+
191+
const client = new Client({
192+
node: 'http://localhost:9200',
193+
Connection: MockConnection
194+
})
195+
196+
const result = await client.helpers.esql({ query: 'FROM sample_data' }).toArrowReader()
197+
t.ok(result.isStream())
198+
199+
const recordBatch = result.next().value
200+
t.same(recordBatch.get(0)?.toJSON(), {
201+
amount: 4.900000095367432,
202+
date: 1729532586965,
203+
})
204+
t.end()
205+
})
206+
207+
t.test('ESQL helper uses correct x-elastic-client-meta helper value', async t => {
208+
const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA='
209+
210+
const MockConnection = connection.buildMockConnection({
211+
onRequest (params) {
212+
const header = params.headers?.['x-elastic-client-meta'] ?? ''
213+
t.ok(header.includes('h=qa'), `Client meta header does not include ESQL helper value: ${header}`)
214+
return {
215+
body: Buffer.from(binaryContent, 'base64'),
216+
statusCode: 200,
217+
headers: {
218+
'content-type': 'application/vnd.elasticsearch+arrow+stream'
219+
}
220+
}
221+
}
222+
})
223+
224+
const client = new Client({
225+
node: 'http://localhost:9200',
226+
Connection: MockConnection
227+
})
228+
229+
await client.helpers.esql({ query: 'FROM sample_data' }).toArrowReader()
230+
t.end()
231+
})
232+
233+
t.test('multi-batch support', async t => {
234+
const intType = new arrow.Uint32
235+
const floatType = new arrow.Float32
236+
const schema = new arrow.Schema([
237+
arrow.Field.new('id', intType),
238+
arrow.Field.new('val', floatType)
239+
])
240+
241+
function getBatch(ids: number[], vals: number[]) {
242+
const id = arrow.makeData({ type: intType, data: ids })
243+
const val = arrow.makeData({ type: floatType, data: vals })
244+
return new arrow.RecordBatch({ id, val })
245+
}
246+
247+
const batch1 = getBatch([1, 2, 3], [0.1, 0.2, 0.3])
248+
const batch2 = getBatch([4, 5, 6], [0.4, 0.5, 0.6])
249+
const batch3 = getBatch([7, 8, 9], [0.7, 0.8, 0.9])
250+
251+
const table = new arrow.Table(schema, [
252+
new arrow.RecordBatch(schema, batch1.data),
253+
new arrow.RecordBatch(schema, batch2.data),
254+
new arrow.RecordBatch(schema, batch3.data),
255+
])
256+
257+
const MockConnection = connection.buildMockConnection({
258+
onRequest (_params) {
259+
return {
260+
body: Buffer.from(arrow.tableToIPC(table, "stream")),
261+
statusCode: 200,
262+
headers: {
263+
'content-type': 'application/vnd.elasticsearch+arrow+stream'
264+
}
265+
}
266+
}
267+
})
268+
269+
const client = new Client({
270+
node: 'http://localhost:9200',
271+
Connection: MockConnection
272+
})
273+
274+
const result = await client.helpers.esql({ query: 'FROM sample_data' }).toArrowReader()
275+
t.ok(result.isStream())
276+
277+
let counter = 0
278+
for (const batch of result) {
279+
for (const row of batch) {
280+
counter++
281+
const { id, val } = row.toJSON()
282+
t.equal(id, counter)
283+
// floating points are hard in JS
284+
t.equal((Math.round(val * 10) / 10).toFixed(1), (counter * 0.1).toFixed(1))
285+
}
286+
}
287+
t.end()
288+
})
289+
290+
t.end()
291+
})
112292
t.end()
113293
})

tsconfig.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"compilerOptions": {
3-
"target": "es2019",
3+
"target": "ES2019",
44
"module": "commonjs",
55
"moduleResolution": "node",
66
"declaration": true,
@@ -21,7 +21,8 @@
2121
"importHelpers": true,
2222
"outDir": "lib",
2323
"lib": [
24-
"esnext"
24+
"ES2019",
25+
"dom"
2526
]
2627
},
2728
"formatCodeOptions": {

0 commit comments

Comments
 (0)