Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion request-api/makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
test: lint test-coverage

lint:: black-check flake8
lint::
make black ./src
python -m flake8 ./src

black-check:
black --check .
Expand Down
48 changes: 44 additions & 4 deletions request-processor/src/application/core/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def fetch_add_data_response(
output_path,
specification_dir,
cache_dir,
url,
endpoint,
):
try:
specification = Specification(specification_dir)
Expand All @@ -210,6 +210,7 @@ def fetch_add_data_response(

existing_entities = []
new_entities = []
entity_org_mapping = []
issues_log = None

for idx, resource_file in enumerate(files_in_resource):
Expand All @@ -227,7 +228,7 @@ def fetch_add_data_response(
resource=resource_from_path(resource_file_path),
valid_category_values=valid_category_values,
disable_lookups=False,
endpoints=[url],
endpoints=[endpoint],
)

existing_entities.extend(
Expand All @@ -253,13 +254,19 @@ def fetch_add_data_response(
pipeline_dir=pipeline_dir,
specification=specification,
cache_dir=cache_dir,
endpoints=[url] if url else None,
endpoints=[endpoint] if endpoint else None,
)
logger.info(
f"Found {len(new_lookups)} unidentified lookups in {resource_file}"
)
new_entities.extend(new_lookups)

# Default create a entity-organisation mapping, front end can use the 'authoritative' flag
entity_org_mapping = create_entity_organisation(
new_lookups, dataset, organisation_provider
)
# TODO, save to pipeline as well for rerun?

# Reload pipeline to pick up newly saved lookups
pipeline = Pipeline(
pipeline_dir, dataset, specification=specification
Expand All @@ -274,7 +281,7 @@ def fetch_add_data_response(
resource=resource_from_path(resource_file_path),
valid_category_values=valid_category_values,
disable_lookups=False,
endpoints=[url],
endpoints=[endpoint],
)
else:
logger.info(f"No unidentified lookups found in {resource_file}")
Expand All @@ -293,6 +300,7 @@ def fetch_add_data_response(
"existing-in-resource": len(existing_entities),
"new-entities": new_entities_breakdown,
"existing-entities": existing_entities_breakdown,
"entity-organisation": entity_org_mapping,
"pipeline-issues": [dict(issue) for issue in issues_log.rows]
if issues_log
else [],
Expand Down Expand Up @@ -354,6 +362,38 @@ def _get_existing_entities_breakdown(existing_entities):
return breakdown


def create_entity_organisation(new_entities, dataset, organisation):
"""
Create entity-organisation mapping from new entities.

Args:
new_entities: List of entity dicts with 'entity' key
dataset: Dataset name
organisation: Organisation identifier

Returns:
List with single dict containing dataset, entity-minimum, entity-maximum, organisation
"""
if not new_entities:
return []

entity_values = [
entry.get("entity") for entry in new_entities if entry.get("entity")
]

if not entity_values:
return []

return [
{
"dataset": dataset,
"entity-minimum": min(entity_values),
"entity-maximum": max(entity_values),
"organisation": organisation,
}
]


def _map_transformed_entities(transformed_csv_path, pipeline_dir): # noqa: C901
"""Extract unique entities from transformed CSV and lookup their details in lookup.csv."""

Expand Down
4 changes: 4 additions & 0 deletions request-processor/src/application/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,11 @@ def create_user_friendly_error_log(exception_detail):
status_code = exception_detail.get("errCode")
exception_type = exception_detail.get("exceptionType")
content_type = exception_detail.get("contentType")
plugin = exception_detail.get("plugin")

user_message = "An error occurred, please try again later."

# The ordering here has been considered to show the most relevant message to users in the front end
if exception_type in ["SSLError", "SSLCertVerificationError"]:
user_message = "SSL certificate verification failed"
elif content_type and "text/html" in content_type:
Expand All @@ -264,6 +266,8 @@ def create_user_friendly_error_log(exception_detail):
user_message = "The URL must be accessible"
elif status_code == "404":
user_message = "The URL does not exist. Check the URL you've entered is correct (HTTP 404 error)"
elif plugin == "arcgis" and status_code == "200":
user_message = "URL must be the data layer"

result = dict(exception_detail)
result["message"] = user_message
Expand Down
5 changes: 4 additions & 1 deletion request-processor/src/application/core/workflow.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
import hashlib
import os
import csv
from pathlib import Path
Expand Down Expand Up @@ -463,6 +464,8 @@ def add_data_workflow(
] = f"Unable to find lookups for collection '{collection}', dataset '{dataset}'"
return response_data

endpoint_hash = hashlib.sha256(url.encode("utf-8")).hexdigest()

# All processes arount transforming the data and generating pipeline summary
pipeline_summary = fetch_add_data_response(
dataset=dataset,
Expand All @@ -472,7 +475,7 @@ def add_data_workflow(
output_path=output_path,
specification_dir=directories.SPECIFICATION_DIR,
cache_dir=directories.CACHE_DIR,
url=url,
endpoint=endpoint_hash,
)

# Create endpoint and source summaries in workflow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ def __init__(self, log={}):
# Content type of the response useful for text/html checks (when not using arcgis/wfs plugin)
self.content_type = log.get("content-type")
self.message_detail = log.get("user_message_detail")
self.plugin = log.get("plugin")

self.load()
super().__init__(self.detail)

def load(self):
# This is not the best way to do this but keeps backward compatibility for now
self.detail = {
"errCode": str(self.status) if self.status is not None else None,
"errType": "User Error",
Expand All @@ -60,4 +62,5 @@ def load(self):
"fetchStatus": self.fetch_status,
"exceptionType": self.exception_type,
"contentType": self.content_type,
"plugin": self.plugin,
}
1 change: 1 addition & 0 deletions request-processor/src/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@ def _fetch_resource(resource_dir, url):
}
)
elif log.get("exception") or log.get("status", "").startswith("4"):
log["plugin"] = plugin # Save plugin used for arcgis error context
break

# All fetch attempts failed - include content-type if available
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def test_fetch_add_data_response_success(monkeypatch, tmp_path):
input_path = tmp_path / "resource"
specification_dir = tmp_path / "specification"
cache_dir = tmp_path / "cache"
url = "http://example.com/endpoint"
endpoint = "abc123hash"

input_path.mkdir(parents=True)
pipeline_dir.mkdir(parents=True)
Expand Down Expand Up @@ -68,7 +68,7 @@ def test_fetch_add_data_response_success(monkeypatch, tmp_path):
output_path=str(input_path / "output.csv"),
specification_dir=str(specification_dir),
cache_dir=str(cache_dir),
url=url,
endpoint=endpoint,
)

assert "new-in-resource" in result
Expand All @@ -83,7 +83,7 @@ def test_fetch_add_data_response_no_files(monkeypatch, tmp_path):
input_path = tmp_path / "resource"
specification_dir = tmp_path / "specification"
cache_dir = tmp_path / "cache"
url = "http://example.com/endpoint"
endpoint = "abc123hash"

input_path.mkdir(parents=True)
pipeline_dir.mkdir(parents=True)
Expand All @@ -103,7 +103,7 @@ def test_fetch_add_data_response_no_files(monkeypatch, tmp_path):
output_path=str(input_path / "output.csv"),
specification_dir=str(specification_dir),
cache_dir=str(cache_dir),
url=url,
endpoint=endpoint,
)

assert "new-in-resource" in result
Expand All @@ -118,7 +118,7 @@ def test_fetch_add_data_response_file_not_found(monkeypatch, tmp_path):
input_path = tmp_path / "nonexistent"
specification_dir = tmp_path / "specification"
cache_dir = tmp_path / "cache"
url = "http://example.com/endpoint"
endpoint = "abc123hash"

pipeline_dir.mkdir(parents=True)

Expand All @@ -138,7 +138,7 @@ def test_fetch_add_data_response_file_not_found(monkeypatch, tmp_path):
output_path=str(input_path / "output.csv"),
specification_dir=str(specification_dir),
cache_dir=str(cache_dir),
url=url,
endpoint=endpoint,
)


