diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java index 39c2011bf486..03642b0e5547 100644 --- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java +++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java @@ -53,6 +53,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledIfSystemProperty; import org.junit.jupiter.api.condition.EnabledIfSystemProperty; import java.nio.file.Files; @@ -356,6 +357,7 @@ public void testPKDeletionVectorWriteMultiBatchRawConvertable() throws Exception @Test @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true") + @DisabledIfSystemProperty(named = "python.version", matches = "3.6") public void testReadPkTable() throws Exception { Identifier identifier = identifier("mixed_test_pk_tablep_parquet"); Table table = catalog.getTable(identifier); diff --git a/paimon-lance/src/test/java/org/apache/paimon/JavaPyLanceE2ETest.java b/paimon-lance/src/test/java/org/apache/paimon/JavaPyLanceE2ETest.java index 290193340a42..a431657b95e4 100644 --- a/paimon-lance/src/test/java/org/apache/paimon/JavaPyLanceE2ETest.java +++ b/paimon-lance/src/test/java/org/apache/paimon/JavaPyLanceE2ETest.java @@ -51,6 +51,7 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledIfSystemProperty; import org.junit.jupiter.api.condition.EnabledIfSystemProperty; import java.nio.file.Files; @@ -260,19 +261,9 @@ public void testJavaWriteReadPkTableLance() throws Exception { @Test @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true") + @DisabledIfSystemProperty(named = "python.version", matches = "3.6") public void testReadPkTableLance() throws Exception { try { - // Known issue: Reading Python-written Lance files in Java causes JVM crash due to - // missing Tokio runtime. The error is: - // "there is no reactor running, must be called from the context of a Tokio 1.x runtime" - // - // This is a limitation of lance-core Java bindings. The Rust native library requires - // Tokio runtime for certain operations when reading files written by Python (which may - // use different encoding formats). Java-written files can be read successfully because - // they use synchronous APIs that don't require Tokio. - // - // Workaround: Try to "warm up" Tokio runtime by reading a Java-written file first. - // This may initialize the Tokio runtime if it's created on first use. try { Identifier warmupIdentifier = identifier("mixed_test_pk_tablej_lance"); try { diff --git a/paimon-python/dev/lint-python.sh b/paimon-python/dev/lint-python.sh index 469ee56c9d95..d174b120ad4f 100755 --- a/paimon-python/dev/lint-python.sh +++ b/paimon-python/dev/lint-python.sh @@ -203,10 +203,6 @@ function mixed_check() { # Get Python version PYTHON_VERSION=$(python -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')") echo "Detected Python version: $PYTHON_VERSION" - if [ "$PYTHON_VERSION" = "3.6" ]; then - print_function "STAGE" "mixed tests checks... [SKIPPED]" - return - fi print_function "STAGE" "mixed tests checks" # Path to the mixed tests script diff --git a/paimon-python/dev/run_mixed_tests.sh b/paimon-python/dev/run_mixed_tests.sh index 337b694e0c7c..38a0ada6c228 100755 --- a/paimon-python/dev/run_mixed_tests.sh +++ b/paimon-python/dev/run_mixed_tests.sh @@ -138,11 +138,14 @@ run_java_read_test() { cd "$PROJECT_ROOT" + PYTHON_VERSION=$(python -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')" 2>/dev/null || echo "unknown") + echo "Detected Python version: $PYTHON_VERSION" + # Run Java test for parquet format in paimon-core echo "Running Maven test for JavaPyE2ETest.testReadPkTable (Java Read Parquet)..." echo "Note: Maven may download dependencies on first run, this may take a while..." local parquet_result=0 - if mvn test -Dtest=org.apache.paimon.JavaPyE2ETest#testReadPkTable -pl paimon-core -Drun.e2e.tests=true; then + if mvn test -Dtest=org.apache.paimon.JavaPyE2ETest#testReadPkTable -pl paimon-core -Drun.e2e.tests=true -Dpython.version="$PYTHON_VERSION"; then echo -e "${GREEN}✓ Java read parquet test completed successfully${NC}" else echo -e "${RED}✗ Java read parquet test failed${NC}" @@ -155,7 +158,7 @@ run_java_read_test() { echo "Running Maven test for JavaPyLanceE2ETest.testReadPkTableLance (Java Read Lance)..." echo "Note: Maven may download dependencies on first run, this may take a while..." local lance_result=0 - if mvn test -Dtest=org.apache.paimon.JavaPyLanceE2ETest#testReadPkTableLance -pl paimon-lance -Drun.e2e.tests=true; then + if mvn test -Dtest=org.apache.paimon.JavaPyLanceE2ETest#testReadPkTableLance -pl paimon-lance -Drun.e2e.tests=true -Dpython.version="$PYTHON_VERSION"; then echo -e "${GREEN}✓ Java read lance test completed successfully${NC}" else echo -e "${RED}✗ Java read lance test failed${NC}" diff --git a/paimon-python/pypaimon/__init__.py b/paimon-python/pypaimon/__init__.py index 5313e8e18aaf..024f8b732bf3 100644 --- a/paimon-python/pypaimon/__init__.py +++ b/paimon-python/pypaimon/__init__.py @@ -15,6 +15,13 @@ # specific language governing permissions and limitations # under the License. +import sys +if sys.version_info[:2] == (3, 6): + try: + from pypaimon.manifest import fastavro_py36_compat # noqa: F401 + except ImportError: + pass + from pypaimon.catalog.catalog_factory import CatalogFactory from pypaimon.filesystem.pvfs import PaimonVirtualFileSystem from pypaimon.schema.schema import Schema diff --git a/paimon-python/pypaimon/manifest/__init__.py b/paimon-python/pypaimon/manifest/__init__.py index 65b48d4d79b4..4b50145a36c5 100644 --- a/paimon-python/pypaimon/manifest/__init__.py +++ b/paimon-python/pypaimon/manifest/__init__.py @@ -15,3 +15,12 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ + +# Apply fastavro Python 3.6 compatibility patch early, before any other +# manifest modules are imported that might use fastavro +import sys +if sys.version_info[:2] == (3, 6): + try: + from pypaimon.manifest import fastavro_py36_compat # noqa: F401 + except ImportError: + pass diff --git a/paimon-python/pypaimon/manifest/fastavro_py36_compat.py b/paimon-python/pypaimon/manifest/fastavro_py36_compat.py new file mode 100644 index 000000000000..f6f509fc0ec5 --- /dev/null +++ b/paimon-python/pypaimon/manifest/fastavro_py36_compat.py @@ -0,0 +1,77 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +""" +Provides compatibility patches for fastavro on Python 3.6, +specifically for handling zstd-compressed Avro files. + +The main issue addressed is: +- On Python 3.6, fastavro's zstd decompression may fail with: + "zstd.ZstdError: could not determine content size in frame header" + +This module patches fastavro's zstd handling to use a more compatible +decompression method that works on Python 3.6. +""" + +import sys + +_patch_applied = False + + +def _apply_zstd_patch(): + global _patch_applied + if _patch_applied or sys.version_info[:2] != (3, 6): + return + + try: + import fastavro._read as fastavro_read + import zstandard as zstd + from io import BytesIO + except (ImportError, AttributeError): + return + + def _fixed_zstandard_read_block(decoder): + from fastavro._read import read_long + + length = read_long(decoder) + + if hasattr(decoder, 'read_fixed'): + data = decoder.read_fixed(length) + elif hasattr(decoder, 'read'): + data = decoder.read(length) + else: + from fastavro._read import read_fixed + data = read_fixed(decoder, length) + + decompressor = zstd.ZstdDecompressor() + with decompressor.stream_reader(BytesIO(data)) as reader: + decompressed = reader.read() + return BytesIO(decompressed) + + if hasattr(fastavro_read, 'BLOCK_READERS'): + block_readers = fastavro_read.BLOCK_READERS + block_readers['zstandard'] = _fixed_zstandard_read_block + block_readers['zstd'] = _fixed_zstandard_read_block + _patch_applied = True + + +if sys.version_info[:2] == (3, 6): + try: + _apply_zstd_patch() + except ImportError: + pass diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py index b88224784c4a..7a194574f5e5 100644 --- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py +++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py @@ -17,6 +17,7 @@ ################################################################################ import os +import sys import unittest import pandas as pd @@ -25,6 +26,19 @@ from pypaimon.catalog.catalog_factory import CatalogFactory from pypaimon.schema.schema import Schema +if sys.version_info[:2] == (3, 6): + from pypaimon.tests.py36.pyarrow_compat import table_sort_by +else: + def table_sort_by(table: pa.Table, column_name: str, order: str = 'ascending') -> pa.Table: + return table.sort_by([(column_name, order)]) + + +def get_file_format_params(): + if sys.version_info[:2] == (3, 6): + return [('parquet',)] + else: + return [('parquet',), ('lance',)] + class JavaPyReadWriteTest(unittest.TestCase): @classmethod @@ -89,11 +103,13 @@ def test_read_append_table(self): res = table_read.to_pandas(table_scan.plan().splits()) print(res) - @parameterized.expand([ - ('parquet',), - ('lance',), - ]) + @parameterized.expand(get_file_format_params()) def test_py_write_read_pk_table(self, file_format): + if sys.version_info[:2] == (3, 6): + self.skipTest( + "Skipping on Python 3.6 due to PyArrow compatibility issue (RecordBatch.add_column not available). " + "Will be fixed in next PR." + ) pa_schema = pa.schema([ ('id', pa.int32()), ('name', pa.string()), @@ -150,10 +166,7 @@ def test_py_write_read_pk_table(self, file_format): actual_names = set(initial_result['name'].tolist()) self.assertEqual(actual_names, expected_names) - @parameterized.expand([ - ('parquet',), - ('lance',), - ]) + @parameterized.expand(get_file_format_params()) def test_read_pk_table(self, file_format): # For parquet, read from Java-written table (no format suffix) # For lance, read from Java-written table (with format suffix) @@ -196,7 +209,7 @@ def test_pk_dv_read(self): read_builder = table.new_read_builder() table_read = read_builder.new_read() splits = read_builder.new_scan().plan().splits() - actual = table_read.to_arrow(splits).sort_by('pt') + actual = table_sort_by(table_read.to_arrow(splits), 'pt') expected = pa.Table.from_pydict({ 'pt': [1, 2, 2], 'a': [10, 21, 22], @@ -219,7 +232,7 @@ def test_pk_dv_read_multi_batch(self): read_builder = table.new_read_builder() table_read = read_builder.new_read() splits = read_builder.new_scan().plan().splits() - actual = table_read.to_arrow(splits).sort_by('pt') + actual = table_sort_by(table_read.to_arrow(splits), 'pt') expected = pa.Table.from_pydict({ 'pt': [1] * 9999, 'a': [i * 10 for i in range(1, 10001) if i * 10 != 81930], @@ -242,7 +255,7 @@ def test_pk_dv_read_multi_batch_raw_convertable(self): read_builder = table.new_read_builder() table_read = read_builder.new_read() splits = read_builder.new_scan().plan().splits() - actual = table_read.to_arrow(splits).sort_by('pt') + actual = table_sort_by(table_read.to_arrow(splits), 'pt') expected = pa.Table.from_pydict({ 'pt': [1] * 9999, 'a': [i * 10 for i in range(1, 10001) if i * 10 != 81930],