Skip to content

Commit 22bca70

Browse files
Added logic to check and correct for duplicate records.
Adjusted existing compartment tests to use this logic
1 parent afb0e23 commit 22bca70

File tree

4 files changed

+218
-30
lines changed

4 files changed

+218
-30
lines changed
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
# Utility Guide: Data Validation
2+
3+
The Data Validation Utility can be used to check if there are any duplicate values returned from an SQL query.
4+
5+
## Table of Contents
6+
7+
- [Utility Guide: Data Validation](#utility-guide-data-validation)
8+
- [Table of Contents](#table-of-contents)
9+
- [How This Works](#how-this-works)
10+
- [Using the Data Validation Utility](#using-the-data-validation-utility)
11+
- [Example usage](#example-usage)
12+
13+
## How This Works
14+
15+
This utility first runs the sql query and then uses functionality from pandas to check if there are duplicate records.<br>
16+
If duplicate records are detected, then it will remove them and re-run the query to replace the dropped records.
17+
18+
## Using the Data Validation Utility
19+
20+
To use this utility import the `DataValidation` class and then call the method `check_for_duplicate_records()`.<br>
21+
Here you will need to provide the SQL query as a multiple line string ensuring that the final line has: **fetch first :subjects_to_retrieve rows only**.<br>
22+
This is necessary as this line is later replaced with an offset if duplicates are found.<br>
23+
You will also need to prove any parameters used in the query as a dictionary
24+
25+
## Example usage
26+
27+
from utils.data_validation import DataValidation
28+
29+
def get_kit_id_from_db(
30+
tk_type_id: int, hub_id: int, no_of_kits_to_retrieve: int
31+
) -> pd.DataFrame:
32+
33+
query = """select tk.kitid, tk.screening_subject_id, sst.subject_nhs_number
34+
from tk_items_t tk
35+
inner join ep_subject_episode_t se on se.screening_subject_id = tk.screening_subject_id
36+
inner join screening_subject_t sst on (sst.screening_subject_id = tk.screening_subject_id)
37+
inner join sd_contact_t sdc on (sdc.nhs_number = sst.subject_nhs_number)
38+
where tk.tk_type_id = :tk_type_id
39+
and tk.logged_in_flag = 'N'
40+
and sdc.hub_id = :hub_id
41+
and device_id is null
42+
and tk.invalidated_date is null
43+
and se.latest_event_status_id in (:s10_event_status, :s19_event_status)
44+
order by tk.kitid DESC
45+
fetch first :subjects_to_retrieve rows only"""
46+
47+
params = {
48+
"s10_event_status": SqlQueryValues.S10_EVENT_STATUS,
49+
"s19_event_status": SqlQueryValues.S19_EVENT_STATUS,
50+
"tk_type_id": tk_type_id,
51+
"hub_id": hub_id,
52+
"subjects_to_retrieve": no_of_kits_to_retrieve,
53+
}
54+
55+
kit_id_df = DataValidation().check_for_duplicate_records(query, params)
56+
57+
return kit_id_df

utils/data_validation.py

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import pandas as pd
2+
import logging
3+
from oracle.oracle import OracleDB
4+
5+
6+
class DataValidation:
7+
"""
8+
This class will be used to validate that there are no duplicate records when obtaining test data.
9+
"""
10+
11+
def __init__(self):
12+
self.max_attempts = 5
13+
14+
def check_for_duplicate_records(self, query: str, params: dict) -> pd.DataFrame:
15+
"""
16+
This method is used to firstly obtain the test data, and then to check if there are any duplicate records.
17+
18+
Args:
19+
query (str): The SQL query you want to run
20+
params (dict): A dictionary of any parameters in the sql query
21+
22+
Returns:
23+
dataframe (pd.DataFrame): A dataframe containing 0 duplicate records
24+
"""
25+
wanted_subject_count = int(params["subjects_to_retrieve"])
26+
27+
dataframe = OracleDB().execute_query(query, params)
28+
29+
attempts = 0
30+
while attempts < self.max_attempts:
31+
logging.info(f"Checking for duplicates. On attempt: {attempts+1}")
32+
duplicate_rows_count = int(dataframe.duplicated().sum())
33+
34+
if duplicate_rows_count == 0:
35+
logging.info("No duplicate records found")
36+
return dataframe
37+
38+
logging.warning(
39+
f"{duplicate_rows_count} duplicate records found. Dropping duplicates and retrying query."
40+
)
41+
dataframe = dataframe.drop_duplicates()
42+
attempts += 1
43+
dataframe = self.run_query_for_dropped_records(
44+
dataframe,
45+
query,
46+
params,
47+
duplicate_rows_count,
48+
wanted_subject_count,
49+
attempts,
50+
)
51+
52+
logging.error(
53+
f"Maximum attempt limit of {self.max_attempts} reached. Returning dataframe with duplicates dropped and not replaced."
54+
)
55+
dataframe = dataframe.drop_duplicates()
56+
actual_subject_count = len(dataframe)
57+
if wanted_subject_count != actual_subject_count:
58+
logging.error(
59+
f"Actual subject count differs to wanted count. {wanted_subject_count} subjects wanted but only {actual_subject_count} subjects were retrieved"
60+
)
61+
return dataframe
62+
63+
def run_query_for_dropped_records(
64+
self,
65+
dataframe: pd.DataFrame,
66+
query: str,
67+
params: dict,
68+
duplicate_count: int,
69+
wanted_subject_count: int,
70+
attempts: int,
71+
) -> pd.DataFrame:
72+
"""
73+
This is used to make up for any dropped duplicate records. It runs the same query again but only returns the amount of dropped records.
74+
75+
Args:
76+
dataframe (pd.DataFrame): The dataframe with duplicates dropped
77+
query (str): The SQL query you want to run
78+
params (dict): A dictionary of any parameters in the sql query
79+
duplicate_count (int): The number of duplicate records in the original dataframe
80+
wanted_subject_count (int): The number of subjects to retrieve in the original query
81+
attempts (int): The number of attempts so far
82+
83+
Returns:
84+
dataframe_without_duplicates (pd.DataFrame): A dataframe matching the original record count
85+
"""
86+
params["offset_value"] = wanted_subject_count + attempts
87+
params["subjects_to_retrieve"] = duplicate_count
88+
89+
query = query.strip().split("\n")
90+
query[-1] = (
91+
"OFFSET :offset_value ROWS FETCH FIRST :subjects_to_retrieve rows only"
92+
)
93+
query = "\n".join(query)
94+
95+
dataframe_with_new_subjects = OracleDB().execute_query(query, params)
96+
97+
combined_dataframe = pd.concat(
98+
[dataframe, dataframe_with_new_subjects], ignore_index=True
99+
)
100+
return combined_dataframe

utils/oracle/oracle.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,15 +147,15 @@ def delete_all_users_from_approved_users_table(
147147
self.disconnect_from_db(conn)
148148

149149
def execute_query(
150-
self, query: str, parameters: list | None = None
150+
self, query: str, parameters: dict | None = None
151151
) -> pd.DataFrame: # To use when "select xxxx" (stored procedures)
152152
"""
153153
This is used to execute any sql queries.
154154
A query is provided and then the result is returned as a pandas dataframe
155155
156156
Args:
157157
query (str): The SQL query you wish to run
158-
parameters (list | None): Optional - Any parameters you want to pass on in a list
158+
parameters (dict | None): Optional - Any parameters you want to pass on in a dictionary
159159
160160
Returns:
161161
df (pd.DataFrame): A pandas dataframe of the result of the query

utils/oracle/oracle_specific_functions.py

Lines changed: 59 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import pandas as pd
44
from datetime import datetime
55
from enum import IntEnum
6+
from utils.data_validation import DataValidation
67

78

89
class SqlQueryValues(IntEnum):
@@ -29,21 +30,30 @@ def get_kit_id_from_db(
2930
kit_id_df (pd.DataFrame): A pandas DataFrame containing the result of the query
3031
"""
3132
logging.info("Retrieving useable test kit ids")
32-
kit_id_df = OracleDB().execute_query(
33-
f"""select tk.kitid, tk.screening_subject_id, sst.subject_nhs_number
33+
query = """select tk.kitid, tk.screening_subject_id, sst.subject_nhs_number
3434
from tk_items_t tk
3535
inner join ep_subject_episode_t se on se.screening_subject_id = tk.screening_subject_id
3636
inner join screening_subject_t sst on (sst.screening_subject_id = tk.screening_subject_id)
3737
inner join sd_contact_t sdc on (sdc.nhs_number = sst.subject_nhs_number)
38-
where tk.tk_type_id = {tk_type_id}
38+
where tk.tk_type_id = :tk_type_id
3939
and tk.logged_in_flag = 'N'
40-
and sdc.hub_id = {hub_id}
40+
and sdc.hub_id = :hub_id
4141
and device_id is null
4242
and tk.invalidated_date is null
43-
and se.latest_event_status_id in ({SqlQueryValues.S10_EVENT_STATUS}, {SqlQueryValues.S19_EVENT_STATUS})
43+
and se.latest_event_status_id in (:s10_event_status, :s19_event_status)
4444
order by tk.kitid DESC
45-
fetch first {no_of_kits_to_retrieve} rows only"""
46-
)
45+
fetch first :subjects_to_retrieve rows only"""
46+
47+
params = {
48+
"s10_event_status": SqlQueryValues.S10_EVENT_STATUS,
49+
"s19_event_status": SqlQueryValues.S19_EVENT_STATUS,
50+
"tk_type_id": tk_type_id,
51+
"hub_id": hub_id,
52+
"subjects_to_retrieve": no_of_kits_to_retrieve,
53+
}
54+
55+
kit_id_df = DataValidation().check_for_duplicate_records(query, params)
56+
4757
return kit_id_df
4858

4959

@@ -83,22 +93,29 @@ def get_kit_id_logged_from_db(smokescreen_properties: dict) -> pd.DataFrame:
8393
Returns:
8494
return kit_id_df (pd.DataFrame): A pandas DataFrame containing the result of the query
8595
"""
86-
kit_id_df = OracleDB().execute_query(
87-
f"""SELECT tk.kitid,tk.device_id,tk.screening_subject_id
96+
query = """SELECT tk.kitid,tk.device_id,tk.screening_subject_id
8897
FROM tk_items_t tk
8998
INNER JOIN kit_queue kq ON kq.device_id = tk.device_id
9099
INNER JOIN ep_subject_episode_t se ON se.screening_subject_id = tk.screening_subject_id
91100
WHERE tk.logged_in_flag = 'Y'
92101
AND kq.test_kit_status IN ('LOGGED', 'POSTED')
93-
AND se.episode_status_id = {SqlQueryValues.OPEN_EPISODE_STATUS_ID}
102+
AND se.episode_status_id = :open_episode_status_id
94103
AND tk.tk_type_id = 2
95-
AND se.latest_event_status_id = {SqlQueryValues.S43_EVENT_STATUS}
96-
AND tk.logged_in_at = {smokescreen_properties["c3_fit_kit_results_test_org_id"]}
104+
AND se.latest_event_status_id = :s43_event_status
105+
AND tk.logged_in_at = :logged_in_at
97106
AND tk.reading_flag = 'N'
98107
AND tk.test_results IS NULL
99-
fetch first {smokescreen_properties["c3_total_fit_kits_to_retrieve"]} rows only
108+
fetch first :subjects_to_retrieve rows only
100109
"""
101-
)
110+
111+
params = {
112+
"s43_event_status": SqlQueryValues.S43_EVENT_STATUS,
113+
"logged_in_at": smokescreen_properties["c3_fit_kit_results_test_org_id"],
114+
"open_episode_status_id": SqlQueryValues.OPEN_EPISODE_STATUS_ID,
115+
"subjects_to_retrieve": smokescreen_properties["c3_total_fit_kits_to_retrieve"],
116+
}
117+
118+
kit_id_df = DataValidation().check_for_duplicate_records(query, params)
102119

103120
return kit_id_df
104121

@@ -263,25 +280,31 @@ def get_subjects_for_appointments(subjects_to_retrieve: int) -> pd.DataFrame:
263280
Returns:
264281
subjects_df (pd.DataFrame): A pandas DataFrame containing the result of the query
265282
"""
266-
subjects_df = OracleDB().execute_query(
267-
f"""
268-
select tk.kitid, ss.subject_nhs_number, se.screening_subject_id
283+
284+
query = """select tk.kitid, ss.subject_nhs_number, se.screening_subject_id
269285
from tk_items_t tk
270286
inner join ep_subject_episode_t se on se.screening_subject_id = tk.screening_subject_id
271287
inner join screening_subject_t ss on ss.screening_subject_id = se.screening_subject_id
272288
inner join sd_contact_t c on c.nhs_number = ss.subject_nhs_number
273-
where se.latest_event_status_id = {SqlQueryValues.A8_EVENT_STATUS}
289+
where se.latest_event_status_id = :a8_event_status
274290
and tk.logged_in_flag = 'Y'
275-
and se.episode_status_id = {SqlQueryValues.OPEN_EPISODE_STATUS_ID}
291+
and se.episode_status_id = :open_episode_status_id
276292
and ss.screening_status_id != 4008
277293
and tk.logged_in_at = 23159
278294
and c.hub_id = 23159
279295
and tk.tk_type_id = 2
280296
and tk.datestamp > add_months(sysdate,-24)
281297
order by ss.subject_nhs_number desc
282-
fetch first {subjects_to_retrieve} rows only
298+
fetch first :subjects_to_retrieve rows only
283299
"""
284-
)
300+
params = {
301+
"a8_event_status": SqlQueryValues.A8_EVENT_STATUS,
302+
"open_episode_status_id": SqlQueryValues.OPEN_EPISODE_STATUS_ID,
303+
"subjects_to_retrieve": subjects_to_retrieve,
304+
}
305+
306+
subjects_df = DataValidation().check_for_duplicate_records(query, params)
307+
285308
return subjects_df
286309

287310

@@ -297,9 +320,8 @@ def get_subjects_with_booked_appointments(subjects_to_retrieve: int) -> pd.DataF
297320
Returns:
298321
subjects_df (pd.DataFrame): A pandas DataFrame containing the result of the query
299322
"""
300-
subjects_df = OracleDB().execute_query(
301-
f"""
302-
select a.appointment_date, s.subject_nhs_number, c.person_family_name, c.person_given_name
323+
324+
query = """select a.appointment_date, s.subject_nhs_number, c.person_family_name, c.person_given_name
303325
from
304326
(select count(*), ds.screening_subject_id
305327
from
@@ -318,10 +340,11 @@ def get_subjects_with_booked_appointments(subjects_to_retrieve: int) -> pd.DataF
318340
inner join screening_subject_t s on s.screening_subject_id = se.screening_subject_id
319341
inner join sd_contact_t c on c.nhs_number = s.subject_nhs_number
320342
inner join appointment_t a on se.subject_epis_id = a.subject_epis_id
321-
where se.latest_event_status_id = {SqlQueryValues.POSITIVE_APPOINTMENT_BOOKED}
343+
where se.latest_event_status_id = :positive_appointment_booked
322344
and tk.logged_in_flag = 'Y'
323-
and se.episode_status_id = {SqlQueryValues.OPEN_EPISODE_STATUS_ID}
345+
and se.episode_status_id = :open_episode_status_id
324346
and tk.logged_in_at = 23159
347+
and tk.algorithm_sc_id = 23162
325348
--and a.appointment_date > sysdate-27
326349
and a.cancel_date is null
327350
and a.attend_info_id is null and a.attend_date is null
@@ -330,7 +353,15 @@ def get_subjects_with_booked_appointments(subjects_to_retrieve: int) -> pd.DataF
330353
and tk.tk_type_id = 2
331354
--and tk.datestamp > add_months(sysdate,-24)
332355
order by a.appointment_date desc
333-
fetch first {subjects_to_retrieve} rows only
356+
fetch first :subjects_to_retrieve rows only
334357
"""
335-
)
358+
359+
params = {
360+
"positive_appointment_booked": SqlQueryValues.POSITIVE_APPOINTMENT_BOOKED,
361+
"open_episode_status_id": SqlQueryValues.OPEN_EPISODE_STATUS_ID,
362+
"subjects_to_retrieve": subjects_to_retrieve,
363+
}
364+
365+
subjects_df = DataValidation().check_for_duplicate_records(query, params)
366+
336367
return subjects_df

0 commit comments

Comments
 (0)