diff --git a/ydb/tests/stress/topic/workload/__init__.py b/ydb/tests/stress/topic/workload/__init__.py index 2118188e89e4..0ef9c245a1d4 100644 --- a/ydb/tests/stress/topic/workload/__init__.py +++ b/ydb/tests/stress/topic/workload/__init__.py @@ -40,14 +40,23 @@ def _unpack_resource(self, name): os.chmod(path_to_unpack, st.st_mode | stat.S_IEXEC) self.cli_path = path_to_unpack - def get_command_prefix(self, subcmds: list[str]) -> list[str]: + def _get_cli_common_args(self) -> list[str]: return [ self.cli_path, '--verbose', '--endpoint', self.endpoint, '--database={}'.format(self.database), + ] + + @property + def workload_topic_name(self) -> str: + return f'{self.table_prefix}' + + def get_command_prefix(self, subcmds: list[str]) -> list[str]: + return [ + *self._get_cli_common_args(), 'workload', 'topic' - ] + subcmds + ['--topic', f'{self.table_prefix}'] + ] + subcmds + ['--topic', self.workload_topic_name] def cmd_run(self, cmd): logger.debug(f"Running cmd {cmd}") @@ -60,6 +69,20 @@ def __loop(self): self.cmd_run( self.get_command_prefix(subcmds=['init', '-c', self.consumers, '-p', self.producers]) ) + # adjust + self.cmd_run([ + *self._get_cli_common_args(), + 'topic', 'alter', + '--retention-period=2s', + self.workload_topic_name, + ]) + self.cmd_run([ + *self._get_cli_common_args(), + 'topic', 'consumer', 'add', + f'--availability-period={int(self.duration) * 9 // 10}s', + '--consumer', 'data_holder', + self.workload_topic_name, + ]) # run run_cmd_args = ['run', 'full', '-s', self.duration, '--byte-rate', '100M', '--use-tx', '--tx-commit-interval', '2000', '-p', self.producers, '-c', self.consumers] if self.limit_memory_usage: