Skip to content

Commit df0aee3

Browse files
authored
Merge pull request #683 from praekeltfoundation/update-migration-script
Update migration script
2 parents 094c2f3 + a86c3ac commit df0aee3

File tree

2 files changed

+136
-47
lines changed

2 files changed

+136
-47
lines changed

scripts/migrate_to_turn/fetch_rapidpro_contacts.py

Lines changed: 120 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,25 @@
2020
to_lowercase,
2121
)
2222

23-
RAPIDPRO_URL = "https://rapidpro.qa.momconnect.co.za"
24-
25-
START_DATE = "2026-02-24 01:13:06"
26-
END_DATE = "2026-02-26 19:13:06"
27-
LIMIT = 1000
23+
env = "qa" # qa or prd
24+
RAPIDPRO_URL = f"https://rapidpro.{env}.momconnect.co.za"
25+
26+
START_DATE = "2022-02-10 00:0:00"
27+
# END_DATE = "2026-02-26 19:13:06"
28+
# START_DATE = "2026-03-04 00:00:00"
29+
END_DATE = "2026-03-05 00:00:00"
30+
LIMIT = 1000000
2831
INCLUDE_OPTED_OUT = False
2932
# We want to import beta testing users as opted out and give them the chance to opt in.
3033
IMPORT_AS_OPTED_OUT = False
3134
# This is to identify invited users and schedule the invite message.
32-
MIGRATION_KEY = "beta_testing_batch_1"
35+
MIGRATION_KEY = "batch_1"
3336

34-
MSISDN_FILTER = (
35-
os.environ.get("MSISDN_FILTER", "").split(",")
36-
if os.environ.get("MSISDN_FILTER")
37-
else []
38-
)
37+
MSISDN_FILTER = [
38+
msisdn.strip()
39+
for msisdn in os.environ.get("MSISDN_FILTER", "").split(",")
40+
if msisdn.strip()
41+
]
3942

