Skip to content

Commit 2c25228

Browse files
[DPE-2955] Cross-region async replication integration tests (#453)
* Add async replication implementation Signed-off-by: Marcelo Henrique Neppel <[email protected]> * Add async replication integration tests Signed-off-by: Marcelo Henrique Neppel <[email protected]> * Add test for scaling Signed-off-by: Marcelo Henrique Neppel <[email protected]> * Backup standby pgdata folder Signed-off-by: Marcelo Henrique Neppel <[email protected]> * Fix OS call Signed-off-by: Marcelo Henrique Neppel <[email protected]> * Fix unit tests Signed-off-by: Marcelo Henrique Neppel <[email protected]> * Improve comments and logs Signed-off-by: Marcelo Henrique Neppel <[email protected]> * Fix juju3 markers Signed-off-by: Marcelo Henrique Neppel <[email protected]> * Revert permission change Signed-off-by: Marcelo Henrique Neppel <[email protected]> * Add optional type hint Signed-off-by: Marcelo Henrique Neppel <[email protected]> * Add relation name to secret label and revert poetry.lock Signed-off-by: Marcelo Henrique Neppel <[email protected]> * Reload Patroni configuration when member is not ready yet Signed-off-by: Marcelo Henrique Neppel <[email protected]> --------- Signed-off-by: Marcelo Henrique Neppel <[email protected]>
1 parent 4310dc9 commit 2c25228

File tree

3 files changed

+713
-51
lines changed

3 files changed

+713
-51
lines changed

tests/integration/ha_tests/helpers.py

Lines changed: 127 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import psycopg2
1313
import requests
1414
import yaml
15+
from juju.model import Model
1516
from pytest_operator.plugin import OpsTest
1617
from tenacity import (
1718
RetryError,
@@ -86,32 +87,47 @@ async def are_all_db_processes_down(ops_test: OpsTest, process: str) -> bool:
8687

8788

8889
async def are_writes_increasing(
89-
ops_test, down_unit: str = None, use_ip_from_inside: bool = False
90+
ops_test, down_unit: str = None, use_ip_from_inside: bool = False, extra_model: Model = None
9091
) -> None:
9192
"""Verify new writes are continuing by counting the number of writes."""
9293
writes, _ = await count_writes(
93-
ops_test, down_unit=down_unit, use_ip_from_inside=use_ip_from_inside
94+
ops_test,
95+
down_unit=down_unit,
96+
use_ip_from_inside=use_ip_from_inside,
97+
extra_model=extra_model,
9498
)
9599
for member, count in writes.items():
96100
for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3)):
97101
with attempt:
98102
more_writes, _ = await count_writes(
99-
ops_test, down_unit=down_unit, use_ip_from_inside=use_ip_from_inside
103+
ops_test,
104+
down_unit=down_unit,
105+
use_ip_from_inside=use_ip_from_inside,
106+
extra_model=extra_model,
100107
)
101-
assert more_writes[member] > count, f"{member}: writes not continuing to DB"
108+
assert (
109+
more_writes[member] > count
110+
), f"{member}: writes not continuing to DB (current writes: {more_writes[member]} - previous writes: {count})"
102111

103112

104-
async def app_name(ops_test: OpsTest, application_name: str = "postgresql") -> Optional[str]:
113+
async def app_name(
114+
ops_test: OpsTest, application_name: str = "postgresql", model: Model = None
115+
) -> Optional[str]:
105116
"""Returns the name of the cluster running PostgreSQL.
106117
107118
This is important since not all deployments of the PostgreSQL charm have the application name
108119
"postgresql".
109120
110121
Note: if multiple clusters are running PostgreSQL this will return the one first found.
111122
"""
112-
status = await ops_test.model.get_status()
113-
for app in ops_test.model.applications:
114-
if application_name in status["applications"][app]["charm"]:
123+
if model is None:
124+
model = ops_test.model
125+
status = await model.get_status()
126+
for app in model.applications:
127+
if (
128+
application_name in status["applications"][app]["charm"]
129+
and APPLICATION_NAME not in status["applications"][app]["charm"]
130+
):
115131
return app
116132

117133
return None
@@ -207,13 +223,18 @@ async def is_cluster_updated(
207223
), "secondary not up to date with the cluster after restarting."
208224

209225

