Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;

import java.nio.file.Files;
Expand Down Expand Up @@ -356,6 +357,7 @@ public void testPKDeletionVectorWriteMultiBatchRawConvertable() throws Exception

@Test
@EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
@DisabledIfSystemProperty(named = "python.version", matches = "3.6")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why disable for 3.6?

Copy link
Contributor Author

@XiaoHongbo-Hope XiaoHongbo-Hope Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why disable for 3.6?

compatible issue in pk table for py36, will create another PR to fix it.

public void testReadPkTable() throws Exception {
Identifier identifier = identifier("mixed_test_pk_tablep_parquet");
Table table = catalog.getTable(identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;

import java.nio.file.Files;
Expand Down Expand Up @@ -260,19 +261,9 @@ public void testJavaWriteReadPkTableLance() throws Exception {

@Test
@EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
@DisabledIfSystemProperty(named = "python.version", matches = "3.6")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why disable for 3.6?

Copy link
Contributor Author

@XiaoHongbo-Hope XiaoHongbo-Hope Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why disable for 3.6?

py36 does not have pylance dependency

public void testReadPkTableLance() throws Exception {
try {
// Known issue: Reading Python-written Lance files in Java causes JVM crash due to
// missing Tokio runtime. The error is:
// "there is no reactor running, must be called from the context of a Tokio 1.x runtime"
//
// This is a limitation of lance-core Java bindings. The Rust native library requires
// Tokio runtime for certain operations when reading files written by Python (which may
// use different encoding formats). Java-written files can be read successfully because
// they use synchronous APIs that don't require Tokio.
//
// Workaround: Try to "warm up" Tokio runtime by reading a Java-written file first.
// This may initialize the Tokio runtime if it's created on first use.
try {
Identifier warmupIdentifier = identifier("mixed_test_pk_tablej_lance");
try {
Expand Down
4 changes: 0 additions & 4 deletions paimon-python/dev/lint-python.sh
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,6 @@ function mixed_check() {
# Get Python version
PYTHON_VERSION=$(python -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')")
echo "Detected Python version: $PYTHON_VERSION"
if [ "$PYTHON_VERSION" = "3.6" ]; then
print_function "STAGE" "mixed tests checks... [SKIPPED]"
return
fi
print_function "STAGE" "mixed tests checks"

# Path to the mixed tests script
Expand Down
7 changes: 5 additions & 2 deletions paimon-python/dev/run_mixed_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,14 @@ run_java_read_test() {

cd "$PROJECT_ROOT"

PYTHON_VERSION=$(python -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')" 2>/dev/null || echo "unknown")
echo "Detected Python version: $PYTHON_VERSION"

# Run Java test for parquet format in paimon-core
echo "Running Maven test for JavaPyE2ETest.testReadPkTable (Java Read Parquet)..."
echo "Note: Maven may download dependencies on first run, this may take a while..."
local parquet_result=0
if mvn test -Dtest=org.apache.paimon.JavaPyE2ETest#testReadPkTable -pl paimon-core -Drun.e2e.tests=true; then
if mvn test -Dtest=org.apache.paimon.JavaPyE2ETest#testReadPkTable -pl paimon-core -Drun.e2e.tests=true -Dpython.version="$PYTHON_VERSION"; then
echo -e "${GREEN}✓ Java read parquet test completed successfully${NC}"
else
echo -e "${RED}✗ Java read parquet test failed${NC}"
Expand All @@ -155,7 +158,7 @@ run_java_read_test() {
echo "Running Maven test for JavaPyLanceE2ETest.testReadPkTableLance (Java Read Lance)..."
echo "Note: Maven may download dependencies on first run, this may take a while..."
local lance_result=0
if mvn test -Dtest=org.apache.paimon.JavaPyLanceE2ETest#testReadPkTableLance -pl paimon-lance -Drun.e2e.tests=true; then
if mvn test -Dtest=org.apache.paimon.JavaPyLanceE2ETest#testReadPkTableLance -pl paimon-lance -Drun.e2e.tests=true -Dpython.version="$PYTHON_VERSION"; then
echo -e "${GREEN}✓ Java read lance test completed successfully${NC}"
else
echo -e "${RED}✗ Java read lance test failed${NC}"
Expand Down
7 changes: 7 additions & 0 deletions paimon-python/pypaimon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
# specific language governing permissions and limitations
# under the License.

import sys
if sys.version_info[:2] == (3, 6):
try:
from pypaimon.manifest import fastavro_py36_compat # noqa: F401
except ImportError:
pass

from pypaimon.catalog.catalog_factory import CatalogFactory
from pypaimon.filesystem.pvfs import PaimonVirtualFileSystem
from pypaimon.schema.schema import Schema
Expand Down
9 changes: 9 additions & 0 deletions paimon-python/pypaimon/manifest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# Apply fastavro Python 3.6 compatibility patch early, before any other
# manifest modules are imported that might use fastavro
import sys
if sys.version_info[:2] == (3, 6):
try:
from pypaimon.manifest import fastavro_py36_compat # noqa: F401
except ImportError:
pass
77 changes: 77 additions & 0 deletions paimon-python/pypaimon/manifest/fastavro_py36_compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

"""
Provides compatibility patches for fastavro on Python 3.6,
specifically for handling zstd-compressed Avro files.

The main issue addressed is:
- On Python 3.6, fastavro's zstd decompression may fail with:
"zstd.ZstdError: could not determine content size in frame header"

This module patches fastavro's zstd handling to use a more compatible
decompression method that works on Python 3.6.
"""

import sys

_patch_applied = False


def _apply_zstd_patch():
global _patch_applied
if _patch_applied or sys.version_info[:2] != (3, 6):
return

try:
import fastavro._read as fastavro_read
import zstandard as zstd
from io import BytesIO
except (ImportError, AttributeError):
return

def _fixed_zstandard_read_block(decoder):
from fastavro._read import read_long

length = read_long(decoder)

if hasattr(decoder, 'read_fixed'):
data = decoder.read_fixed(length)
elif hasattr(decoder, 'read'):
data = decoder.read(length)
else:
from fastavro._read import read_fixed
data = read_fixed(decoder, length)

decompressor = zstd.ZstdDecompressor()
with decompressor.stream_reader(BytesIO(data)) as reader:
decompressed = reader.read()
return BytesIO(decompressed)

if hasattr(fastavro_read, 'BLOCK_READERS'):
block_readers = fastavro_read.BLOCK_READERS
block_readers['zstandard'] = _fixed_zstandard_read_block
block_readers['zstd'] = _fixed_zstandard_read_block
_patch_applied = True


if sys.version_info[:2] == (3, 6):
try:
_apply_zstd_patch()
except ImportError:
pass
35 changes: 24 additions & 11 deletions paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
################################################################################

import os
import sys
import unittest

import pandas as pd
Expand All @@ -25,6 +26,19 @@
from pypaimon.catalog.catalog_factory import CatalogFactory
from pypaimon.schema.schema import Schema

if sys.version_info[:2] == (3, 6):
from pypaimon.tests.py36.pyarrow_compat import table_sort_by
else:
def table_sort_by(table: pa.Table, column_name: str, order: str = 'ascending') -> pa.Table:
return table.sort_by([(column_name, order)])


def get_file_format_params():
if sys.version_info[:2] == (3, 6):
return [('parquet',)]
else:
return [('parquet',), ('lance',)]


class JavaPyReadWriteTest(unittest.TestCase):
@classmethod
Expand Down Expand Up @@ -89,11 +103,13 @@ def test_read_append_table(self):
res = table_read.to_pandas(table_scan.plan().splits())
print(res)

@parameterized.expand([
('parquet',),
('lance',),
])
@parameterized.expand(get_file_format_params())
def test_py_write_read_pk_table(self, file_format):
if sys.version_info[:2] == (3, 6):
self.skipTest(
"Skipping on Python 3.6 due to PyArrow compatibility issue (RecordBatch.add_column not available). "
"Will be fixed in next PR."
)
pa_schema = pa.schema([
('id', pa.int32()),
('name', pa.string()),
Expand Down Expand Up @@ -150,10 +166,7 @@ def test_py_write_read_pk_table(self, file_format):
actual_names = set(initial_result['name'].tolist())
self.assertEqual(actual_names, expected_names)

@parameterized.expand([
('parquet',),
('lance',),
])
@parameterized.expand(get_file_format_params())
def test_read_pk_table(self, file_format):
# For parquet, read from Java-written table (no format suffix)
# For lance, read from Java-written table (with format suffix)
Expand Down Expand Up @@ -196,7 +209,7 @@ def test_pk_dv_read(self):
read_builder = table.new_read_builder()
table_read = read_builder.new_read()
splits = read_builder.new_scan().plan().splits()
actual = table_read.to_arrow(splits).sort_by('pt')
actual = table_sort_by(table_read.to_arrow(splits), 'pt')
expected = pa.Table.from_pydict({
'pt': [1, 2, 2],
'a': [10, 21, 22],
Expand All @@ -219,7 +232,7 @@ def test_pk_dv_read_multi_batch(self):
read_builder = table.new_read_builder()
table_read = read_builder.new_read()
splits = read_builder.new_scan().plan().splits()
actual = table_read.to_arrow(splits).sort_by('pt')
actual = table_sort_by(table_read.to_arrow(splits), 'pt')
expected = pa.Table.from_pydict({
'pt': [1] * 9999,
'a': [i * 10 for i in range(1, 10001) if i * 10 != 81930],
Expand All @@ -242,7 +255,7 @@ def test_pk_dv_read_multi_batch_raw_convertable(self):
read_builder = table.new_read_builder()
table_read = read_builder.new_read()
splits = read_builder.new_scan().plan().splits()
actual = table_read.to_arrow(splits).sort_by('pt')
actual = table_sort_by(table_read.to_arrow(splits), 'pt')
expected = pa.Table.from_pydict({
'pt': [1] * 9999,
'a': [i * 10 for i in range(1, 10001) if i * 10 != 81930],
Expand Down