Skip to content

Commit e84887d

Browse files
authored
Partial download status (#597)
* Implement partial success
1 parent fee132a commit e84887d

File tree

6 files changed

+104
-26
lines changed

6 files changed

+104
-26
lines changed

api/models/download.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ type Extent struct {
8080
// PackagerContentItem is a single item for Packager to include in output file
8181
// Note: Previously called DownloadContentItem
8282
type PackagerContentItem struct {
83+
ProductID string `json:"product_id" db:"product_id"`
8384
Bucket string `json:"bucket"`
8485
Key string `json:"key"`
8586
DssDatatype string `json:"dss_datatype" db:"dss_datatype"`
@@ -146,6 +147,7 @@ func GetDownloadPackagerRequest(db *pgxpool.Pool, downloadID *uuid.UUID) (*Packa
146147
context.Background(), db, &pr,
147148
`WITH download_contents AS (
148149
SELECT download_id,
150+
product_id,
149151
key,
150152
bucket,
151153
dss_datatype,
@@ -168,6 +170,7 @@ func GetDownloadPackagerRequest(db *pgxpool.Pool, downloadID *uuid.UUID) (*Packa
168170
)
169171
UNION
170172
SELECT download_id,
173+
product_id,
171174
key,
172175
bucket,
173176
dss_datatype,
@@ -215,6 +218,7 @@ func GetDownloadPackagerRequest(db *pgxpool.Pool, downloadID *uuid.UUID) (*Packa
215218
SELECT download_id,
216219
jsonb_agg(
217220
jsonb_build_object(
221+
'product_id', product_id,
218222
'key', key,
219223
'bucket', bucket,
220224
'dss_datatype', dss_datatype,

async_packager/packager.py

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535

3636
def handle_message(message):
37+
package_file = None
3738
try:
3839
logger.info("%(spacer)s new message %(spacer)s" % {"spacer": "*" * 20})
3940

@@ -69,11 +70,23 @@ def handle_message(message):
6970
# TODO: Add new package_status in database to represent EMPTY condition
7071
logger.info(f'Empty Contents: No products selected in the request for download ID "{download_id}"')
7172
else:
72-
package_file = handler.handle_message(PayloadResp, dst.name)
73+
writer_result = handler.handle_message(PayloadResp, dst.name)
74+
75+
if writer_result:
76+
package_file = writer_result["file"]
77+
product_stats = writer_result["product_stats"]
78+
79+
# Determine status from per-product stats
80+
total_expected = sum(ps["expected"] for ps in product_stats.values())
81+
total_successful = sum(ps["successful"] for ps in product_stats.values())
82+
83+
if total_successful == total_expected:
84+
status_key = "SUCCESS"
85+
else:
86+
status_key = "PARTIAL_SUCCESS"
7387

74-
if package_file:
7588
# Upload File to S3
76-
logger.debug(f'Packaging successful for download ID "{download_id}"')
89+
logger.debug(f'Packaging {status_key.lower()} for download ID "{download_id}" ({total_successful}/{total_expected} files)')
7790
t1 = Timer(logger=None)
7891
t1.start()
7992
s3_upload_worked = s3_upload_file(
@@ -86,13 +99,15 @@ def handle_message(message):
8699
)
87100
handler.update_status(
88101
download_id,
89-
handler.PACKAGE_STATUS["SUCCESS"],
102+
handler.PACKAGE_STATUS[status_key],
90103
100,
91104
PayloadResp.output_key,
92-
# Manifest JSON
105+
# Manifest JSON with per-product stats
93106
{
94107
"size_bytes": os.path.getsize(package_file),
95-
"filecount": len(PayloadResp.contents),
108+
"filecount": total_expected,
109+
"filecount_successful": total_successful,
110+
"product_stats": product_stats,
96111
},
97112
)
98113
else:
@@ -101,7 +116,7 @@ def handle_message(message):
101116
)
102117
else:
103118
logger.critical(
104-
f'Failed to package or upload "{package_file}" to S3 download ID "{download_id}"'
119+
f'Failed to package or upload to S3 download ID "{download_id}"'
105120
)
106121

107122
except Exception as ex:

async_packager/src/cumulus_packager/packager/handler.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
PACKAGE_STATUS = {
2727
"FAILED": "a553101e-8c51-4ddd-ac2e-b011ed54389b",
2828
"INITIATED": "94727878-7a50-41f8-99eb-a80eb82f737a",
29+
"PARTIAL_SUCCESS": "b9e8fac1-0cd4-4645-91b2-36b83524549f",
2930
"SUCCESS": "3914f0bd-2290-42b1-bc24-41479b3a846f",
3031
}
3132

async_packager/src/cumulus_packager/writers/dss7.py

Lines changed: 61 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import os
77
import sys
88
import threading
9-
from collections import namedtuple
9+
from collections import defaultdict, namedtuple
1010
from pathlib import Path
1111
from queue import Queue, Empty
1212
from concurrent.futures import ThreadPoolExecutor, as_completed
@@ -200,6 +200,9 @@ def process_single_tiff_gdal(args):
200200
(idx, tif, _bbox, cellsize, destination_srs, grid_type, grid_type_name,
201201
srs_definition, _extent_name, tz_name, tz_offset, is_interval) = args
202202

203+
# Extract product_id early for error reporting
204+
product_id = tif.get('product_id', 'UNKNOWN')
205+
203206
try:
204207
TifCfg = namedtuple("TifCfg", tif)(**tif)
205208
s3_path = f"/vsis3_streaming/{TifCfg.bucket}/{TifCfg.key}"
@@ -210,6 +213,7 @@ def process_single_tiff_gdal(args):
210213
return {
211214
'success': False,
212215
'index': idx,
216+
'product_id': product_id,
213217
'error': f"Failed to open {TifCfg.key}"
214218
}
215219

@@ -232,6 +236,7 @@ def process_single_tiff_gdal(args):
232236
return {
233237
'success': False,
234238
'index': idx,
239+
'product_id': product_id,
235240
'error': f"Failed to warp {TifCfg.key}"
236241
}
237242

@@ -309,6 +314,7 @@ def process_single_tiff_gdal(args):
309314
return {
310315
'success': True,
311316
'index': idx,
317+
'product_id': product_id,
312318
'tif_key': TifCfg.key,
313319
'gd': gd,
314320
'compressed_data': compressed_data,
@@ -322,6 +328,7 @@ def process_single_tiff_gdal(args):
322328
return {
323329
'success': False,
324330
'index': idx,
331+
'product_id': product_id,
325332
'error': str(e)
326333
}
327334

@@ -373,12 +380,19 @@ def process_tiffs_with_bounded_queue(src, _bbox, cellsize, destination_srs, dss,
373380
374381
Returns:
375382
--------
376-
int : Number of successfully processed files
383+
tuple : (processed_count, product_stats) where processed_count is total files
384+
attempted and product_stats is {product_id: {"expected": N, "successful": M}}
377385
"""
378386
# Auto-detect optimal number of workers if not provided
379387
if max_workers is None:
380388
max_workers = get_optimal_workers()
381389

390+
# Build expected counts per product
391+
expected_counts = defaultdict(int)
392+
for tif in src:
393+
expected_counts[tif.get('product_id', 'UNKNOWN')] += 1
394+
success_counts = defaultdict(int)
395+
382396
try:
383397
# Estimate bbox dimensions for queue sizing
384398
first_tif = src[0]
@@ -455,6 +469,7 @@ def producer():
455469
compressed_data = result['compressed_data']
456470
compressed_size = result['compressed_size']
457471
tif_key = result['tif_key']
472+
_product_id = result.get('product_id', 'UNKNOWN')
458473

459474
# Write precompressed data to DSS (GriddedData already created in worker)
460475
t = Timer(name="accumuluated", logger=None)
@@ -464,8 +479,10 @@ def producer():
464479

465480
if dss_result != 0:
466481
logger.warning(f'HEC-DSS-PY write record failed for "{tif_key}": {dss_result}')
467-
elif logger.isEnabledFor(logging.DEBUG):
468-
logger.debug(f'DSS writePrecompressedGrid processed "{tif_key}" in {elapsed_time:.4f}s')
482+
else:
483+
if logger.isEnabledFor(logging.DEBUG):
484+
logger.debug(f'DSS writePrecompressedGrid processed "{tif_key}" in {elapsed_time:.4f}s')
485+
success_counts[_product_id] += 1
469486

470487
processed_count += 1
471488

@@ -489,24 +506,36 @@ def producer():
489506
logger.error(f"Error writing to DSS for file {result['index']}: {e}")
490507
import traceback
491508
logger.error(traceback.format_exc())
509+
processed_count += 1
492510
continue
493511
else:
494512
logger.error(f"Error processing file {result['index']}: {result.get('error', 'Unknown error')}")
513+
processed_count += 1
495514

496515
except Empty:
497516
logger.error("Timeout waiting for results from queue")
498517
break
499518

500519
producer_thread.join()
501520

502-
logger.info(f"Parallel GDAL with compression: Successfully processed {processed_count}/{len(src)} files")
503-
return processed_count
521+
total_successful = sum(success_counts.values())
522+
logger.info(f"Parallel GDAL with compression: Successfully wrote {total_successful}/{len(src)} files")
523+
524+
product_stats = {
525+
pid: {"expected": expected_counts[pid], "successful": success_counts.get(pid, 0)}
526+
for pid in expected_counts
527+
}
528+
return processed_count, product_stats
504529

505530
except Exception as e:
506531
logger.error(f"Parallel GDAL processing failed: {e}")
507532
import traceback
508533
logger.error(traceback.format_exc())
509-
return 0
534+
product_stats = {
535+
pid: {"expected": expected_counts[pid], "successful": 0}
536+
for pid in expected_counts
537+
}
538+
return 0, product_stats
510539

511540

512541
@pyplugs.register
@@ -537,8 +566,9 @@ def writer(
537566
538567
Returns
539568
-------
540-
str
541-
FQPN to dss file
569+
dict or None
570+
{"file": FQPN to dss file, "product_stats": {product_id: {"expected": N, "successful": M}}}
571+
or None if no files were processed
542572
"""
543573

544574
try:
@@ -579,11 +609,17 @@ def writer(
579609

580610
dssfilename = Path(dst).joinpath(id).with_suffix(".dss").as_posix()
581611

612+
# Build expected counts per product for single-file path
613+
expected_counts = defaultdict(int)
614+
for tif in src:
615+
expected_counts[tif.get('product_id', 'UNKNOWN')] += 1
616+
success_counts = defaultdict(int)
617+
582618
with HecDss(dssfilename) as dss:
583619
# Use parallel GDAL processing with compression and precompressed writes for multiple files
584620
if len(src) > 1:
585621
logger.info("Using parallel GDAL processing with compression and precompressed writes")
586-
processed_count = process_tiffs_with_bounded_queue(
622+
processed_count, product_stats = process_tiffs_with_bounded_queue(
587623
src, _bbox, cellsize, destination_srs, dss,
588624
grid_type, grid_type_name, srs_definition,
589625
_extent_name, tz_name, tz_offset, is_interval,
@@ -662,14 +698,17 @@ def writer(
662698
t.start()
663699
result = dss.put(gd)
664700
elapsed_time = t.stop()
665-
if logger.isEnabledFor(logging.DEBUG):
666-
logger.debug(
667-
f'DSS put Processed "{TifCfg.key}" in {elapsed_time:.4f} seconds'
668-
)
669701
if result != 0:
670702
logger.info(
671703
f'HEC-DSS-PY write record failed for "{TifCfg.key}": {result}'
672704
)
705+
else:
706+
if logger.isEnabledFor(logging.DEBUG):
707+
logger.debug(
708+
f'DSS put Processed "{TifCfg.key}" in {elapsed_time:.4f} seconds'
709+
)
710+
_product_id = tif.get('product_id', 'UNKNOWN')
711+
success_counts[_product_id] += 1
673712

674713
_progress = int(((idx + 1) / gridcount) * 100)
675714
# Update progress at predefined interval
@@ -698,6 +737,13 @@ def writer(
698737
warp_ds = None
699738
ds = None
700739

740+
# Build product_stats for single-file path (multi-file path already has it)
741+
if len(src) == 1:
742+
product_stats = {
743+
pid: {"expected": expected_counts[pid], "successful": success_counts.get(pid, 0)}
744+
for pid in expected_counts
745+
}
746+
701747
# If no progress was made for any items in the payload (ex: all tifs could not be projected properly),
702748
# don't return a dssfilename
703749
if _progress == 0:
@@ -710,4 +756,4 @@ def writer(
710756
f'Total processing time for download ID "{id}" in {total_time:.4f} seconds'
711757
)
712758

713-
return dssfilename
759+
return {"file": dssfilename, "product_stats": product_stats}

async_packager/src/cumulus_packager/writers/tgz-cog.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"""
55
import json
66
import os
7-
from collections import namedtuple
7+
from collections import defaultdict, namedtuple
88
from tarfile import TarFile
99

1010
import pyplugs
@@ -46,8 +46,9 @@ def writer(
4646
4747
Returns
4848
-------
49-
str
50-
FQPN to dss file
49+
dict or None
50+
{"file": FQPN to tar.gz file, "product_stats": {product_id: {"expected": N, "successful": M}}}
51+
or None if packaging failed
5152
"""
5253
# convert the strings back to json objects; needed for pyplugs
5354
src = json.loads(src)
@@ -107,4 +108,13 @@ def writer(
107108
ds = None
108109
tar.close()
109110

110-
return tarfilename
111+
# All-or-nothing: if we reached here, every file was processed
112+
expected_counts = defaultdict(int)
113+
for tif in src:
114+
expected_counts[tif.get('product_id', 'UNKNOWN')] += 1
115+
product_stats = {
116+
pid: {"expected": count, "successful": count}
117+
for pid, count in expected_counts.items()
118+
}
119+
120+
return {"file": tarfilename, "product_stats": product_stats}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
INSERT INTO download_status (id, name) VALUES
2+
('b9e8fac1-0cd4-4645-91b2-36b83524549f', 'PARTIAL SUCCESS');

0 commit comments

Comments
 (0)