Skip to content

Commit 560fc28

Browse files
authored
feat(delta-table): fix pyo3_runtime.PanicException on linux (#545)
1 parent 47e8d65 commit 560fc28

File tree

5 files changed

+66
-44
lines changed

5 files changed

+66
-44
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## 1.0.46-dev0
2+
3+
* **Fix delta-table `pyo3_runtime.PanicException: Forked process detected` on Linux**
4+
15
## 1.0.45
26

37
### Fixes

requirements/connectors/delta-table.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@ pandas
22
deltalake
33
boto3
44
tenacity
5+
pyarrow

test/integration/connectors/test_s3.py

Lines changed: 18 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -76,18 +76,15 @@ async def test_s3_source(anon_connection_config: S3ConnectionConfig):
7676
),
7777
)
7878

79+
7980
@pytest.mark.asyncio
8081
@pytest.mark.tags(CONNECTOR_TYPE, SOURCE_TAG, BLOB_STORAGE_TAG)
8182
async def test_s3_source_special_char(anon_connection_config: S3ConnectionConfig):
82-
indexer_config = S3IndexerConfig(
83-
remote_url="s3://utic-dev-tech-fixtures/special-characters/"
84-
)
83+
indexer_config = S3IndexerConfig(remote_url="s3://utic-dev-tech-fixtures/special-characters/")
8584
with tempfile.TemporaryDirectory() as tempdir:
8685
tempdir_path = Path(tempdir)
8786
download_config = S3DownloaderConfig(download_dir=tempdir_path)
88-
indexer = S3Indexer(
89-
connection_config=anon_connection_config, index_config=indexer_config
90-
)
87+
indexer = S3Indexer(connection_config=anon_connection_config, index_config=indexer_config)
9188
downloader = S3Downloader(
9289
connection_config=anon_connection_config, download_config=download_config
9390
)
@@ -108,26 +105,23 @@ async def test_s3_source_special_char(anon_connection_config: S3ConnectionConfig
108105
),
109106
)
110107

108+
111109
@pytest.mark.tags(CONNECTOR_TYPE, SOURCE_TAG, BLOB_STORAGE_TAG)
112110
def test_s3_source_no_access(anon_connection_config: S3ConnectionConfig):
113-
indexer_config = S3IndexerConfig(
114-
remote_url="s3://utic-ingest-test-fixtures/destination/"
115-
)
116-
indexer = S3Indexer(
117-
connection_config=anon_connection_config, index_config=indexer_config
118-
)
111+
indexer_config = S3IndexerConfig(remote_url="s3://utic-ingest-test-fixtures/destination/")
112+
indexer = S3Indexer(connection_config=anon_connection_config, index_config=indexer_config)
119113
with pytest.raises(UserAuthError):
120114
indexer.precheck()
121115

116+
122117
@pytest.mark.tags(CONNECTOR_TYPE, SOURCE_TAG, BLOB_STORAGE_TAG)
123118
def test_s3_source_no_bucket(anon_connection_config: S3ConnectionConfig):
124119
indexer_config = S3IndexerConfig(remote_url="s3://fake-bucket")
125-
indexer = S3Indexer(
126-
connection_config=anon_connection_config, index_config=indexer_config
127-
)
120+
indexer = S3Indexer(connection_config=anon_connection_config, index_config=indexer_config)
128121
with pytest.raises(UserError):
129122
indexer.precheck()
130123

124+
131125
@pytest.mark.asyncio
132126
@pytest.mark.tags(CONNECTOR_TYPE, SOURCE_TAG, "minio", BLOB_STORAGE_TAG)
133127
async def test_s3_minio_source(anon_connection_config: S3ConnectionConfig):
@@ -139,9 +133,7 @@ async def test_s3_minio_source(anon_connection_config: S3ConnectionConfig):
139133
):
140134
tempdir_path = Path(tempdir)
141135
download_config = S3DownloaderConfig(download_dir=tempdir_path)
142-
indexer = S3Indexer(
143-
connection_config=anon_connection_config, index_config=indexer_config
144-
)
136+
indexer = S3Indexer(connection_config=anon_connection_config, index_config=indexer_config)
145137
downloader = S3Downloader(
146138
connection_config=anon_connection_config, download_config=download_config
147139
)
@@ -167,13 +159,15 @@ async def test_s3_minio_source(anon_connection_config: S3ConnectionConfig):
167159
),
168160
)
169161