Expand All @@ -150,7 +150,7 @@ def test_fetch_add_data_response_handles_processing_error(monkeypatch, tmp_path)
input_path = tmp_path / "resource"
specification_dir = tmp_path / "specification"
cache_dir = tmp_path / "cache"
url = "http://example.com/endpoint"
endpoint = "abc123hash"

input_path.mkdir(parents=True)
pipeline_dir.mkdir(parents=True)
Expand Down Expand Up @@ -178,7 +178,7 @@ def raise_exception(*args, **kwargs):
output_path=str(input_path / "output.csv"),
specification_dir=str(specification_dir),
cache_dir=str(cache_dir),
url=url,
endpoint=endpoint,
)

assert "new-in-resource" in result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
fetch_add_data_collection_csvs,
)
import csv
import hashlib
import os
from pathlib import Path
import urllib
Expand Down Expand Up @@ -409,7 +410,7 @@ def fake_fetch_add_data_response(
output_path,
specification_dir,
cache_dir,
url,
endpoint,
):
called["fetch_add_data_response"] = {
"dataset": dataset,
Expand All @@ -419,7 +420,7 @@ def fake_fetch_add_data_response(
"output_path": output_path,
"specification_dir": specification_dir,
"cache_dir": cache_dir,
"url": url,
"endpoint": endpoint,
}
return {"result": "ok"}

Expand Down Expand Up @@ -470,7 +471,8 @@ def fake_fetch_add_data_response(
== directories.SPECIFICATION_DIR
)
assert called["fetch_add_data_response"]["cache_dir"] == directories.CACHE_DIR
assert called["fetch_add_data_response"]["url"] == url
expected_endpoint_hash = hashlib.sha256(url.encode("utf-8")).hexdigest()
assert called["fetch_add_data_response"]["endpoint"] == expected_endpoint_hash


def test_fetch_add_data_pipeline_csvs_from_url(monkeypatch, tmp_path):
Expand Down
1 change: 1 addition & 0 deletions request_model/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class AddDataParams(Params):
type: Literal[RequestTypeEnum.add_data] = RequestTypeEnum.add_data
url: Optional[str] = None
documentation_url: Optional[str] = None
authoritative: Optional[bool] = None
licence: Optional[str] = None
start_date: Optional[str] = None
organisation: Optional[str] = None
Expand Down
2 changes: 1 addition & 1 deletion scripts/debug_trigger_add_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def ensure_request_exists(request_payload: dict) -> None:
"collection": "article-4-direction",
"column_mapping": None,
"geom_type": None,
"url": "https://smbc-opendata.s3.eu-west-1.amazonaws.com/Article4/Article4_Dataset_Stockport.csv",
"url": "https://raw.githubusercontent.com/digital-land/PublishExamples/refs/heads/main/Article4Direction/Files/Article4DirectionArea/articel4directionareas-(wrongColName-NewRefs).csv",
"documentation_url": "https://example.com/article-4-direction/documentation",
"licence": "ogl3",
"start_date": "2020-01-01",
Expand Down
Loading