Skip to content

Commit 09c2bb8

Browse files
authored
Merge pull request #98 from digital-land/feat/change-github-branch
change github branch, column mapping done in add data task
2 parents 687c8e9 + 79cbfd1 commit 09c2bb8

File tree

4 files changed

+122
-13
lines changed

4 files changed

+122
-13
lines changed

request-processor/src/application/core/workflow.py

Lines changed: 114 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,12 @@ def add_geom_mapping(dataset, pipeline_dir, geom_type, resource, pipeline_csv):
233233

234234

235235
def add_extra_column_mappings(
236-
column_path, column_mapping, dataset, resource, specification_dir
236+
column_path,
237+
column_mapping,
238+
dataset,
239+
resource,
240+
specification_dir,
241+
endpoint_hash=None,
237242
):
238243
filtered_rows = extract_dataset_field_rows(specification_dir, dataset)
239244
fieldnames = []
@@ -242,7 +247,10 @@ def add_extra_column_mappings(
242247
dictreader = csv.DictReader(f)
243248
fieldnames = dictreader.fieldnames
244249

245-
mappings = {"dataset": dataset, "resource": resource}
250+
if endpoint_hash:
251+
mappings = {"dataset": dataset, "endpoint": endpoint_hash, "resource": ""}
252+
else:
253+
mappings = {"dataset": dataset, "resource": resource}
246254
column_mapping_dump = json.dumps(column_mapping)
247255
column_mapping_json = json.loads(column_mapping_dump)
248256
for key, value in column_mapping_json.items():
@@ -425,6 +433,9 @@ def add_data_workflow(
425433
licence=None,
426434
start_date=None,
427435
plugin=None,
436+
geom_type=None,
437+
column_mapping=None,
438+
github_branch=None,
428439
):
429440
"""
430441
Setup directories and download required CSVs to manage add-data pipeline
@@ -441,6 +452,9 @@ def add_data_workflow(
441452
url (str): Endpoint URL to fetch data from
442453
documentation_url (str): Documentation URL for the dataset
443454
directories (Directories): Directories object with required paths
455+
geom_type (str): Optional geometry type for column mapping
456+
column_mapping (dict): Optional caller-supplied column mappings to append to column.csv
457+
github_branch (str): Optional branch name to indicate if the data should be appended to a specific branch
444458
"""
445459
response_data = {}
446460

@@ -452,20 +466,33 @@ def add_data_workflow(
452466
if not os.path.exists(output_path):
453467
os.makedirs(os.path.dirname(output_path), exist_ok=True)
454468

469+
resource = resource_from_path(os.path.join(input_dir, file_name))
470+
endpoint_hash = hashlib.sha256(url.encode("utf-8")).hexdigest()
471+
455472
# Loads csvs for Pipeline and Config
456-
if not fetch_add_data_pipeline_csvs(collection, pipeline_dir):
473+
if not fetch_add_data_pipeline_csvs(
474+
collection,
475+
pipeline_dir,
476+
column_mapping=column_mapping,
477+
geom_type=geom_type,
478+
resource=resource,
479+
dataset=dataset,
480+
specification_dir=directories.SPECIFICATION_DIR,
481+
endpoint_hash=endpoint_hash,
482+
github_branch=github_branch,
483+
):
457484
response_data[
458485
"message"
459486
] = f"Unable to find lookups for collection '{collection}', dataset '{dataset}'"
460487
return response_data
461-
if not fetch_add_data_collection_csvs(collection, collection_dir):
488+
if not fetch_add_data_collection_csvs(
489+
collection, collection_dir, github_branch=github_branch
490+
):
462491
response_data[
463492
"message"
464493
] = f"Unable to find lookups for collection '{collection}', dataset '{dataset}'"
465494
return response_data
466495

467-
endpoint_hash = hashlib.sha256(url.encode("utf-8")).hexdigest()
468-
469496
# All processes arount transforming the data and generating pipeline summary
470497
pipeline_summary = fetch_add_data_response(
471498
dataset=dataset,
@@ -525,8 +552,22 @@ def add_data_workflow(
525552
return response_data
526553

527554

528-
def fetch_add_data_pipeline_csvs(collection, pipeline_dir):
529-
"""Download pipeline CSVs into pipeline_dir. Returns False if any errors occur."""
555+
def fetch_add_data_pipeline_csvs(
556+
collection,
557+
pipeline_dir,
558+
column_mapping=None,
559+
geom_type=None,
560+
resource=None,
561+
dataset=None,
562+
specification_dir=None,
563+
endpoint_hash=None,
564+
github_branch=None,
565+
):
566+
"""Download pipeline CSVs into pipeline_dir. Returns False if any errors occur.
567+
If column_mapping is provided, appends extra mappings to column.csv after download.
568+
When endpoint_hash is provided, mappings are keyed by endpoint hash rather than resource hash.
569+
When github_branch is provided, the pipeline CSVs are downloaded from a specific branch. (if exists, if not falls back to main branch
570+
"""
530571
os.makedirs(pipeline_dir, exist_ok=True)
531572
pipeline_csvs = [
532573
"column.csv",
@@ -544,6 +585,37 @@ def fetch_add_data_pipeline_csvs(collection, pipeline_dir):
544585
"skip.csv",
545586
"transform.csv",
546587
]
588+
if github_branch:
589+
try:
590+
for csv_name in pipeline_csvs:
591+
csv_path = os.path.join(pipeline_dir, csv_name)
592+
branch_url = f"{source_url}config/refs/heads/{github_branch}/pipeline/{collection}/{csv_name}"
593+
urllib.request.urlretrieve(branch_url, csv_path)
594+
logger.info(
595+
f"Downloaded {csv_name} from branch '{github_branch}': {branch_url}"
596+
)
597+
except HTTPError:
598+
logger.warning(f"Branch '{github_branch}' not found, falling back to main")
599+
else:
600+
column_csv_path = os.path.join(pipeline_dir, "column.csv")
601+
try:
602+
if column_mapping and resource and dataset and specification_dir:
603+
add_extra_column_mappings(
604+
column_csv_path,
605+
column_mapping,
606+
dataset,
607+
resource,
608+
specification_dir,
609+
endpoint_hash=endpoint_hash,
610+
)
611+
elif geom_type and resource and dataset:
612+
add_geom_mapping(
613+
dataset, pipeline_dir, geom_type, resource, "column.csv"
614+
)
615+
except Exception as e:
616+
logger.error(f"Error saving column mappings to column.csv: {e}")
617+
return True
618+
547619
for csv_name in pipeline_csvs:
548620
csv_path = os.path.join(pipeline_dir, csv_name)
549621
url = f"{CONFIG_URL}pipeline/{collection}/{csv_name}"
@@ -553,13 +625,46 @@ def fetch_add_data_pipeline_csvs(collection, pipeline_dir):
553625
except HTTPError as e:
554626
logger.warning(f"Failed to retrieve {csv_name}: {e}")
555627
return False
628+
629+
if csv_name == "column.csv":
630+
try:
631+
if column_mapping and resource and dataset and specification_dir:
632+
add_extra_column_mappings(
633+
csv_path,
634+
column_mapping,
635+
dataset,
636+
resource,
637+
specification_dir,
638+
endpoint_hash=endpoint_hash,
639+
)
640+
elif geom_type and resource and dataset:
641+
add_geom_mapping(
642+
dataset, pipeline_dir, geom_type, resource, csv_name
643+
)
644+
except Exception as e:
645+
logger.error(f"Error saving column mappings to column.csv: {e}")
646+
556647
return True
557648

558649

559-
def fetch_add_data_collection_csvs(collection, config_dir):
650+
def fetch_add_data_collection_csvs(collection, config_dir, github_branch=None):
560651
"""Download config CSVs (endpoint.csv, source.csv) into config_dir. Returns False if any errors occur."""
561652
os.makedirs(config_dir, exist_ok=True)
562653
config_csvs = ["endpoint.csv", "source.csv"]
654+
655+
if github_branch:
656+
try:
657+
for csv_name in config_csvs:
658+
csv_path = os.path.join(config_dir, csv_name)
659+
branch_url = f"{source_url}config/refs/heads/{github_branch}/collection/{collection}/{csv_name}"
660+
urllib.request.urlretrieve(branch_url, csv_path)
661+
logger.info(
662+
f"Downloaded {csv_name} from branch '{github_branch}': {branch_url}"
663+
)
664+
return True
665+
except HTTPError:
666+
logger.warning(f"Branch '{github_branch}' not found, falling back to main")
667+
563668
for csv_name in config_csvs:
564669
csv_path = os.path.join(config_dir, csv_name)
565670
url = f"{CONFIG_URL}collection/{collection}/{csv_name}"

request-processor/src/tasks.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,9 @@ def add_data_task(request: Dict, directories=None):
281281
request_data.licence,
282282
request_data.start_date,
283283
request_data.plugin,
284+
geom_type=getattr(request_data, "geom_type", ""),
285+
column_mapping=getattr(request_data, "column_mapping", {}),
286+
github_branch=request_data.github_branch,
284287
)
285288
if "plugin" in log:
286289
response["plugin"] = log["plugin"]

request-processor/tests/unit/src/application/core/test_workflow.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -339,11 +339,11 @@ class DummyDirectories:
339339
)
340340
monkeypatch.setattr(
341341
"src.application.core.workflow.fetch_add_data_pipeline_csvs",
342-
lambda col, pdir: True,
342+
lambda col, pdir, **kwargs: True,
343343
)
344344
monkeypatch.setattr(
345345
"src.application.core.workflow.fetch_add_data_collection_csvs",
346-
lambda col, cdir: True,
346+
lambda col, cdir, **kwargs: True,
347347
)
348348
monkeypatch.setattr(
349349
"src.application.core.workflow.fetch_add_data_response",
@@ -392,11 +392,11 @@ class DummyDirectories:
392392

393393
called = {}
394394

395-
def fake_fetch_add_data_pipeline_csvs(col, pdir):
395+
def fake_fetch_add_data_pipeline_csvs(col, pdir, **_):
396396
called["fetch_add_data_pipeline_csvs"] = (col, pdir)
397397
return True
398398

399-
def fake_fetch_add_data_collection_csvs(col, cdir):
399+
def fake_fetch_add_data_collection_csvs(col, cdir, **_):
400400
called["fetch_add_data_collection_csvs"] = (col, cdir)
401401
return True
402402

request_model/schemas.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ class AddDataParams(Params):
4343
start_date: Optional[str] = None
4444
organisation: Optional[str] = None
4545
plugin: Optional[PluginTypeEnum] = None
46+
github_branch: Optional[str] = None
4647

4748

4849
class RequestBase(BaseModel):

0 commit comments

Comments
 (0)