4043
FIELD_MAPPING = {
4144
"edd": {
@@ -108,6 +111,15 @@
108111
}
109112

110113

114+
def get_readable_timestamp(dt=None):
115+
dt = dt or datetime.now().astimezone()
116+
return dt.strftime("%Y-%m-%d %H:%M:%S %Z")
117+
118+
119+
def log(message):
120+
print(f"[{get_readable_timestamp()}] {message}")
121+
122+
111123
def get_field_data(contact):
112124
data = {}
113125
for rapidpro_field, turn_details in FIELD_MAPPING.items():
@@ -118,6 +130,11 @@ def get_field_data(contact):
118130
else:
119131
data[turn_field] = contact.fields.get(rapidpro_field)
120132

133+
if rapidpro_field == "language":
134+
language_value = data[turn_field]
135+
if language_value is None or str(language_value).strip() == "":
136+
data[turn_field] = "eng"
137+
121138
if "process" in turn_details:
122139
data[turn_field] = turn_details["process"](data[turn_field])
123140

@@ -138,53 +155,108 @@ def is_opted_out(contact):
138155
return opted_out.upper() == "TRUE"
139156

140157

141-
def get_rapidpro_contacts(client, start_date=None, end_date=None):
142-
print(f"> Getting rapidpro contacts from {start_date} to {end_date}")
143-
contacts = []
144-
oldest_date = end_date.replace(tzinfo=pytz.utc)
145-
for contact_batch in client.get_contacts(
146-
before=end_date, after=start_date
147-
).iterfetches(retry_on_rate_exceed=True):
148-
for contact in contact_batch:
149-
wa_id = get_wa_id(contact)
158+
def get_contact_urn(msisdn):
159+
return msisdn if msisdn.startswith("whatsapp:") else f"whatsapp:{msisdn}"
160+
161+
162+
def process_contact(
163+
contact, contacts, oldest_date, seen_wa_ids=None, apply_msisdn_filter=False
164+
):
165+
wa_id = get_wa_id(contact)
166+
167+
if is_opted_out(contact) and not INCLUDE_OPTED_OUT:
168+
return oldest_date, False
169+
170+
if not wa_id:
171+
return oldest_date, False
172+
173+
if apply_msisdn_filter and MSISDN_FILTER and wa_id not in MSISDN_FILTER:
174+
return oldest_date, False
150175

151-
if is_opted_out(contact) and not INCLUDE_OPTED_OUT:
152-
continue
176+
if seen_wa_ids is not None and wa_id in seen_wa_ids:
177+
return oldest_date, False
153178

154-
# TODO: filtering for testing only, remove later()
155-
if wa_id and MSISDN_FILTER and wa_id not in MSISDN_FILTER:
156-
continue
179+
data = get_field_data(contact)
180+
data["urn"] = wa_id
181+
contacts.append(data)
157182

158-
if wa_id:
159-
data = get_field_data(contact)
160-
data["urn"] = wa_id
161-
contacts.append(data)
183+
if seen_wa_ids is not None:
184+
seen_wa_ids.add(wa_id)
162185

163-
modified_on = contact.modified_on.astimezone(pytz.utc)
164-
if modified_on < oldest_date:
165-
oldest_date = modified_on
186+
modified_on = contact.modified_on.astimezone(pytz.utc)
187+
if oldest_date is None or modified_on < oldest_date:
188+
oldest_date = modified_on
166189

167-
if len(contacts) >= LIMIT:
190+
return oldest_date, True
191+
192+
193+
def get_rapidpro_contacts_by_msisdn(client):
194+
log(f"> Getting rapidpro contacts by MSISDN filter ({len(MSISDN_FILTER)} values)")
195+
contacts = []
196+
oldest_date = None
197+
seen_wa_ids = set()
198+
batch_number = 0
199+
200+
for msisdn in MSISDN_FILTER:
201+
urn = get_contact_urn(msisdn)
202+
for contact_batch in client.get_contacts(urn=urn).iterfetches(
203+
retry_on_rate_exceed=True
204+
):
205+
batch_number += 1
206+
log(f"Processing contact_batch #{batch_number} for {urn}")
207+
for contact in contact_batch:
208+
oldest_date, added = process_contact(
209+
contact, contacts, oldest_date, seen_wa_ids=seen_wa_ids
210+
)
211+
212+
if added and len(contacts) >= LIMIT:
168213
return contacts, oldest_date
169214

170215
return contacts, oldest_date
171216

172217

173-
def fetch_rapidpro_contacts(client):
174-
start_date = datetime.strptime(START_DATE, "%Y-%m-%d %H:%M:%S")
175-
end_date = datetime.strptime(END_DATE, "%Y-%m-%d %H:%M:%S")
218+
def get_rapidpro_contacts(client, start_date=None, end_date=None):
219+
log(f"> Getting rapidpro contacts from {start_date} to {end_date}")
220+
contacts = []
221+
oldest_date = end_date.replace(tzinfo=pytz.utc)
222+
for batch_number, contact_batch in enumerate(
223+
client.get_contacts(before=end_date, after=start_date).iterfetches(
224+
retry_on_rate_exceed=True
225+
),
226+
start=1,
227+
):
228+
log(f"Processing contact_batch #{batch_number}")
229+
for contact in contact_batch:
230+
oldest_date, added = process_contact(
231+
contact, contacts, oldest_date, apply_msisdn_filter=True
232+
)
176233

177-
contacts, oldest_date = get_rapidpro_contacts(client, start_date, end_date)
234+
if added and len(contacts) >= LIMIT:
235+
return contacts, oldest_date
178236

179-
print(f"Found: {len(contacts)}")
180-
print(f"Oldest modified on date: {oldest_date}")
237+
return contacts, oldest_date
181238

182-
if contacts:
239+
240+
def fetch_rapidpro_contacts(client):
241+
run_start = datetime.now().astimezone()
242+
log("Starting RapidPro contact fetch")
243+
244+
if MSISDN_FILTER:
245+
contacts, oldest_date = get_rapidpro_contacts_by_msisdn(client)
246+
filename = f"contacts-{env}-msisdn-filter.csv"
247+
else:
248+
start_date = datetime.strptime(START_DATE, "%Y-%m-%d %H:%M:%S")
249+
end_date = datetime.strptime(END_DATE, "%Y-%m-%d %H:%M:%S")
250+
contacts, oldest_date = get_rapidpro_contacts(client, start_date, end_date)
183251
start = START_DATE.split(" ")[0]
184252
end = END_DATE.split(" ")[0]
185-
filename = f"contacts-{start}-{end}.csv"
253+
filename = f"contacts-{env}-{start}-{end}.csv"
186254

187-
print(f"File: {filename}")
255+
log(f"Found: {len(contacts)}")
256+
log(f"Oldest modified on date: {oldest_date}")
257+
258+
if contacts:
259+
log(f"File: {filename}")
188260

189261
keys = contacts[0].keys()
190262

@@ -193,6 +265,12 @@ def fetch_rapidpro_contacts(client):
193265
dict_writer.writeheader()
194266
dict_writer.writerows(contacts)
195267

268+
run_end = datetime.now().astimezone()
269+
log("Finished RapidPro contact fetch")
270+
log(f"Run started: {get_readable_timestamp(run_start)}")
271+
log(f"Run ended: {get_readable_timestamp(run_end)}")
272+
log(f"Total duration: {run_end - run_start}")
273+
196274

197275
if __name__ == "__main__":
198276
client = TembaClient(RAPIDPRO_URL, os.environ["RAPIDPRO_TOKEN"])

scripts/migrate_to_turn/tests/test_fetch_rapidpro_contacts.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,19 @@ class FakeClient:
3434
def __init__(self, batches):
3535
self._batches = batches
3636

37-
def get_contacts(self, before=None, after=None):
38-
return FakeContacts(self._batches)
37+
def get_contacts(self, before=None, after=None, urn=None):
38+
if urn is None:
39+
return FakeContacts(self._batches)
40+
41+
filtered_batches = []
42+
for batch in self._batches:
43+
filtered_batch = [
44+
contact for contact in batch if urn in getattr(contact, "urns", [])
45+
]
46+
if filtered_batch:
47+
filtered_batches.append(filtered_batch)
48+
49+
return FakeContacts(filtered_batches)
3950

4051

4152
class FetchRapidproContactsTests(TestCase):
@@ -174,7 +185,7 @@ def test_fetch_rapidpro_contacts_writes_csv(self):
174185
):
175186
fetch_rapidpro_contacts.fetch_rapidpro_contacts(client)
176187