162+
170163
def get_aws_credentials() -> dict:
171164
access_key = os.getenv("S3_INGEST_TEST_ACCESS_KEY", None)
172165
assert access_key
173166
secret_key = os.getenv("S3_INGEST_TEST_SECRET_KEY", None)
174167
assert secret_key
175168
return {"aws_access_key_id": access_key, "aws_secret_access_key": secret_key}
176169

170+
177171
@pytest.mark.asyncio
178172
@pytest.mark.tags(CONNECTOR_TYPE, DESTINATION_TAG, BLOB_STORAGE_TAG)
179173
@requires_env("S3_INGEST_TEST_ACCESS_KEY", "S3_INGEST_TEST_SECRET_KEY")
@@ -191,9 +185,7 @@ async def test_s3_destination(upload_file: Path):
191185
uploader = S3Uploader(connection_config=connection_config, upload_config=upload_config)
192186
s3fs = uploader.fs
193187
file_data = FileData(
194-
source_identifiers=SourceIdentifiers(
195-
fullpath=upload_file.name, filename=upload_file.name
196-
),
188+
source_identifiers=SourceIdentifiers(fullpath=upload_file.name, filename=upload_file.name),
197189
connector_type=CONNECTOR_TYPE,
198190
identifier="mock file data",
199191
)
@@ -204,14 +196,13 @@ async def test_s3_destination(upload_file: Path):
204196
else:
205197
uploader.run(path=upload_file, file_data=file_data)
206198
uploaded_files = [
207-
Path(file)
208-
for file in s3fs.ls(path=destination_path)
209-
if Path(file).name != "_empty"
199+
Path(file) for file in s3fs.ls(path=destination_path) if Path(file).name != "_empty"
210200
]
211201
assert len(uploaded_files) == 1
212202
finally:
213203
s3fs.rm(path=destination_path, recursive=True)
214204

205+
215206
@pytest.mark.asyncio
216207
@pytest.mark.tags(CONNECTOR_TYPE, DESTINATION_TAG, BLOB_STORAGE_TAG)
217208
@requires_env("S3_INGEST_TEST_ACCESS_KEY", "S3_INGEST_TEST_SECRET_KEY")
@@ -252,14 +243,13 @@ async def test_s3_destination_same_filename_different_folders(upload_file: Path)
252243
uploader.run(path=upload_file, file_data=file_data_1)
253244
uploader.run(path=upload_file, file_data=file_data_2)
254245
uploaded_files = [
255-
Path(file)
256-
for file in s3fs.ls(path=destination_path)
257-
if Path(file).name != "_empty"
246+
Path(file) for file in s3fs.ls(path=destination_path) if Path(file).name != "_empty"
258247
]
259248
assert len(uploaded_files) == 2
260249
finally:
261250
s3fs.rm(path=destination_path, recursive=True)
262251

252+
263253
@pytest.mark.asyncio
264254
@pytest.mark.tags(CONNECTOR_TYPE, DESTINATION_TAG, BLOB_STORAGE_TAG)
265255
@requires_env("S3_INGEST_TEST_ACCESS_KEY", "S3_INGEST_TEST_SECRET_KEY")
@@ -292,9 +282,7 @@ async def test_s3_destination_different_relative_path_and_full_path(upload_file:
292282
else:
293283
uploader.run(path=upload_file, file_data=file_data)
294284
uploaded_files = [
295-
Path(file)
296-
for file in s3fs.ls(path=destination_path)
297-
if Path(file).name != "_empty"
285+
Path(file) for file in s3fs.ls(path=destination_path) if Path(file).name != "_empty"
298286
]
299287
assert len(uploaded_files) == 1
300288
assert uploaded_files[0].as_posix() == f"{destination_path.lstrip('s3://')}/folder1"

unstructured_ingest/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "1.0.45" # pragma: no cover
1+
__version__ = "1.0.46-dev0" # pragma: no cover

unstructured_ingest/processes/connectors/delta_table.py

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import logging
22
import traceback
33
from dataclasses import dataclass, field
4-
from multiprocessing import Process, Queue, current_process
4+
from multiprocessing import Queue, current_process
55
from pathlib import Path
66
from typing import TYPE_CHECKING, Any, Optional
77
from urllib.parse import urlparse
@@ -186,10 +186,25 @@ def upload_dataframe(self, df: "DataFrame", file_data: FileData) -> None:
186186
)
187187

