Skip to content

Commit 44d2b0a

Browse files
Xiao-zhen-Liubobbai00
authored andcommitted
Add Storage Layer In Python (#3224)
This PR adds a storage layer implementation on the Python side of Texera's codebase, mirroring the implementation of our Java-based storage layer. ## Motivation - The primary motivation of having a storage layer in Python so that we can let Python UDF operators' ports write directly to result tables without needing to send the results back to Java. - In the future we will also use the Python storage layer for UDF logs and workflow runtime statistics. ## Storage APIs - There are 3 abstract classes in Java's storage implementation: - `ReadOnlyVirtualDocument` for read-only tables - `VirtualDocument` for tables supporting both read and write operations. - `BufferedItemWriter` as a writer class of `VirtualDocument` - We mirror the implementation in Python, but keep only the APIs relevant to table storage (e.g., APIs related to dataset storage are not kept in Python.) ## Iceberg Document Following #3147, we add a table-storage implementation based on Apache Iceberg (pyiceberg), including `IcebergDocument`, `IcebergTableWriter`, `IcebergCatalogInstance`, and related util functions and tests. ### Limitations of / TODOs for python implementation pyiceberg is less mature than its java-based counterpart. As a result there are a few functionalities not supported in our current Python storage implementation. #### Incremental Read Incremental Read is not supported by pyiceberg. It will be supported [in the future](apache/iceberg-python#533). Before then we will not include incremental read in our Python codebase (it is also not currently needed) #### Concurrent writers Iceberg uses optimistic concurrency control for concurrent writers. Java Iceberg natively supports retry with configurable retry parameters, using exponential backoff (without randomness). However pyiceberg does not currently support retry. We implemented an ad-hoc custom retry mechanism in `IcebergTableWriter`, using exponential random backoff based on the [tenacity](https://tenacity.readthedocs.io/en/latest/) library. It has a good speed (~0.6s for 10 concurrent writers writing 20K tuples) and is faster than Java’s iceberg-native retry (~6 seconds for the same test). We may need to re-evaluate this custom implementation if pyiceberg supports retry natively in the future. ## Iceberg Catalog pyiceberg only supports SQL catalog (postgreSQL to be specific) and REST catalog for production. We use postgresql based SQL catalog in this implementation for the following reasons: - It supports local storage. - We tested that it is works with both Java and Python iceberg storage. - It is easier to set up for developers (compared to REST services). ### PostgreSQL setup Python storage layer requires a running postgreSQL service in the environment, and an empty database for iceberg to work. - **A script to set up a new postgres database for Texera's iceberg storage has been added for CI tests.** - The database will be used by pyiceberg to manage the catalog. - The logic to setup the database is added in GitHub CI config. - Java side can continue using Hadoop-based catalog for now until we add storage on operator ports for both Java and Python. - As the Python storage is not currently used by Python workers, no action is required for developers for now. ### REST catalogs (feel free to skip this section) I also explored 3 major REST catalog implementations ([lakekeeper](https://lakekeeper.io), [polaris](https://polaris.apache.org), and [gravitino](https://gravitino.apache.org)) and here are some observations: - REST catalogs are the trend primarily because different query engines (Spark, Flink, Snowflake, etc.) relying on iceberg need a central place to keep and manage the catalogs. Under the hood they all still use some database as their storage layer. - Most of them support / recommend cloud storage only in production and do not support local storage. - They are incubating projects and lack documentation. For example I find it very hard to set up authentication (as pyiceberg requires authentication to work with REST catalogs) using gravitino, and using them will add a lot more burden to our developers. - I have successfully made polaris work with our implementation after setting up auth, but somehow it was very very slow. - As postgres catalog is working, we will explore more about REST catalog in the future if have migrated to cloud storage and have scalability issues. ## Storage configurations A static class `StorageConfigs` is added to manage storage-related configurations. We do NOT read the configs from files. Instead we will let Java pass the configs to Python worker, and the config will be filled when initializing the worker. The storage config is hardcoded in CI tests. ## Other items `VFSURIFactory` and `DocumentFactory` are added in Python storage layer mirroring the Java implementations. ## TODO for Java Storage - Add SQL catalog as another type of iceberg catalog --------- Co-authored-by: Jiadong Bai <[email protected]>
1 parent 60d9d3e commit 44d2b0a

19 files changed

+1328
-3
lines changed

.github/workflows/github-action-build.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,13 @@ jobs:
111111
if [ -f core/amber/requirements.txt ]; then pip install -r core/amber/requirements.txt; fi
112112
if [ -f core/amber/r-requirements.txt ]; then pip install -r core/amber/r-requirements.txt; fi
113113
if [ -f core/amber/operator-requirements.txt ]; then pip install -r core/amber/operator-requirements.txt; fi
114+
- name: Install PostgreSQL
115+
run: sudo apt-get update && sudo apt-get install -y postgresql
116+
- name: Start PostgreSQL Service
117+
run: sudo systemctl start postgresql
118+
- name: Create Database and User
119+
run: |
120+
cd core/scripts/sql && sudo -u postgres psql -f iceberg_postgres_catalog.sql
114121
- name: Lint with flake8 and black
115122
run: |
116123
cd core/amber/src/main/python && flake8 && black . --check

core/amber/requirements.txt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,9 @@ bidict==0.22.0
2626
cached_property==1.5.2
2727
psutil==5.9.0
2828
transformers==4.44.2
29-
tzlocal==2.1
29+
tzlocal==2.1
30+
pyiceberg==0.8.1
31+
readerwriterlock==1.0.9
32+
tenacity==8.5.0
33+
SQLAlchemy==2.0.37
34+
psycopg2==2.9.10

core/amber/src/main/python/core/models/schema/attribute_type.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,18 @@ class AttributeType(Enum):
4242
AttributeType.DOUBLE: pa.float64(),
4343
AttributeType.BOOL: pa.bool_(),
4444
AttributeType.BINARY: pa.binary(),
45-
AttributeType.TIMESTAMP: pa.timestamp("ms", tz="UTC"),
45+
AttributeType.TIMESTAMP: pa.timestamp("us"),
4646
}
4747

