Skip to content

Commit 616a24d

Browse files
authored
Merge pull request #92 from digital-land/1780-new-pipeline
Changing over to new Pipeline Class Management
2 parents e18e4f0 + 429e991 commit 616a24d

File tree

5 files changed

+266
-172
lines changed

5 files changed

+266
-172
lines changed

.vscode/DEBUG_SETUP.md

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
# Debug Setup Guide for Request Processor Checking of URL
2+
3+
### Prerequisites
4+
5+
You need these services running:
6+
- PostgreSQL (or use Docker version with port 54320 exposed)
7+
- Redis (for Celery broker) - or use Docker version with port 6379 exposed
8+
- LocalStack (for S3/SQS) - or use Docker version with port 4566 exposed
9+
- **Do Not** have the request-processor running.
10+
11+
Keep your Docker Compose stack running with:
12+
```bash
13+
docker compose up -d localstack request-db redis
14+
```
15+
16+
- Make sure to have .venv installed locally for request-processor, such that the launch.json is correctly pointing to it
17+
18+
### Start Debugging of CheckURL
19+
20+
Click **"Run"** in VS Code Debug
21+
22+
The script will:
23+
1. Invoke the `check_dataurl` task directly (no Celery broker needed)
24+
2. Pass your POST request body payload <-- make sure to set the request_body variable to data you want to test in debug_trigger.py
25+
3. Pause at breakpoints you've set
26+
27+
28+
## Environment Variables
29+
30+
The launch configuration sets these automatically:
31+
32+
```
33+
DATABASE_URL=postgresql://postgres:password@localhost:54320/request_database
34+
CELERY_BROKER_URL=redis://localhost:6379/0
35+
AWS_ENDPOINT_URL=http://localhost:4566
36+
AWS_DEFAULT_REGION=eu-west-2
37+
SENTRY_ENABLED=false
38+
```
39+
40+
If your local setup differs, edit `.vscode/launch.json` and update the `env` section within debugging request processor.
41+
42+
---
43+
44+
## Notes
45+
46+
- The `debug_trigger.py` script calls `check_dataurl()` **synchronously**, without Celery. This is intentional—it makes debugging simpler.
47+
- The request database transaction will be created when the task runs, so you can inspect the DB afterward.
48+
- The `docker_volume` folder should now contain the downloaded resources under `/opt/collection/resource/<request-id>/` after the task completes.
49+

