Skip to content

Commit 17727e7

Browse files
[python] Fix zstd manifest decompression error on Python 3.6 (#6982)
1 parent fa0d974 commit 17727e7

File tree

8 files changed

+126
-28
lines changed

8 files changed

+126
-28
lines changed

paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353

5454
import org.junit.jupiter.api.BeforeEach;
5555
import org.junit.jupiter.api.Test;
56+
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
5657
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
5758

5859
import java.nio.file.Files;
@@ -356,6 +357,7 @@ public void testPKDeletionVectorWriteMultiBatchRawConvertable() throws Exception
356357

357358
@Test
358359
@EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
360+
@DisabledIfSystemProperty(named = "python.version", matches = "3.6")
359361
public void testReadPkTable() throws Exception {
360362
Identifier identifier = identifier("mixed_test_pk_tablep_parquet");
361363
Table table = catalog.getTable(identifier);

paimon-lance/src/test/java/org/apache/paimon/JavaPyLanceE2ETest.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.junit.jupiter.api.BeforeAll;
5252
import org.junit.jupiter.api.BeforeEach;
5353
import org.junit.jupiter.api.Test;
54+
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
5455
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
5556

5657
import java.nio.file.Files;
@@ -260,19 +261,9 @@ public void testJavaWriteReadPkTableLance() throws Exception {
260261

261262
@Test
262263
@EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
264+
@DisabledIfSystemProperty(named = "python.version", matches = "3.6")
263265
public void testReadPkTableLance() throws Exception {
264266
try {
265-
// Known issue: Reading Python-written Lance files in Java causes JVM crash due to
266-
// missing Tokio runtime. The error is:
267-
// "there is no reactor running, must be called from the context of a Tokio 1.x runtime"
268-
//
269-
// This is a limitation of lance-core Java bindings. The Rust native library requires
270-
// Tokio runtime for certain operations when reading files written by Python (which may
271-
// use different encoding formats). Java-written files can be read successfully because
272-
// they use synchronous APIs that don't require Tokio.
273-
//
274-
// Workaround: Try to "warm up" Tokio runtime by reading a Java-written file first.
275-
// This may initialize the Tokio runtime if it's created on first use.
276267
try {
277268
Identifier warmupIdentifier = identifier("mixed_test_pk_tablej_lance");
278269
try {

paimon-python/dev/lint-python.sh

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -203,10 +203,6 @@ function mixed_check() {
203203
# Get Python version
204204
PYTHON_VERSION=$(python -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')")
205205
echo "Detected Python version: $PYTHON_VERSION"
206-
if [ "$PYTHON_VERSION" = "3.6" ]; then
207-
print_function "STAGE" "mixed tests checks... [SKIPPED]"
208-
return
209-
fi
210206
print_function "STAGE" "mixed tests checks"
211207

212208
# Path to the mixed tests script

paimon-python/dev/run_mixed_tests.sh

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,11 +138,14 @@ run_java_read_test() {
138138

139139
cd "$PROJECT_ROOT"
140140

141+
PYTHON_VERSION=$(python -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')" 2>/dev/null || echo "unknown")
142+
echo "Detected Python version: $PYTHON_VERSION"
143+
141144
# Run Java test for parquet format in paimon-core
142145
echo "Running Maven test for JavaPyE2ETest.testReadPkTable (Java Read Parquet)..."
143146
echo "Note: Maven may download dependencies on first run, this may take a while..."
144147
local parquet_result=0
145-
if mvn test -Dtest=org.apache.paimon.JavaPyE2ETest#testReadPkTable -pl paimon-core -Drun.e2e.tests=true; then
148+
if mvn test -Dtest=org.apache.paimon.JavaPyE2ETest#testReadPkTable -pl paimon-core -Drun.e2e.tests=true -Dpython.version="$PYTHON_VERSION"; then
146149
echo -e "${GREEN}✓ Java read parquet test completed successfully${NC}"
147150
else
148151
echo -e "${RED}✗ Java read parquet test failed${NC}"
@@ -155,7 +158,7 @@ run_java_read_test() {
155158
echo "Running Maven test for JavaPyLanceE2ETest.testReadPkTableLance (Java Read Lance)..."
156159
echo "Note: Maven may download dependencies on first run, this may take a while..."
157160
local lance_result=0
158-
if mvn test -Dtest=org.apache.paimon.JavaPyLanceE2ETest#testReadPkTableLance -pl paimon-lance -Drun.e2e.tests=true; then
161+
if mvn test -Dtest=org.apache.paimon.JavaPyLanceE2ETest#testReadPkTableLance -pl paimon-lance -Drun.e2e.tests=true -Dpython.version="$PYTHON_VERSION"; then
159162
echo -e "${GREEN}✓ Java read lance test completed successfully${NC}"
160163
else
161164
echo -e "${RED}✗ Java read lance test failed${NC}"

paimon-python/pypaimon/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,13 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18+
import sys
19+
if sys.version_info[:2] == (3, 6):
20+
try:
21+
from pypaimon.manifest import fastavro_py36_compat # noqa: F401
22+
except ImportError:
23+
pass
24+
1825
from pypaimon.catalog.catalog_factory import CatalogFactory
1926
from pypaimon.filesystem.pvfs import PaimonVirtualFileSystem
2027
from pypaimon.schema.schema import Schema

paimon-python/pypaimon/manifest/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,12 @@
1515
# See the License for the specific language governing permissions and
1616
# limitations under the License.
1717
################################################################################
18+
19+
# Apply fastavro Python 3.6 compatibility patch early, before any other
20+
# manifest modules are imported that might use fastavro
21+
import sys
22+
if sys.version_info[:2] == (3, 6):
23+
try:
24+
from pypaimon.manifest import fastavro_py36_compat # noqa: F401
25+
except ImportError:
26+
pass
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
19+
"""
20+
Provides compatibility patches for fastavro on Python 3.6,
21+
specifically for handling zstd-compressed Avro files.
22+
23+
The main issue addressed is:
24+
- On Python 3.6, fastavro's zstd decompression may fail with:
25+
"zstd.ZstdError: could not determine content size in frame header"
26+
27+
This module patches fastavro's zstd handling to use a more compatible
28+
decompression method that works on Python 3.6.
29+
"""
30+
31+
import sys
32+
33+
_patch_applied = False
34+
35+
36+
def _apply_zstd_patch():
37+
global _patch_applied
38+
if _patch_applied or sys.version_info[:2] != (3, 6):
39+
return
40+
41+
try:
42+
import fastavro._read as fastavro_read
43+
import zstandard as zstd
44+
from io import BytesIO
45+
except (ImportError, AttributeError):
46+
return
47+
48+
def _fixed_zstandard_read_block(decoder):
49+
from fastavro._read import read_long
50+
51+
length = read_long(decoder)
52+
53+
if hasattr(decoder, 'read_fixed'):
54+
data = decoder.read_fixed(length)
55+
elif hasattr(decoder, 'read'):
56+
data = decoder.read(length)
57+
else:
58+
from fastavro._read import read_fixed
59+
data = read_fixed(decoder, length)
60+
61+
decompressor = zstd.ZstdDecompressor()
62+
with decompressor.stream_reader(BytesIO(data)) as reader:
63+
decompressed = reader.read()
64+
return BytesIO(decompressed)
65+
66+
if hasattr(fastavro_read, 'BLOCK_READERS'):
67+
block_readers = fastavro_read.BLOCK_READERS
68+
block_readers['zstandard'] = _fixed_zstandard_read_block
69+
block_readers['zstd'] = _fixed_zstandard_read_block
70+
_patch_applied = True
71+
72+
73+
if sys.version_info[:2] == (3, 6):
74+
try:
75+
_apply_zstd_patch()
76+
except ImportError:
77+
pass

paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
################################################################################
1818

1919
import os
20+
import sys
2021
import unittest
2122

2223
import pandas as pd
@@ -25,6 +26,19 @@
2526
from pypaimon.catalog.catalog_factory import CatalogFactory
2627
from pypaimon.schema.schema import Schema
2728

29+
if sys.version_info[:2] == (3, 6):
30+
from pypaimon.tests.py36.pyarrow_compat import table_sort_by
31+
else:
32+
def table_sort_by(table: pa.Table, column_name: str, order: str = 'ascending') -> pa.Table:
33+
return table.sort_by([(column_name, order)])
34+
35+
36+
def get_file_format_params():
37+
if sys.version_info[:2] == (3, 6):
38+
return [('parquet',)]
39+
else:
40+
return [('parquet',), ('lance',)]
41+
2842

2943
class JavaPyReadWriteTest(unittest.TestCase):
3044
@classmethod
@@ -89,11 +103,13 @@ def test_read_append_table(self):
89103
res = table_read.to_pandas(table_scan.plan().splits())
90104
print(res)
91105

92-
@parameterized.expand([
93-
('parquet',),
94-
('lance',),
95-
])
106+
@parameterized.expand(get_file_format_params())
96107
def test_py_write_read_pk_table(self, file_format):
108+
if sys.version_info[:2] == (3, 6):
109+
self.skipTest(
110+
"Skipping on Python 3.6 due to PyArrow compatibility issue (RecordBatch.add_column not available). "
111+
"Will be fixed in next PR."
112+
)
97113
pa_schema = pa.schema([
98114
('id', pa.int32()),
99115
('name', pa.string()),
@@ -150,10 +166,7 @@ def test_py_write_read_pk_table(self, file_format):
150166
actual_names = set(initial_result['name'].tolist())
151167
self.assertEqual(actual_names, expected_names)
152168

153-
@parameterized.expand([
154-
('parquet',),
155-
('lance',),
156-
])
169+
@parameterized.expand(get_file_format_params())
157170
def test_read_pk_table(self, file_format):
158171
# For parquet, read from Java-written table (no format suffix)
159172
# For lance, read from Java-written table (with format suffix)
@@ -196,7 +209,7 @@ def test_pk_dv_read(self):
196209
read_builder = table.new_read_builder()
197210
table_read = read_builder.new_read()
198211
splits = read_builder.new_scan().plan().splits()
199-
actual = table_read.to_arrow(splits).sort_by('pt')
212+
actual = table_sort_by(table_read.to_arrow(splits), 'pt')
200213
expected = pa.Table.from_pydict({
201214
'pt': [1, 2, 2],
202215
'a': [10, 21, 22],
@@ -219,7 +232,7 @@ def test_pk_dv_read_multi_batch(self):
219232
read_builder = table.new_read_builder()
220233
table_read = read_builder.new_read()
221234
splits = read_builder.new_scan().plan().splits()
222-
actual = table_read.to_arrow(splits).sort_by('pt')
235+
actual = table_sort_by(table_read.to_arrow(splits), 'pt')
223236
expected = pa.Table.from_pydict({
224237
'pt': [1] * 9999,
225238
'a': [i * 10 for i in range(1, 10001) if i * 10 != 81930],
@@ -242,7 +255,7 @@ def test_pk_dv_read_multi_batch_raw_convertable(self):
242255
read_builder = table.new_read_builder()
243256
table_read = read_builder.new_read()
244257
splits = read_builder.new_scan().plan().splits()
245-
actual = table_read.to_arrow(splits).sort_by('pt')
258+
actual = table_sort_by(table_read.to_arrow(splits), 'pt')
246259
expected = pa.Table.from_pydict({
247260
'pt': [1] * 9999,
248261
'a': [i * 10 for i in range(1, 10001) if i * 10 != 81930],

0 commit comments

Comments
 (0)