Skip to content

Commit 87d0d12

Browse files
authored
fix: hide internal process functions for process_chunked_resource (#3083)
* fix: remove `unnecessary` arguments * add: process chunked resource `documentation` and `examples`
1 parent 618b670 commit 87d0d12

File tree

2 files changed

+86
-8
lines changed

2 files changed

+86
-8
lines changed

warehouse/oso_dagster/dlt_sources/github_repos/__init__.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -411,10 +411,6 @@ def oss_directory_github_sbom_resource(
411411
rate_limit_max_retry: int = 5,
412412
server_error_max_rety: int = 3,
413413
http_cache: t.Optional[str] = None,
414-
_chunk_resource_update: t.Optional[t.Callable[..., None]] = None,
415-
_chunk_retrieve_failed: t.Optional[
416-
t.Callable[[], t.Generator[t.Any, None, None]]
417-
] = None,
418414
):
419415
"""Retrieve SBOM information for GitHub repositories"""
420416

warehouse/oso_dagster/utils/dlt.py

Lines changed: 86 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,14 @@ async def _wrapper(
8282

8383
log = context.log if context else logger
8484

85+
chunk_update_fn = None
86+
if "_chunk_resource_update" in kwargs:
87+
chunk_update_fn = kwargs.pop("_chunk_resource_update")
88+
89+
retrieve_failed_fn = None
90+
if "_chunk_retrieve_failed" in kwargs:
91+
retrieve_failed_fn = kwargs.pop("_chunk_retrieve_failed")
92+
8593
tasks: List[Coroutine[Any, Any, R]] = [
8694
task() for task in fn(*args, **kwargs)
8795
]
@@ -109,15 +117,11 @@ async def _wrapper(
109117
f"DLTParallelize: Waiting for {config.wait_interval} seconds ..."
110118
)
111119

112-
chunk_update_fn = kwargs.get("_chunk_resource_update")
113120
if chunk_update_fn and isinstance(chunk_update_fn, Callable):
114121
chunk_update_fn(results)
115-
else:
116-
log.warning("DLTParallelize: chunk_update_fn function not found")
117122

118123
await asyncio.sleep(config.wait_interval)
119124

120-
retrieve_failed_fn = kwargs.get("_chunk_retrieve_failed")
121125
if retrieve_failed_fn and isinstance(retrieve_failed_fn, Callable):
122126
for retrieved in retrieve_failed_fn():
123127
yield retrieved
@@ -177,6 +181,65 @@ def process_chunked_resource(
177181
*args,
178182
**kwargs,
179183
) -> Generator[DltResource, None, None]:
184+
"""
185+
This function configures a DLT resource to keep state checkpoints in Google Cloud Storage.
186+
It processes the data in chunks and yields the resource object. It also exposes two
187+
functions via the kwargs: `_chunk_resource_update` and `_chunk_retrieve_failed`.
188+
189+
The `_chunk_resource_update` function is used to update the state manifest and upload
190+
the chunked data to GCS.
191+
The `_chunk_retrieve_failed` function is used to retrieve all the stored chunks in GCS
192+
which failed in previous runs.
193+
194+
The decorated function must use these functions to update the state manifest and upload
195+
the chunked data to GCS. It must call `_chunk_resource_update` with the elements to be
196+
uploaded to GCS after a successful yield. It must also call `_chunk_retrieve_failed` at the
197+
end of the function to retrieve all the stored chunks in GCS which failed in previous runs
198+
and clean them up.
199+
200+
Example:
201+
```
202+
@dlt.resource(name="example", ...)
203+
def resource(max: int, *args, **kwargs):
204+
for i in range(max):
205+
data: List = get_data(i)
206+
yield data
207+
208+
# Update the state manifest and upload the data
209+
kwargs["_chunk_resource_update"]([data])
210+
211+
# Retrieve all the stored chunks in GCS from previous runs
212+
# and clean them up
213+
yield from kwargs["_chunk_retrieve_failed"]()
214+
215+
@dlt_factory(...)
216+
def example(
217+
context: AssetExecutionContext,
218+
global_config: ResourceParam[DagsterConfig]
219+
):
220+
...
221+
222+
return process_chunked_resource(
223+
ChunkedResourceConfig(
224+
fetch_data_fn=fetch_fn,
225+
resource=resource,
226+
to_string_fn=str,
227+
gcs_bucket_name=global_config.gcs_bucket,
228+
context=context,
229+
),
230+
..., # these will be forwarded to `resource`
231+
)
232+
```
233+
234+
Args:
235+
config (ChunkedResourceConfig): Configuration object.
236+
*args: Positional arguments forwarded to the resource constructor.
237+
**kwargs: Keyword arguments forwarded to the resource constructor.
238+
239+
Yields:
240+
DltResource: The bound resource object.
241+
"""
242+
180243
client = storage.Client()
181244
bucket = client.get_bucket(config.gcs_bucket_name)
182245
state_blob = bucket.blob(f"{config.gcs_prefix}/{config.resource.name}/state.json")
@@ -186,6 +249,13 @@ def process_chunked_resource(
186249
log.info(f"ChunkedResource: Checking state in {state_blob.name}")
187250

188251
def resource_update(elements: List[List]):
252+
"""
253+
Updtates the state manifest and uploads the chunked data to GCS.
254+
255+
Args:
256+
elements (List[List]): Elements to be stored in GCS
257+
"""
258+
189259
count = len(elements)
190260
current_manifest = state_blob.download_as_string()
191261
current_data = json.loads(current_manifest)
@@ -216,6 +286,18 @@ def resource_update(elements: List[List]):
216286
log.info(f"ChunkedResource: Uploaded {count} elements to {new_blob.name}")
217287

218288
def retrieve_failed(yield_elems: bool):
289+
"""
290+
Retrieves all the stored chunks in GCS which failed in previous runs. If
291+
`yield_elems` is True, it yields the elements as they are retrieved. Otherwise,
292+
it works as a cleanup function, deleting all the stored chunks.
293+
294+
Args:
295+
yield_elems (bool): If True, yields the elements as they are retrieved
296+
297+
Yields:
298+
Dict: The retrieved chunked data
299+
"""
300+
219301
blobs = bucket.list_blobs(
220302
prefix=f"{config.gcs_prefix}/{config.resource.name}/chunk."
221303
)

0 commit comments

Comments
 (0)