Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
},
"dependencies": {
"@elastic/transport": "^8.9.1",
"apache-arrow": "^18.0.0",
"tslib": "^2.5.0"
},
"tap": {
Expand Down
41 changes: 36 additions & 5 deletions src/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import assert from 'node:assert'
import * as timersPromises from 'node:timers/promises'
import { Readable } from 'node:stream'
import { errors, TransportResult, TransportRequestOptions, TransportRequestOptionsWithMeta } from '@elastic/transport'
import { Table, TypeMap, tableFromIPC, RecordBatchStreamReader } from 'apache-arrow/Arrow.node'
import Client from './client'
import * as T from './api/types'

Expand Down Expand Up @@ -154,6 +155,8 @@ export interface EsqlResponse {

export interface EsqlHelper {
toRecords: <TDocument>() => Promise<EsqlToRecords<TDocument>>
toArrowTable: () => Promise<Table<TypeMap>>
toArrowReader: () => Promise<RecordBatchStreamReader>
}

export interface EsqlToRecords<TDocument> {
Expand Down Expand Up @@ -955,11 +958,6 @@ export default class Helpers {
* @returns {object} EsqlHelper instance
*/
esql (params: T.EsqlQueryRequest, reqOptions: TransportRequestOptions = {}): EsqlHelper {
if (this[kMetaHeader] !== null) {
reqOptions.headers = reqOptions.headers ?? {}
reqOptions.headers['x-elastic-client-meta'] = `${this[kMetaHeader] as string},h=qo`
}

const client = this[kClient]

function toRecords<TDocument> (response: EsqlResponse): TDocument[] {
Expand All @@ -975,17 +973,50 @@ export default class Helpers {
})
}

const metaHeader = this[kMetaHeader]

const helper: EsqlHelper = {
/**
* Pivots ES|QL query results into an array of row objects, rather than the default format where each row is an array of values.
*/
async toRecords<TDocument>(): Promise<EsqlToRecords<TDocument>> {
if (metaHeader !== null) {
reqOptions.headers = reqOptions.headers ?? {}
reqOptions.headers['x-elastic-client-meta'] = `${metaHeader as string},h=qo`
}

params.format = 'json'
params.columnar = false
// @ts-expect-error it's typed as ArrayBuffer but we know it will be JSON
const response: EsqlResponse = await client.esql.query(params, reqOptions)
const records: TDocument[] = toRecords(response)
const { columns } = response
return { records, columns }
},

async toArrowTable (): Promise<Table<TypeMap>> {
if (metaHeader !== null) {
reqOptions.headers = reqOptions.headers ?? {}
reqOptions.headers['x-elastic-client-meta'] = `${metaHeader as string},h=qa`
}

params.format = 'arrow'

const response = await client.esql.query(params, reqOptions)
return tableFromIPC(response)
},

async toArrowReader (): Promise<RecordBatchStreamReader> {
if (metaHeader !== null) {
reqOptions.headers = reqOptions.headers ?? {}
reqOptions.headers['x-elastic-client-meta'] = `${metaHeader as string},h=qa`
reqOptions.asStream = true
}

params.format = 'arrow'

const response = await client.esql.query(params, reqOptions)
return RecordBatchStreamReader.from(response)
}
}

Expand Down
180 changes: 180 additions & 0 deletions test/unit/helpers/esql.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/

import { test } from 'tap'
import * as arrow from 'apache-arrow'
import { connection } from '../../utils'
import { Client } from '../../../'

Expand Down Expand Up @@ -109,5 +110,184 @@ test('ES|QL helper', t => {

t.end()
})

test('toArrowTable', t => {
t.test('Parses a binary response into an Arrow table', async t => {
const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA='

const MockConnection = connection.buildMockConnection({
onRequest (_params) {
return {
body: Buffer.from(binaryContent, 'base64'),
statusCode: 200,
headers: {
'content-type': 'application/vnd.elasticsearch+arrow+stream'
}
}
}
})

const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})

const result = await client.helpers.esql({ query: 'FROM sample_data' }).toArrowTable()
t.ok(result instanceof arrow.Table)

const table = [...result]
t.same(table[0], [
["amount", 4.900000095367432],
["date", 1729532586965],
])
t.end()
})

t.test('ESQL helper uses correct x-elastic-client-meta helper value', async t => {
const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA='

const MockConnection = connection.buildMockConnection({
onRequest (params) {
const header = params.headers?.['x-elastic-client-meta'] ?? ''
t.ok(header.includes('h=qa'), `Client meta header does not include ESQL helper value: ${header}`)
return {
body: Buffer.from(binaryContent, 'base64'),
statusCode: 200,
headers: {
'content-type': 'application/vnd.elasticsearch+arrow+stream'
}
}
}
})

const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})

await client.helpers.esql({ query: 'FROM sample_data' }).toArrowTable()
t.end()
})

t.end()
})

test('toArrowReader', t => {
t.test('Parses a binary response into an Arrow stream reader', async t => {
const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA='

const MockConnection = connection.buildMockConnection({
onRequest (_params) {
return {
body: Buffer.from(binaryContent, 'base64'),
statusCode: 200,
headers: {
'content-type': 'application/vnd.elasticsearch+arrow+stream'
}
}
}
})

const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})

const result = await client.helpers.esql({ query: 'FROM sample_data' }).toArrowReader()
t.ok(result.isStream())

const recordBatch = result.next().value
t.same(recordBatch.get(0)?.toJSON(), {
amount: 4.900000095367432,
date: 1729532586965,
})
t.end()
})

t.test('ESQL helper uses correct x-elastic-client-meta helper value', async t => {
const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA='

const MockConnection = connection.buildMockConnection({
onRequest (params) {
const header = params.headers?.['x-elastic-client-meta'] ?? ''
t.ok(header.includes('h=qa'), `Client meta header does not include ESQL helper value: ${header}`)
return {
body: Buffer.from(binaryContent, 'base64'),
statusCode: 200,
headers: {
'content-type': 'application/vnd.elasticsearch+arrow+stream'
}
}
}
})

const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})

await client.helpers.esql({ query: 'FROM sample_data' }).toArrowReader()
t.end()
})

t.test('multi-batch support', async t => {
const intType = new arrow.Uint32
const floatType = new arrow.Float32
const schema = new arrow.Schema([
arrow.Field.new('id', intType),
arrow.Field.new('val', floatType)
])

function getBatch(ids: number[], vals: number[]) {
const id = arrow.makeData({ type: intType, data: ids })
const val = arrow.makeData({ type: floatType, data: vals })
return new arrow.RecordBatch({ id, val })
}

const batch1 = getBatch([1, 2, 3], [0.1, 0.2, 0.3])
const batch2 = getBatch([4, 5, 6], [0.4, 0.5, 0.6])
const batch3 = getBatch([7, 8, 9], [0.7, 0.8, 0.9])

const table = new arrow.Table(schema, [
new arrow.RecordBatch(schema, batch1.data),
new arrow.RecordBatch(schema, batch2.data),
new arrow.RecordBatch(schema, batch3.data),
])

const MockConnection = connection.buildMockConnection({
onRequest (_params) {
return {
body: Buffer.from(arrow.tableToIPC(table, "stream")),
statusCode: 200,
headers: {
'content-type': 'application/vnd.elasticsearch+arrow+stream'
}
}
}
})

const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})

const result = await client.helpers.esql({ query: 'FROM sample_data' }).toArrowReader()
t.ok(result.isStream())

let counter = 0
for (const batch of result) {
for (const row of batch) {
counter++
const { id, val } = row.toJSON()
t.equal(id, counter)
// floating points are hard in JS
t.equal((Math.round(val * 10) / 10).toFixed(1), (counter * 0.1).toFixed(1))
}
}
t.end()
})

t.end()
})
t.end()
})
5 changes: 3 additions & 2 deletions tsconfig.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"compilerOptions": {
"target": "es2019",
"target": "ES2019",
"module": "commonjs",
"moduleResolution": "node",
"declaration": true,
Expand All @@ -21,7 +21,8 @@
"importHelpers": true,
"outDir": "lib",
"lib": [
"esnext"
"ES2019",
"dom"
]
},
"formatCodeOptions": {
Expand Down