210-
async def check_writes(ops_test, use_ip_from_inside: bool = False) -> int:
226+
async def check_writes(
227+
ops_test, use_ip_from_inside: bool = False, extra_model: Model = None
228+
) -> int:
211229
"""Gets the total writes from the test charm and compares to the writes from db."""
212230
total_expected_writes = await stop_continuous_writes(ops_test)
213231
actual_writes, max_number_written = await count_writes(
214-
ops_test, use_ip_from_inside=use_ip_from_inside
232+
ops_test, use_ip_from_inside=use_ip_from_inside, extra_model=extra_model
215233
)
216234
for member, count in actual_writes.items():
235+
print(
236+
f"member: {member}, count: {count}, max_number_written: {max_number_written[member]}, total_expected_writes: {total_expected_writes}"
237+
)
217238
assert (
218239
count == max_number_written[member]
219240
), f"{member}: writes to the db were missed: count of actual writes different from the max number written."
@@ -222,30 +243,44 @@ async def check_writes(ops_test, use_ip_from_inside: bool = False) -> int:
222243

223244

224245
async def count_writes(
225-
ops_test: OpsTest, down_unit: str = None, use_ip_from_inside: bool = False
246+
ops_test: OpsTest,
247+
down_unit: str = None,
248+
use_ip_from_inside: bool = False,
249+
extra_model: Model = None,
226250
) -> Tuple[Dict[str, int], Dict[str, int]]:
227251
"""Count the number of writes in the database."""
228252
app = await app_name(ops_test)
229253
password = await get_password(ops_test, app, down_unit)
230-
for unit in ops_test.model.applications[app].units:
231-
if unit.name != down_unit:
232-
cluster = get_patroni_cluster(
233-
await (
234-
get_ip_from_inside_the_unit(ops_test, unit.name)
235-
if use_ip_from_inside
236-
else get_unit_ip(ops_test, unit.name)
237-
)
238-
)
239-
break
254+
members = []
255+
for model in [ops_test.model, extra_model]:
256+
if model is None:
257+
continue
258+
for unit in model.applications[app].units:
259+
if unit.name != down_unit:
260+
members_data = get_patroni_cluster(
261+
await (
262+
get_ip_from_inside_the_unit(ops_test, unit.name)
263+
if use_ip_from_inside
264+
else get_unit_ip(ops_test, unit.name)
265+
)
266+
)["members"]
267+
for index, member_data in enumerate(members_data):
268+
members_data[index]["model"] = model.info.name
269+
members.extend(members_data)
270+
break
240271
down_ips = []
241272
if down_unit:
242273
for unit in ops_test.model.applications[app].units:
243274
if unit.name == down_unit:
244275
down_ips.append(unit.public_address)
245276
down_ips.append(await get_unit_ip(ops_test, unit.name))
277+
return count_writes_on_members(members, password, down_ips)
278+
279+
280+
def count_writes_on_members(members, password, down_ips) -> Tuple[Dict[str, int], Dict[str, int]]:
246281
count = {}
247282
maximum = {}
248-
for member in cluster["members"]:
283+
for member in members:
249284
if member["role"] != "replica" and member["host"] not in down_ips:
250285
host = member["host"]
251286

@@ -254,12 +289,23 @@ async def count_writes(
254289
f" host='{host}' password='{password}' connect_timeout=10"
255290
)
256291

257-
with psycopg2.connect(connection_string) as connection, connection.cursor() as cursor:
258-
cursor.execute("SELECT COUNT(number), MAX(number) FROM continuous_writes;")
259-
results = cursor.fetchone()
260-
count[member["name"]] = results[0]
261-
maximum[member["name"]] = results[1]
262-
connection.close()
292+
member_name = f'{member["model"]}.{member["name"]}'
293+
connection = None
294+
try:
295+
with psycopg2.connect(
296+
connection_string
297+
) as connection, connection.cursor() as cursor:
298+
cursor.execute("SELECT COUNT(number), MAX(number) FROM continuous_writes;")
299+
results = cursor.fetchone()
300+
count[member_name] = results[0]
301+
maximum[member_name] = results[1]
302+
except psycopg2.Error:
303+
# Error raised when the connection is not possible.
304+
count[member_name] = -1
305+
maximum[member_name] = -1
306+
finally:
307+
if connection is not None:
308+
connection.close()
263309
return count, maximum
264310

265311

