Skip to content

Commit 878ca74

Browse files
fix: mitigate async logging problems (#307)
* Added better error catching in the async environment
1 parent 33ea2de commit 878ca74

File tree

3 files changed

+85
-26
lines changed

3 files changed

+85
-26
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
## 0.3.12-dev1
2+
3+
* **Bypass asyncio exception grouping to return more meaningful errors from OneDrive indexer**
4+
5+
16
## 0.3.12-dev0
27

38
### Fixes

unstructured_ingest/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.3.12-dev0" # pragma: no cover
1+
__version__ = "0.3.12-dev1" # pragma: no cover

unstructured_ingest/v2/processes/connectors/onedrive.py

Lines changed: 79 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
from __future__ import annotations
22

3+
import asyncio
34
import json
45
from dataclasses import dataclass
56
from pathlib import Path
67
from time import time
7-
from typing import TYPE_CHECKING, Any, Generator, Optional
8+
from typing import TYPE_CHECKING, Any, AsyncIterator, Generator, Iterator, Optional, TypeVar
89

910
from dateutil import parser
1011
from pydantic import Field, Secret
@@ -100,6 +101,27 @@ class OnedriveIndexerConfig(IndexerConfig):
100101
recursive: bool = False
101102

102103

104+
T = TypeVar("T")
105+
106+
107+
def async_iterable_to_sync_iterable(iterator: AsyncIterator[T]) -> Iterator[T]:
108+
# This version works on Python 3.9 by manually handling the async iteration.
109+
loop = asyncio.new_event_loop()
110+
asyncio.set_event_loop(loop)
111+
try:
112+
while True:
113+
try:
114+
# Instead of anext(iterator), we directly call __anext__().
115+
# __anext__ returns a coroutine that we must run until complete.
116+
future = iterator.__anext__()
117+
result = loop.run_until_complete(future)
118+
yield result
119+
except StopAsyncIteration:
120+
break
121+
finally:
122+
loop.close()
123+
124+
103125
@dataclass
104126
class OnedriveIndexer(Indexer):
105127
connection_config: OnedriveConnectionConfig
@@ -116,25 +138,32 @@ def precheck(self) -> None:
116138
logger.error(f"failed to validate connection: {e}", exc_info=True)
117139
raise SourceConnectionError(f"failed to validate connection: {e}")
118140

119-
def list_objects(self, folder: DriveItem, recursive: bool) -> list["DriveItem"]:
141+
def list_objects_sync(self, folder: DriveItem, recursive: bool) -> list["DriveItem"]:
120142
drive_items = folder.children.get().execute_query()
121143
files = [d for d in drive_items if d.is_file]
122144
if not recursive:
123145
return files
146+
124147
folders = [d for d in drive_items if d.is_folder]
125148
for f in folders:
126-
files.extend(self.list_objects(f, recursive))
149+
files.extend(self.list_objects_sync(f, recursive))
127150
return files
128151

129-
def get_root(self, client: "GraphClient") -> "DriveItem":
152+
async def list_objects(self, folder: "DriveItem", recursive: bool) -> list["DriveItem"]:
153+
return await asyncio.to_thread(self.list_objects_sync, folder, recursive)
154+
155+
def get_root_sync(self, client: "GraphClient") -> "DriveItem":
130156
root = client.users[self.connection_config.user_pname].drive.get().execute_query().root
131157
if fpath := self.index_config.path:
132158
root = root.get_by_path(fpath).get().execute_query()
133159
if root is None or not root.is_folder:
134160
raise ValueError(f"Unable to find directory, given: {fpath}")
135161
return root
136162

137-
def get_properties(self, drive_item: "DriveItem") -> dict:
163+
async def get_root(self, client: "GraphClient") -> "DriveItem":
164+
return await asyncio.to_thread(self.get_root_sync, client)
165+
166+
def get_properties_sync(self, drive_item: "DriveItem") -> dict:
138167
properties = drive_item.properties
139168
filtered_properties = {}
140169
for k, v in properties.items():
@@ -145,7 +174,10 @@ def get_properties(self, drive_item: "DriveItem") -> dict:
145174
pass
146175
return filtered_properties
147176

148-
def drive_item_to_file_data(self, drive_item: "DriveItem") -> FileData:
177+
async def get_properties(self, drive_item: "DriveItem") -> dict:
178+
return await asyncio.to_thread(self.get_properties_sync, drive_item)
179+
180+
def drive_item_to_file_data_sync(self, drive_item: "DriveItem") -> FileData:
149181
file_path = drive_item.parent_reference.path.split(":")[-1]
150182
file_path = file_path[1:] if file_path and file_path[0] == "/" else file_path
151183
filename = drive_item.name
@@ -176,17 +208,34 @@ def drive_item_to_file_data(self, drive_item: "DriveItem") -> FileData:
176208
"server_relative_path": server_path,
177209
},
178210
),
179-
additional_metadata=self.get_properties(drive_item=drive_item),
211+
additional_metadata=self.get_properties_sync(drive_item=drive_item),
180212
)
181213

