Skip to content

Commit be46a4f

Browse files
committed
lint
1 parent b122140 commit be46a4f

File tree

3 files changed

+55
-52
lines changed

3 files changed

+55
-52
lines changed

request-processor/src/application/core/pipeline.py

Lines changed: 44 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,9 @@
44
from digital_land.specification import Specification
55
from digital_land.organisation import Organisation
66
from digital_land.api import API
7-
from collections import defaultdict
87

98
from digital_land.pipeline import Pipeline, Lookups
109
from digital_land.commands import get_resource_unidentified_lookups
11-
from digital_land.api import API
1210
from application.core.utils import append_endpoint, append_source
1311
from datetime import datetime
1412
from pathlib import Path
@@ -83,17 +81,27 @@ def fetch_response_data(
8381
output_path=os.path.join(
8482
transformed_dir, dataset, request_id, f"{resource}.csv"
8583
),
86-
organisation=Organisation(os.path.join(cache_dir, "organisation.csv"), Path(pipeline.path)),
84+
organisation=Organisation(
85+
os.path.join(cache_dir, "organisation.csv"), Path(pipeline.path)
86+
),
8787
resource=resource,
88-
valid_category_values = api.get_valid_category_values(dataset, pipeline),
89-
converted_path=os.path.join(converted_dir, request_id, f"{resource}.csv"),
88+
valid_category_values=api.get_valid_category_values(dataset, pipeline),
89+
converted_path=os.path.join(
90+
converted_dir, request_id, f"{resource}.csv"
91+
),
9092
disable_lookups=True,
9193
)
9294
# Issue log needs severity column added, so manually added and saved here
93-
issue_log.add_severity_column(os.path.join(specification_dir, "issue-type.csv"))
94-
issue_log.save(os.path.join(issue_dir, dataset, request_id, resource + ".csv"))
95+
issue_log.add_severity_column(
96+
os.path.join(specification_dir, "issue-type.csv")
97+
)
98+
issue_log.save(
99+
os.path.join(issue_dir, dataset, request_id, resource + ".csv")
100+
)
95101
pipeline.save_logs(
96-
column_field_path=os.path.join(column_field_dir, dataset, request_id, resource + ".csv"),
102+
column_field_path=os.path.join(
103+
column_field_dir, dataset, request_id, resource + ".csv"
104+
),
97105
dataset_resource_path=os.path.join(
98106
dataset_resource_dir, dataset, request_id, resource + ".csv"
99107
),
@@ -112,7 +120,13 @@ def default_output_path(command, input_path):
112120

113121

114122
def assign_entries(
115-
resource_path, dataset, organisation, pipeline_dir, specification, cache_dir, endpoints=None
123+
resource_path,
124+
dataset,
125+
organisation,
126+
pipeline_dir,
127+
specification,
128+
cache_dir,
129+
endpoints=None,
116130
):
117131
pipeline = Pipeline(pipeline_dir, dataset)
118132
resource_lookups = get_resource_unidentified_lookups(
@@ -138,7 +152,7 @@ def assign_entries(
138152
)
139153

140154
lookups.load_csv()
141-
155+
142156
# Track which entries are new by checking before adding
143157
new_entries_added = []
144158
for new_lookup in unassigned_entries:
@@ -157,19 +171,20 @@ def assign_entries(
157171
)
158172

159173
newly_assigned = lookups.save_csv()
160-
174+
161175
# Filter to return only the entries we just added
162176
if newly_assigned:
163177
new_lookups = [
164-
lookup for lookup in newly_assigned
178+
lookup
179+
for lookup in newly_assigned
165180
if any(
166-
lookup.get("reference") == entry.get("reference")
181+
lookup.get("reference") == entry.get("reference")
167182
and lookup.get("organisation") == entry.get("organisation")
168183
for entry in new_entries_added
169184
)
170185
]
171186
return new_lookups
172-
187+
173188
return []
174189

175190

@@ -185,15 +200,17 @@ def fetch_add_data_response(
185200
url,
186201
documentation_url,
187202
):
188-
try:
203+
try:
189204
specification = Specification(specification_dir)
190205
pipeline = Pipeline(pipeline_dir, dataset, specification=specification)
191-
organisation = Organisation(os.path.join(cache_dir, "organisation.csv"), Path(pipeline.path))
206+
organisation = Organisation(
207+
os.path.join(cache_dir, "organisation.csv"), Path(pipeline.path)
208+
)
192209
api = API(specification=specification)
193210

194211
# TODO: Need to load config class for correct transform?
195212
# TODO: Handling of column mapping?
196-
valid_category_values = api.get_valid_category_values(dataset, pipeline)
213+
valid_category_values = api.get_valid_category_values(dataset, pipeline)
197214

198215
files_in_resource = os.listdir(input_dir)
199216

@@ -213,18 +230,21 @@ def fetch_add_data_response(
213230
organisation=organisation,
214231
organisations=[organisation_provider],
215232
resource=resource_from_path(resource_file_path),
216-
valid_category_values = valid_category_values,
233+
valid_category_values=valid_category_values,
217234
disable_lookups=False,
218235
)
219236

220237
existing_entities.extend(
221-
_map_existing_entities_from_transformed_csv(output_path, pipeline_dir)
238+
_map_transformed_entities(output_path, pipeline_dir)
222239
)
223240

224241
# Check if there are unknown entity issues in the log
225-
unknown_issue_types = {'unknown entity', 'unknown entity - missing reference'}
242+
unknown_issue_types = {
243+
"unknown entity",
244+
"unknown entity - missing reference",
245+
}
226246
has_unknown = any(
227-
row.get('issue-type') in unknown_issue_types
247+
row.get("issue-type") in unknown_issue_types
228248
for row in issues_log.rows
229249
if isinstance(row, dict)
230250
)
@@ -246,8 +266,7 @@ def fetch_add_data_response(
246266
else:
247267
logger.info(f"No unidentified lookups found in {resource_file}")
248268

249-
250-
# TODO: Re-run to see if no unidentified lookups remain and if so create an error summary for add data command
269+
# TODO: Re-run to see if no unidentified remain, if so new add data error summary
251270

252271
except Exception as err:
253272
logger.error(f"Error processing {resource_file}: {err}")
@@ -340,11 +359,11 @@ def _get_existing_entities_breakdown(existing_entities):
340359
return breakdown
341360

342361

343-
def _map_existing_entities_from_transformed_csv(transformed_csv_path, pipeline_dir):
362+
def _map_transformed_entities(transformed_csv_path, pipeline_dir): # noqa: C901
344363
"""Extract unique entities from transformed CSV and lookup their details in lookup.csv."""
345364

346365
mapped_entities = []
347-
366+
348367
if not os.path.exists(transformed_csv_path):
349368
logger.warning(f"Transformed CSV not found: {transformed_csv_path}")
350369
return mapped_entities

request-processor/src/application/core/workflow.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ def add_data_workflow(
419419
):
420420
"""
421421
Setup directories and download required CSVs to manage add-data pipeline, then invoke fetch_add_data_response, also clean up.
422-
422+
423423
Args:
424424
file_name (str): Collection resource file name
425425
request_id (str): Unique request identifier
@@ -436,7 +436,7 @@ def add_data_workflow(
436436
output_path = os.path.join(directories.TRANSFORMED_DIR, request_id, file_name)
437437
if not os.path.exists(output_path):
438438
os.makedirs(os.path.dirname(output_path), exist_ok=True)
439-
439+
440440
# TODO: Can this use fetch_pipeline_csvs function instead?, do seem to need main config source (GitHub) for real time data
441441
fetch_add_data_csvs(collection, pipeline_dir)
442442

@@ -473,7 +473,7 @@ def add_data_workflow(
473473

474474

475475
def fetch_add_data_csvs(collection, pipeline_dir):
476-
"""Download add-data pipeline CSVs (lookup, endpoint, source) into pipeline_dir and ensure organisation """
476+
"""Download add-data pipeline CSVs (lookup, endpoint, source) into pipeline_dir and ensure organisation"""
477477
os.makedirs(pipeline_dir, exist_ok=True)
478478
add_data_csvs = ["lookup.csv", "endpoint.csv", "source.csv"]
479479
fetched_files = []

request-processor/tests/unit/src/application/core/test_pipeline.py

Lines changed: 8 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,8 @@ def test_fetch_add_data_response_success(monkeypatch, tmp_path):
6161
monkeypatch.setattr(
6262
"src.application.core.pipeline.Lookups", lambda x: mock_lookups_instance
6363
)
64-
monkeypatch.setattr(
65-
"src.application.core.pipeline.Pipeline", MagicMock()
66-
)
67-
monkeypatch.setattr(
68-
"src.application.core.pipeline.Organisation", MagicMock()
69-
)
64+
monkeypatch.setattr("src.application.core.pipeline.Pipeline", MagicMock())
65+
monkeypatch.setattr("src.application.core.pipeline.Organisation", MagicMock())
7066
monkeypatch.setattr(
7167
"src.application.core.pipeline._validate_endpoint",
7268
lambda url, dir: {"endpoint_url_in_endpoint_csv": True},
@@ -113,12 +109,8 @@ def test_fetch_add_data_response_no_files(monkeypatch, tmp_path):
113109
monkeypatch.setattr(
114110
"src.application.core.pipeline.Specification", lambda x: mock_spec
115111
)
116-
monkeypatch.setattr(
117-
"src.application.core.pipeline.Pipeline", MagicMock()
118-
)
119-
monkeypatch.setattr(
120-
"src.application.core.pipeline.Organisation", MagicMock()
121-
)
112+
monkeypatch.setattr("src.application.core.pipeline.Pipeline", MagicMock())
113+
monkeypatch.setattr("src.application.core.pipeline.Organisation", MagicMock())
122114
monkeypatch.setattr(
123115
"src.application.core.pipeline._validate_endpoint",
124116
lambda url, dir: {"endpoint_url_in_endpoint_csv": True},
@@ -164,12 +156,8 @@ def test_fetch_add_data_response_file_not_found(monkeypatch, tmp_path):
164156
monkeypatch.setattr(
165157
"src.application.core.pipeline.Specification", lambda x: mock_spec
166158
)
167-
monkeypatch.setattr(
168-
"src.application.core.pipeline.Pipeline", MagicMock()
169-
)
170-
monkeypatch.setattr(
171-
"src.application.core.pipeline.Organisation", MagicMock()
172-
)
159+
monkeypatch.setattr("src.application.core.pipeline.Pipeline", MagicMock())
160+
monkeypatch.setattr("src.application.core.pipeline.Organisation", MagicMock())
173161

174162
with pytest.raises(FileNotFoundError):
175163
fetch_add_data_response(
@@ -211,12 +199,8 @@ def test_fetch_add_data_response_handles_processing_error(monkeypatch, tmp_path)
211199
monkeypatch.setattr(
212200
"src.application.core.pipeline.Specification", lambda x: mock_spec
213201
)
214-
monkeypatch.setattr(
215-
"src.application.core.pipeline.Pipeline", MagicMock()
216-
)
217-
monkeypatch.setattr(
218-
"src.application.core.pipeline.Organisation", MagicMock()
219-
)
202+
monkeypatch.setattr("src.application.core.pipeline.Pipeline", MagicMock())
203+
monkeypatch.setattr("src.application.core.pipeline.Organisation", MagicMock())
220204

221205
def raise_exception(*args, **kwargs):
222206
raise Exception("Processing error")

0 commit comments

Comments
 (0)