@@ -233,7 +233,12 @@ def add_geom_mapping(dataset, pipeline_dir, geom_type, resource, pipeline_csv):
233233
234234
235235def add_extra_column_mappings (
236- column_path , column_mapping , dataset , resource , specification_dir
236+ column_path ,
237+ column_mapping ,
238+ dataset ,
239+ resource ,
240+ specification_dir ,
241+ endpoint_hash = None ,
237242):
238243 filtered_rows = extract_dataset_field_rows (specification_dir , dataset )
239244 fieldnames = []
@@ -242,7 +247,10 @@ def add_extra_column_mappings(
242247 dictreader = csv .DictReader (f )
243248 fieldnames = dictreader .fieldnames
244249
245- mappings = {"dataset" : dataset , "resource" : resource }
250+ if endpoint_hash :
251+ mappings = {"dataset" : dataset , "endpoint" : endpoint_hash , "resource" : "" }
252+ else :
253+ mappings = {"dataset" : dataset , "resource" : resource }
246254 column_mapping_dump = json .dumps (column_mapping )
247255 column_mapping_json = json .loads (column_mapping_dump )
248256 for key , value in column_mapping_json .items ():
@@ -425,6 +433,9 @@ def add_data_workflow(
425433 licence = None ,
426434 start_date = None ,
427435 plugin = None ,
436+ geom_type = None ,
437+ column_mapping = None ,
438+ github_branch = None ,
428439):
429440 """
430441 Setup directories and download required CSVs to manage add-data pipeline
@@ -441,6 +452,9 @@ def add_data_workflow(
441452 url (str): Endpoint URL to fetch data from
442453 documentation_url (str): Documentation URL for the dataset
443454 directories (Directories): Directories object with required paths
455+ geom_type (str): Optional geometry type for column mapping
456+ column_mapping (dict): Optional caller-supplied column mappings to append to column.csv
457+ github_branch (str): Optional branch name to indicate if the data should be appended to a specific branch
444458 """
445459 response_data = {}
446460
@@ -452,20 +466,33 @@ def add_data_workflow(
452466 if not os .path .exists (output_path ):
453467 os .makedirs (os .path .dirname (output_path ), exist_ok = True )
454468
469+ resource = resource_from_path (os .path .join (input_dir , file_name ))
470+ endpoint_hash = hashlib .sha256 (url .encode ("utf-8" )).hexdigest ()
471+
455472 # Loads csvs for Pipeline and Config
456- if not fetch_add_data_pipeline_csvs (collection , pipeline_dir ):
473+ if not fetch_add_data_pipeline_csvs (
474+ collection ,
475+ pipeline_dir ,
476+ column_mapping = column_mapping ,
477+ geom_type = geom_type ,
478+ resource = resource ,
479+ dataset = dataset ,
480+ specification_dir = directories .SPECIFICATION_DIR ,
481+ endpoint_hash = endpoint_hash ,
482+ github_branch = github_branch ,
483+ ):
457484 response_data [
458485 "message"
459486 ] = f"Unable to find lookups for collection '{ collection } ', dataset '{ dataset } '"
460487 return response_data
461- if not fetch_add_data_collection_csvs (collection , collection_dir ):
488+ if not fetch_add_data_collection_csvs (
489+ collection , collection_dir , github_branch = github_branch
490+ ):
462491 response_data [
463492 "message"
464493 ] = f"Unable to find lookups for collection '{ collection } ', dataset '{ dataset } '"
465494 return response_data
466495
467- endpoint_hash = hashlib .sha256 (url .encode ("utf-8" )).hexdigest ()
468-
469496 # All processes arount transforming the data and generating pipeline summary
470497 pipeline_summary = fetch_add_data_response (
471498 dataset = dataset ,
@@ -525,8 +552,22 @@ def add_data_workflow(
525552 return response_data
526553
527554
528- def fetch_add_data_pipeline_csvs (collection , pipeline_dir ):
529- """Download pipeline CSVs into pipeline_dir. Returns False if any errors occur."""
555+ def fetch_add_data_pipeline_csvs (
556+ collection ,
557+ pipeline_dir ,
558+ column_mapping = None ,
559+ geom_type = None ,
560+ resource = None ,
561+ dataset = None ,
562+ specification_dir = None ,
563+ endpoint_hash = None ,
564+ github_branch = None ,
565+ ):
566+ """Download pipeline CSVs into pipeline_dir. Returns False if any errors occur.
567+ If column_mapping is provided, appends extra mappings to column.csv after download.
568+ When endpoint_hash is provided, mappings are keyed by endpoint hash rather than resource hash.
569+ When github_branch is provided, the pipeline CSVs are downloaded from a specific branch. (if exists, if not falls back to main branch
570+ """
530571 os .makedirs (pipeline_dir , exist_ok = True )
531572 pipeline_csvs = [
532573 "column.csv" ,
@@ -544,6 +585,37 @@ def fetch_add_data_pipeline_csvs(collection, pipeline_dir):
544585 "skip.csv" ,
545586 "transform.csv" ,
546587 ]
588+ if github_branch :
589+ try :
590+ for csv_name in pipeline_csvs :
591+ csv_path = os .path .join (pipeline_dir , csv_name )
592+ branch_url = f"{ source_url } config/refs/heads/{ github_branch } /pipeline/{ collection } /{ csv_name } "
593+ urllib .request .urlretrieve (branch_url , csv_path )
594+ logger .info (
595+ f"Downloaded { csv_name } from branch '{ github_branch } ': { branch_url } "
596+ )
597+ except HTTPError :
598+ logger .warning (f"Branch '{ github_branch } ' not found, falling back to main" )
599+ else :
600+ column_csv_path = os .path .join (pipeline_dir , "column.csv" )
601+ try :
602+ if column_mapping and resource and dataset and specification_dir :
603+ add_extra_column_mappings (
604+ column_csv_path ,
605+ column_mapping ,
606+ dataset ,
607+ resource ,
608+ specification_dir ,
609+ endpoint_hash = endpoint_hash ,
610+ )
611+ elif geom_type and resource and dataset :
612+ add_geom_mapping (
613+ dataset , pipeline_dir , geom_type , resource , "column.csv"
614+ )
615+ except Exception as e :
616+ logger .error (f"Error saving column mappings to column.csv: { e } " )
617+ return True
618+
547619 for csv_name in pipeline_csvs :
548620 csv_path = os .path .join (pipeline_dir , csv_name )
549621 url = f"{ CONFIG_URL } pipeline/{ collection } /{ csv_name } "
@@ -553,13 +625,46 @@ def fetch_add_data_pipeline_csvs(collection, pipeline_dir):
553625 except HTTPError as e :
554626 logger .warning (f"Failed to retrieve { csv_name } : { e } " )
555627 return False
628+
629+ if csv_name == "column.csv" :
630+ try :
631+ if column_mapping and resource and dataset and specification_dir :
632+ add_extra_column_mappings (
633+ csv_path ,
634+ column_mapping ,
635+ dataset ,
636+ resource ,
637+ specification_dir ,
638+ endpoint_hash = endpoint_hash ,
639+ )
640+ elif geom_type and resource and dataset :
641+ add_geom_mapping (
642+ dataset , pipeline_dir , geom_type , resource , csv_name
643+ )
644+ except Exception as e :
645+ logger .error (f"Error saving column mappings to column.csv: { e } " )
646+
556647 return True
557648
558649
559- def fetch_add_data_collection_csvs (collection , config_dir ):
650+ def fetch_add_data_collection_csvs (collection , config_dir , github_branch = None ):
560651 """Download config CSVs (endpoint.csv, source.csv) into config_dir. Returns False if any errors occur."""
561652 os .makedirs (config_dir , exist_ok = True )
562653 config_csvs = ["endpoint.csv" , "source.csv" ]
654+
655+ if github_branch :
656+ try :
657+ for csv_name in config_csvs :
658+ csv_path = os .path .join (config_dir , csv_name )
659+ branch_url = f"{ source_url } config/refs/heads/{ github_branch } /collection/{ collection } /{ csv_name } "
660+ urllib .request .urlretrieve (branch_url , csv_path )
661+ logger .info (
662+ f"Downloaded { csv_name } from branch '{ github_branch } ': { branch_url } "
663+ )
664+ return True
665+ except HTTPError :
666+ logger .warning (f"Branch '{ github_branch } ' not found, falling back to main" )
667+
563668 for csv_name in config_csvs :
564669 csv_path = os .path .join (config_dir , csv_name )
565670 url = f"{ CONFIG_URL } collection/{ collection } /{ csv_name } "
0 commit comments