182-
def run(self, **kwargs: Any) -> Generator[FileData, None, None]:
183-
client = self.connection_config.get_client()
184-
root = self.get_root(client=client)
185-
drive_items = self.list_objects(folder=root, recursive=self.index_config.recursive)
214+
async def drive_item_to_file_data(self, drive_item: "DriveItem") -> FileData:
215+
# Offload the file data creation if it's not guaranteed async
216+
return await asyncio.to_thread(self.drive_item_to_file_data_sync, drive_item)
217+
218+
async def _run_async(self, **kwargs: Any) -> AsyncIterator[FileData]:
219+
token_resp = await asyncio.to_thread(self.connection_config.get_token)
220+
if "error" in token_resp:
221+
raise SourceConnectionError(
222+
f"[{CONNECTOR_TYPE}]: {token_resp['error']} ({token_resp.get('error_description')})"
223+
)
224+
225+
client = await asyncio.to_thread(self.connection_config.get_client)
226+
root = await self.get_root(client=client)
227+
drive_items = await self.list_objects(folder=root, recursive=self.index_config.recursive)
228+
186229
for drive_item in drive_items:
187-
file_data = self.drive_item_to_file_data(drive_item=drive_item)
230+
file_data = await self.drive_item_to_file_data(drive_item=drive_item)
188231
yield file_data
189232

233+
def run(self, **kwargs: Any) -> Generator[FileData, None, None]:
234+
# Convert the async generator to a sync generator without loading all data into memory
235+
async_gen = self._run_async(**kwargs)
236+
for item in async_iterable_to_sync_iterable(async_gen):
237+
yield item
238+
190239

191240
class OnedriveDownloaderConfig(DownloaderConfig):
192241
pass
@@ -220,19 +269,24 @@ def get_download_path(self, file_data: FileData) -> Optional[Path]:
220269

221270
@SourceConnectionError.wrap
222271
def run(self, file_data: FileData, **kwargs: Any) -> DownloadResponse:
223-
file = self._fetch_file(file_data=file_data)
224-
fsize = file.get_property("size", 0)
225-
download_path = self.get_download_path(file_data=file_data)
226-
download_path.parent.mkdir(parents=True, exist_ok=True)
227-
logger.info(f"downloading {file_data.source_identifiers.fullpath} to {download_path}")
228-
if fsize > MAX_MB_SIZE:
229-
logger.info(f"downloading file with size: {fsize} bytes in chunks")
230-
with download_path.open(mode="wb") as f:
231-
file.download_session(f, chunk_size=1024 * 1024 * 100).execute_query()
232-
else:
233-
with download_path.open(mode="wb") as f:
234-
file.download(f).execute_query()
235-
return self.generate_download_response(file_data=file_data, download_path=download_path)
272+
try:
273+
file = self._fetch_file(file_data=file_data)
274+
fsize = file.get_property("size", 0)
275+
download_path = self.get_download_path(file_data=file_data)
276+
download_path.parent.mkdir(parents=True, exist_ok=True)
277+
logger.info(f"downloading {file_data.source_identifiers.fullpath} to {download_path}")
278+
if fsize > MAX_MB_SIZE:
279+
logger.info(f"downloading file with size: {fsize} bytes in chunks")
280+
with download_path.open(mode="wb") as f:
281+
file.download_session(f, chunk_size=1024 * 1024 * 100).execute_query()
282+
else:
283+
with download_path.open(mode="wb") as f:
284+
file.download(f).execute_query()
285+
return self.generate_download_response(file_data=file_data, download_path=download_path)
286+
except Exception as e:
287+
logger.error(f"[{CONNECTOR_TYPE}] Exception during downloading: {e}", exc_info=True)
288+
# Re-raise to see full stack trace locally
289+
raise
236290

237291

238292
class OnedriveUploaderConfig(UploaderConfig):

0 commit comments

Comments
 (0)