Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions docs/utility-guides/DataValidation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Utility Guide: Data Validation

The Data Validation Utility can be used to check if there are any duplicate values returned from an SQL query.

## Table of Contents

- [Utility Guide: Data Validation](#utility-guide-data-validation)
- [Table of Contents](#table-of-contents)
- [How This Works](#how-this-works)
- [Using the Data Validation Utility](#using-the-data-validation-utility)
- [Example usage](#example-usage)

## How This Works

This utility first runs the SQL query and then uses functionality from pandas to check if there are duplicate records.<br>
If duplicate records are detected, then it will remove them and re-run the query to replace the dropped records.

## Using the Data Validation Utility

To use this utility import the `DataValidation` class and then call the method `check_for_duplicate_records()`.<br>
Here you will need to provide the SQL query as a multiple line string ensuring that the final line has: **fetch first :subjects_to_retrieve rows only**.<br>
This is necessary as this line is later replaced with an offset if duplicates are found.<br>
You will also need to prove any parameters used in the query as a dictionary

## Example usage

from utils.data_validation import DataValidation

def get_kit_id_from_db(
tk_type_id: int, hub_id: int, no_of_kits_to_retrieve: int
) -> pd.DataFrame:

query = """select tk.kitid, tk.screening_subject_id, sst.subject_nhs_number
from tk_items_t tk
inner join ep_subject_episode_t se on se.screening_subject_id = tk.screening_subject_id
inner join screening_subject_t sst on (sst.screening_subject_id = tk.screening_subject_id)
inner join sd_contact_t sdc on (sdc.nhs_number = sst.subject_nhs_number)
where tk.tk_type_id = :tk_type_id
and tk.logged_in_flag = 'N'
and sdc.hub_id = :hub_id
and device_id is null
and tk.invalidated_date is null
and se.latest_event_status_id in (:s10_event_status, :s19_event_status)
order by tk.kitid DESC
fetch first :subjects_to_retrieve rows only"""

params = {
"s10_event_status": SqlQueryValues.S10_EVENT_STATUS,
"s19_event_status": SqlQueryValues.S19_EVENT_STATUS,
"tk_type_id": tk_type_id,
"hub_id": hub_id,
"subjects_to_retrieve": no_of_kits_to_retrieve,
}

kit_id_df = DataValidation().check_for_duplicate_records(query, params)

return kit_id_df
100 changes: 100 additions & 0 deletions utils/data_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import pandas as pd
import logging
from oracle.oracle import OracleDB


class DataValidation:
"""
This class will be used to validate that there are no duplicate records when obtaining test data.
"""

def __init__(self):
self.max_attempts = 5

def check_for_duplicate_records(self, query: str, params: dict) -> pd.DataFrame:
"""
This method is used to firstly obtain the test data, and then to check if there are any duplicate records.

Args:
query (str): The SQL query you want to run
params (dict): A dictionary of any parameters in the sql query

Returns:
dataframe (pd.DataFrame): A dataframe containing 0 duplicate records
"""
wanted_subject_count = int(params["subjects_to_retrieve"])

dataframe = OracleDB().execute_query(query, params)

attempts = 0
while attempts < self.max_attempts:
logging.info(f"Checking for duplicates. On attempt: {attempts+1}")
duplicate_rows_count = int(dataframe.duplicated().sum())

if duplicate_rows_count == 0:
logging.info("No duplicate records found")
return dataframe

logging.warning(
f"{duplicate_rows_count} duplicate records found. Dropping duplicates and retrying query."
)
dataframe = dataframe.drop_duplicates()
attempts += 1
dataframe = self.run_query_for_dropped_records(
dataframe,
query,
params,
duplicate_rows_count,
wanted_subject_count,
attempts,
)

logging.error(
f"Maximum attempt limit of {self.max_attempts} reached. Returning dataframe with duplicates dropped and not replaced."
)
dataframe = dataframe.drop_duplicates()
actual_subject_count = len(dataframe)
if wanted_subject_count != actual_subject_count:
logging.error(
f"Actual subject count differs to wanted count. {wanted_subject_count} subjects wanted but only {actual_subject_count} subjects were retrieved"
)
return dataframe

def run_query_for_dropped_records(
self,
dataframe: pd.DataFrame,
query: str,
params: dict,
duplicate_count: int,
wanted_subject_count: int,
attempts: int,
) -> pd.DataFrame:
"""
This is used to make up for any dropped duplicate records. It runs the same query again but only returns the amount of dropped records.

Args:
dataframe (pd.DataFrame): The dataframe with duplicates dropped
query (str): The SQL query you want to run
params (dict): A dictionary of any parameters in the sql query
duplicate_count (int): The number of duplicate records in the original dataframe
wanted_subject_count (int): The number of subjects to retrieve in the original query
attempts (int): The number of attempts so far

Returns:
dataframe_without_duplicates (pd.DataFrame): A dataframe matching the original record count
"""
params["offset_value"] = wanted_subject_count + attempts
params["subjects_to_retrieve"] = duplicate_count

query = query.strip().split("\n")
query[-1] = (
"OFFSET :offset_value ROWS FETCH FIRST :subjects_to_retrieve rows only"
)
query = "\n".join(query)

dataframe_with_new_subjects = OracleDB().execute_query(query, params)

combined_dataframe = pd.concat(
[dataframe, dataframe_with_new_subjects], ignore_index=True
)
return combined_dataframe
6 changes: 3 additions & 3 deletions utils/oracle/oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def connect_to_db(self) -> oracledb.Connection:
return conn
except Exception as queryExecutionError:
logging.error(
f"Failed to to extract subject ID with error: {queryExecutionError}"
f"Failed to extract subject ID with error: {queryExecutionError}"
)

def disconnect_from_db(self, conn: oracledb.Connection) -> None:
Expand Down Expand Up @@ -147,15 +147,15 @@ def delete_all_users_from_approved_users_table(
self.disconnect_from_db(conn)

def execute_query(
self, query: str, parameters: list | None = None
self, query: str, parameters: dict | None = None
) -> pd.DataFrame: # To use when "select xxxx" (stored procedures)
"""
This is used to execute any sql queries.
A query is provided and then the result is returned as a pandas dataframe

Args:
query (str): The SQL query you wish to run
parameters (list | None): Optional - Any parameters you want to pass on in a list
parameters (dict | None): Optional - Any parameters you want to pass on in a dictionary

Returns:
df (pd.DataFrame): A pandas dataframe of the result of the query
Expand Down
87 changes: 59 additions & 28 deletions utils/oracle/oracle_specific_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pandas as pd
from datetime import datetime
from enum import IntEnum
from utils.data_validation import DataValidation


class SqlQueryValues(IntEnum):
Expand All @@ -29,21 +30,30 @@ def get_kit_id_from_db(
kit_id_df (pd.DataFrame): A pandas DataFrame containing the result of the query
"""
logging.info("Retrieving useable test kit ids")
kit_id_df = OracleDB().execute_query(
f"""select tk.kitid, tk.screening_subject_id, sst.subject_nhs_number
query = """select tk.kitid, tk.screening_subject_id, sst.subject_nhs_number
from tk_items_t tk
inner join ep_subject_episode_t se on se.screening_subject_id = tk.screening_subject_id
inner join screening_subject_t sst on (sst.screening_subject_id = tk.screening_subject_id)
inner join sd_contact_t sdc on (sdc.nhs_number = sst.subject_nhs_number)
where tk.tk_type_id = {tk_type_id}
where tk.tk_type_id = :tk_type_id
and tk.logged_in_flag = 'N'
and sdc.hub_id = {hub_id}
and sdc.hub_id = :hub_id
and device_id is null
and tk.invalidated_date is null
and se.latest_event_status_id in ({SqlQueryValues.S10_EVENT_STATUS}, {SqlQueryValues.S19_EVENT_STATUS})
and se.latest_event_status_id in (:s10_event_status, :s19_event_status)
order by tk.kitid DESC
fetch first {no_of_kits_to_retrieve} rows only"""
)
fetch first :subjects_to_retrieve rows only"""

params = {
"s10_event_status": SqlQueryValues.S10_EVENT_STATUS,
"s19_event_status": SqlQueryValues.S19_EVENT_STATUS,
"tk_type_id": tk_type_id,
"hub_id": hub_id,
"subjects_to_retrieve": no_of_kits_to_retrieve,
}

kit_id_df = DataValidation().check_for_duplicate_records(query, params)

return kit_id_df


Expand Down Expand Up @@ -83,22 +93,29 @@ def get_kit_id_logged_from_db(smokescreen_properties: dict) -> pd.DataFrame:
Returns:
return kit_id_df (pd.DataFrame): A pandas DataFrame containing the result of the query
"""
kit_id_df = OracleDB().execute_query(
f"""SELECT tk.kitid,tk.device_id,tk.screening_subject_id
query = """SELECT tk.kitid,tk.device_id,tk.screening_subject_id
FROM tk_items_t tk
INNER JOIN kit_queue kq ON kq.device_id = tk.device_id
INNER JOIN ep_subject_episode_t se ON se.screening_subject_id = tk.screening_subject_id
WHERE tk.logged_in_flag = 'Y'
AND kq.test_kit_status IN ('LOGGED', 'POSTED')
AND se.episode_status_id = {SqlQueryValues.OPEN_EPISODE_STATUS_ID}
AND se.episode_status_id = :open_episode_status_id
AND tk.tk_type_id = 2
AND se.latest_event_status_id = {SqlQueryValues.S43_EVENT_STATUS}
AND tk.logged_in_at = {smokescreen_properties["c3_fit_kit_results_test_org_id"]}
AND se.latest_event_status_id = :s43_event_status
AND tk.logged_in_at = :logged_in_at
AND tk.reading_flag = 'N'
AND tk.test_results IS NULL
fetch first {smokescreen_properties["c3_total_fit_kits_to_retrieve"]} rows only
fetch first :subjects_to_retrieve rows only
"""
)

params = {
"s43_event_status": SqlQueryValues.S43_EVENT_STATUS,
"logged_in_at": smokescreen_properties["c3_fit_kit_results_test_org_id"],
"open_episode_status_id": SqlQueryValues.OPEN_EPISODE_STATUS_ID,
"subjects_to_retrieve": smokescreen_properties["c3_total_fit_kits_to_retrieve"],
}

kit_id_df = DataValidation().check_for_duplicate_records(query, params)

return kit_id_df

Expand Down Expand Up @@ -263,25 +280,31 @@ def get_subjects_for_appointments(subjects_to_retrieve: int) -> pd.DataFrame:
Returns:
subjects_df (pd.DataFrame): A pandas DataFrame containing the result of the query
"""
subjects_df = OracleDB().execute_query(
f"""
select tk.kitid, ss.subject_nhs_number, se.screening_subject_id

query = """select tk.kitid, ss.subject_nhs_number, se.screening_subject_id
from tk_items_t tk
inner join ep_subject_episode_t se on se.screening_subject_id = tk.screening_subject_id
inner join screening_subject_t ss on ss.screening_subject_id = se.screening_subject_id
inner join sd_contact_t c on c.nhs_number = ss.subject_nhs_number
where se.latest_event_status_id = {SqlQueryValues.A8_EVENT_STATUS}
where se.latest_event_status_id = :a8_event_status
and tk.logged_in_flag = 'Y'
and se.episode_status_id = {SqlQueryValues.OPEN_EPISODE_STATUS_ID}
and se.episode_status_id = :open_episode_status_id
and ss.screening_status_id != 4008
and tk.logged_in_at = 23159
and c.hub_id = 23159
and tk.tk_type_id = 2
and tk.datestamp > add_months(sysdate,-24)
order by ss.subject_nhs_number desc
fetch first {subjects_to_retrieve} rows only
fetch first :subjects_to_retrieve rows only
"""
)
params = {
"a8_event_status": SqlQueryValues.A8_EVENT_STATUS,
"open_episode_status_id": SqlQueryValues.OPEN_EPISODE_STATUS_ID,
"subjects_to_retrieve": subjects_to_retrieve,
}

subjects_df = DataValidation().check_for_duplicate_records(query, params)

return subjects_df


Expand All @@ -297,9 +320,8 @@ def get_subjects_with_booked_appointments(subjects_to_retrieve: int) -> pd.DataF
Returns:
subjects_df (pd.DataFrame): A pandas DataFrame containing the result of the query
"""
subjects_df = OracleDB().execute_query(
f"""
select a.appointment_date, s.subject_nhs_number, c.person_family_name, c.person_given_name

query = """select a.appointment_date, s.subject_nhs_number, c.person_family_name, c.person_given_name
from
(select count(*), ds.screening_subject_id
from
Expand All @@ -318,10 +340,11 @@ def get_subjects_with_booked_appointments(subjects_to_retrieve: int) -> pd.DataF
inner join screening_subject_t s on s.screening_subject_id = se.screening_subject_id
inner join sd_contact_t c on c.nhs_number = s.subject_nhs_number
inner join appointment_t a on se.subject_epis_id = a.subject_epis_id
where se.latest_event_status_id = {SqlQueryValues.POSITIVE_APPOINTMENT_BOOKED}
where se.latest_event_status_id = :positive_appointment_booked
and tk.logged_in_flag = 'Y'
and se.episode_status_id = {SqlQueryValues.OPEN_EPISODE_STATUS_ID}
and se.episode_status_id = :open_episode_status_id
and tk.logged_in_at = 23159
and tk.algorithm_sc_id = 23162
--and a.appointment_date > sysdate-27
and a.cancel_date is null
and a.attend_info_id is null and a.attend_date is null
Expand All @@ -330,7 +353,15 @@ def get_subjects_with_booked_appointments(subjects_to_retrieve: int) -> pd.DataF
and tk.tk_type_id = 2
--and tk.datestamp > add_months(sysdate,-24)
order by a.appointment_date desc
fetch first {subjects_to_retrieve} rows only
fetch first :subjects_to_retrieve rows only
"""
)

params = {
"positive_appointment_booked": SqlQueryValues.POSITIVE_APPOINTMENT_BOOKED,
"open_episode_status_id": SqlQueryValues.OPEN_EPISODE_STATUS_ID,
"subjects_to_retrieve": subjects_to_retrieve,
}

subjects_df = DataValidation().check_for_duplicate_records(query, params)

return subjects_df