188188
def _is_commit_conflict(exc: BaseException) -> bool: # noqa: ANN401
189-
"""Return True if exception looks like a Delta Lake commit conflict."""
190-
191-
return isinstance(exc, RuntimeError) and (
192-
"CommitFailed" in str(exc) or "Metadata changed" in str(exc)
189+
"""Return True if exception looks like a Delta Lake commit conflict.
190+
191+
Besides the canonical *CommitFailed* / *Metadata changed* errors that
192+
deltalake surfaces when two writers clash, we occasionally hit
193+
messages such as *Delta transaction failed, version 0 already
194+
exists* while multiple processes race to create the very first log
195+
entry. These situations are equally safe to retry, so detect them
196+
too.
197+
"""
198+
199+
return isinstance(exc, RuntimeError) and any(
200+
marker in str(exc)
201+
for marker in (
202+
"CommitFailed",
203+
"Metadata changed",
204+
"version 0 already exists",
205+
"version already exists",
206+
"Delta transaction failed",
207+
)
193208
)
194209

195210
@retry(
@@ -206,25 +221,39 @@ def _single_attempt() -> None:
206221
# cause ingest to fail, even though all tasks are completed normally. Putting the writer
207222
# into a process mitigates this issue by ensuring python interpreter waits properly for
208223
# deltalake's rust backend to finish
209-
queue: Queue[str] = Queue()
224+
# Use a multiprocessing context that relies on 'spawn' to avoid inheriting the
225+
# parent process' Tokio runtime, which leads to `pyo3_runtime.PanicException`.
226+
from multiprocessing import get_context
227+
228+
ctx = get_context("spawn")
229+
queue: "Queue[str]" = ctx.Queue()
210230

211231
if current_process().daemon:
212232
# write_deltalake_with_error_handling will push any traceback to our queue
213233
write_deltalake_with_error_handling(queue=queue, **writer_kwargs)
214234
else:
215-
# On non-daemon processes we still guard against SIGABRT by running in a subprocess.
216-
writer = Process(
235+
# On non-daemon processes we still guard against SIGABRT by running in a
236+
# dedicated subprocess created via the 'spawn' method.
237+
writer = ctx.Process(
217238
target=write_deltalake_with_error_handling,
218239
kwargs={"queue": queue, **writer_kwargs},
219240
)
220241
writer.start()
221242
writer.join()
222243

223-
# Check if the queue has any exception message
224-
if not queue.empty():
225-
error_message = queue.get()
226-
logger.error("Exception occurred in write_deltalake: %s", error_message)
227-
raise RuntimeError(f"Error in write_deltalake: {error_message}")
244+
# First surface any traceback captured inside the subprocess so users see the real
245+
# root-cause instead of a generic non-zero exit code.
246+
if not queue.empty():
247+
error_message = queue.get()
248+
logger.error("Exception occurred in write_deltalake: %s", error_message)
249+
raise RuntimeError(f"Error in write_deltalake: {error_message}")
250+
251+
# If the subprocess terminated abnormally but produced no traceback (e.g., SIGABRT),
252+
# still raise a helpful error for callers.
253+
if not current_process().daemon and writer.exitcode != 0:
254+
raise RuntimeError(
255+
f"write_deltalake subprocess exited with code {writer.exitcode}"
256+
)
228257

229258
_single_attempt()
230259

0 commit comments

Comments
 (0)