Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 114 additions & 9 deletions request-processor/src/application/core/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,12 @@ def add_geom_mapping(dataset, pipeline_dir, geom_type, resource, pipeline_csv):


def add_extra_column_mappings(
column_path, column_mapping, dataset, resource, specification_dir
column_path,
column_mapping,
dataset,
resource,
specification_dir,
endpoint_hash=None,
):
filtered_rows = extract_dataset_field_rows(specification_dir, dataset)
fieldnames = []
Expand All @@ -242,7 +247,10 @@ def add_extra_column_mappings(
dictreader = csv.DictReader(f)
fieldnames = dictreader.fieldnames

mappings = {"dataset": dataset, "resource": resource}
if endpoint_hash:
mappings = {"dataset": dataset, "endpoint": endpoint_hash, "resource": ""}
else:
mappings = {"dataset": dataset, "resource": resource}
column_mapping_dump = json.dumps(column_mapping)
column_mapping_json = json.loads(column_mapping_dump)
for key, value in column_mapping_json.items():
Expand Down Expand Up @@ -425,6 +433,9 @@ def add_data_workflow(
licence=None,
start_date=None,
plugin=None,
geom_type=None,
column_mapping=None,
github_branch=None,
):
"""
Setup directories and download required CSVs to manage add-data pipeline
Expand All @@ -441,6 +452,9 @@ def add_data_workflow(
url (str): Endpoint URL to fetch data from
documentation_url (str): Documentation URL for the dataset
directories (Directories): Directories object with required paths
geom_type (str): Optional geometry type for column mapping
column_mapping (dict): Optional caller-supplied column mappings to append to column.csv
github_branch (str): Optional branch name to indicate if the data should be appended to a specific branch
"""
response_data = {}

Expand All @@ -452,20 +466,33 @@ def add_data_workflow(
if not os.path.exists(output_path):
os.makedirs(os.path.dirname(output_path), exist_ok=True)

resource = resource_from_path(os.path.join(input_dir, file_name))
endpoint_hash = hashlib.sha256(url.encode("utf-8")).hexdigest()

# Loads csvs for Pipeline and Config
if not fetch_add_data_pipeline_csvs(collection, pipeline_dir):
if not fetch_add_data_pipeline_csvs(
collection,
pipeline_dir,
column_mapping=column_mapping,
geom_type=geom_type,
resource=resource,
dataset=dataset,
specification_dir=directories.SPECIFICATION_DIR,
endpoint_hash=endpoint_hash,
github_branch=github_branch,
):
response_data[
"message"
] = f"Unable to find lookups for collection '{collection}', dataset '{dataset}'"
return response_data
if not fetch_add_data_collection_csvs(collection, collection_dir):
if not fetch_add_data_collection_csvs(
collection, collection_dir, github_branch=github_branch
):
response_data[
"message"
] = f"Unable to find lookups for collection '{collection}', dataset '{dataset}'"
return response_data

endpoint_hash = hashlib.sha256(url.encode("utf-8")).hexdigest()

# All processes arount transforming the data and generating pipeline summary
pipeline_summary = fetch_add_data_response(
dataset=dataset,
Expand Down Expand Up @@ -525,8 +552,22 @@ def add_data_workflow(
return response_data


def fetch_add_data_pipeline_csvs(collection, pipeline_dir):
"""Download pipeline CSVs into pipeline_dir. Returns False if any errors occur."""
def fetch_add_data_pipeline_csvs(
collection,
pipeline_dir,
column_mapping=None,
geom_type=None,
resource=None,
dataset=None,
specification_dir=None,
endpoint_hash=None,
github_branch=None,
):
"""Download pipeline CSVs into pipeline_dir. Returns False if any errors occur.
If column_mapping is provided, appends extra mappings to column.csv after download.
When endpoint_hash is provided, mappings are keyed by endpoint hash rather than resource hash.
When github_branch is provided, the pipeline CSVs are downloaded from a specific branch. (if exists, if not falls back to main branch
"""
os.makedirs(pipeline_dir, exist_ok=True)
pipeline_csvs = [
"column.csv",
Expand All @@ -544,6 +585,37 @@ def fetch_add_data_pipeline_csvs(collection, pipeline_dir):
"skip.csv",
"transform.csv",
]
if github_branch:
try:
for csv_name in pipeline_csvs:
csv_path = os.path.join(pipeline_dir, csv_name)
branch_url = f"{source_url}config/refs/heads/{github_branch}/pipeline/{collection}/{csv_name}"
urllib.request.urlretrieve(branch_url, csv_path)
logger.info(
f"Downloaded {csv_name} from branch '{github_branch}': {branch_url}"
)
except HTTPError:
logger.warning(f"Branch '{github_branch}' not found, falling back to main")
else:
column_csv_path = os.path.join(pipeline_dir, "column.csv")
try:
if column_mapping and resource and dataset and specification_dir:
add_extra_column_mappings(
column_csv_path,
column_mapping,
dataset,
resource,
specification_dir,
endpoint_hash=endpoint_hash,
)
elif geom_type and resource and dataset:
add_geom_mapping(
dataset, pipeline_dir, geom_type, resource, "column.csv"
)
except Exception as e:
logger.error(f"Error saving column mappings to column.csv: {e}")
return True

for csv_name in pipeline_csvs:
csv_path = os.path.join(pipeline_dir, csv_name)
url = f"{CONFIG_URL}pipeline/{collection}/{csv_name}"
Expand All @@ -553,13 +625,46 @@ def fetch_add_data_pipeline_csvs(collection, pipeline_dir):
except HTTPError as e:
logger.warning(f"Failed to retrieve {csv_name}: {e}")
return False

if csv_name == "column.csv":
try:
if column_mapping and resource and dataset and specification_dir:
add_extra_column_mappings(
csv_path,
column_mapping,
dataset,
resource,
specification_dir,
endpoint_hash=endpoint_hash,
)
elif geom_type and resource and dataset:
add_geom_mapping(
dataset, pipeline_dir, geom_type, resource, csv_name
)
except Exception as e:
logger.error(f"Error saving column mappings to column.csv: {e}")

return True


def fetch_add_data_collection_csvs(collection, config_dir):
def fetch_add_data_collection_csvs(collection, config_dir, github_branch=None):
"""Download config CSVs (endpoint.csv, source.csv) into config_dir. Returns False if any errors occur."""
os.makedirs(config_dir, exist_ok=True)
config_csvs = ["endpoint.csv", "source.csv"]

if github_branch:
try:
for csv_name in config_csvs:
csv_path = os.path.join(config_dir, csv_name)
branch_url = f"{source_url}config/refs/heads/{github_branch}/collection/{collection}/{csv_name}"
urllib.request.urlretrieve(branch_url, csv_path)
logger.info(
f"Downloaded {csv_name} from branch '{github_branch}': {branch_url}"
)
return True
except HTTPError:
logger.warning(f"Branch '{github_branch}' not found, falling back to main")

for csv_name in config_csvs:
csv_path = os.path.join(config_dir, csv_name)
url = f"{CONFIG_URL}collection/{collection}/{csv_name}"
Expand Down
3 changes: 3 additions & 0 deletions request-processor/src/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,9 @@ def add_data_task(request: Dict, directories=None):
request_data.licence,
request_data.start_date,
request_data.plugin,
geom_type=getattr(request_data, "geom_type", ""),
column_mapping=getattr(request_data, "column_mapping", {}),
github_branch=request_data.github_branch,
)
if "plugin" in log:
response["plugin"] = log["plugin"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,11 +339,11 @@ class DummyDirectories:
)
monkeypatch.setattr(
"src.application.core.workflow.fetch_add_data_pipeline_csvs",
lambda col, pdir: True,
lambda col, pdir, **kwargs: True,
)
monkeypatch.setattr(
"src.application.core.workflow.fetch_add_data_collection_csvs",
lambda col, cdir: True,
lambda col, cdir, **kwargs: True,
)
monkeypatch.setattr(
"src.application.core.workflow.fetch_add_data_response",
Expand Down Expand Up @@ -392,11 +392,11 @@ class DummyDirectories:

called = {}

def fake_fetch_add_data_pipeline_csvs(col, pdir):
def fake_fetch_add_data_pipeline_csvs(col, pdir, **_):
called["fetch_add_data_pipeline_csvs"] = (col, pdir)
return True

def fake_fetch_add_data_collection_csvs(col, cdir):
def fake_fetch_add_data_collection_csvs(col, cdir, **_):
called["fetch_add_data_collection_csvs"] = (col, cdir)
return True

Expand Down
1 change: 1 addition & 0 deletions request_model/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class AddDataParams(Params):
start_date: Optional[str] = None
organisation: Optional[str] = None
plugin: Optional[PluginTypeEnum] = None
github_branch: Optional[str] = None


class RequestBase(BaseModel):
Expand Down
Loading