Skip to content

Commit 931d045

Browse files
authored
fix: enable per-asset chunked resource ttl options (#5582)
1 parent cf244bc commit 931d045

File tree

3 files changed

+36
-22
lines changed

3 files changed

+36
-22
lines changed

warehouse/oso_dagster/assets/default/ossd.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,7 @@ def fetch_fn() -> t.List[str]:
328328
bq_schema=bq_schema,
329329
gcs_bucket_name=global_config.gcs_bucket,
330330
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
331+
max_age_hours=18,
331332
context=context,
332333
),
333334
gh_token,
@@ -384,6 +385,7 @@ def sbom(
384385
bq_schema=bq_schema,
385386
gcs_bucket_name=global_config.gcs_bucket,
386387
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
388+
max_age_hours=48,
387389
context=context,
388390
),
389391
gh_token,

warehouse/oso_dagster/definitions/default.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ def default_definitions(
4848

4949
chunked_state_cleanup_sensor = setup_chunked_state_cleanup_sensor(
5050
global_config.gcs_bucket,
51-
max_age_hours=24 * 2,
5251
enable=global_config.is_production,
5352
)
5453

warehouse/oso_dagster/utils/dlt.py

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -158,10 +158,10 @@ class ChunkedResourceConfig(Generic[T]):
158158
Determines how to handle existing data in the table.
159159
gcs_prefix (str): Google Cloud Storage prefix for chunked data. Defaults
160160
to "dlt_chunked_state".
161-
max_manifest_age (int): Maximum age of the manifest file in seconds. If the
162-
manifest file is older than this value, the manifest will be reset. This
163-
means that the `fetch_data_fn` will be called again, re-fetching all the
164-
data and starting from scratch. Defaults to 3 days.
161+
max_age_hours (int): Maximum age in hours for both manifest files and chunk files.
162+
If the manifest is older than this value, it will be reset and data will be
163+
re-fetched. Chunk files older than this value will be cleaned up by the sensor.
164+
This value is stored in the manifest. Defaults to 48 hours.
165165
context (AssetExecutionContext): Dagster context object.
166166
"""
167167

@@ -175,7 +175,7 @@ def __init__(
175175
gcs_bucket_name: str,
176176
write_disposition: str,
177177
gcs_prefix: str = "dlt_chunked_state",
178-
max_manifest_age: int = 60 * 60 * 24 * 3,
178+
max_age_hours: int = 48,
179179
context: AssetExecutionContext | None = None,
180180
):
181181
self.fetch_data_fn = fetch_data_fn
@@ -186,7 +186,7 @@ def __init__(
186186
self.gcs_bucket_name = gcs_bucket_name
187187
self.write_disposition = write_disposition
188188
self.gcs_prefix = gcs_prefix
189-
self.max_manifest_age = max_manifest_age
189+
self.max_age_hours = max_age_hours
190190
self.context = context
191191

192192

@@ -393,17 +393,15 @@ def check_and_handle_load_job(job_id: str) -> bool:
393393

394394
log.info("ChunkedResource: Found existing state manifest")
395395

396+
max_age_seconds = config.max_age_hours * 60 * 60
396397
updated_age = (
397398
datetime.now() - datetime.fromisoformat(manifest_data["updated_at"])
398399
).total_seconds()
399400
created_age = (
400401
datetime.now() - datetime.fromisoformat(manifest_data["created_at"])
401402
).total_seconds()
402403

403-
if (
404-
updated_age > config.max_manifest_age
405-
or created_age > config.max_manifest_age
406-
):
404+
if updated_age > max_age_seconds or created_age > max_age_seconds:
407405
log.info("ChunkedResource: State manifest is too old, resetting")
408406
manifest_data = None
409407

@@ -434,6 +432,7 @@ def check_and_handle_load_job(job_id: str) -> bool:
434432
"updated_at": datetime.now().isoformat(),
435433
"created_at": datetime.now().isoformat(),
436434
"pending_data": config.fetch_data_fn(),
435+
"max_age_hours": config.max_age_hours,
437436
}
438437

439438
log.info("ChunkedResource: Uploading initial manifest")
@@ -571,21 +570,23 @@ async def cleanup_chunks(
571570
context: OpExecutionContext,
572571
bucket_name: str,
573572
prefix: str = "dlt_chunked_state",
574-
max_age_hours: int = 24,
575573
client: Optional[storage.Client] = None,
576574
):
577575
"""
578576
Cleanup function that deletes chunk files older than the specified age.
579577
Also cleans up state files if they have no pending data.
578+
The max_age_hours is read from each resource's manifest, falling back to
579+
48 hours if not specified.
580580
581581
Args:
582582
context (OpExecutionContext): The Dagster operation context
583583
bucket_name (str): The GCS bucket name
584584
prefix (str): The GCS prefix
585-
max_age_hours (int): The maximum age of chunk files to clean up
586585
client (storage.Client): Optional storage client
587586
"""
588587

588+
DEFAULT_MAX_AGE_HOURS = 48
589+
589590
if client is None:
590591
client = storage.Client()
591592

@@ -622,9 +623,21 @@ async def cleanup_chunks(
622623

623624
state_blobs = [blob for blob in blobs if blob.name.endswith("/state.json")]
624625

626+
resource_max_age = {}
625627
for state_blob in state_blobs:
626628
try:
627629
state_data = json.loads(state_blob.download_as_string())
630+
631+
parts = state_blob.name.split("/")
632+
if len(parts) >= 2:
633+
resource_name = parts[-2]
634+
resource_max_age[resource_name] = state_data.get(
635+
"max_age_hours", DEFAULT_MAX_AGE_HOURS
636+
)
637+
context.log.info(
638+
f"Resource '{resource_name}' max_age_hours: {resource_max_age[resource_name]}"
639+
)
640+
628641
if not state_data.get("pending_data"):
629642
context.log.info(
630643
f"State file {state_blob.name} has no pending data, marking for deletion"
@@ -637,8 +650,11 @@ async def cleanup_chunks(
637650
context.log.error(f"Error processing state file {state_blob.name}: {e}")
638651

639652
for resource_name, blobs in resource_chunks.items():
653+
max_age_hours = resource_max_age.get(resource_name, DEFAULT_MAX_AGE_HOURS)
654+
640655
context.log.info(
641-
f"Processing resource: {resource_name} with {len(blobs)} chunk files"
656+
f"Processing resource: {resource_name} with {len(blobs)} chunk files "
657+
f"(max_age_hours: {max_age_hours})"
642658
)
643659

644660
blobs_to_delete = []
@@ -680,16 +696,15 @@ async def cleanup_chunks(
680696
def setup_chunked_state_cleanup_sensor(
681697
gcs_bucket_name: str,
682698
gcs_prefix: str = "dlt_chunked_state",
683-
max_age_hours: int = 48,
684699
enable: bool = False,
685700
):
686701
"""
687-
Sets up a sensor and job to clean up chunk files that are older than the specified age.
702+
Sets up a sensor and job to clean up chunk files based on per-asset max_age_hours
703+
configuration stored in each resource's manifest.
688704
689705
Args:
690706
gcs_bucket_name (str): GCS bucket name
691707
gcs_prefix (str): GCS prefix for chunked data
692-
max_age_hours (int): Maximum age in hours for chunk files
693708
enable (bool): Whether to enable the sensor
694709
695710
Returns:
@@ -703,14 +718,12 @@ async def cleanup_op(context: OpExecutionContext) -> None:
703718
f"Starting chunked state cleanup job with settings:"
704719
f"\n - Bucket: {gcs_bucket_name}"
705720
f"\n - Prefix: {gcs_prefix}"
706-
f"\n - Max age for chunk files: {max_age_hours} hours"
707721
)
708722

709723
await cleanup_chunks(
710724
context=context,
711725
bucket_name=gcs_bucket_name,
712726
prefix=gcs_prefix,
713-
max_age_hours=max_age_hours,
714727
)
715728

716729
context.log.info("Chunked state cleanup job completed successfully")
@@ -724,13 +737,13 @@ def cleanup_job():
724737
@sensor(
725738
name="chunked_state_cleanup_sensor",
726739
job=cleanup_job,
727-
minimum_interval_seconds=60 * 60 * 12,
740+
minimum_interval_seconds=60 * 60 * 2,
728741
default_status=status,
729742
)
730743
def cleanup_sensor(context: SensorEvaluationContext):
731744
"""
732-
Sensor that periodically triggers the job to clean up chunk files
733-
that are older than the specified age.
745+
Sensor that periodically triggers the job to clean up chunk files.
746+
The max age for each resource is read from its manifest.
734747
"""
735748
context.log.info("Evaluating chunked state cleanup sensor")
736749

0 commit comments

Comments
 (0)