Skip to content

Conversation

Xiao-zhen-Liu
Copy link
Contributor

@Xiao-zhen-Liu Xiao-zhen-Liu commented Jan 22, 2025

This PR adds a storage layer implementation on the Python side of Texera's codebase, mirroring the implementation of our Java-based storage layer.

Motivation

  • The primary motivation of having a storage layer in Python so that we can let Python UDF operators' ports write directly to result tables without needing to send the results back to Java.
  • In the future we will also use the Python storage layer for UDF logs and workflow runtime statistics.

Storage APIs

  • There are 3 abstract classes in Java's storage implementation:
    • ReadOnlyVirtualDocument for read-only tables
    • VirtualDocument for tables supporting both read and write operations.
    • BufferedItemWriter as a writer class of VirtualDocument
  • We mirror the implementation in Python, but keep only the APIs relevant to table storage (e.g., APIs related to dataset storage are not kept in Python.)

Iceberg Document

Following #3147, we add a table-storage implementation based on Apache Iceberg (pyiceberg), including IcebergDocument, IcebergTableWriter, IcebergCatalogInstance, and related util functions and tests.

Limitations of / TODOs for python implementation

pyiceberg is less mature than its java-based counterpart. As a result there are a few functionalities not supported in our current Python storage implementation.

Incremental Read

Incremental Read is not supported by pyiceberg. It will be supported in the future. Before then we will not include incremental read in our Python codebase (it is also not currently needed)

Concurrent writers

Iceberg uses optimistic concurrency control for concurrent writers. Java Iceberg natively supports retry with configurable retry parameters, using exponential backoff (without randomness). However pyiceberg does not currently support retry. We implemented an ad-hoc custom retry mechanism in IcebergTableWriter, using exponential random backoff based on the tenacity library. It has a good speed (~0.6s for 10 concurrent writers writing 20K tuples) and is faster than Java’s iceberg-native retry (~6 seconds for the same test). We may need to re-evaluate this custom implementation if pyiceberg supports retry natively in the future.

Iceberg Catalog

pyiceberg only supports SQL catalog (postgreSQL to be specific) and REST catalog for production. We use postgresql based SQL catalog in this implementation for the following reasons:

  • It supports local storage.
  • We tested that it is works with both Java and Python iceberg storage.
  • It is easier to set up for developers (compared to REST services).

PostgreSQL setup

Python storage layer requires a running postgreSQL service in the environment, and an empty database for iceberg to work.

  • A script to set up a new postgres database for Texera's iceberg storage has been added for CI tests.
  • The database will be used by pyiceberg to manage the catalog.
  • The logic to setup the database is added in GitHub CI config.
  • Java side can continue using Hadoop-based catalog for now until we add storage on operator ports for both Java and Python.
  • As the Python storage is not currently used by Python workers, no action is required for developers for now.

REST catalogs (feel free to skip this section)

I also explored 3 major REST catalog implementations (lakekeeper, polaris, and gravitino) and here are some observations:

  • REST catalogs are the trend primarily because different query engines (Spark, Flink, Snowflake, etc.) relying on iceberg need a central place to keep and manage the catalogs. Under the hood they all still use some database as their storage layer.
  • Most of them support / recommend cloud storage only in production and do not support local storage.
  • They are incubating projects and lack documentation. For example I find it very hard to set up authentication (as pyiceberg requires authentication to work with REST catalogs) using gravitino, and using them will add a lot more burden to our developers.
  • I have successfully made polaris work with our implementation after setting up auth, but somehow it was very very slow.
  • As postgres catalog is working, we will explore more about REST catalog in the future if have migrated to cloud storage and have scalability issues.

Storage configurations

A static class StorageConfigs is added to manage storage-related configurations. We do NOT read the configs from files. Instead we will let Java pass the configs to Python worker, and the config will be filled when initializing the worker. The storage config is hardcoded in CI tests.

Other items

VFSURIFactory and DocumentFactory are added in Python storage layer mirroring the Java implementations.

TODO for Java Storage

  • Add SQL catalog as another type of iceberg catalog

@Xiao-zhen-Liu Xiao-zhen-Liu self-assigned this Jan 29, 2025
@Xiao-zhen-Liu Xiao-zhen-Liu marked this pull request as ready for review January 30, 2025 05:52
Copy link
Contributor

@shengquan-ni shengquan-ni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm.

