1+ import csv
12from datetime import datetime , timedelta , timezone
23from itertools import cycle
34from math import gcd
45from random import shuffle
5- from typing import Any
6+ from typing import Any , Iterator
67
78import boto3
89import fire
910
11+ # import json
12+ import numpy as np
13+
1014from nrlf .consumer .fhir .r4 .model import DocumentReference
1115from nrlf .core .constants import (
1216 CATEGORY_ATTRIBUTES ,
@@ -145,7 +149,7 @@ def _populate_seed_table(
145149 px_with_pointers : int ,
146150 pointers_per_px : float = 1.0 ,
147151 type_dists : dict [str , int ] = DEFAULT_TYPE_DISTRIBUTIONS ,
148- custodian_dists : dict [str , int ] = DEFAULT_CUSTODIAN_DISTRIBUTIONS ,
152+ custodian_dists : dict [str , dict [ str , int ] ] = DEFAULT_CUSTODIAN_DISTRIBUTIONS ,
149153):
150154 """
151155 Seeds a table with example data for non-functional testing.
@@ -155,25 +159,41 @@ def _populate_seed_table(
155159 # set up iterations
156160 type_iter = _set_up_cyclical_iterator (type_dists )
157161 custodian_iters = _set_up_custodian_iterators (custodian_dists )
158- count_iter = _set_up_cyclical_iterator (DEFAULT_COUNT_DISTRIBUTIONS )
162+ # count_iter = _set_up_cyclical_iterator(DEFAULT_COUNT_DISTRIBUTIONS)
163+ count_iter = _get_pointer_count_poisson_distributions (
164+ px_with_pointers , pointers_per_px
165+ )
166+ # count_iter = _get_pointer_count_negbinom_distributions(px_with_pointers, pointers_per_px)
159167 testnum_cls = TestNhsNumbersIterator ()
160168 testnum_iter = iter (testnum_cls )
161169
162170 px_counter = 0
163171 doc_ref_target = int (pointers_per_px * px_with_pointers )
164172 print (
165- f"Will upsert { doc_ref_target } test pointers for { px_with_pointers } patients."
173+ f"Will upsert ~ { doc_ref_target } test pointers for { px_with_pointers } patients."
166174 )
167175 doc_ref_counter = 0
168176 batch_counter = 0
177+ unprocessed_count = 0
178+
179+ pointer_data : list [list [str ]] = []
169180
170181 start_time = datetime .now (tz = timezone .utc )
171182
172- batch_upsert_items = []
173- while px_counter <= px_with_pointers :
183+ batch_upsert_items : list [ dict [ str , Any ]] = []
184+ while px_counter < px_with_pointers :
174185 pointers_for_px = int (next (count_iter ))
186+
175187 if batch_counter + pointers_for_px > 25 or px_counter == px_with_pointers :
176- resource .batch_write_item (RequestItems = {table_name : batch_upsert_items })
188+ response = resource .batch_write_item (
189+ RequestItems = {table_name : batch_upsert_items }
190+ )
191+
192+ if response .get ("UnprocessedItems" ):
193+ unprocessed_count += len (
194+ response .get ("UnprocessedItems" ).get (table_name , [])
195+ )
196+
177197 batch_upsert_items = []
178198 batch_counter = 0
179199
@@ -189,55 +209,68 @@ def _populate_seed_table(
189209 )
190210 put_req = {"PutRequest" : {"Item" : pointer .model_dump ()}}
191211 batch_upsert_items .append (put_req )
212+ pointer_data .append (
213+ [
214+ pointer .id ,
215+ pointer .type ,
216+ pointer .custodian ,
217+ pointer .nhs_number ,
218+ ]
219+ )
192220 px_counter += 1
193221
222+ if px_counter % 1000 == 0 :
223+ print ("." , end = "" , flush = True )
224+ if px_counter % 100000 == 0 :
225+ print (f" { px_counter } patients processed ({ doc_ref_counter } pointers)." )
226+
227+ print (" Done." )
228+
194229 end_time = datetime .now (tz = timezone .utc )
195230 print (
196- f"Created { doc_ref_counter } pointers in { timedelta .total_seconds (end_time - start_time )} seconds."
231+ f"Created { doc_ref_counter } pointers in { timedelta .total_seconds (end_time - start_time )} seconds (unprocessed: { unprocessed_count } ) ."
197232 )
198233
234+ with open ("./dist/seed-nft-pointers.csv" , "w" ) as f :
235+ writer = csv .writer (f )
236+ writer .writerow (["pointer_id" , "pointer_type" , "custodian" , "nhs_number" ])
237+ writer .writerows (pointer_data )
238+ print (f"Pointer data saved to ./dist/seed-nft-pointers.csv" ) # noqa
239+
199240
200- def _set_up_cyclical_iterator (dists : dict [str , int ]) -> iter :
241+ def _set_up_cyclical_iterator (dists : dict [str , int ]) -> Iterator [ str ] :
201242 """
202243 Given a dict of values and their relative frequencies,
203244 returns an iterator that will cycle through a the reduced and shuffled set of values.
204245 This should result in more live-like data than e.g. creating a bulk amount of each pointer type/custodian in series.
205246 It also means each batch will contain a representative sample of the distribution.
206247 """
207248 d = gcd (* dists .values ())
208- value_list = []
249+ value_list : list [ str ] = []
209250 for entry in dists :
210251 value_list .extend ([entry ] * (dists [entry ] // d ))
211252 shuffle (value_list )
212253 return cycle (value_list )
213254
214255
256+ def _get_pointer_count_poisson_distributions (
257+ num_of_patients : int , pointers_per_px : float
258+ ) -> Iterator [int ]:
259+ p_count_distr = np .random .poisson (lam = pointers_per_px - 1 , size = num_of_patients ) + 1
260+ p_count_distr = np .clip (p_count_distr , a_min = 1 , a_max = 4 )
261+ return cycle (p_count_distr )
262+
263+
215264def _set_up_custodian_iterators (
216- custodian_dists : dict [dict [str , int ]]
217- ) -> dict [str , iter ]:
218- custodian_iters = {}
265+ custodian_dists : dict [str , dict [str , int ]]
266+ ) -> dict [str , Iterator [ str ] ]:
267+ custodian_iters : dict [ str , Iterator [ str ]] = {}
219268 for pointer_type in custodian_dists :
220269 custodian_iters [pointer_type ] = _set_up_cyclical_iterator (
221270 custodian_dists [pointer_type ]
222271 )
223272 return custodian_iters
224273
225274
226- def _set_up_count_iterator (pointers_per_px : float ) -> iter :
227- """
228- Given a target average number of pointers per patient,
229- generates a distribution of counts per individual patient.
230- """
231-
232- extra_per_hundred = int (
233- (pointers_per_px - 1.0 ) * 100
234- ) # no patients can have zero pointers
235- counts = {}
236- counts ["3" ] = extra_per_hundred // 10
237- counts ["2" ] = extra_per_hundred - 2 * counts ["3" ]
238- counts ["1" ] = 100 - counts [2 ] - counts [3 ]
239- return _set_up_cyclical_iterator (counts )
240-
241-
242275if __name__ == "__main__" :
243276 fire .Fire (_populate_seed_table )
0 commit comments