Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 67 additions & 1 deletion digital_land/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ def dataset_create(
path=output_path,
specification_dir=None, # TBD: package should use this specification object
)
# don'tt use create as we don't want to create the indexes
# don't use create as we don't want to create the indexes
package.create_database()
package.disconnect()
for path in input_paths:
Expand Down Expand Up @@ -474,6 +474,72 @@ def dataset_create(
package.add_counts()


#
# update dataset from processed new resources
#
def dataset_update(
input_paths,
output_path,
organisation_path,
pipeline,
dataset,
specification,
bucket_name=None, # bucket name from bash script, need to put into cli.
object_key=None, # object-key, latter part of 'bucket'
issue_dir="issue",
column_field_dir="var/column-field",
dataset_resource_dir="var/dataset-resource",
):
"""
Updates the current state of the sqlite files being held in S3 with new resources
"""
if not output_path:
print("missing output path", file=sys.stderr)
sys.exit(2)

if not bucket_name or not object_key:
print("Missing bucket name or object_key to get sqlite files", file=sys.stderr)
sys.exit(2)

# Set up initial objects
column_field_dir = Path(column_field_dir)
dataset_resource_dir = Path(dataset_resource_dir)
organisation = Organisation(
organisation_path=organisation_path, pipeline_dir=Path(pipeline.path)
)
package = DatasetPackage(
dataset,
organisation=organisation,
path=output_path,
specification_dir=None, # TBD: package should use this specification object
)
# Copy files from S3 and load into tables
table_name = dataset
package.load_from_s3(
bucket_name=bucket_name, object_key=object_key, table_name=table_name
)

for path in input_paths:
path_obj = Path(path)
package.load_transformed(path)
package.load_column_fields(column_field_dir / dataset / path_obj.name)
package.load_dataset_resource(dataset_resource_dir / dataset / path_obj.name)
package.load_entities()

old_entity_path = os.path.join(pipeline.path, "old-entity.csv")
if os.path.exists(old_entity_path):
package.load_old_entities(old_entity_path)

issue_paths = os.path.join(issue_dir, dataset)
if os.path.exists(issue_paths):
for issue_path in os.listdir(issue_paths):
package.load_issues(os.path.join(issue_paths, issue_path))
else:
logging.warning("No directory for this dataset in the provided issue_directory")

package.add_counts()


def dataset_dump(input_path, output_path):
cmd = f"sqlite3 -header -csv {input_path} 'select * from entity;' > {output_path}"
logging.info(cmd)
Expand Down
37 changes: 35 additions & 2 deletions digital_land/package/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import csv
import sqlite3
import logging
import boto3
import botocore.exceptions
from .package import Package
from decimal import Decimal

Expand Down Expand Up @@ -298,15 +300,46 @@ def create_database(self):
if os.path.exists(self.path):
os.remove(self.path)

self.set_up_connection()

self.create_tables()

def set_up_connection(self):
self.connect()

if self._spatialite:
self.connection.execute("select InitSpatialMetadata(1)")

self.create_tables()

def create(self):
self.create_database()
self.load()
self.create_indexes()
self.disconnect()

def load_from_s3(self, bucket_name, object_key, table_name):
# Ensure parameters are valid
if not isinstance(bucket_name, str) or not isinstance(object_key, str):
raise ValueError("Bucket name and object key must be strings.")

local_path = os.path.dirname(self.path)
s3 = boto3.client("s3")

file_key = f"{table_name}.sqlite3"
local_file_path = os.path.join(local_path, file_key)

try:
os.makedirs(local_path, exist_ok=True) # Ensure local directory exists
s3.download_file(bucket_name, object_key + "/" + file_key, local_file_path)
except botocore.exceptions.NoCredentialsError:
logger.error(
"❌ AWS credentials not found. Run `aws configure` to set them."
)
except botocore.exceptions.ParamValidationError as e:
logger.error(f"❌ Parameter validation error: {e}")
except botocore.exceptions.ClientError as e:
logger.error(f"❌ AWS S3 error: {e}")

self.set_up_connection()
self.load()
# self.create_indexes()# Do we need this?
self.disconnect()
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ def get_long_description():
"dask[dataframe]",
"pyarrow",
"pygit2",
"boto3",
"moto",
],
entry_points={"console_scripts": ["digital-land=digital_land.cli:cli"]},
setup_requires=["pytest-runner"],
Expand Down
112 changes: 112 additions & 0 deletions tests/integration/package/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ def test_entry_date_upsert_uploads_blank_fields(
# run upload to fact table not fact resource for testing the upsert
package.connect()
package.create_cursor()

fact_fields = package.specification.schema["fact"]["fields"]
fact_conflict_fields = ["fact"]
fact_update_fields = [
Expand Down Expand Up @@ -406,3 +407,114 @@ def test_entry_date_upsert_uploads_blank_fields(
]

assert actual_result == expected_result, "actual result does not match query"


def test_insert_newest_date(
specification_dir,
organisation_csv,
blank_patch_csv,
transformed_fact_resources,
transformed_fact_resources_with_blank,
tmp_path,
):
dataset = "conservation-area"
sqlite3_path = os.path.join(tmp_path, f"{dataset}.sqlite3")

organisation = Organisation(
organisation_path=organisation_csv,
pipeline_dir=Path(os.path.dirname(blank_patch_csv)),
)
package = DatasetPackage(
"conservation-area",
organisation=organisation,
path=sqlite3_path,
specification_dir=specification_dir, # TBD: package should use this specification object
)

# create package
package.create()

# run upload to fact table not fact resource for testing the upsert
package.connect()
package.create_cursor()

fact_resource_fields = package.specification.schema["fact-resource"]["fields"]
for row in transformed_fact_resources:
package.insert("fact-resource", fact_resource_fields, row, upsert=True)
package.commit()
package.disconnect()

# retrieve results
package.connect()
package.create_cursor()
package.cursor.execute("SELECT * FROM fact_resource;")
cols = [column[0] for column in package.cursor.description]
actual_result = pd.DataFrame.from_records(
package.cursor.fetchall(), columns=cols
).to_dict(orient="records")
expected_result = [
{
"end_date": "",
"fact": "1f90248fd06e49accd42b80e43d58beeac300f942f1a9f71da4b64865356b1f3",
"entry_date": "2021-09-06",
"entry_number": None,
"priority": None,
"resource": "",
"start_date": "",
},
{
"end_date": "",
"fact": "1f90248fd06e49accd42b80e43d58beeac300f942f1a9f71da4b64865356b1f3",
"entry_date": "2022-11-02",
"entry_number": None,
"priority": None,
"resource": "",
"start_date": "",
},
]

assert actual_result == expected_result, "actual result does not match query"

# create package
package.create()

# run upload to fact table not fact resource for testing the upsert
package.connect()
package.create_cursor()

fact_resource_fields = package.specification.schema["fact-resource"]["fields"]
for row in transformed_fact_resources_with_blank:
package.insert("fact-resource", fact_resource_fields, row, upsert=True)
package.commit()
package.disconnect()

# retrieve results
package.connect()
package.create_cursor()
package.cursor.execute("SELECT * FROM fact_resource;")
cols = [column[0] for column in package.cursor.description]
actual_result = pd.DataFrame.from_records(
package.cursor.fetchall(), columns=cols
).to_dict(orient="records")
expected_result = [
{
"end_date": "2021-12-31",
"fact": "1f90248fd06e49accd42b80e43d58beeac300f942f1a9f71da4b64865356b1f3",
"entry_date": "2021-09-06",
"entry_number": None,
"priority": None,
"resource": "",
"start_date": "",
},
{
"end_date": "",
"fact": "1f90248fd06e49accd42b80e43d58beeac300f942f1a9f71da4b64865356b1f3",
"entry_date": "2022-11-02",
"entry_number": None,
"priority": None,
"resource": "",
"start_date": "",
},
]

assert actual_result == expected_result, "actual result does not match query"
Loading