4848
FROM_ARROW_MAPPING = {
4949
lib.Type_INT32: AttributeType.INT,
5050
lib.Type_INT64: AttributeType.LONG,
5151
lib.Type_STRING: AttributeType.STRING,
52+
lib.Type_LARGE_STRING: AttributeType.STRING,
5253
lib.Type_DOUBLE: AttributeType.DOUBLE,
5354
lib.Type_BOOL: AttributeType.BOOL,
5455
lib.Type_BINARY: AttributeType.BINARY,
56+
lib.Type_LARGE_BINARY: AttributeType.BINARY,
5557
lib.Type_TIMESTAMP: AttributeType.TIMESTAMP,
5658
}
5759

core/amber/src/main/python/core/models/schema/test_schema.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def arrow_schema(self):
2727
pa.field("field-3", pa.int64()),
2828
pa.field("field-4", pa.float64()),
2929
pa.field("field-5", pa.bool_()),
30-
pa.field("field-6", pa.timestamp("ms", tz="UTC")),
30+
pa.field("field-6", pa.timestamp("us")),
3131
pa.field("field-7", pa.binary()),
3232
]
3333
)

core/amber/src/main/python/core/storage/__init__.py

Whitespace-only changes.
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
from urllib.parse import urlparse
2+
3+
from typing import Optional
4+
5+
from core.models import Schema, Tuple
6+
from core.storage.iceberg.iceberg_catalog_instance import IcebergCatalogInstance
7+
from core.storage.iceberg.iceberg_document import IcebergDocument
8+
from core.storage.iceberg.iceberg_utils import (
9+
create_table,
10+
amber_tuples_to_arrow_table,
11+
arrow_table_to_amber_tuples,
12+
load_table_metadata,
13+
)
14+
from core.storage.model.virtual_document import VirtualDocument
15+
from core.storage.storage_config import StorageConfig
16+
from core.storage.vfs_uri_factory import VFSURIFactory, VFSResourceType
17+
18+
19+
class DocumentFactory:
20+
"""
21+
Factory class to create and open documents.
22+
Currently only iceberg documents are supported.
23+
"""
24+
25+
ICEBERG = "iceberg"
26+
27+
@staticmethod
28+
def sanitize_uri_path(uri):
29+
return uri.path.lstrip("/").replace("/", "_")
30+
31+
@staticmethod
32+
def create_document(uri: str, schema: Schema) -> VirtualDocument:
33+
parsed_uri = urlparse(uri)
34+
if parsed_uri.scheme == VFSURIFactory.VFS_FILE_URI_SCHEME:
35+
_, _, _, _, resource_type = VFSURIFactory.decode_uri(uri)
36+
37+
if resource_type in {
38+
VFSResourceType.RESULT,
39+
VFSResourceType.MATERIALIZED_RESULT,
40+
}:
41+
storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)
42+
43+
iceberg_schema = Schema.as_arrow_schema(schema)
44+
45+
create_table(
46+
IcebergCatalogInstance.get_instance(),
47+
StorageConfig.ICEBERG_TABLE_NAMESPACE,
48+
storage_key,
49+
iceberg_schema,
50+
override_if_exists=True,
51+
)
52+
53+
return IcebergDocument[Tuple](
54+
StorageConfig.ICEBERG_TABLE_NAMESPACE,
55+
storage_key,
56+
iceberg_schema,
57+
amber_tuples_to_arrow_table,
58+
arrow_table_to_amber_tuples,
59+
)
60+
else:
61+
raise ValueError(f"Resource type {resource_type} is not supported")
62+
else:
63+
raise NotImplementedError(
64+
f"Unsupported URI scheme: {parsed_uri.scheme} for creating the document"
65+
)
66+
67+
@staticmethod
68+
def open_document(uri: str) -> (VirtualDocument, Optional[Schema]):
69+
parsed_uri = urlparse(uri)
70+
if parsed_uri.scheme == "vfs":
71+
_, _, _, _, resource_type = VFSURIFactory.decode_uri(uri)
72+
73+
if resource_type in {
74+
VFSResourceType.RESULT,
75+
VFSResourceType.MATERIALIZED_RESULT,
76+
}:
77+
storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)
78+
79+
table = load_table_metadata(
80+
IcebergCatalogInstance.get_instance(),
81+
StorageConfig.ICEBERG_TABLE_NAMESPACE,
82+
storage_key,
83+
)
84+
85+
if table is None:
86+
raise ValueError("No storage is found for the given URI")
87+
88+
amber_schema = Schema(table.schema().as_arrow())
89+
90+
document = IcebergDocument(
91+
StorageConfig.ICEBERG_TABLE_NAMESPACE,
92+
storage_key,
93+
table.schema(),
94+
amber_tuples_to_arrow_table,
95+
arrow_table_to_amber_tuples,
96+
)
97+
return document, amber_schema
98+
else:
99+
raise ValueError(f"Resource type {resource_type} is not supported")
100+
else:
101+
raise NotImplementedError(
102+
f"Unsupported URI scheme: {parsed_uri.scheme} for opening the document"
103+
)