@@ -401,6 +447,42 @@ def get_random_unit(ops_test: OpsTest, app: str) -> str:
401447
return random.choice(ops_test.model.applications[app].units).name
402448

403449

450+
async def get_standby_leader(model: Model, application_name: str) -> str:
451+
"""Get the standby leader name.
452+
453+
Args:
454+
model: the model instance.
455+
application_name: the name of the application to get the value for.
456+
457+
Returns:
458+
the name of the standby leader.
459+
"""
460+
first_unit_ip = model.applications[application_name].units[0].public_address
461+
cluster = get_patroni_cluster(first_unit_ip)
462+
for member in cluster["members"]:
463+
if member["role"] == "standby_leader":
464+
return member["name"]
465+
466+
467+
async def get_sync_standby(ops_test: OpsTest, model: Model, application_name: str) -> str:
468+
"""Get the sync_standby name.
469+
470+
Args:
471+
ops_test: the ops test instance.
472+
model: the model instance.
473+
application_name: the name of the application to get the value for.
474+
475+
Returns:
476+
the name of the sync standby.
477+
"""
478+
any_unit = model.applications[application_name].units[0].name
479+
first_unit_ip = await get_unit_ip(ops_test, any_unit, model)
480+
cluster = get_patroni_cluster(first_unit_ip)
481+
for member in cluster["members"]:
482+
if member["role"] == "sync_standby":
483+
return member["name"]
484+
485+
404486
async def get_password(ops_test: OpsTest, app: str, down_unit: str = None) -> str:
405487
"""Use the charm action to retrieve the password from provided application.
406488
@@ -417,20 +499,24 @@ async def get_password(ops_test: OpsTest, app: str, down_unit: str = None) -> st
417499
return action.results["password"]
418500

419501

420-
async def get_unit_ip(ops_test: OpsTest, unit_name: str) -> str:
502+
async def get_unit_ip(ops_test: OpsTest, unit_name: str, model: Model = None) -> str:
421503
"""Wrapper for getting unit ip.
422504
423505
Args:
424506
ops_test: The ops test object passed into every test case
425507
unit_name: The name of the unit to get the address
508+
model: Optional model instance to use
426509
Returns:
427510
The (str) ip of the unit
428511
"""
429-
application = unit_name.split("/")[0]
430-
for unit in ops_test.model.applications[application].units:
431-
if unit.name == unit_name:
432-
break
433-
return await instance_ip(ops_test, unit.machine.hostname)
512+
if model is None:
513+
application = unit_name.split("/")[0]
514+
for unit in ops_test.model.applications[application].units:
515+
if unit.name == unit_name:
516+
break
517+
return await instance_ip(ops_test, unit.machine.hostname)
518+
else:
519+
return get_unit_address(ops_test, unit_name)
434520

435521

436522
@retry(stop=stop_after_attempt(8), wait=wait_fixed(15), reraise=True)
@@ -673,24 +759,26 @@ async def is_secondary_up_to_date(
673759
return True
674760

675761

676-
async def start_continuous_writes(ops_test: OpsTest, app: str) -> None:
762+
async def start_continuous_writes(ops_test: OpsTest, app: str, model: Model = None) -> None:
677763
"""Start continuous writes to PostgreSQL."""
678764
# Start the process by relating the application to the database or
679765
# by calling the action if the relation already exists.
766+
if model is None:
767+
model = ops_test.model
680768
relations = [
681769
relation
682-
for relation in ops_test.model.applications[app].relations
770+
for relation in model.applications[app].relations
683771
if not relation.is_peer
684772
and f"{relation.requires.application_name}:{relation.requires.name}"
685773
== f"{APPLICATION_NAME}:first-database"
686774
]
687775
if not relations:
688-
await ops_test.model.relate(app, f"{APPLICATION_NAME}:first-database")
689-
await ops_test.model.wait_for_idle(status="active", timeout=1000)
776+
await model.relate(app, f"{APPLICATION_NAME}:first-database")
777+
await model.wait_for_idle(status="active", timeout=1000)
690778
for attempt in Retrying(stop=stop_after_delay(60 * 5), wait=wait_fixed(3), reraise=True):
691779
with attempt:
692780
action = (
693-
await ops_test.model.applications[APPLICATION_NAME]
781+
await model.applications[APPLICATION_NAME]
694782
.units[0]
695783
.run_action("start-continuous-writes")
696784
)

0 commit comments

Comments
 (0)