Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 111 additions & 2 deletions misc/python/materialize/data_ingest/data_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ def name(backend: Backend = Backend.MATERIALIZE) -> str:
return "string"


class Bytea(Text):
class Bytea(DataType):
@staticmethod
def name(backend: Backend = Backend.MATERIALIZE) -> str:
if backend == Backend.AVRO:
Expand All @@ -524,6 +524,44 @@ def name(backend: Backend = Backend.MATERIALIZE) -> str:
else:
return "bytea"

@staticmethod
def random_value(
rng: random.Random,
record_size: RecordSize = RecordSize.LARGE,
in_query: bool = False,
) -> Any:
if rng.randrange(10) == 0:
result = rng.choice(
[
"NULL",
"0.0",
"True",
# "",
"表ポあA鷗ŒéB逍Üߪąñ丂㐀𠀀",
rng.randint(-100, 100),
]
)
# Fails: unterminated dollar-quoted string
# chars = string.printable
chars = string.ascii_letters + string.digits
if record_size == RecordSize.TINY:
result = rng.choice(("foo", "bar", "baz"))
elif record_size == RecordSize.SMALL:
result = "".join(rng.choice(chars) for _ in range(3))
elif record_size == RecordSize.MEDIUM:
result = "".join(rng.choice(chars) for _ in range(10))
elif record_size == RecordSize.LARGE:
result = "".join(rng.choice(chars) for _ in range(100))
else:
raise ValueError(f"Unexpected record size {record_size}")

return f"{literal(str(result))}::bytea" if in_query else str(result)

@staticmethod
def numeric_value(num: int, in_query: bool = False) -> Any:
result = f"key{num}"
return f"'{result}'::bytea" if in_query else str(result)


class UUID(DataType):
@staticmethod
Expand Down Expand Up @@ -705,6 +743,66 @@ def numeric_value(num: int, in_query: bool = False) -> Any:
return f"'{values_str}'::int list" if in_query else values_str


class RecordList(DataType):
@staticmethod
def name(backend: Backend = Backend.MATERIALIZE) -> str:
if backend == Backend.AVRO:
raise ValueError("Unsupported")
elif backend == Backend.JSON:
raise ValueError("Unsupported")
else:
return "record list"

@staticmethod
def random_value(
rng: random.Random,
record_size: RecordSize = RecordSize.LARGE,
in_query: bool = False,
) -> Any:
if record_size == RecordSize.TINY:
key_range = 1
elif record_size == RecordSize.SMALL:
key_range = 5
elif record_size == RecordSize.MEDIUM:
key_range = 10
elif record_size == RecordSize.LARGE:
key_range = 20
else:
raise ValueError(f"Unexpected record size {record_size}")
values = [f"row({rng.randint(-100, 100)})" for i in range(0, key_range)]
return f"list[{', '.join(values)}]"

@staticmethod
def numeric_value(num: int, in_query: bool = False) -> Any:
values = [f"row({i})" for i in range(0, num)]
return f"list[{', '.join(values)}]"


class Record(DataType):
@staticmethod
def name(backend: Backend = Backend.MATERIALIZE) -> str:
if backend == Backend.AVRO:
raise ValueError("Unsupported")
elif backend == Backend.JSON:
raise ValueError("Unsupported")
else:
return "record"

@staticmethod
def random_value(
rng: random.Random,
record_size: RecordSize = RecordSize.LARGE,
in_query: bool = False,
) -> Any:
value = str(rng.choice(["null::integer", "1"]))
return f"row({value})"

@staticmethod
def numeric_value(num: int, in_query: bool = False) -> Any:
value = str(num)
return f"row({value})"


