Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 85 additions & 59 deletions tests/rptest/scale_tests/many_partitions_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,67 @@ def _restart_stress(self, scale: ScaleParameters, topic_names: list,
f"Open files after {i} restarts on {node_name}: {file_count}"
)

def _restart_repeater_stress(self, repeater, repeater_msg_size,
scale: ScaleParameters, topic_names: list,
n_partitions: int, expect_bandwidth):
repeater_await_bytes = 1E9
repeater_await_msgs = int(repeater_await_bytes / repeater_msg_size)

def progress_check():
# Explicit wait for consumer group, because we might have e.g.
# just restarted the cluster, and don't want to include that
# delay in our throughput-driven timeout expectations
self.logger.info(f"Checking repeater group is ready...")
repeater.await_group_ready()

t = repeater_await_bytes / expect_bandwidth
self.logger.info(
f"Waiting for {repeater_await_msgs} messages in {t} seconds")
t1 = time.time()
repeater.await_progress(repeater_await_msgs, t)
t2 = time.time()

# This is approximate, because await_progress isn't returning the very
# instant the workers hit their collective target.
self.logger.info(
f"Wait complete, approx bandwidth {(repeater_await_bytes / (t2-t1))/(1024*1024.0)}MB/s"
)

progress_check()

self.logger.info(f"Entering single node restart phase")
self._single_node_restart(scale, topic_names, n_partitions)
progress_check()

self.logger.info(f"Entering restart stress test phase")
self._restart_stress(scale, topic_names, n_partitions, progress_check)

self.logger.info(f"Post-restarts: checking repeater group is ready...")
repeater.await_group_ready()

# Done with restarts, now do a longer traffic soak
self.logger.info(f"Entering traffic soak phase")
soak_await_bytes = 100E9
if not self.redpanda.dedicated_nodes:
soak_await_bytes = 10E9

soak_await_msgs = soak_await_bytes / repeater_msg_size
t1 = time.time()
initial_p, _ = repeater.total_messages()
try:
repeater.await_progress(soak_await_msgs,
soak_await_bytes / expect_bandwidth)
except TimeoutError:
t2 = time.time()
final_p, _ = repeater.total_messages()
bytes_sent = (final_p - initial_p) * repeater_msg_size
expect_mbps = expect_bandwidth / (1024 * 1024.0)
actual_mbps = (bytes_sent / (t2 - t1)) / (1024 * 1024.0)
self.logger.error(
f"Expected throughput {expect_mbps:.2f}, got throughput {actual_mbps:.2f}MB/s"
)
raise

def _write_and_random_read(self, scale: ScaleParameters, topic_names):
"""
This is a relatively low intensity test, that covers random
Expand Down Expand Up @@ -825,6 +886,10 @@ def _test_many_partitions(self, compacted):

# Main test phase: with continuous background traffic, exercise restarts and
# any other cluster changes that might trip up at scale.
# We run this test phase twice.
# Once with txn background traffic. Once with non-txn background traffic

# txn background traffic
repeater_msg_size = 16384
with repeater_traffic(context=self._ctx,
redpanda=self.redpanda,
Expand All @@ -833,65 +898,26 @@ def _test_many_partitions(self, compacted):
msg_size=repeater_msg_size,
workers=workers,
max_buffered_records=64,
use_transactions=True,
transaction_abort_rate=0.1,
msgs_per_transaction=50,
cleanup=lambda: self.free_preallocated_nodes(),
**repeater_kwargs) as repeater:
repeater_await_bytes = 1E9
repeater_await_msgs = int(repeater_await_bytes / repeater_msg_size)
self._restart_repeater_stress(repeater, repeater_msg_size, scale,
topic_names, n_partitions,
scale.expect_bandwidth / 60)

def progress_check():
# Explicit wait for consumer group, because we might have e.g.
# just restarted the cluster, and don't want to include that
# delay in our throughput-driven timeout expectations
self.logger.info(f"Checking repeater group is ready...")
repeater.await_group_ready()

t = repeater_await_bytes / scale.expect_bandwidth
self.logger.info(
f"Waiting for {repeater_await_msgs} messages in {t} seconds"
)
t1 = time.time()
repeater.await_progress(repeater_await_msgs, t)
t2 = time.time()

# This is approximate, because await_progress isn't returning the very
# instant the workers hit their collective target.
self.logger.info(
f"Wait complete, approx bandwidth {(repeater_await_bytes / (t2-t1))/(1024*1024.0)}MB/s"
)

progress_check()

self.logger.info(f"Entering single node restart phase")
self._single_node_restart(scale, topic_names, n_partitions)
progress_check()

self.logger.info(f"Entering restart stress test phase")
self._restart_stress(scale, topic_names, n_partitions,
progress_check)

self.logger.info(
f"Post-restarts: checking repeater group is ready...")
repeater.await_group_ready()

# Done with restarts, now do a longer traffic soak
self.logger.info(f"Entering traffic soak phase")
soak_await_bytes = 100E9
if not self.redpanda.dedicated_nodes:
soak_await_bytes = 10E9

soak_await_msgs = soak_await_bytes / repeater_msg_size
t1 = time.time()
initial_p, _ = repeater.total_messages()
try:
repeater.await_progress(
soak_await_msgs, soak_await_bytes / scale.expect_bandwidth)
except TimeoutError:
t2 = time.time()
final_p, _ = repeater.total_messages()
bytes_sent = (final_p - initial_p) * repeater_msg_size
expect_mbps = scale.expect_bandwidth / (1024 * 1024.0)
actual_mbps = (bytes_sent / (t2 - t1)) / (1024 * 1024.0)
self.logger.error(
f"Expected throughput {expect_mbps:.2f}, got throughput {actual_mbps:.2f}MB/s"
)
raise
# non-txn background traffic
repeater_msg_size = 16384
with repeater_traffic(context=self._ctx,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit odd to pick out the restart stress portion of the test and only run that bit with transactions.

The approach for compaction is to have a top level set of repeater_kwargs, and two different test cases (compacted vs. non) -- can we do the same for transactions? Maybe rather than each feature independently, we should have just two variants of the test: vanilla (no compaction or transactions) and "fully loaded" (compaction, transactions enabled).

I think that'll be useful in identifying which failures result from issues in the primary data path, vs. which ones relate to particular optional features.

redpanda=self.redpanda,
nodes=self.preallocated_nodes,
topic=topic_names[0],
msg_size=repeater_msg_size,
workers=workers,
max_buffered_records=64,
cleanup=lambda: self.free_preallocated_nodes(),
**repeater_kwargs) as repeater:
self._restart_repeater_stress(repeater, repeater_msg_size, scale,
topic_names, n_partitions,
scale.expect_bandwidth)