diff --git a/misc/python/materialize/parallel_workload/executor.py b/misc/python/materialize/parallel_workload/executor.py index 2ff814f3804a1..f26c782899574 100644 --- a/misc/python/materialize/parallel_workload/executor.py +++ b/misc/python/materialize/parallel_workload/executor.py @@ -53,6 +53,7 @@ class Executor: last_status: str action_run_since_last_commit_rollback: bool autocommit: bool + allow_bad: bool def __init__( self, @@ -75,6 +76,51 @@ def __init__( self.use_ws = self.rng.choice([True, False]) if self.ws else False self.autocommit = cur.connection.autocommit self.mz_service = "materialized" + self.allow_bad = False + + self.keywords = [ + "+", + "-", + "*", + "/", + "%", + "+", + "-", + "~", + "=", + "<>", + "!=", + "<", + "<=", + ">", + ">=", + "&", + "|", + "#", + "^", + "<<", + ">>", + "||", + "~", + "!~", + "~*", + "!~*", + "@>", + "<@", + "&&", + "(", + ")", + ";", + "0", + '""', + "NULL", + ] + with open("src/sql-lexer/src/keywords.txt") as f: + for line in f: + line = line.strip() + if not line or line.startswith("#"): + continue + self.keywords.append(line.upper()) def set_isolation(self, level: str) -> None: self.execute(f"SET TRANSACTION_ISOLATION TO '{level}'") @@ -161,6 +207,19 @@ def execute( http_str = " [HTTP]" if is_http else " [WS]" if use_ws and self.ws else "" self.log(f"{query}{extra_info_str}{http_str}") self.last_status = "running" + + if self.allow_bad and self.rng.choice([True, False]): + parts = query.split(" ") + idx = self.rng.randrange(len(parts)) + parts[idx] = str( + self.rng.choice( + self.keywords + if self.rng.choice([True, False]) + else self.db.db_objects() + ) + ) + query = " ".join(parts) + try: if not is_http: if use_ws and self.ws: diff --git a/misc/python/materialize/parallel_workload/worker.py b/misc/python/materialize/parallel_workload/worker.py index 45aeb8cebe664..5453af2bf9a67 100644 --- a/misc/python/materialize/parallel_workload/worker.py +++ b/misc/python/materialize/parallel_workload/worker.py @@ -84,6 +84,7 @@ def run( cur.execute("SELECT pg_backend_pid()") self.exe.pg_pid = cur.fetchall()[0][0] + self.exe.allow_bad = True while time.time() < self.end_time: action = self.rng.choices(self.actions, self.weights)[0] try: @@ -128,10 +129,13 @@ def run( self.exe.rollback_next = True break else: - thread_name = threading.current_thread().getName() - self.occurred_exception = e - print(f"+++ [{thread_name}] Query failed: {e.query} {e.msg}") - raise + if self.exe.allow_bad: + self.exe.rollback_next = True + else: + thread_name = threading.current_thread().getName() + self.occurred_exception = e + print(f"+++ [{thread_name}] Query failed: {e.query} {e.msg}") + raise except Exception as e: self.occurred_exception = e raise e