@Xiao-zhen-Liu Xiao-zhen-Liu merged commit 753c299 into master Jan 30, 2025
8 checks passed
@Xiao-zhen-Liu Xiao-zhen-Liu deleted the xiaozhen-add-python-storage branch January 30, 2025 18:33
Ma77Ball pushed a commit that referenced this pull request Apr 2, 2025
This PR adds a storage layer implementation on the Python side of
Texera's codebase, mirroring the implementation of our Java-based
storage layer.

## Motivation
- The primary motivation of having a storage layer in Python so that we
can let Python UDF operators' ports write directly to result tables
without needing to send the results back to Java.
- In the future we will also use the Python storage layer for UDF logs
and workflow runtime statistics.

## Storage APIs
- There are 3 abstract classes in Java's storage implementation:
  - `ReadOnlyVirtualDocument` for read-only tables
- `VirtualDocument` for tables supporting both read and write
operations.
  - `BufferedItemWriter` as a writer class of `VirtualDocument`
- We mirror the implementation in Python, but keep only the APIs
relevant to table storage (e.g., APIs related to dataset storage are not
kept in Python.)

## Iceberg Document
Following #3147, we add a table-storage implementation based on Apache
Iceberg (pyiceberg), including `IcebergDocument`, `IcebergTableWriter`,
`IcebergCatalogInstance`, and related util functions and tests.

### Limitations of / TODOs for python implementation 
pyiceberg is less mature than its java-based counterpart. As a result
there are a few functionalities not supported in our current Python
storage implementation.

#### Incremental Read
Incremental Read is not supported by pyiceberg. It will be supported [in
the future](apache/iceberg-python#533). Before
then we will not include incremental read in our Python codebase (it is
also not currently needed)
#### Concurrent writers
Iceberg uses optimistic concurrency control for concurrent writers. Java
Iceberg natively supports retry with configurable retry parameters,
using exponential backoff (without randomness). However pyiceberg does
not currently support retry. We implemented an ad-hoc custom retry
mechanism in `IcebergTableWriter`, using exponential random backoff
based on the [tenacity](https://tenacity.readthedocs.io/en/latest/)
library. It has a good speed (~0.6s for 10 concurrent writers writing
20K tuples) and is faster than Java’s iceberg-native retry (~6 seconds
for the same test). We may need to re-evaluate this custom
implementation if pyiceberg supports retry natively in the future.

## Iceberg Catalog
pyiceberg only supports SQL catalog (postgreSQL to be specific) and REST
catalog for production. We use postgresql based SQL catalog in this
implementation for the following reasons:
- It supports local storage.
- We tested that it is works with both Java and Python iceberg storage.
- It is easier to set up for developers (compared to REST services).

### PostgreSQL setup
Python storage layer requires a running postgreSQL service in the
environment, and an empty database for iceberg to work.
- **A script to set up a new postgres database for Texera's iceberg
storage has been added for CI tests.**
- The database will be used by pyiceberg to manage the catalog.
- The logic to setup the database is added in GitHub CI config.
- Java side can continue using Hadoop-based catalog for now until we add
storage on operator ports for both Java and Python.
- As the Python storage is not currently used by Python workers, no
action is required for developers for now.

### REST catalogs (feel free to skip this section)
I also explored 3 major REST catalog implementations
([lakekeeper](https://lakekeeper.io),
[polaris](https://polaris.apache.org), and
[gravitino](https://gravitino.apache.org)) and here are some
observations:
- REST catalogs are the trend primarily because different query engines
(Spark, Flink, Snowflake, etc.) relying on iceberg need a central place
to keep and manage the catalogs. Under the hood they all still use some
database as their storage layer.
- Most of them support / recommend cloud storage only in production and
do not support local storage.
- They are incubating projects and lack documentation. For example I
find it very hard to set up authentication (as pyiceberg requires
authentication to work with REST catalogs) using gravitino, and using
them will add a lot more burden to our developers.
- I have successfully made polaris work with our implementation after
setting up auth, but somehow it was very very slow.
- As postgres catalog is working, we will explore more about REST
catalog in the future if have migrated to cloud storage and have
scalability issues.

## Storage configurations

A static class `StorageConfigs` is added to manage storage-related
configurations. We do NOT read the configs from files. Instead we will
let Java pass the configs to Python worker, and the config will be
filled when initializing the worker. The storage config is hardcoded in
CI tests.

## Other items

`VFSURIFactory` and `DocumentFactory` are added in Python storage layer
mirroring the Java implementations.

## TODO for Java Storage
- Add SQL catalog as another type of iceberg catalog

---------

Co-authored-by: Jiadong Bai <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants