Skip to content

Conversation

@swallez
Copy link
Contributor

@swallez swallez commented Mar 17, 2025

This PR allows to bulk-ingest data using the Apache Arrow format. Arrow is an efficient column-oriented data format that allows zero-copy reads and efficient processing. It is used in data science as a backend for the Python Pandas library, and in data analytics where column-orientation allows efficient processing (like we do in ES|QL).

We already have Arrow as an output format for ES|QL (/cc @nik9000), this adds Arrow as an input format.

It's a baby step: converting Arrow dataframes (i.e. column-oriented data) to documents (see below) is clearly suboptimal, even more considering that Lucene and ES|QL are heavily column-oriented.

This PR has to be considered as a step towards a possible larger "columnization" of ES. It addresses only the outer layer (the API) with two major goals:

  • make users lives easier by being a better ecosystem citizen. They can just send Arrow data from their existing tools.
  • improve performance (benchmarks still missing) by
    • using a binary format and removing the need for serialization on the client side
    • saving some parsing time on the server by creating bulk operations with CBOR buffers instead of parsing JSON text (some old benchmarks that I cannot find showed 2 to 4 times lower CPU and memory usage when parsing CBOR compared to JSON).

Architecture and implementation

This PR leverages the streaming incremental bulk ingestion added recently. Arrow decoding happens on new /_arrow/_bulk and /_arrow/{index}/_bulk endpoints. The request's content-type must be application/vnd.apache.arrow.stream.

(note: it was initially an additional mime-type accepted by the /_bulk endpoint but adding a dependency on Arrow and its own dependencies in the server module proved to be too difficult because of version conflicts – see details in the module-info in the libs/arrow module)

The Arrow IPC format splits a large data stream into smaller "record batches". The ArrowBulkIncrementalParser buffers these batches, and translates them into a stream of individual DocWriteRequest (Arrow columns are "rotated" into record operations).

Decoding Arrow dataframes into ES operations and documents

The bulk endpoint can receive arbitrary create/insert/update/delete operations. Although the Arrow format still allows it (see below), it's more suited to using the same operation with homogeneous data records, to leverage the column-oriented structure of Arrow dataframes.

The default operation type is defined by the op_type request parameter and defaults to create, which is also the only operation available on data streams. For update operations, only partial updates are supported and not script updates.

All columns of the dataframe are converted to JSON, except for 3 optional columns:

  • _id: defines the document id.
  • _index: defines the target index, if different from the default index set in the request path.
  • _bulk_action: an Arrow struct or map that defines the bulk action, using the same properties as "classic bulk". Allows for advanced use cases and heterogeneous operations.

Arrow has a rich set of timestamp types while ES only handles UTC timestamps in milliseconds or nanoseconds. Arrow values in seconds are converted to milliseconds, and values in microseconds are converted to nanoseconds. Timestamps with a timezone are converted to UTC.

The documents created from Arrow dataframes are encoded in CBOR, which is more compact and more efficient to parse than JSON text. This reduces storage space and improves performance when documents are parsed in the indexing phase.

Response format

The response is an Arrow table that only contains error information. Arrow bulk ingestion is expected to be used mainly for create and index operations where success information (index, shard, etc) is of no use and would therefore unnecessarily increase the response size. A fully successful request will therefore return an empty Arrow table.

The fields in the result table are item_no (index of the item in the request), _id, _index, status, typeand reason that are those found in a JSON error.

Applications wanting full response details can obtain the "classic" response by sending a Accept: application/json header.

Code modules

Arrow is already used by ES|QL as an output format. This PR moves the Arrow dependencies to a new libs/arrow module to avoid duplication and provide some common utilities.

Doing so caused some dependency version conflicts for a few libraries. They were solved by using the latest of the versions used, and centralizing them in the main version catalog.

Todo

  • Docs.
  • Benchmarks.
  • Memory usage limit is hard-coded to 100 MiB per record batch (still allows for very large streams). Arrow has its own memory allocator, which we should link to ES memory management (how to do that?).
  • Implement missing vector types in ArrowToXContent.
  • More tests in ArrowToXContent to cover all data types.
  • Do we really need _bulk_action? It provides equivalence with the existing bulk, but requiring a single operation type would make further optimizations easier, such as directly passing column data to the indexing phase.

Possible future developments:

  • Instead of converting columns to rows (documents), and back again to columns to store data into Lucene, we could index columns directly and avoid creating the document (and use synthetic source).
  • Implement the OTel-Arrow format to improve OpenTelemetry data ingestion (/cc @felixbarny)

Example usage

This Python script creates an Arrow dataframe, bulk-ingests it, and then uses ES|QL to retrieve it in Arrow format that is read as a Pandas dataframe.

Make sure pyarrow, pandas and tabulate are installed, then run ./gradlew run -Dtests.es.xpack.security.enabled=false and launch the script.

import pyarrow
from urllib import request

es_url = "http://localhost:9200/"

def main():
    ids = pyarrow.array(['a', 'b', 'c', 'd'])
    names = pyarrow.array(['Tom', 'Jessica', 'Krish', 'Jack'])
    ages = pyarrow.array([38, 31, 42, 53], type=pyarrow.int8())
    table = pyarrow.table([ids, names, ages], names=["_id", "name", "age"])

    bulk_response = bulk_arrow(table, "arrow-test").read_pandas()
    if len(bulk_response) == 0:
        print("Bulk operations were all successful!")
    else:
        print("Some bulk operations failed:")
        print(bulk_response.to_markdown())

    print()
    esql_response = esql_arrow("FROM arrow-test | keep *")
    print("ES|QL response:")
    print(esql_response.read_pandas().to_markdown())


def bulk_arrow(arrow_table, index):
    """Ingests an arrow table into Elasticsearch in the provided index"""

    with pyarrow.BufferOutputStream() as sink:
        with pyarrow.ipc.new_stream(sink, arrow_table.schema) as writer:
            writer.write(arrow_table)
        buf = sink.getvalue().to_pybytes()

    req =  request.Request(url = es_url + "_arrow/" + index + "/_bulk?refresh=wait_for", data = buf)
    req.add_header("Content-Type", "application/vnd.apache.arrow.stream")

    try:
        response = request.urlopen(req)
    except urllib.error.HTTPError as e:
        print("Bulk request failed: ", e.status)
        print(e.read().decode())
        raise e

    return pyarrow.ipc.open_stream(response.read())

def esql_arrow(query):
    """Runs an ES|QL query and returns the result as an arrow table."""

    data = '{"query": "' + query + '"}' # don't do that in prod
    req =  request.Request(url = es_url + "_query", data = data.encode("utf-8"))
    req.add_header("Content-Type", "application/json")
    req.add_header("Accept", "application/vnd.apache.arrow.stream")

    response = request.urlopen(req)
    return pyarrow.ipc.open_stream(response.read())

if __name__ == "__main__":
    main()

Running it yields the following output (the name.keyword field was created because we didn't define an index mapping):

Bulk operations were all successful!
ES|QL response:
|    |   age | name    | name.keyword   |
|---:|------:|:--------|:---------------|
|  0 |    38 | Tom     | Tom            |
|  1 |    31 | Jessica | Jessica        |
|  2 |    42 | Krish   | Krish          |
|  3 |    53 | Jack    | Jack           |

@swallez swallez added >enhancement :Distributed Indexing/CRUD A catch all label for issues around indexing, updating and getting a doc by id. Not search. labels Mar 17, 2025
@swallez swallez requested review from a team as code owners March 17, 2025 16:28
@elasticsearchmachine elasticsearchmachine added v9.1.0 external-contributor Pull request authored by a developer outside the Elasticsearch team Team:Distributed Indexing Meta label for Distributed Indexing team labels Mar 17, 2025
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-distributed-indexing (Team:Distributed Indexing)

@elasticsearchmachine
Copy link
Collaborator

Hi @swallez, I've created a changelog YAML for you.

@smith
Copy link
Contributor

smith commented Mar 17, 2025

Arrow has a rich set of date/time types. They're all converted to milliseconds since the Epoch.

I think date_nanos would be better here, as that's what all otel dates are.

@axw
Copy link
Member

axw commented Apr 9, 2025

(note: it was initially an additional mime-type accepted by the /_bulk endpoint but adding a dependency on Arrow and its own dependencies in the server module proved to be too difficult because of version conflicts – see details in the module-info in the libs/arrow module)

@swallez sorry if this is a stupid question: would it be possible to invert the dependency, and have the Arrow module inject a mime-type/parser into the server module?

@swallez
Copy link
Contributor Author

swallez commented Apr 9, 2025

@axw that's actually an interesting suggestion! There's no infrastructure in place for that (i.e. it's not a plugin extension point), but certainly something that could be considered.

@swallez
Copy link
Contributor Author

swallez commented Apr 12, 2025

@smith I updated timestamps conversions from the 4 units in Arrow to the 2 units in ES:

  • seconds and milliseconds --> milliseconds
  • microseconds and nanoseconds --> nanoseconds

@rjernst
Copy link
Member

rjernst commented Oct 15, 2025

This PR leverages the streaming incremental bulk ingestion added recently. Arrow decoding happens on new /_arrow/_bulk and /_arrow/{index}/_bulk endpoints. The request's content-type must be application/vnd.apache.arrow.stream.

(note: it was initially an additional mime-type accepted by the /_bulk endpoint but adding a dependency on Arrow and its own dependencies in the server module proved to be too difficult because of version conflicts – see details in the module-info in the libs/arrow module)

Having a new endpoint complicates maintaining the _bulk API. There would then be two different actions needing to be modified as changes are made to the bulk api. Supporting additional formats is great, but we need to fit this within the architecture of Elasticsearch.

We already have pluggability of xcontent formats. If arrow doesn't conform to that, we can consider abstracting bulk formats independently. But we need to think about how this will interact with other parts of bulk, for example how individual documents are separated, see #135506.

@swallez
Copy link
Contributor Author

swallez commented Oct 20, 2025

Having a new endpoint complicates maintaining the _bulk API.

Agree. I initially tried to implement this as an additional data type handled by the regular bulk endpoint, but ended up with a dependency version nightmare (jackson, some apache common libraries), and fixing it was beyond the scope of this PR (and my knowledge).

A way to alleviate that would be to extend the bulk system so that it accepts format plugins.

We already have pluggability of xcontent formats [...] see #135506.

Arrow doesn't really fit in XContent: it's a dataframe/column-oriented format, where each row represents a single document in Elasticsearch. This PR has utilities to translate a single Arrow row into a XContent document, as this is what the ES ingestion pipeline (and bulk) operations expect.

#135506 is about improving the detection of document limits in a stream of documents. This isn't also a concern for Arrow, since the Arrow IPC protocol chunks large data streams into smaller column segments ("batches" in their terminology).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Distributed Indexing/CRUD A catch all label for issues around indexing, updating and getting a doc by id. Not search. >enhancement external-contributor Pull request authored by a developer outside the Elasticsearch team Team:Distributed Indexing Meta label for Distributed Indexing team v9.3.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants