|
| 1 | +from datetime import datetime, timedelta, timezone |
| 2 | +from itertools import cycle |
| 3 | +from math import gcd |
| 4 | +from random import shuffle |
| 5 | +from typing import Any |
| 6 | + |
| 7 | +import boto3 |
| 8 | +import fire |
| 9 | + |
| 10 | +from nrlf.consumer.fhir.r4.model import DocumentReference |
| 11 | +from nrlf.core.constants import ( |
| 12 | + CATEGORY_ATTRIBUTES, |
| 13 | + SNOMED_SYSTEM_URL, |
| 14 | + TYPE_ATTRIBUTES, |
| 15 | + TYPE_CATEGORIES, |
| 16 | + Categories, |
| 17 | + PointerTypes, |
| 18 | +) |
| 19 | +from nrlf.core.dynamodb.model import DocumentPointer |
| 20 | +from nrlf.core.logger import logger |
| 21 | +from nrlf.tests.data import load_document_reference |
| 22 | + |
| 23 | +dynamodb = boto3.client("dynamodb") |
| 24 | +resource = boto3.resource("dynamodb") |
| 25 | + |
| 26 | +logger.setLevel("ERROR") |
| 27 | + |
| 28 | +DOC_REF_TEMPLATE = load_document_reference("NFT-template") |
| 29 | + |
| 30 | +CHECKSUM_WEIGHTS = [i for i in range(10, 1, -1)] |
| 31 | + |
| 32 | +# These are based on the Nov 7th 2025 pointer stats report |
| 33 | +DEFAULT_TYPE_DISTRIBUTIONS = { |
| 34 | + "736253002": 65, # mental health crisis plan |
| 35 | + "1382601000000107": 5, # respect form |
| 36 | + "887701000000100": 15, # emergency healthcare plan |
| 37 | + "861421000000109": 5, # eol care coordination summary |
| 38 | + "735324008": 5, # treatment escalation plan |
| 39 | + "824321000000109": 5, # summary record |
| 40 | +} |
| 41 | + |
| 42 | +DEFAULT_CUSTODIAN_DISTRIBUTIONS = { |
| 43 | + "736253002": { |
| 44 | + "TRPG": 9, |
| 45 | + "TRHA": 1, |
| 46 | + "TRRE": 20, |
| 47 | + "TRAT": 10, |
| 48 | + "TWR4": 4, |
| 49 | + "TRKL": 9, |
| 50 | + "TRW1": 5, |
| 51 | + "TRH5": 1, |
| 52 | + "TRP7": 13, |
| 53 | + "TRWK": 8, |
| 54 | + "TRQY": 3, |
| 55 | + "TRV5": 3, |
| 56 | + "TRJ8": 2, |
| 57 | + "TRXA": 4, |
| 58 | + "T11X": 1, |
| 59 | + "TG6V": 2, |
| 60 | + }, |
| 61 | + "1382601000000107": {"T8GX8": 3, "TQUY": 2}, # respect form |
| 62 | + "887701000000100": { |
| 63 | + "TV1": 1, |
| 64 | + "TV2": 2, |
| 65 | + "TV3": 1, |
| 66 | + "TV4": 1, |
| 67 | + "TV5": 3, |
| 68 | + "TV6": 1, |
| 69 | + }, # emergency healthcare plan |
| 70 | + "861421000000109": { |
| 71 | + "TV1": 2, |
| 72 | + "TV2": 2, |
| 73 | + "TV3": 1, |
| 74 | + "TV4": 1, |
| 75 | + "TV5": 3, |
| 76 | + "TV6": 1, |
| 77 | + }, # eol care coordination summary |
| 78 | + "735324008": { |
| 79 | + "TV1": 1, |
| 80 | + "TV2": 1, |
| 81 | + "TV3": 1, |
| 82 | + "TV4": 2, |
| 83 | + "TV5": 2, |
| 84 | + "TV6": 1, |
| 85 | + }, # treatment escalation plan |
| 86 | + "824321000000109": { |
| 87 | + "TRXT": 1, |
| 88 | + }, # summary record currently has only one supplier |
| 89 | +} |
| 90 | + |
| 91 | +DEFAULT_COUNT_DISTRIBUTIONS = {"1": 91, "2": 8, "3": 1} |
| 92 | + |
| 93 | + |
| 94 | +class TestNhsNumbersIterator: |
| 95 | + def __iter__(self): |
| 96 | + self.first9 = 900000000 |
| 97 | + return self |
| 98 | + |
| 99 | + def __next__(self): |
| 100 | + if self.first9 > 999999999: |
| 101 | + raise StopIteration |
| 102 | + checksum = 10 |
| 103 | + while checksum == 10: |
| 104 | + self.first9 += 1 |
| 105 | + nhs_no_digits = list(map(int, str(self.first9))) |
| 106 | + checksum = ( |
| 107 | + sum( |
| 108 | + weight * digit |
| 109 | + for weight, digit in zip(CHECKSUM_WEIGHTS, nhs_no_digits) |
| 110 | + ) |
| 111 | + * -1 |
| 112 | + % 11 |
| 113 | + ) |
| 114 | + nhs_no = str(self.first9) + str(checksum) |
| 115 | + return nhs_no |
| 116 | + |
| 117 | + |
| 118 | +def _make_seed_pointer( |
| 119 | + type_code: str, custodian: str, nhs_number: str, counter: int |
| 120 | +) -> DocumentPointer: |
| 121 | + """ |
| 122 | + Populates the example pointer template with test data to create a valid NRL 3.0 pointer |
| 123 | + """ |
| 124 | + doc_ref = DOC_REF_TEMPLATE |
| 125 | + doc_ref.id = f"{custodian}-{str(counter).zfill(12)}" # deterministic to aid perftest script retrieval |
| 126 | + doc_ref.subject.identifier.value = nhs_number |
| 127 | + doc_ref.custodian.identifier.value = custodian |
| 128 | + doc_ref.author[0].identifier.value = "X26NFT" |
| 129 | + doc_ref.type.coding[0].code = type_code |
| 130 | + doc_ref.type.coding[0].display = TYPE_ATTRIBUTES.get( |
| 131 | + f"{SNOMED_SYSTEM_URL}|{type_code}" |
| 132 | + ).get("display") |
| 133 | + type_url = f"{SNOMED_SYSTEM_URL}|{type_code}" |
| 134 | + category = TYPE_CATEGORIES.get(type_url) |
| 135 | + doc_ref.category[0].coding[0].code = category.split("|")[-1] |
| 136 | + doc_ref.category[0].coding[0].display = CATEGORY_ATTRIBUTES.get(category).get( |
| 137 | + "display" |
| 138 | + ) |
| 139 | + nft_pointer = DocumentPointer.from_document_reference(doc_ref, source="NFT-SEED") |
| 140 | + return nft_pointer |
| 141 | + |
| 142 | + |
| 143 | +def _populate_seed_table( |
| 144 | + table_name: str, |
| 145 | + px_with_pointers: int, |
| 146 | + pointers_per_px: float = 1.0, |
| 147 | + type_dists: dict[str, int] = DEFAULT_TYPE_DISTRIBUTIONS, |
| 148 | + custodian_dists: dict[str, int] = DEFAULT_CUSTODIAN_DISTRIBUTIONS, |
| 149 | +): |
| 150 | + """ |
| 151 | + Seeds a table with example data for non-functional testing. |
| 152 | + """ |
| 153 | + if pointers_per_px < 1.0: |
| 154 | + raise ValueError("Cannot populate table with patients with zero pointers") |
| 155 | + # set up iterations |
| 156 | + type_iter = _set_up_cyclical_iterator(type_dists) |
| 157 | + custodian_iters = _set_up_custodian_iterators(custodian_dists) |
| 158 | + count_iter = _set_up_cyclical_iterator(DEFAULT_COUNT_DISTRIBUTIONS) |
| 159 | + testnum_cls = TestNhsNumbersIterator() |
| 160 | + testnum_iter = iter(testnum_cls) |
| 161 | + |
| 162 | + px_counter = 0 |
| 163 | + doc_ref_target = int(pointers_per_px * px_with_pointers) |
| 164 | + print( |
| 165 | + f"Will upsert {doc_ref_target} test pointers for {px_with_pointers} patients." |
| 166 | + ) |
| 167 | + doc_ref_counter = 0 |
| 168 | + batch_counter = 0 |
| 169 | + |
| 170 | + start_time = datetime.now(tz=timezone.utc) |
| 171 | + |
| 172 | + batch_upsert_items = [] |
| 173 | + while px_counter <= px_with_pointers: |
| 174 | + pointers_for_px = int(next(count_iter)) |
| 175 | + if batch_counter + pointers_for_px > 25 or px_counter == px_with_pointers: |
| 176 | + resource.batch_write_item(RequestItems={table_name: batch_upsert_items}) |
| 177 | + batch_upsert_items = [] |
| 178 | + batch_counter = 0 |
| 179 | + |
| 180 | + new_px = next(testnum_iter) |
| 181 | + for _ in range(pointers_for_px): |
| 182 | + new_type = next(type_iter) |
| 183 | + new_custodian = next(custodian_iters[new_type]) |
| 184 | + doc_ref_counter += 1 |
| 185 | + batch_counter += 1 |
| 186 | + |
| 187 | + pointer = _make_seed_pointer( |
| 188 | + new_type, new_custodian, new_px, doc_ref_counter |
| 189 | + ) |
| 190 | + put_req = {"PutRequest": {"Item": pointer.model_dump()}} |
| 191 | + batch_upsert_items.append(put_req) |
| 192 | + px_counter += 1 |
| 193 | + |
| 194 | + end_time = datetime.now(tz=timezone.utc) |
| 195 | + print( |
| 196 | + f"Created {doc_ref_counter} pointers in {timedelta.total_seconds(end_time - start_time)} seconds." |
| 197 | + ) |
| 198 | + |
| 199 | + |
| 200 | +def _set_up_cyclical_iterator(dists: dict[str, int]) -> iter: |
| 201 | + """ |
| 202 | + Given a dict of values and their relative frequencies, |
| 203 | + returns an iterator that will cycle through a the reduced and shuffled set of values. |
| 204 | + This should result in more live-like data than e.g. creating a bulk amount of each pointer type/custodian in series. |
| 205 | + It also means each batch will contain a representative sample of the distribution. |
| 206 | + """ |
| 207 | + d = gcd(*dists.values()) |
| 208 | + value_list = [] |
| 209 | + for entry in dists: |
| 210 | + value_list.extend([entry] * (dists[entry] // d)) |
| 211 | + shuffle(value_list) |
| 212 | + return cycle(value_list) |
| 213 | + |
| 214 | + |
| 215 | +def _set_up_custodian_iterators( |
| 216 | + custodian_dists: dict[dict[str, int]] |
| 217 | +) -> dict[str, iter]: |
| 218 | + custodian_iters = {} |
| 219 | + for pointer_type in custodian_dists: |
| 220 | + custodian_iters[pointer_type] = _set_up_cyclical_iterator( |
| 221 | + custodian_dists[pointer_type] |
| 222 | + ) |
| 223 | + return custodian_iters |
| 224 | + |
| 225 | + |
| 226 | +def _set_up_count_iterator(pointers_per_px: float) -> iter: |
| 227 | + """ |
| 228 | + Given a target average number of pointers per patient, |
| 229 | + generates a distribution of counts per individual patient. |
| 230 | + """ |
| 231 | + |
| 232 | + extra_per_hundred = int( |
| 233 | + (pointers_per_px - 1.0) * 100 |
| 234 | + ) # no patients can have zero pointers |
| 235 | + counts = {} |
| 236 | + counts["3"] = extra_per_hundred // 10 |
| 237 | + counts["2"] = extra_per_hundred - 2 * counts["3"] |
| 238 | + counts["1"] = 100 - counts[2] - counts[3] |
| 239 | + return _set_up_cyclical_iterator(counts) |
| 240 | + |
| 241 | + |
| 242 | +if __name__ == "__main__": |
| 243 | + fire.Fire(_populate_seed_table) |
0 commit comments