|
| 1 | +import csv |
| 2 | +from datetime import datetime, timedelta, timezone |
| 3 | +from itertools import cycle |
| 4 | +from math import gcd |
| 5 | +from random import shuffle |
| 6 | +from typing import Any, Iterator |
| 7 | + |
1 | 8 | import boto3 |
| 9 | +import fire |
| 10 | + |
| 11 | +# import json |
| 12 | +import numpy as np |
| 13 | + |
| 14 | +from nrlf.core.constants import ( |
| 15 | + CATEGORY_ATTRIBUTES, |
| 16 | + SNOMED_SYSTEM_URL, |
| 17 | + TYPE_ATTRIBUTES, |
| 18 | + TYPE_CATEGORIES, |
| 19 | +) |
| 20 | +from nrlf.core.dynamodb.model import DocumentPointer |
| 21 | +from nrlf.core.logger import logger |
| 22 | +from nrlf.tests.data import load_document_reference |
2 | 23 |
|
3 | 24 | dynamodb = boto3.client("dynamodb") |
4 | 25 | resource = boto3.resource("dynamodb") |
5 | 26 |
|
| 27 | +logger.setLevel("ERROR") |
6 | 28 |
|
7 | | -# DOC_REF_TEMPLATE = load_document_reference("NFT-template") |
| 29 | +DOC_REF_TEMPLATE = load_document_reference("NFT-template") |
8 | 30 |
|
9 | 31 | CHECKSUM_WEIGHTS = [i for i in range(10, 1, -1)] |
10 | 32 |
|
|
66 | 88 | "TRXT": 1, |
67 | 89 | }, # summary record currently has only one supplier |
68 | 90 | } |
| 91 | + |
| 92 | +DEFAULT_COUNT_DISTRIBUTIONS = {"1": 91, "2": 8, "3": 1} |
| 93 | + |
| 94 | + |
| 95 | +class TestNhsNumbersIterator: |
| 96 | + def __iter__(self): |
| 97 | + self.first9 = 900000000 |
| 98 | + return self |
| 99 | + |
| 100 | + def __next__(self): |
| 101 | + if self.first9 > 999999999: |
| 102 | + raise StopIteration |
| 103 | + checksum = 10 |
| 104 | + while checksum == 10: |
| 105 | + self.first9 += 1 |
| 106 | + nhs_no_digits = list(map(int, str(self.first9))) |
| 107 | + checksum = ( |
| 108 | + sum( |
| 109 | + weight * digit |
| 110 | + for weight, digit in zip(CHECKSUM_WEIGHTS, nhs_no_digits) |
| 111 | + ) |
| 112 | + * -1 |
| 113 | + % 11 |
| 114 | + ) |
| 115 | + nhs_no = str(self.first9) + str(checksum) |
| 116 | + return nhs_no |
| 117 | + |
| 118 | + |
| 119 | +def _make_seed_pointer( |
| 120 | + type_code: str, custodian: str, nhs_number: str, counter: int |
| 121 | +) -> DocumentPointer: |
| 122 | + """ |
| 123 | + Populates the example pointer template with test data to create a valid NRL 3.0 pointer |
| 124 | + """ |
| 125 | + doc_ref = DOC_REF_TEMPLATE |
| 126 | + doc_ref.id = f"{custodian}-{str(counter).zfill(12)}" # deterministic to aid perftest script retrieval |
| 127 | + doc_ref.subject.identifier.value = nhs_number |
| 128 | + doc_ref.custodian.identifier.value = custodian |
| 129 | + doc_ref.author[0].identifier.value = "X26NFT" |
| 130 | + doc_ref.type.coding[0].code = type_code |
| 131 | + doc_ref.type.coding[0].display = TYPE_ATTRIBUTES.get( |
| 132 | + f"{SNOMED_SYSTEM_URL}|{type_code}" |
| 133 | + ).get("display") |
| 134 | + type_url = f"{SNOMED_SYSTEM_URL}|{type_code}" |
| 135 | + category = TYPE_CATEGORIES.get(type_url) |
| 136 | + doc_ref.category[0].coding[0].code = category.split("|")[-1] |
| 137 | + doc_ref.category[0].coding[0].display = CATEGORY_ATTRIBUTES.get(category).get( |
| 138 | + "display" |
| 139 | + ) |
| 140 | + nft_pointer = DocumentPointer.from_document_reference(doc_ref, source="NFT-SEED") |
| 141 | + return nft_pointer |
| 142 | + |
| 143 | + |
| 144 | +def _populate_seed_table( |
| 145 | + table_name: str, |
| 146 | + px_with_pointers: int, |
| 147 | + pointers_per_px: float = 1.0, |
| 148 | + type_dists: dict[str, int] = DEFAULT_TYPE_DISTRIBUTIONS, |
| 149 | + custodian_dists: dict[str, dict[str, int]] = DEFAULT_CUSTODIAN_DISTRIBUTIONS, |
| 150 | +): |
| 151 | + """ |
| 152 | + Seeds a table with example data for non-functional testing. |
| 153 | + """ |
| 154 | + if pointers_per_px < 1.0: |
| 155 | + raise ValueError("Cannot populate table with patients with zero pointers") |
| 156 | + # set up iterations |
| 157 | + type_iter = _set_up_cyclical_iterator(type_dists) |
| 158 | + custodian_iters = _set_up_custodian_iterators(custodian_dists) |
| 159 | + # count_iter = _set_up_cyclical_iterator(DEFAULT_COUNT_DISTRIBUTIONS) |
| 160 | + count_iter = _get_pointer_count_poisson_distributions( |
| 161 | + px_with_pointers, pointers_per_px |
| 162 | + ) |
| 163 | + # count_iter = _get_pointer_count_negbinom_distributions(px_with_pointers, pointers_per_px) |
| 164 | + testnum_cls = TestNhsNumbersIterator() |
| 165 | + testnum_iter = iter(testnum_cls) |
| 166 | + |
| 167 | + px_counter = 0 |
| 168 | + doc_ref_target = int(pointers_per_px * px_with_pointers) |
| 169 | + logger.log( |
| 170 | + f"Will upsert ~{doc_ref_target} test pointers for {px_with_pointers} patients." |
| 171 | + ) |
| 172 | + doc_ref_counter = 0 |
| 173 | + batch_counter = 0 |
| 174 | + unprocessed_count = 0 |
| 175 | + |
| 176 | + pointer_data: list[list[str]] = [] |
| 177 | + |
| 178 | + start_time = datetime.now(tz=timezone.utc) |
| 179 | + |
| 180 | + batch_upsert_items: list[dict[str, Any]] = [] |
| 181 | + while px_counter < px_with_pointers: |
| 182 | + pointers_for_px = int(next(count_iter)) |
| 183 | + |
| 184 | + if batch_counter + pointers_for_px > 25 or px_counter == px_with_pointers: |
| 185 | + response = resource.batch_write_item( |
| 186 | + RequestItems={table_name: batch_upsert_items} |
| 187 | + ) |
| 188 | + |
| 189 | + if response.get("UnprocessedItems"): |
| 190 | + unprocessed_count += len( |
| 191 | + response.get("UnprocessedItems").get(table_name, []) |
| 192 | + ) |
| 193 | + |
| 194 | + batch_upsert_items = [] |
| 195 | + batch_counter = 0 |
| 196 | + |
| 197 | + new_px = next(testnum_iter) |
| 198 | + for _ in range(pointers_for_px): |
| 199 | + new_type = next(type_iter) |
| 200 | + new_custodian = next(custodian_iters[new_type]) |
| 201 | + doc_ref_counter += 1 |
| 202 | + batch_counter += 1 |
| 203 | + |
| 204 | + pointer = _make_seed_pointer( |
| 205 | + new_type, new_custodian, new_px, doc_ref_counter |
| 206 | + ) |
| 207 | + put_req = {"PutRequest": {"Item": pointer.model_dump()}} |
| 208 | + batch_upsert_items.append(put_req) |
| 209 | + pointer_data.append( |
| 210 | + [ |
| 211 | + pointer.id, |
| 212 | + pointer.type, |
| 213 | + pointer.custodian, |
| 214 | + pointer.nhs_number, |
| 215 | + ] |
| 216 | + ) |
| 217 | + px_counter += 1 |
| 218 | + |
| 219 | + if px_counter % 1000 == 0: |
| 220 | + logger.log(".", end="", flush=True) |
| 221 | + if px_counter % 100000 == 0: |
| 222 | + logger.log( |
| 223 | + f" {px_counter} patients processed ({doc_ref_counter} pointers)." |
| 224 | + ) |
| 225 | + |
| 226 | + logger.log("Done.") |
| 227 | + |
| 228 | + end_time = datetime.now(tz=timezone.utc) |
| 229 | + logger.log( |
| 230 | + f"Created {doc_ref_counter} pointers in {timedelta.total_seconds(end_time - start_time)} seconds (unprocessed: {unprocessed_count})." |
| 231 | + ) |
| 232 | + |
| 233 | + with open("./dist/seed-nft-pointers.csv", "w") as f: |
| 234 | + writer = csv.writer(f) |
| 235 | + writer.writerow(["pointer_id", "pointer_type", "custodian", "nhs_number"]) |
| 236 | + writer.writerows(pointer_data) |
| 237 | + logger.log(f"Pointer data saved to ./dist/seed-nft-pointers.csv") # noqa |
| 238 | + |
| 239 | + |
| 240 | +def _set_up_cyclical_iterator(dists: dict[str, int]) -> Iterator[str]: |
| 241 | + """ |
| 242 | + Given a dict of values and their relative frequencies, |
| 243 | + returns an iterator that will cycle through a the reduced and shuffled set of values. |
| 244 | + This should result in more live-like data than e.g. creating a bulk amount of each pointer type/custodian in series. |
| 245 | + It also means each batch will contain a representative sample of the distribution. |
| 246 | + """ |
| 247 | + d = gcd(*dists.values()) |
| 248 | + value_list: list[str] = [] |
| 249 | + for entry in dists: |
| 250 | + value_list.extend([entry] * (dists[entry] // d)) |
| 251 | + shuffle(value_list) |
| 252 | + return cycle(value_list) |
| 253 | + |
| 254 | + |
| 255 | +def _get_pointer_count_poisson_distributions( |
| 256 | + num_of_patients: int, pointers_per_px: float |
| 257 | +) -> Iterator[int]: |
| 258 | + p_count_distr = np.random.poisson(lam=pointers_per_px - 1, size=num_of_patients) + 1 |
| 259 | + p_count_distr = np.clip(p_count_distr, a_min=1, a_max=4) |
| 260 | + return cycle(p_count_distr) |
| 261 | + |
| 262 | + |
| 263 | +def _set_up_custodian_iterators( |
| 264 | + custodian_dists: dict[str, dict[str, int]], |
| 265 | +) -> dict[str, Iterator[str]]: |
| 266 | + custodian_iters: dict[str, Iterator[str]] = {} |
| 267 | + for pointer_type in custodian_dists: |
| 268 | + custodian_iters[pointer_type] = _set_up_cyclical_iterator( |
| 269 | + custodian_dists[pointer_type] |
| 270 | + ) |
| 271 | + return custodian_iters |
| 272 | + |
| 273 | + |
| 274 | +if __name__ == "__main__": |
| 275 | + fire.Fire(_populate_seed_table) |
0 commit comments