class Timestamp(DataType):
@staticmethod
def random_value(
Expand Down Expand Up @@ -889,6 +987,8 @@ def name(backend: Backend = Backend.MATERIALIZE) -> str:
# Sort to keep determinism for reproducible runs with specific seed
DATA_TYPES = sorted(list(all_subclasses(DataType)), key=repr)

DATA_TYPES_FOR_COLUMNS = sorted(list(set(DATA_TYPES) - {Record, RecordList}), key=repr)

# fastavro._schema_common.UnknownType: record
# bytea requires Python bytes type instead of str
DATA_TYPES_FOR_AVRO = sorted(
Expand All @@ -902,6 +1002,8 @@ def name(backend: Backend = Backend.MATERIALIZE) -> str:
UUID,
Interval,
IntList,
RecordList,
Record,
IntArray,
Time,
Date,
Expand All @@ -925,6 +1027,8 @@ def name(backend: Backend = Backend.MATERIALIZE) -> str:
set(DATA_TYPES)
- {
IntList,
RecordList,
Record,
IntArray,
UUID,
TextTextMap,
Expand All @@ -948,6 +1052,8 @@ def name(backend: Backend = Backend.MATERIALIZE) -> str:
set(DATA_TYPES)
- {
IntList,
RecordList,
Record,
IntArray,
UUID,
TextTextMap,
Expand All @@ -974,7 +1080,10 @@ def name(backend: Backend = Backend.MATERIALIZE) -> str:

# MySQL doesn't support keys of unlimited size
DATA_TYPES_FOR_KEY = sorted(
list(set(DATA_TYPES_FOR_AVRO) - {Text, Bytea, IntList, IntArray, Float, Double}),
list(
set(DATA_TYPES_FOR_AVRO)
- {Text, Bytea, IntList, RecordList, Record, IntArray, Float, Double}
),
key=repr,
)

Expand Down
58 changes: 43 additions & 15 deletions misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import materialize.parallel_workload.database
from materialize.data_ingest.data_type import (
DATA_TYPES,
NUMBER_TYPES,
Boolean,
Text,
Expand All @@ -42,7 +43,6 @@
)
from materialize.mzcompose.services.minio import minio_blob_uri
from materialize.parallel_workload.database import (
DATA_TYPES,
DB,
MAX_CLUSTERS,
MAX_COLUMNS,
Expand Down Expand Up @@ -239,24 +239,27 @@ def generate_select_query(self, exe: Executor, expr_kind: ExprKind) -> str:

join = obj_name != obj2_name and obj not in exe.db.views and columns

if join:
all_columns = list(obj.columns) + list(obj2.columns)
else:
all_columns = obj.columns
all_columns = list(obj.columns) + list(obj2.columns) if join else obj.columns

column_types = []
if self.rng.random() < 0.9:
column_types = [
self.rng.choice(list(DATA_TYPES))
for i in range(self.rng.randint(1, 10))
]
expressions = ", ".join(
[
expression(
self.rng.choice(list(DATA_TYPES)),
column_type,
all_columns,
self.rng,
expr_kind,
)
for i in range(self.rng.randint(1, 10))
for column_type in column_types
]
)
if self.rng.choice([True, False]):
column_types = []
column1 = self.rng.choice(all_columns)
column2 = self.rng.choice(all_columns)
column3 = self.rng.choice(all_columns)
Expand Down Expand Up @@ -299,17 +302,42 @@ def generate_select_query(self, exe: Executor, expr_kind: ExprKind) -> str:
if self.rng.choice([True, False]):
query += f" WHERE {expression(Boolean, all_columns, self.rng, expr_kind)}"

if self.rng.choice([True, False]):
query += f" UNION ALL SELECT {expressions} FROM {obj_name}"
if bool(column_types) and self.rng.choice([True, False]):
obj3 = self.rng.choice(exe.db.db_objects())
obj3_name = str(obj3)
column3 = self.rng.choice(obj3.columns)
obj4 = self.rng.choice(exe.db.db_objects())
obj4_name = str(obj4)
columns_union = [
c
for c in obj4.columns
if c.data_type == column3.data_type and c.data_type != TextTextMap
]
join_union = (
obj3_name != obj4_name and obj3 not in exe.db.views and columns_union
)
all_columns_union = (
list(obj3.columns) + list(obj4.columns) if join_union else obj3.columns
)
expressions3 = ", ".join(
[
expression(
column_type,
all_columns_union,
self.rng,
expr_kind,
)
for column_type in column_types
]
)
query += f" UNION ALL SELECT {expressions3} FROM {obj3_name}"

if join:
column2 = self.rng.choice(columns)
query += f" JOIN {obj2_name} ON {column} = {column2}"
if join_union:
column4 = self.rng.choice(columns_union)
query += f" JOIN {obj4_name} ON {column3} = {column4}"

if self.rng.choice([True, False]):
query += (
f" WHERE {expression(Boolean, all_columns, self.rng, expr_kind)}"
)
query += f" WHERE {expression(Boolean, all_columns_union, self.rng, expr_kind)}"

query += f" LIMIT {self.rng.randint(0, 100)}"
return query
Expand Down
7 changes: 4 additions & 3 deletions misc/python/materialize/parallel_workload/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
from pg8000.native import identifier, literal

from materialize.data_ingest.data_type import (
DATA_TYPES,
DATA_TYPES_FOR_AVRO,
DATA_TYPES_FOR_COLUMNS,
DATA_TYPES_FOR_KEY,
DATA_TYPES_FOR_MYSQL,
DATA_TYPES_FOR_SQL_SERVER,
Expand Down Expand Up @@ -174,7 +174,7 @@ def __init__(self, rng: random.Random, table_id: int, schema: Schema):
self.table_id = table_id
self.schema = schema
self.columns = [
Column(rng, i, rng.choice(DATA_TYPES), self)
Column(rng, i, rng.choice(DATA_TYPES_FOR_COLUMNS), self)
for i in range(rng.randint(2, MAX_COLUMNS))
]
self.num_rows = 0
Expand Down Expand Up @@ -225,7 +225,8 @@ def __init__(
list(base_object2.columns) if base_object2 else []
)
self.data_types = [
rng.choice(list(DATA_TYPES)) for i in range(rng.randint(1, MAX_COLUMNS))
rng.choice(list(DATA_TYPES_FOR_COLUMNS))
for i in range(rng.randint(1, MAX_COLUMNS))
]
self.expressions = [
expression(data_type, all_columns, rng, kind=ExprKind.MATERIALIZABLE)
Expand Down
20 changes: 18 additions & 2 deletions misc/python/materialize/parallel_workload/expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
MzTimestamp,
Numeric,
Numeric383,
Record,
RecordList,
RecordSize,
Text,
TextTextMap,
Expand Down Expand Up @@ -74,13 +76,22 @@ def __init__(self, text: str, params: list, unsupported: ExprKind = ExprKind.ALL
if dt != Bytea:
FUNC_OPS[Text] += [FuncOp("cast({} as text)", [dt])]

if dt not in (IntList, IntArray, TextTextMap, Bytea, Jsonb, Text):
if dt not in (
IntList,
RecordList,
Record,
IntArray,
TextTextMap,
Bytea,
Jsonb,
Text,
):
FUNC_OPS[Text] += [
FuncOp("{} || {}", [dt, Text]),
FuncOp("{} || {}", [Text, dt]),
]

if dt not in (IntList, IntArray, TextTextMap, Bytea, Jsonb):
if dt not in (IntList, RecordList, Record, IntArray, TextTextMap, Bytea, Jsonb):
FUNC_OPS[Boolean] += [
FuncOp("{} > {}", [dt, dt]),
FuncOp("{} < {}", [dt, dt]),
Expand Down Expand Up @@ -182,6 +193,10 @@ def __init__(self, text: str, params: list, unsupported: ExprKind = ExprKind.ALL
FuncOp("mz_is_superuser()", [], unsupported=ExprKind.MATERIALIZABLE),
]

FUNC_OPS[Record] += [FuncOp("row{}", [Int])]

FUNC_OPS[RecordList] += [FuncOp("list[{}]", [Record])]

FUNC_OPS[Text] += [
FuncOp("lower{}", [Text]),
FuncOp("upper{}", [Text]),
Expand Down Expand Up @@ -213,6 +228,7 @@ def __init__(self, text: str, params: list, unsupported: ExprKind = ExprKind.ALL
FuncOp("char_length{}", [Text]),
FuncOp("map_length{}", [TextTextMap]),
FuncOp("list_length{}", [IntList]),
FuncOp("list_length{}", [RecordList]),
FuncOp("mz_version_num()", [], unsupported=ExprKind.MATERIALIZABLE),
FuncOp("pg_backend_pid()", [], unsupported=ExprKind.MATERIALIZABLE),
FuncOp("{} - {}", [Date, Date]),
Expand Down
Loading