177-
output_path = Path(tmp_dir) / "contacts-2025-01-01-2025-01-31.csv"
188+
output_path = Path(tmp_dir) / "contacts-qa-2025-01-01-2025-01-31.csv"
178189
with output_path.open(newline="") as csv_file:
179190
reader = csv.DictReader(csv_file)
180191
rows = list(reader)
@@ -268,7 +279,7 @@ def test_fetch_rapidpro_contacts_skips_opted_out(self):
268279
):
269280
fetch_rapidpro_contacts.fetch_rapidpro_contacts(client)
270281

271-
output_path = Path(tmp_dir) / "contacts-2025-01-01-2025-01-31.csv"
282+
output_path = Path(tmp_dir) / "contacts-qa-2025-01-01-2025-01-31.csv"
272283
self.assertFalse(output_path.exists())
273284

274285
def test_fetch_rapidpro_contacts_msisdn_filter(self):
@@ -310,7 +321,7 @@ def test_fetch_rapidpro_contacts_msisdn_filter(self):
310321
):
311322
fetch_rapidpro_contacts.fetch_rapidpro_contacts(client)
312323

313-
output_path = Path(tmp_dir) / "contacts-2025-01-01-2025-01-31.csv"
324+
output_path = Path(tmp_dir) / "contacts-qa-msisdn-filter.csv"
314325
with output_path.open(newline="") as csv_file:
315326
reader = csv.DictReader(csv_file)
316327
rows = list(reader)

0 commit comments

Comments
 (0)