Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@

import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
import static org.apache.paimon.CoreOptions.MANIFEST_COMPRESSION;
import static org.apache.paimon.CoreOptions.TARGET_FILE_SIZE;
import static org.apache.paimon.data.DataFormatTestUtil.internalRowToString;
import static org.apache.paimon.table.SimpleTableTestBase.getResult;
Expand Down Expand Up @@ -379,6 +380,39 @@ public void testReadPkTable() throws Exception {
"6, Beef, Meat, 8.0");
}

@Test
@EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
public void testJavaWriteZstdManifestTable() throws Exception {
Identifier identifier = identifier("zstd_manifest_test_table");

Schema schema =
Schema.newBuilder()
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.column("value", DataTypes.DOUBLE())
.option(MANIFEST_COMPRESSION.key(), "zstd")
.build();

catalog.createTable(identifier, schema, true);
Table table = catalog.getTable(identifier);
FileStoreTable fileStoreTable = (FileStoreTable) table;

String manifestCompression = fileStoreTable.coreOptions().manifestCompression();
assertThat(manifestCompression).isEqualTo("zstd");

try (StreamTableWrite write = fileStoreTable.newWrite(commitUser);
InnerTableCommit commit = fileStoreTable.newCommit(commitUser)) {

write.write(GenericRow.of(1, BinaryString.fromString("test1"), 1.1));
write.write(GenericRow.of(2, BinaryString.fromString("test2"), 2.2));
write.write(GenericRow.of(3, BinaryString.fromString("test3"), 3.3));

commit.commit(0, write.prepareCommit(true, 0));
}

assertThat(fileStoreTable.newSnapshotReader().read().dataSplits()).hasSizeGreaterThan(0);
}