core/amber/src/main/python/core/storage/iceberg/__init__.py

Whitespace-only changes.
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
from pyiceberg.catalog import Catalog
2+
from typing import Optional
3+
4+
from core.storage.iceberg.iceberg_utils import create_postgres_catalog
5+
from core.storage.storage_config import StorageConfig
6+
7+
8+
class IcebergCatalogInstance:
9+
"""
10+
IcebergCatalogInstance is a singleton that manages the Iceberg catalog instance.
11+
Currently only postgres SQL catalog is supported.
12+
- Provides a single shared catalog for all Iceberg table-related operations.
13+
- Lazily initializes the catalog on first access.
14+
- Supports replacing the catalog instance for testing or reconfiguration.
15+
"""
16+
17+
_instance: Optional[Catalog] = None
18+
19+
@classmethod
20+
def get_instance(cls):
21+
"""
22+
Retrieves the singleton Iceberg catalog instance.
23+
- If the catalog is not initialized, it is lazily created using the configured
24+
properties.
25+
:return: the Iceberg catalog instance.
26+
"""
27+
if cls._instance is None:
28+
cls._instance = create_postgres_catalog(
29+
"texera_iceberg",
30+
StorageConfig.ICEBERG_FILE_STORAGE_DIRECTORY_PATH,
31+
StorageConfig.ICEBERG_POSTGRES_CATALOG_USERNAME,
32+
StorageConfig.ICEBERG_POSTGRES_CATALOG_PASSWORD,
33+
)
34+
return cls._instance
35+
36+
@classmethod
37+
def replace_instance(cls, catalog: Catalog):
38+
"""
39+
Replaces the existing Iceberg catalog instance.
40+
- This method is useful for testing or dynamically updating the catalog.
41+
:param catalog: the new Iceberg catalog instance to replace the current one.
42+
"""
43+
cls._instance = catalog

0 commit comments

Comments
 (0)