Skip to content

Commit 242cc1c

Browse files
committed
parallel-workload: Introduce bad queries
1 parent 3cd5592 commit 242cc1c

File tree

2 files changed

+67
-4
lines changed

2 files changed

+67
-4
lines changed

misc/python/materialize/parallel_workload/executor.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ class Executor:
5353
last_status: str
5454
action_run_since_last_commit_rollback: bool
5555
autocommit: bool
56+
allow_bad: bool
5657

5758
def __init__(
5859
self,
@@ -75,6 +76,51 @@ def __init__(
7576
self.use_ws = self.rng.choice([True, False]) if self.ws else False
7677
self.autocommit = cur.connection.autocommit
7778
self.mz_service = "materialized"
79+
self.allow_bad = False
80+
81+
self.keywords = [
82+
"+",
83+
"-",
84+
"*",
85+
"/",
86+
"%",
87+
"+",
88+
"-",
89+
"~",
90+
"=",
91+
"<>",
92+
"!=",
93+
"<",
94+
"<=",
95+
">",
96+
">=",
97+
"&",
98+
"|",
99+
"#",
100+
"^",
101+
"<<",
102+
">>",
103+
"||",
104+
"~",
105+
"!~",
106+
"~*",
107+
"!~*",
108+
"@>",
109+
"<@",
110+
"&&",
111+
"(",
112+
")",
113+
";",
114+
"0",
115+
'""',
116+
"NULL",
117+
]
118+
with open("src/sql-lexer/src/keywords.txt") as f:
119+
for line in f:
120+
line = line.strip()
121+
if not line or line.startswith("#"):
122+
continue
123+
self.keywords.append(line.upper())
78124

79125
def set_isolation(self, level: str) -> None:
80126
self.execute(f"SET TRANSACTION_ISOLATION TO '{level}'")
@@ -161,6 +207,19 @@ def execute(
161207
http_str = " [HTTP]" if is_http else " [WS]" if use_ws and self.ws else ""
162208
self.log(f"{query}{extra_info_str}{http_str}")
163209
self.last_status = "running"
210+
211+
if self.allow_bad and self.rng.choice([True, False]):
212+
parts = query.split(" ")
213+
idx = self.rng.randrange(len(parts))
214+
parts[idx] = str(
215+
self.rng.choice(
216+
self.keywords
217+
if self.rng.choice([True, False])
218+
else self.db.db_objects()
219+
)
220+
)
221+
query = " ".join(parts)
222+
164223
try:
165224
if not is_http:
166225
if use_ws and self.ws:

misc/python/materialize/parallel_workload/worker.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ def run(
8484
cur.execute("SELECT pg_backend_pid()")
8585
self.exe.pg_pid = cur.fetchall()[0][0]
8686

87+
self.exe.allow_bad = True
8788
while time.time() < self.end_time:
8889
action = self.rng.choices(self.actions, self.weights)[0]
8990
try:
@@ -128,10 +129,13 @@ def run(
128129
self.exe.rollback_next = True
129130
break
130131
else:
131-
thread_name = threading.current_thread().getName()
132-
self.occurred_exception = e
133-
print(f"+++ [{thread_name}] Query failed: {e.query} {e.msg}")
134-
raise
132+
if self.exe.allow_bad:
133+
self.exe.rollback_next = True
134+
else:
135+
thread_name = threading.current_thread().getName()
136+
self.occurred_exception = e
137+
print(f"+++ [{thread_name}] Query failed: {e.query} {e.msg}")
138+
raise
135139
except Exception as e:
136140
self.occurred_exception = e
137141
raise e

0 commit comments

Comments
 (0)