.vscode/launch.json

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
{
2+
"version": "0.2.0",
3+
"python": "${workspaceFolder}/request-processor/.venv/bin/python",
4+
"configurations": [
5+
{
6+
"name": "Debug Request Processor - Manual Task Trigger",
7+
"type": "python",
8+
"request": "launch",
9+
"python": "${workspaceFolder}/request-processor/.venv/bin/python",
10+
"program": "${workspaceFolder}/scripts/debug_trigger.py",
11+
"console": "integratedTerminal",
12+
"justMyCode": false,
13+
"env": {
14+
"PYTHONPATH": "${workspaceFolder}:${workspaceFolder}/request-processor:${workspaceFolder}/request-processor/.venv/src/digital-land",
15+
"DATABASE_URL": "postgresql://postgres:password@localhost:54320/request_database",
16+
"CELERY_BROKER_URL": "redis://localhost:6379/0",
17+
"AWS_ENDPOINT_URL": "http://localhost:4566",
18+
"AWS_DEFAULT_REGION": "eu-west-2",
19+
"AWS_ACCESS_KEY_ID": "example",
20+
"AWS_SECRET_ACCESS_KEY": "example",
21+
"REQUEST_FILES_BUCKET_NAME": "dluhc-data-platform-request-files-local",
22+
"SENTRY_ENABLED": "false"
23+
},
24+
"args": [],
25+
"cwd": "${workspaceFolder}/request-processor"
26+
},
27+
{
28+
"name": "Debug Request Processor - Celery Worker",
29+
"type": "python",
30+
"request": "launch",
31+
"python": "${workspaceFolder}/request-processor/.venv/bin/python",
32+
"program": "-m",
33+
"args": [
34+
"celery",
35+
"-A",
36+
"src.tasks",
37+
"worker",
38+
"--loglevel=debug",
39+
"--concurrency=1"
40+
],
41+
"console": "integratedTerminal",
42+
"justMyCode": false,
43+
"env": {
44+
"PYTHONPATH": "${workspaceFolder}:${workspaceFolder}/request-processor:${workspaceFolder}/request-processor/.venv/src/digital-land",
45+
"DATABASE_URL": "postgresql://postgres:password@localhost:54320/request_database",
46+
"CELERY_BROKER_URL": "redis://localhost:6379/0",
47+
"AWS_ENDPOINT_URL": "http://localhost:4566",
48+
"AWS_DEFAULT_REGION": "eu-west-2",
49+
"AWS_ACCESS_KEY_ID": "example",
50+
"AWS_SECRET_ACCESS_KEY": "example",
51+
"REQUEST_FILES_BUCKET_NAME": "dluhc-data-platform-request-files-local",
52+
"SENTRY_ENABLED": "false"
53+
},
54+
"cwd": "${workspaceFolder}/request-processor"
55+
}
56+
]
57+
}

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,3 +103,7 @@ To include new dependencies, update the requirements.in file with the desired pa
103103
```
104104

105105
This ensures that your project accurately reflects its dependencies, including any transitive dependencies required by the newly added packages.
106+
107+
## Debug
108+
109+
Because this is a monorepo, debugging can be challenging. The primary approach is to review the logs for each sub‑repository through Docker. However, if you need to perform breakpoint debugging, a debug configuration is available under the .vscode/ folder, currently only set up for the request-processor service.

request-processor/src/application/core/pipeline.py

Lines changed: 22 additions & 172 deletions
Original file line numberDiff line numberDiff line change
@@ -2,35 +2,10 @@
22
import csv
33
from application.logging.logger import get_logger
44
from digital_land.specification import Specification
5-
from digital_land.log import DatasetResourceLog, IssueLog, ColumnFieldLog
65
from digital_land.organisation import Organisation
7-
from digital_land.phase.combine import FactCombinePhase
8-
from digital_land.phase.concat import ConcatFieldPhase
9-
from digital_land.phase.convert import ConvertPhase
10-
from digital_land.phase.default import DefaultPhase
11-
from digital_land.phase.factor import FactorPhase
12-
from digital_land.phase.filter import FilterPhase
13-
from digital_land.phase.harmonise import HarmonisePhase
14-
from digital_land.phase.lookup import (
15-
EntityLookupPhase,
16-
FactLookupPhase,
17-
)
18-
19-
from digital_land.phase.map import MapPhase
20-
from digital_land.phase.migrate import MigratePhase
21-
from digital_land.phase.normalise import NormalisePhase
22-
from digital_land.phase.organisation import OrganisationPhase
23-
from digital_land.phase.parse import ParsePhase
24-
from digital_land.phase.patch import PatchPhase
25-
from digital_land.phase.pivot import PivotPhase
26-
from digital_land.phase.prefix import EntityPrefixPhase
27-
from digital_land.phase.prune import FieldPrunePhase, FactPrunePhase
28-
from digital_land.phase.reference import EntityReferencePhase, FactReferencePhase
29-
from digital_land.phase.save import SavePhase
30-
from digital_land.phase.priority import PriorityPhase
31-
from digital_land.pipeline import run_pipeline, Pipeline, Lookups
6+
7+
from digital_land.pipeline import Pipeline, Lookups
328
from digital_land.commands import get_resource_unidentified_lookups
33-
from digital_land.check import duplicate_reference_check
349
from digital_land.api import API
3510
from application.core.utils import append_endpoint, append_source
3611
from datetime import datetime
@@ -55,9 +30,10 @@ def fetch_response_data(
5530
additional_col_mappings,
5631
additional_concats,
5732
):
58-
# define variables for Pipeline and specification
59-
pipeline = Pipeline(pipeline_dir, dataset)
33+
# define variables for Pipeline Execution
6034
specification = Specification(specification_dir)
35+
pipeline = Pipeline(pipeline_dir, dataset, specification=specification)
36+
api = API(specification=specification)
6137

6238
input_path = os.path.join(collection_dir, "resource", request_id)
6339
# List all files in the "resource" directory
@@ -99,155 +75,29 @@ def fetch_response_data(
9975
os.path.join(dataset_resource_dir, dataset, request_id), exist_ok=True
10076
)
10177
try:
102-
pipeline_run(
103-
dataset=dataset,
104-
pipeline=pipeline,
105-
request_id=request_id,
106-
specification_dir=specification_dir,
78+
resource = resource_from_path(file_path)
79+
issue_log = pipeline.transform(
10780
input_path=file_path,
10881
output_path=os.path.join(
109-
transformed_dir, dataset, request_id, f"{file_name}.csv"
82+
transformed_dir, dataset, request_id, f"{resource}.csv"
11083
),
111-
issue_dir=os.path.join(issue_dir, dataset, request_id),
112-
column_field_dir=os.path.join(column_field_dir, dataset, request_id),
113-
dataset_resource_dir=os.path.join(
114-
dataset_resource_dir, dataset, request_id
84+
organisation=Organisation(os.path.join(cache_dir, "organisation.csv"), Path(pipeline.path)),
85+
resource=resource,
86+
valid_category_values = api.get_valid_category_values(dataset, pipeline),
87+
converted_path=os.path.join(converted_dir, request_id, f"{resource}.csv"),
88+
disable_lookups=True,
89+
)
90+
# Issue log needs severity column added, so manually added and saved here
91+
issue_log.add_severity_column(os.path.join(specification_dir, "issue-type.csv"))
92+
issue_log.save(os.path.join(issue_dir, dataset, request_id, resource + ".csv"))
93+
pipeline.save_logs(
94+
column_field_path=os.path.join(column_field_dir, dataset, request_id, resource + ".csv"),
95+
dataset_resource_path=os.path.join(
96+
dataset_resource_dir, dataset, request_id, resource + ".csv"
11597
),
116-
organisation_path=os.path.join(cache_dir, "organisation.csv"),
117-
save_harmonised=False,
118-
organisations=[organisation],
119-
converted_dir=converted_dir,
12098
)
12199
except Exception as err:
122-
logger.error("An exception occured during pipeline_run: ", str(err))
123-
124-
125-
def pipeline_run(
126-
dataset,
127-
pipeline,
128-
request_id,
129-
specification_dir,
130-
input_path,
131-
output_path,
132-
organisations,
133-
converted_dir,
134-
null_path=None, # TBD: remove this
135-
issue_dir=None,
136-
organisation_path=None,
137-
save_harmonised=False,
138-
column_field_dir=None,
139-
dataset_resource_dir=None,
140-
endpoints=[],
141-
entry_date="",
142-
):
143-
resource = resource_from_path(input_path)
144-
145-
specification = Specification(specification_dir)
146-
schema = specification.pipeline[pipeline.name]["schema"]
147-
intermediate_fieldnames = specification.intermediate_fieldnames(pipeline)
148-
issue_log = IssueLog(dataset=dataset, resource=resource)
149-
column_field_log = ColumnFieldLog(dataset=dataset, resource=resource)
150-
dataset_resource_log = DatasetResourceLog(dataset=dataset, resource=resource)
151-
152-
api = API(specification=specification)
153-
# load pipeline configuration
154-
skip_patterns = pipeline.skip_patterns(resource)
155-
columns = pipeline.columns(resource, endpoints=endpoints)
156-
concats = pipeline.concatenations(resource, endpoints=endpoints)
157-
patches = pipeline.patches(resource=resource)
158-
lookups = pipeline.lookups(resource=resource)
159-
default_fields = pipeline.default_fields(resource=resource)
160-
default_values = pipeline.default_values(endpoints=endpoints)
161-
combine_fields = pipeline.combine_fields(endpoints=endpoints)
162-
163-
# load organisations
164-
organisation = Organisation(organisation_path, Path(pipeline.path))
165-
166-
severity_csv_path = os.path.join(specification_dir, "issue-type.csv")
167-
168-
# Load valid category values
169-
valid_category_values = api.get_valid_category_values(dataset, pipeline)
170-
# resource specific default values
171-
if len(organisations) == 1:
172-
default_values["organisation"] = organisations[0]
173-
174-
run_pipeline(
175-
ConvertPhase(
176-
path=input_path,
177-
dataset_resource_log=dataset_resource_log,
178-
output_path=os.path.join(converted_dir, request_id, f"{resource}.csv"),
179-
),
180-
NormalisePhase(skip_patterns=skip_patterns, null_path=null_path),
181-
ParsePhase(),
182-
ConcatFieldPhase(concats=concats, log=column_field_log),
183-
MapPhase(
184-
fieldnames=intermediate_fieldnames,
185-
columns=columns,
186-
log=column_field_log,
187-
),
188-
FilterPhase(filters=pipeline.filters(resource)),
189-
PatchPhase(
190-
issues=issue_log,
191-
patches=patches,
192-
),
193-
HarmonisePhase(
194-
field_datatype_map=specification.get_field_datatype_map(),
195-
issues=issue_log,
196-
dataset=dataset,
197-
valid_category_values=valid_category_values,
198-
),
199-
DefaultPhase(
200-
default_fields=default_fields,
201-
default_values=default_values,
202-
issues=issue_log,
203-
),
204-
# TBD: move migrating columns to fields to be immediately after map
205-
# this will simplify harmonisation and remove intermediate_fieldnames
206-
# but effects brownfield-land and other pipelines which operate on columns
207-
MigratePhase(
208-
fields=specification.schema_field[schema],
209-
migrations=pipeline.migrations(),
210-
),
211-
OrganisationPhase(organisation=organisation, issues=issue_log),
212-
FieldPrunePhase(fields=specification.current_fieldnames(schema)),
213-
EntityReferencePhase(
214-
dataset=dataset,
215-
prefix=specification.dataset_prefix(dataset),
216-
),
217-
EntityPrefixPhase(dataset=dataset),
218-
EntityLookupPhase(lookups),
219-
SavePhase(
220-
default_output_path("harmonised", input_path),
221-
fieldnames=intermediate_fieldnames,
222-
enabled=save_harmonised,
223-
),
224-
PriorityPhase(config=None),
225-
PivotPhase(),
226-
FactCombinePhase(issue_log=issue_log, fields=combine_fields),
227-
FactorPhase(),
228-
FactReferencePhase(
229-
field_typology_map=specification.get_field_typology_map(),
230-
field_prefix_map=specification.get_field_prefix_map(),
231-
),
232-
FactLookupPhase(
233-
lookups=lookups,
234-
odp_collections=specification.get_odp_collections(),
235-
),
236-
FactPrunePhase(),
237-
SavePhase(
238-
output_path,
239-
fieldnames=specification.factor_fieldnames(),
240-
),
241-
)
242-
243-
issue_log = duplicate_reference_check(issues=issue_log, csv_path=output_path)
244-
245-
# Add the 'severity' and 'description' column based on the mapping
246-
issue_log.add_severity_column(severity_csv_path)
247-
248-
issue_log.save(os.path.join(issue_dir, resource + ".csv"))
249-
column_field_log.save(os.path.join(column_field_dir, resource + ".csv"))
250-
dataset_resource_log.save(os.path.join(dataset_resource_dir, resource + ".csv"))
100+
logger.error("An exception occured during Pipeline Transform: ", str(err))
251101

252102

253103
def resource_from_path(path):

0 commit comments

Comments
 (0)