-
Notifications
You must be signed in to change notification settings - Fork 23
[DPE-4114] Test: Scale to zero units #347
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 19 commits
9d7bfed
938035f
7dc328b
b762ec8
0ca9740
04bc51c
171b53f
27c97f4
8382d0d
d467d8c
526357b
4b64ce9
927ad24
a18b1d3
18211ed
d917d88
0a0486f
ab160f3
263a1ef
a1b24dd
19574bd
6873326
6716eaf
41bfc2f
2b7db14
ef84bf6
e670781
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -807,7 +807,7 @@ def storage_id(ops_test, unit_name): | |
return line.split()[1] | ||
|
||
|
||
async def add_unit_with_storage(ops_test, app, storage): | ||
async def add_unit_with_storage(ops_test, app, storage, is_blocked: bool = False): | ||
"""Adds unit with storage. | ||
|
||
Note: this function exists as a temporary solution until this issue is resolved: | ||
|
@@ -820,7 +820,14 @@ async def add_unit_with_storage(ops_test, app, storage): | |
return_code, _, _ = await ops_test.juju(*add_unit_cmd) | ||
assert return_code == 0, "Failed to add unit with storage" | ||
async with ops_test.fast_forward(): | ||
await ops_test.model.wait_for_idle(apps=[app], status="active", timeout=2000) | ||
if is_blocked: | ||
application = ops_test.model.applications[app] | ||
await ops_test.model.block_until( | ||
lambda: "blocked" in {unit.workload_status for unit in application.units}, | ||
timeout=1500, | ||
) | ||
else: | ||
await ops_test.model.wait_for_idle(apps=[app], status="active", timeout=1500) | ||
assert ( | ||
len(ops_test.model.applications[app].units) == expected_units | ||
), "New unit not added to model" | ||
|
@@ -862,3 +869,50 @@ async def reused_full_cluster_recovery_storage(ops_test: OpsTest, unit_name) -> | |
"/var/snap/charmed-postgresql/common/var/log/patroni/patroni.log*", | ||
) | ||
return True | ||
|
||
|
||
async def get_db_connection(ops_test, dbname, is_primary=True, replica_unit_name=""): | ||
unit_name = await get_primary(ops_test, APP_NAME) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You may add the type hint for the returned values and a docstring to make the output even easier to understand. The same also applies to the other functions you created in this file. |
||
password = await get_password(ops_test, APP_NAME) | ||
address = get_unit_address(ops_test, unit_name) | ||
if not is_primary and unit_name != "": | ||
unit_name = replica_unit_name | ||
address = ops_test.model.applications[APP_NAME].units[unit_name].public_address | ||
connection_string = ( | ||
f"dbname='{dbname}' user='operator'" | ||
f" host='{address}' password='{password}' connect_timeout=10" | ||
) | ||
return connection_string, unit_name | ||
|
||
|
||
async def validate_test_data(connection_string): | ||
with psycopg2.connect(connection_string) as connection: | ||
connection.autocommit = True | ||
with connection.cursor() as cursor: | ||
cursor.execute("SELECT data FROM test;") | ||
data = cursor.fetchone() | ||
assert data[0] == "some data" | ||
connection.close() | ||
|
||
|
||
async def create_test_data(connection_string): | ||
with psycopg2.connect(connection_string) as connection: | ||
connection.autocommit = True | ||
with connection.cursor() as cursor: | ||
# Check that it's possible to write and read data from the database that | ||
# was created for the application. | ||
cursor.execute("DROP TABLE IF EXISTS test;") | ||
cursor.execute("CREATE TABLE test(data TEXT);") | ||
cursor.execute("INSERT INTO test(data) VALUES('some data');") | ||
cursor.execute("SELECT data FROM test;") | ||
data = cursor.fetchone() | ||
assert data[0] == "some data" | ||
connection.close() | ||
|
||
|
||
async def get_last_added_unit(ops_test, app, prev_units): | ||
curr_units = [unit.name for unit in ops_test.model.applications[app].units] | ||
new_unit = list(set(curr_units) - set(prev_units))[0] | ||
for unit in ops_test.model.applications[app].units: | ||
if new_unit == unit.name: | ||
return unit |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ | |
import logging | ||
|
||
import pytest | ||
from pip._vendor import requests | ||
from pytest_operator.plugin import OpsTest | ||
from tenacity import Retrying, stop_after_delay, wait_fixed | ||
|
||
|
@@ -15,6 +16,7 @@ | |
get_password, | ||
get_unit_address, | ||
run_command_on_unit, | ||
scale_application, | ||
) | ||
from .conftest import APPLICATION_NAME | ||
from .helpers import ( | ||
|
@@ -27,10 +29,13 @@ | |
change_patroni_setting, | ||
change_wal_settings, | ||
check_writes, | ||
create_test_data, | ||
cut_network_from_unit, | ||
cut_network_from_unit_without_ip_change, | ||
fetch_cluster_members, | ||
get_controller_machine, | ||
get_db_connection, | ||
get_last_added_unit, | ||
get_patroni_setting, | ||
get_primary, | ||
get_unit_ip, | ||
|
@@ -49,8 +54,10 @@ | |
storage_id, | ||
storage_type, | ||
update_restart_condition, | ||
validate_test_data, | ||
wait_network_restore, | ||
) | ||
from .test_restore_cluster import SECOND_APPLICATION | ||
marceloneppel marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
@@ -78,6 +85,14 @@ async def test_build_and_deploy(ops_test: OpsTest) -> None: | |
storage={"pgdata": {"pool": "lxd-btrfs", "size": 2048}}, | ||
config={"profile": "testing"}, | ||
) | ||
await ops_test.model.deploy( | ||
charm, | ||
num_units=1, | ||
application_name=SECOND_APPLICATION, | ||
series=CHARM_SERIES, | ||
storage={"pgdata": {"pool": "lxd-btrfs", "size": 2048}}, | ||
config={"profile": "testing"}, | ||
) | ||
# Deploy the continuous writes application charm if it wasn't already deployed. | ||
if not await app_name(ops_test, APPLICATION_NAME): | ||
wait_for_apps = True | ||
|
@@ -543,3 +558,119 @@ async def test_network_cut_without_ip_change( | |
), "Connection is not possible after network restore" | ||
|
||
await is_cluster_updated(ops_test, primary_name, use_ip_from_inside=True) | ||
|
||
|
||
@pytest.mark.group(1) | ||
async def test_deploy_zero_units(ops_test: OpsTest, charm): | ||
"""Scale the database to zero units and scale up again.""" | ||
app = await app_name(ops_test) | ||
dbname = f"{APPLICATION_NAME.replace('-', '_')}_first_database" | ||
connection_string, _ = await get_db_connection(ops_test, dbname=dbname) | ||
|
||
# Start an application that continuously writes data to the database. | ||
await start_continuous_writes(ops_test, app) | ||
|
||
logger.info("checking whether writes are increasing") | ||
await are_writes_increasing(ops_test) | ||
|
||
# Connect to the database. | ||
# Create test data. | ||
logger.info("connect to DB and create test table") | ||
await create_test_data(connection_string) | ||
|
||
# Test to check the use of different versions postgresql. | ||
# Release of a new version of charm with another version of postgresql, | ||
# it is necessary to implement a test that will check the use of different versions of postgresql. | ||
marceloneppel marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
unit_ip_addresses = [] | ||
primary_storage = "" | ||
for unit in ops_test.model.applications[app].units: | ||
# Save IP addresses of units | ||
unit_ip_addresses.append(await get_unit_ip(ops_test, unit.name)) | ||
|
||
# Save detached storage ID | ||
if await unit.is_leader_from_status(): | ||
primary_storage = storage_id(ops_test, unit.name) | ||
|
||
logger.info(f"get storage id app: {SECOND_APPLICATION}") | ||
second_storage = "" | ||
for unit in ops_test.model.applications[SECOND_APPLICATION].units: | ||
if await unit.is_leader_from_status(): | ||
second_storage = storage_id(ops_test, unit.name) | ||
break | ||
|
||
# Scale the database to zero units. | ||
logger.info("scaling database to zero units") | ||
await scale_application(ops_test, app, 0) | ||
await scale_application(ops_test, SECOND_APPLICATION, 0) | ||
|
||
# Checking shutdown units. | ||
for unit_ip in unit_ip_addresses: | ||
try: | ||
resp = requests.get(f"http://{unit_ip}:8008") | ||
assert ( | ||
resp.status_code != 200 | ||
), f"status code = {resp.status_code}, message = {resp.text}" | ||
except requests.exceptions.ConnectionError: | ||
assert True, f"unit host = http://{unit_ip}:8008, all units shutdown" | ||
except Exception as e: | ||
assert False, f"{e} unit host = http://{unit_ip}:8008, something went wrong" | ||
|
||
# Scale up to one unit. | ||
logger.info("scaling database to one unit") | ||
await add_unit_with_storage(ops_test, app=app, storage=primary_storage) | ||
await ops_test.model.wait_for_idle( | ||
apps=[APP_NAME, APPLICATION_NAME], status="active", timeout=1500 | ||
) | ||
|
||
connection_string, _ = await get_db_connection(ops_test, dbname=dbname) | ||
logger.info("checking whether writes are increasing") | ||
await are_writes_increasing(ops_test) | ||
|
||
logger.info("check test database data") | ||
await validate_test_data(connection_string) | ||
|
||
logger.info("database scaling up to two units using third-party cluster storage") | ||
new_unit = await add_unit_with_storage( | ||
ops_test, app=app, storage=second_storage, is_blocked=True | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: IMHO, it worth to check: we are blocked with the right message (foreign disk). |
||
|
||
logger.info(f"remove unit {new_unit.name} with storage from application {SECOND_APPLICATION}") | ||
await ops_test.model.destroy_units(new_unit.name) | ||
|
||
await are_writes_increasing(ops_test) | ||
|
||
logger.info("check test database data") | ||
await validate_test_data(connection_string) | ||
|
||
# Scale up to two units. | ||
logger.info("scaling database to two unit") | ||
prev_units = [unit.name for unit in ops_test.model.applications[app].units] | ||
await scale_application(ops_test, application_name=app, count=2) | ||
unit = await get_last_added_unit(ops_test, app, prev_units) | ||
|
||
logger.info(f"check test database data of unit name {unit.name}") | ||
connection_string, _ = await get_db_connection( | ||
ops_test, dbname=dbname, is_primary=False, replica_unit_name=unit.name | ||
) | ||
await validate_test_data(connection_string) | ||
assert await reused_replica_storage( | ||
ops_test, unit_name=unit.name | ||
), "attached storage not properly re-used by Postgresql." | ||
|
||
# Scale up to three units. | ||
logger.info("scaling database to three unit") | ||
prev_units = [unit.name for unit in ops_test.model.applications[app].units] | ||
await scale_application(ops_test, application_name=app, count=3) | ||
unit = await get_last_added_unit(ops_test, app, prev_units) | ||
|
||
logger.info(f"check test database data of unit name {unit.name}") | ||
connection_string, _ = await get_db_connection( | ||
ops_test, dbname=dbname, is_primary=False, replica_unit_name=unit.name | ||
) | ||
await validate_test_data(connection_string) | ||
assert await reused_replica_storage( | ||
ops_test, unit_name=unit.name | ||
), "attached storage not properly re-used by Postgresql." | ||
|
||
await check_writes(ops_test) |
Uh oh!
There was an error while loading. Please reload this page.