// Helper method from TableTestBase
protected Identifier identifier(String tableName) {
return new Identifier(database, tableName);
Expand Down
4 changes: 0 additions & 4 deletions paimon-python/dev/lint-python.sh
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,6 @@ function mixed_check() {
# Get Python version
PYTHON_VERSION=$(python -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')")
echo "Detected Python version: $PYTHON_VERSION"
if [ "$PYTHON_VERSION" = "3.6" ]; then
print_function "STAGE" "mixed tests checks... [SKIPPED]"
return
fi
print_function "STAGE" "mixed tests checks"

# Path to the mixed tests script
Expand Down
88 changes: 82 additions & 6 deletions paimon-python/dev/run_mixed_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -192,13 +192,77 @@ run_pk_dv_test() {
return 1
fi
}

# Function to run zstd manifest e2e test
run_zstd_manifest_test() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to create this, by default, manifest is compressed by zstd.

echo -e "${YELLOW}=== Step 6: Running Zstd Manifest Compression Test (Python 3.6 Compatibility) ===${NC}"

cd "$PROJECT_ROOT"

echo "Running Maven test for JavaPyE2ETest.testJavaWriteZstdManifestTable..."
local java_result=0
if mvn test -Dtest=org.apache.paimon.JavaPyE2ETest#testJavaWriteZstdManifestTable -pl paimon-core -Drun.e2e.tests=true; then
echo -e "${GREEN}✓ Java zstd manifest test completed successfully${NC}"
else
echo -e "${RED}✗ Java zstd manifest test failed${NC}"
java_result=1
fi

echo ""

cd "$PAIMON_PYTHON_DIR"
echo "Running Python test for JavaPyReadWriteTest.test_read_zstd_manifest_table..."
local python_result=0
if python -m pytest java_py_read_write_test.py::JavaPyReadWriteTest::test_read_zstd_manifest_table -v; then
echo -e "${GREEN}✓ Python zstd manifest test completed successfully${NC}"
else
echo -e "${RED}✗ Python zstd manifest test failed${NC}"
python_result=1
fi

if [[ $java_result -eq 0 && $python_result -eq 0 ]]; then
return 0
else
return 1
fi
}
# Main execution
main() {
PYTHON_VERSION=$(python -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')" 2>/dev/null || echo "unknown")

# For Python 3.6, only run zstd manifest test
if [ "$PYTHON_VERSION" = "3.6" ]; then
echo -e "${YELLOW}Python 3.6 detected: Running only Zstd Manifest Test${NC}"
echo ""

local zstd_manifest_result=0
if ! run_zstd_manifest_test; then
zstd_manifest_result=1
fi

echo ""
echo -e "${YELLOW}=== Test Results Summary ===${NC}"
if [[ $zstd_manifest_result -eq 0 ]]; then
echo -e "${GREEN}✓ Zstd Manifest Test (Python 3.6 Compatibility): PASSED${NC}"
echo ""
cleanup_warehouse
echo -e "${GREEN}🎉 All tests passed!${NC}"
return 0
else
echo -e "${RED}✗ Zstd Manifest Test (Python 3.6 Compatibility): FAILED${NC}"
echo ""
cleanup_warehouse
return 1
fi
fi

# For other Python versions, run all tests
local java_write_result=0
local python_read_result=0
local python_write_result=0
local java_read_result=0
local pk_dv_result=0
local zstd_manifest_result=0

echo -e "${YELLOW}Starting mixed language test execution...${NC}"
echo ""
Expand Down Expand Up @@ -241,6 +305,12 @@ main() {
if ! run_pk_dv_test; then
pk_dv_result=1
fi

# Run zstd manifest test
if ! run_zstd_manifest_test; then
zstd_manifest_result=1
fi

echo ""
echo -e "${YELLOW}=== Test Results Summary ===${NC}"

Expand Down Expand Up @@ -268,18 +338,24 @@ main() {
echo -e "${RED}✗ Java Read Test (Parquet + Lance): FAILED${NC}"
fi

if [[ $pk_dv_result -eq 0 ]]; then
echo -e "${GREEN}✓ PK DV Test (JavaPyReadWriteTest.testPKDeletionVectorWriteRead): PASSED${NC}"
else
echo -e "${RED}✗ PK DV Test (JavaPyReadWriteTest.testPKDeletionVectorWriteRead): FAILED${NC}"
fi
if [[ $pk_dv_result -eq 0 ]]; then
echo -e "${GREEN}✓ PK DV Test (JavaPyReadWriteTest.testPKDeletionVectorWriteRead): PASSED${NC}"
else
echo -e "${RED}✗ PK DV Test (JavaPyReadWriteTest.testPKDeletionVectorWriteRead): FAILED${NC}"
fi

if [[ $zstd_manifest_result -eq 0 ]]; then
echo -e "${GREEN}✓ Zstd Manifest Test (Python 3.6 Compatibility): PASSED${NC}"
else
echo -e "${RED}✗ Zstd Manifest Test (Python 3.6 Compatibility): FAILED${NC}"
fi

echo ""

# Clean up warehouse directory after all tests
cleanup_warehouse

if [[ $java_write_result -eq 0 && $python_read_result -eq 0 && $python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 ]]; then
if [[ $java_write_result -eq 0 && $python_read_result -eq 0 && $python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 && $zstd_manifest_result -eq 0 ]]; then
echo -e "${GREEN}🎉 All tests passed! Java-Python interoperability verified.${NC}"
return 0
else
Expand Down
7 changes: 7 additions & 0 deletions paimon-python/pypaimon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
# specific language governing permissions and limitations
# under the License.

import sys
if sys.version_info[:2] == (3, 6):
try:
from pypaimon.manifest import fastavro_py36_compat # noqa: F401
except ImportError:
pass

from pypaimon.catalog.catalog_factory import CatalogFactory
from pypaimon.filesystem.pvfs import PaimonVirtualFileSystem
from pypaimon.schema.schema import Schema
Expand Down
9 changes: 9 additions & 0 deletions paimon-python/pypaimon/manifest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# Apply fastavro Python 3.6 compatibility patch early, before any other
# manifest modules are imported that might use fastavro
import sys
if sys.version_info[:2] == (3, 6):
try:
from pypaimon.manifest import fastavro_py36_compat # noqa: F401
except ImportError:
pass
77 changes: 77 additions & 0 deletions paimon-python/pypaimon/manifest/fastavro_py36_compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

"""
Provides compatibility patches for fastavro on Python 3.6,
specifically for handling zstd-compressed Avro files.

The main issue addressed is:
- On Python 3.6, fastavro's zstd decompression may fail with:
"zstd.ZstdError: could not determine content size in frame header"

This module patches fastavro's zstd handling to use a more compatible
decompression method that works on Python 3.6.
"""

import sys

_patch_applied = False


def _apply_zstd_patch():
global _patch_applied
if _patch_applied or sys.version_info[:2] != (3, 6):
return

try:
import fastavro._read as fastavro_read
import zstandard as zstd
from io import BytesIO
except (ImportError, AttributeError):
return

def _fixed_zstandard_read_block(decoder):
from fastavro._read import read_long

length = read_long(decoder)

if hasattr(decoder, 'read_fixed'):
data = decoder.read_fixed(length)
elif hasattr(decoder, 'read'):
data = decoder.read(length)
else:
from fastavro._read import read_fixed
data = read_fixed(decoder, length)

decompressor = zstd.ZstdDecompressor()
with decompressor.stream_reader(BytesIO(data)) as reader:
decompressed = reader.read()
return BytesIO(decompressed)

if hasattr(fastavro_read, 'BLOCK_READERS'):
block_readers = fastavro_read.BLOCK_READERS
block_readers['zstandard'] = _fixed_zstandard_read_block
block_readers['zstd'] = _fixed_zstandard_read_block
_patch_applied = True


if sys.version_info[:2] == (3, 6):
try:
_apply_zstd_patch()
except ImportError:
pass
24 changes: 24 additions & 0 deletions paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,27 @@ def test_pk_dv_read_multi_batch_raw_convertable(self):
'b': [i * 100 for i in range(1, 10001) if i * 10 != 81930]
}, schema=pa_schema)
self.assertEqual(expected, actual)

def test_read_zstd_manifest_table(self):
table_name = 'default.zstd_manifest_test_table'

try:
table = self.catalog.get_table(table_name)
except Exception as e:
self.fail(
f"Failed to get table {table_name}. "
f"Make sure Java test (JavaPyE2ETest.testJavaWriteZstdManifestTable) "
f"has been run first. Error: {e}"
)

read_builder = table.new_read_builder()
table_scan = read_builder.new_scan()
table_read = read_builder.new_read()

splits = table_scan.plan().splits()
result = table_read.to_pandas(splits)

self.assertEqual(len(result), 3)
expected_ids = {1, 2, 3}
actual_ids = set(result['id'].tolist())
self.assertEqual(actual_ids, expected_ids)
Loading